debian-mirror-gitlab/workhorse/internal/redis/keywatcher_test.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

310 lines
7.5 KiB
Go
Raw Normal View History

2021-02-22 17:27:13 +05:30
package redis
import (
"sync"
"testing"
"time"
2022-07-16 23:28:13 +05:30
"github.com/gomodule/redigo/redis"
2022-08-13 15:12:31 +05:30
"github.com/rafaeljusto/redigomock/v3"
2021-02-22 17:27:13 +05:30
"github.com/stretchr/testify/require"
)
const (
runnerKey = "runner:build_queue:10"
)
func createSubscriptionMessage(key, data string) []interface{} {
return []interface{}{
[]byte("message"),
[]byte(key),
[]byte(data),
}
}
func createSubscribeMessage(key string) []interface{} {
return []interface{}{
[]byte("subscribe"),
[]byte(key),
[]byte("1"),
}
}
func createUnsubscribeMessage(key string) []interface{} {
return []interface{}{
[]byte("unsubscribe"),
[]byte(key),
[]byte("1"),
}
}
2022-10-11 01:57:18 +05:30
func (kw *KeyWatcher) countSubscribers(key string) int {
kw.mu.Lock()
defer kw.mu.Unlock()
return len(kw.subscribers[key])
2021-02-22 17:27:13 +05:30
}
// Forces a run of the `Process` loop against a mock PubSubConn.
2022-10-11 01:57:18 +05:30
func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) {
2021-02-22 17:27:13 +05:30
psc := redigomock.NewConn()
2022-10-11 01:57:18 +05:30
psc.ReceiveWait = true
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
channel := channelPrefix + runnerKey
psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
errC := make(chan error)
go func() { errC <- kw.receivePubSubStream(psc) }()
require.Eventually(t, func() bool {
kw.mu.Lock()
defer kw.mu.Unlock()
return kw.conn != nil
}, time.Second, time.Millisecond)
close(ready)
require.Eventually(t, func() bool {
return kw.countSubscribers(runnerKey) == numWatchers
}, time.Second, time.Millisecond)
close(psc.ReceiveNow)
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
require.NoError(t, <-errC)
2021-02-22 17:27:13 +05:30
}
2022-07-16 23:28:13 +05:30
type keyChangeTestCase struct {
desc string
returnValue string
isKeyMissing bool
watchValue string
processedValue string
expectedStatus WatchKeyStatus
timeout time.Duration
}
func TestKeyChangesInstantReturn(t *testing.T) {
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
{
desc: "sees change with key existing and changed",
returnValue: "somethingelse",
watchValue: "something",
expectedStatus: WatchKeyStatusAlreadyChanged,
timeout: time.Second,
},
{
desc: "sees change with key non-existing",
isKeyMissing: true,
watchValue: "something",
processedValue: "somethingelse",
expectedStatus: WatchKeyStatusAlreadyChanged,
timeout: time.Second,
},
// WatchKeyStatusTimeout
{
desc: "sees timeout with key existing and unchanged",
returnValue: "something",
watchValue: "something",
expectedStatus: WatchKeyStatusTimeout,
timeout: time.Millisecond,
},
{
desc: "sees timeout with key non-existing and unchanged",
isKeyMissing: true,
watchValue: "",
expectedStatus: WatchKeyStatusTimeout,
timeout: time.Millisecond,
},
}
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
conn, td := setupMockPool()
defer td()
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
if tc.isKeyMissing {
conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
} else {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
kw := NewKeyWatcher()
defer kw.Shutdown()
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout)
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
})
}
}
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
func TestKeyChangesWhenWatching(t *testing.T) {
testCases := []keyChangeTestCase{
// WatchKeyStatusSeenChange
{
desc: "sees change with key existing",
returnValue: "something",
watchValue: "something",
processedValue: "somethingelse",
expectedStatus: WatchKeyStatusSeenChange,
},
{
desc: "sees change with key non-existing, when watching empty value",
isKeyMissing: true,
watchValue: "",
processedValue: "something",
expectedStatus: WatchKeyStatusSeenChange,
},
// WatchKeyStatusNoChange
{
desc: "sees no change with key existing",
returnValue: "something",
watchValue: "something",
processedValue: "something",
expectedStatus: WatchKeyStatusNoChange,
},
}
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
conn, td := setupMockPool()
defer td()
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
if tc.isKeyMissing {
conn.Command("GET", runnerKey).ExpectError(redis.ErrNil)
} else {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
kw := NewKeyWatcher()
defer kw.Shutdown()
2022-07-16 23:28:13 +05:30
wg := &sync.WaitGroup{}
wg.Add(1)
2022-10-11 01:57:18 +05:30
ready := make(chan struct{})
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
go func() {
defer wg.Done()
2022-10-11 01:57:18 +05:30
<-ready
val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
2021-02-22 17:27:13 +05:30
2022-10-11 01:57:18 +05:30
kw.processMessages(t, 1, tc.processedValue, ready)
2022-07-16 23:28:13 +05:30
wg.Wait()
})
}
2021-02-22 17:27:13 +05:30
}
2022-07-16 23:28:13 +05:30
func TestKeyChangesParallel(t *testing.T) {
testCases := []keyChangeTestCase{
{
desc: "massively parallel, sees change with key existing",
returnValue: "something",
watchValue: "something",
processedValue: "somethingelse",
expectedStatus: WatchKeyStatusSeenChange,
},
{
desc: "massively parallel, sees change with key existing, watching missing keys",
isKeyMissing: true,
watchValue: "",
processedValue: "somethingelse",
expectedStatus: WatchKeyStatusSeenChange,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
runTimes := 100
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
conn, td := setupMockPool()
defer td()
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
getCmd := conn.Command("GET", runnerKey)
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
for i := 0; i < runTimes; i++ {
if tc.isKeyMissing {
getCmd = getCmd.ExpectError(redis.ErrNil)
} else {
getCmd = getCmd.Expect(tc.returnValue)
}
}
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
wg := &sync.WaitGroup{}
wg.Add(runTimes)
2022-10-11 01:57:18 +05:30
ready := make(chan struct{})
kw := NewKeyWatcher()
defer kw.Shutdown()
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
for i := 0; i < runTimes; i++ {
go func() {
defer wg.Done()
2022-10-11 01:57:18 +05:30
<-ready
val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
2021-02-22 17:27:13 +05:30
2022-07-16 23:28:13 +05:30
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
}
2022-10-11 01:57:18 +05:30
kw.processMessages(t, runTimes, tc.processedValue, ready)
2022-07-16 23:28:13 +05:30
wg.Wait()
})
}
2021-02-22 17:27:13 +05:30
}
2021-09-04 01:27:46 +05:30
func TestShutdown(t *testing.T) {
conn, td := setupMockPool()
defer td()
2022-10-11 01:57:18 +05:30
kw := NewKeyWatcher()
kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
defer kw.Shutdown()
2021-09-04 01:27:46 +05:30
conn.Command("GET", runnerKey).Expect("something")
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
2022-10-11 01:57:18 +05:30
defer wg.Done()
val, err := kw.WatchKey(runnerKey, "something", 10*time.Second)
2021-09-04 01:27:46 +05:30
require.NoError(t, err, "Expected no error")
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
}()
go func() {
2022-10-11 01:57:18 +05:30
defer wg.Done()
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
2021-09-04 01:27:46 +05:30
2022-10-11 01:57:18 +05:30
kw.Shutdown()
2021-09-04 01:27:46 +05:30
}()
wg.Wait()
2022-10-11 01:57:18 +05:30
require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
2021-09-04 01:27:46 +05:30
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus
var err error
done := make(chan struct{})
go func() {
2022-10-11 01:57:18 +05:30
val, err = kw.WatchKey(runnerKey, "something", 10*time.Second)
2021-09-04 01:27:46 +05:30
close(done)
}()
select {
case <-done:
require.NoError(t, err, "Expected no error")
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for WatchKey")
}
}