309 lines
7.5 KiB
Go
309 lines
7.5 KiB
Go
package redis
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
"github.com/rafaeljusto/redigomock/v3"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const (
|
|
runnerKey = "runner:build_queue:10"
|
|
)
|
|
|
|
func createSubscriptionMessage(key, data string) []interface{} {
|
|
return []interface{}{
|
|
[]byte("message"),
|
|
[]byte(key),
|
|
[]byte(data),
|
|
}
|
|
}
|
|
|
|
func createSubscribeMessage(key string) []interface{} {
|
|
return []interface{}{
|
|
[]byte("subscribe"),
|
|
[]byte(key),
|
|
[]byte("1"),
|
|
}
|
|
}
|
|
func createUnsubscribeMessage(key string) []interface{} {
|
|
return []interface{}{
|
|
[]byte("unsubscribe"),
|
|
[]byte(key),
|
|
[]byte("1"),
|
|
}
|
|
}
|
|
|
|
func (kw *KeyWatcher) countSubscribers(key string) int {
|
|
kw.mu.Lock()
|
|
defer kw.mu.Unlock()
|
|
return len(kw.subscribers[key])
|
|
}
|
|
|
|
// Forces a run of the `Process` loop against a mock PubSubConn.
|
|
func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) {
|
|
psc := redigomock.NewConn()
|
|
psc.ReceiveWait = true
|
|
|
|
channel := channelPrefix + runnerKey
|
|
psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
|
|
psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
|
|
psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
|
|
|
|
errC := make(chan error)
|
|
go func() { errC <- kw.receivePubSubStream(psc) }()
|
|
|
|
require.Eventually(t, func() bool {
|
|
kw.mu.Lock()
|
|
defer kw.mu.Unlock()
|
|
return kw.conn != nil
|
|
}, time.Second, time.Millisecond)
|
|
close(ready)
|
|
|
|
require.Eventually(t, func() bool {
|
|
return kw.countSubscribers(runnerKey) == numWatchers
|
|
}, time.Second, time.Millisecond)
|
|
close(psc.ReceiveNow)
|
|
|
|
require.NoError(t, <-errC)
|
|
}
|
|
|
|
type keyChangeTestCase struct {
|
|
desc string
|
|
returnValue string
|
|
isKeyMissing bool
|
|
watchValue string
|
|
processedValue string
|
|
expectedStatus WatchKeyStatus
|
|
timeout time.Duration
|
|
}
|
|
|
|
func TestKeyChangesInstantReturn(t *testing.T) {
|
|
testCases := []keyChangeTestCase{
|
|
// WatchKeyStatusAlreadyChanged
|
|
{
|
|
desc: "sees change with key existing and changed",
|
|
returnValue: "somethingelse",
|
|
watchValue: "something",
|
|
expectedStatus: WatchKeyStatusAlreadyChanged,
|
|
timeout: time.Second,
|
|
},
|
|
{
|
|
desc: "sees change with key non-existing",
|
|
isKeyMissing: true,
|
|
watchValue: "something",
|
|
processedValue: "somethingelse",
|
|
expectedStatus: WatchKeyStatusAlreadyChanged,
|
|
timeout: time.Second,
|
|
},
|
|
// WatchKeyStatusTimeout
|
|
{
|
|
desc: "sees timeout with key existing and unchanged",
|
|
returnValue: "something",
|
|
watchValue: "something",
|
|
expectedStatus: WatchKeyStatusTimeout,
|
|
timeout: time.Millisecond,
|
|
},
|
|
{
|
|
desc: "sees timeout with key non-existing and unchanged",
|
|
isKeyMissing: true,
|
|
watchValue: "",
|
|
expectedStatus: WatchKeyStatusTimeout,
|
|
timeout: time.Millisecond,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
conn, td := setupMockPool()
|
|
defer td()
|
|
|
|
if tc.isKeyMissing {
|
|
conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
|
|
} else {
|
|
conn.Command("GET", runnerKey).Expect(tc.returnValue)
|
|
}
|
|
|
|
kw := NewKeyWatcher()
|
|
defer kw.Shutdown()
|
|
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
|
|
|
|
val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout)
|
|
|
|
require.NoError(t, err, "Expected no error")
|
|
require.Equal(t, tc.expectedStatus, val, "Expected value")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestKeyChangesWhenWatching(t *testing.T) {
|
|
testCases := []keyChangeTestCase{
|
|
// WatchKeyStatusSeenChange
|
|
{
|
|
desc: "sees change with key existing",
|
|
returnValue: "something",
|
|
watchValue: "something",
|
|
processedValue: "somethingelse",
|
|
expectedStatus: WatchKeyStatusSeenChange,
|
|
},
|
|
{
|
|
desc: "sees change with key non-existing, when watching empty value",
|
|
isKeyMissing: true,
|
|
watchValue: "",
|
|
processedValue: "something",
|
|
expectedStatus: WatchKeyStatusSeenChange,
|
|
},
|
|
// WatchKeyStatusNoChange
|
|
{
|
|
desc: "sees no change with key existing",
|
|
returnValue: "something",
|
|
watchValue: "something",
|
|
processedValue: "something",
|
|
expectedStatus: WatchKeyStatusNoChange,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
conn, td := setupMockPool()
|
|
defer td()
|
|
|
|
if tc.isKeyMissing {
|
|
conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
|
|
} else {
|
|
conn.Command("GET", runnerKey).Expect(tc.returnValue)
|
|
}
|
|
|
|
kw := NewKeyWatcher()
|
|
defer kw.Shutdown()
|
|
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
ready := make(chan struct{})
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
<-ready
|
|
val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
|
|
|
|
require.NoError(t, err, "Expected no error")
|
|
require.Equal(t, tc.expectedStatus, val, "Expected value")
|
|
}()
|
|
|
|
kw.processMessages(t, 1, tc.processedValue, ready)
|
|
wg.Wait()
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestKeyChangesParallel(t *testing.T) {
|
|
testCases := []keyChangeTestCase{
|
|
{
|
|
desc: "massively parallel, sees change with key existing",
|
|
returnValue: "something",
|
|
watchValue: "something",
|
|
processedValue: "somethingelse",
|
|
expectedStatus: WatchKeyStatusSeenChange,
|
|
},
|
|
{
|
|
desc: "massively parallel, sees change with key existing, watching missing keys",
|
|
isKeyMissing: true,
|
|
watchValue: "",
|
|
processedValue: "somethingelse",
|
|
expectedStatus: WatchKeyStatusSeenChange,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.desc, func(t *testing.T) {
|
|
runTimes := 100
|
|
|
|
conn, td := setupMockPool()
|
|
defer td()
|
|
|
|
getCmd := conn.Command("GET", runnerKey)
|
|
|
|
for i := 0; i < runTimes; i++ {
|
|
if tc.isKeyMissing {
|
|
getCmd = getCmd.ExpectError(redis.ErrNil)
|
|
} else {
|
|
getCmd = getCmd.Expect(tc.returnValue)
|
|
}
|
|
}
|
|
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(runTimes)
|
|
ready := make(chan struct{})
|
|
|
|
kw := NewKeyWatcher()
|
|
defer kw.Shutdown()
|
|
|
|
for i := 0; i < runTimes; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
<-ready
|
|
val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
|
|
|
|
require.NoError(t, err, "Expected no error")
|
|
require.Equal(t, tc.expectedStatus, val, "Expected value")
|
|
}()
|
|
}
|
|
|
|
kw.processMessages(t, runTimes, tc.processedValue, ready)
|
|
wg.Wait()
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestShutdown(t *testing.T) {
|
|
conn, td := setupMockPool()
|
|
defer td()
|
|
|
|
kw := NewKeyWatcher()
|
|
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
|
|
defer kw.Shutdown()
|
|
|
|
conn.Command("GET", runnerKey).Expect("something")
|
|
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
val, err := kw.WatchKey(runnerKey, "something", 10*time.Second)
|
|
|
|
require.NoError(t, err, "Expected no error")
|
|
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
|
|
|
|
kw.Shutdown()
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
|
|
|
|
// Adding a key after the shutdown should result in an immediate response
|
|
var val WatchKeyStatus
|
|
var err error
|
|
done := make(chan struct{})
|
|
go func() {
|
|
val, err = kw.WatchKey(runnerKey, "something", 10*time.Second)
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
require.NoError(t, err, "Expected no error")
|
|
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
|
|
case <-time.After(100 * time.Millisecond):
|
|
t.Fatal("timeout waiting for WatchKey")
|
|
}
|
|
}
|