Merge pull request 'refactor: redis queue backend test cleanup' (#3850) from efertone/forgejo:refactor-redis-queue-test into forgejo
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/3850 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
This commit is contained in:
commit
cf391f937a
9 changed files with 7582 additions and 117 deletions
3
Makefile
3
Makefile
|
@ -38,6 +38,7 @@ GO_LICENSES_PACKAGE ?= github.com/google/go-licenses@v1.6.0 # renovate: datasour
|
|||
GOVULNCHECK_PACKAGE ?= golang.org/x/vuln/cmd/govulncheck@v1 # renovate: datasource=go
|
||||
ACTIONLINT_PACKAGE ?= github.com/rhysd/actionlint/cmd/actionlint@v1.6.27 # renovate: datasource=go
|
||||
DEADCODE_PACKAGE ?= golang.org/x/tools/internal/cmd/deadcode@v0.14.0 # renovate: datasource=go
|
||||
GOMOCK_PACKAGE ?= go.uber.org/mock/mockgen@latest
|
||||
|
||||
DOCKER_IMAGE ?= gitea/gitea
|
||||
DOCKER_TAG ?= latest
|
||||
|
@ -783,6 +784,7 @@ generate-backend: $(TAGS_PREREQ) generate-go
|
|||
generate-go: $(TAGS_PREREQ)
|
||||
@echo "Running go generate..."
|
||||
@CC= GOOS= GOARCH= CGO_ENABLED=0 $(GO) generate -tags '$(TAGS)' ./...
|
||||
$(GO) run $(GOMOCK_PACKAGE) -package mock -destination ./modules/queue/mock/redisuniversalclient.go github.com/redis/go-redis/v9 UniversalClient
|
||||
|
||||
.PHONY: merge-locales
|
||||
merge-locales:
|
||||
|
@ -884,6 +886,7 @@ deps-tools:
|
|||
$(GO) install $(GO_LICENSES_PACKAGE)
|
||||
$(GO) install $(GOVULNCHECK_PACKAGE)
|
||||
$(GO) install $(ACTIONLINT_PACKAGE)
|
||||
$(GO) install $(GOMOCK_PACKAGE)
|
||||
|
||||
node_modules: package-lock.json
|
||||
npm install --no-save
|
||||
|
|
1
go.mod
1
go.mod
|
@ -101,6 +101,7 @@ require (
|
|||
github.com/yuin/goldmark v1.7.0
|
||||
github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc
|
||||
github.com/yuin/goldmark-meta v1.1.0
|
||||
go.uber.org/mock v0.4.0
|
||||
golang.org/x/crypto v0.23.0
|
||||
golang.org/x/image v0.15.0
|
||||
golang.org/x/net v0.25.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -877,6 +877,8 @@ go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0
|
|||
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
|
||||
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
|
||||
|
|
|
@ -26,8 +26,11 @@ type baseRedis struct {
|
|||
|
||||
var _ baseQueue = (*baseRedis)(nil)
|
||||
|
||||
func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
||||
client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
||||
func newBaseRedisGeneric(cfg *BaseConfig, unique bool, client redis.UniversalClient) (baseQueue, error) {
|
||||
if client == nil {
|
||||
client = nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
||||
}
|
||||
|
||||
prefix := ""
|
||||
uri := nosql.ToRedisURI(cfg.ConnStr)
|
||||
|
||||
|
@ -62,11 +65,11 @@ func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
|||
}
|
||||
|
||||
func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, false)
|
||||
return newBaseRedisGeneric(cfg, false, nil)
|
||||
}
|
||||
|
||||
func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
|
||||
return newBaseRedisGeneric(cfg, true)
|
||||
return newBaseRedisGeneric(cfg, true, nil)
|
||||
}
|
||||
|
||||
func (q *baseRedis) prefixedName(name string) string {
|
||||
|
|
|
@ -5,120 +5,134 @@ package queue
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/nosql"
|
||||
"code.gitea.io/gitea/modules/queue/mock"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
const defaultTestRedisServer = "127.0.0.1:6379"
|
||||
type baseRedisUnitTestSuite struct {
|
||||
suite.Suite
|
||||
|
||||
func testRedisHost() string {
|
||||
value := os.Getenv("TEST_REDIS_SERVER")
|
||||
if value != "" {
|
||||
return value
|
||||
}
|
||||
|
||||
return defaultTestRedisServer
|
||||
}
|
||||
|
||||
func waitRedisReady(conn string, dur time.Duration) (ready bool) {
|
||||
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
|
||||
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
|
||||
if ret.Err() == nil {
|
||||
return true
|
||||
}
|
||||
if time.Since(t) > dur {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func redisServerCmd(t *testing.T) *exec.Cmd {
|
||||
redisServerProg, err := exec.LookPath("redis-server")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
c := &exec.Cmd{
|
||||
Path: redisServerProg,
|
||||
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
|
||||
Dir: t.TempDir(),
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
return c
|
||||
mockController *gomock.Controller
|
||||
}
|
||||
|
||||
func TestBaseRedis(t *testing.T) {
|
||||
redisAddress := "redis://" + testRedisHost() + "/0"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
var redisServer *exec.Cmd
|
||||
if !waitRedisReady(redisAddress, 0) {
|
||||
redisServer = redisServerCmd(t)
|
||||
|
||||
if redisServer == nil {
|
||||
t.Skip("redis-server not found in Forgejo test yet")
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, redisServer.Start())
|
||||
if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
suite.Run(t, &baseRedisUnitTestSuite{})
|
||||
}
|
||||
|
||||
func TestBaseRedisWithPrefix(t *testing.T) {
|
||||
redisAddress := "redis://" + testRedisHost() + "/0?prefix=forgejo:queue:"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
var redisServer *exec.Cmd
|
||||
if !waitRedisReady(redisAddress, 0) {
|
||||
redisServer = redisServerCmd(t)
|
||||
|
||||
if redisServer == nil {
|
||||
t.Skip("redis-server not found in Forgejo test yet")
|
||||
return
|
||||
}
|
||||
|
||||
assert.NoError(t, redisServer.Start())
|
||||
if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
func (suite *baseRedisUnitTestSuite) SetupSuite() {
|
||||
suite.mockController = gomock.NewController(suite.T())
|
||||
}
|
||||
|
||||
func (suite *baseRedisUnitTestSuite) TestBasic() {
|
||||
queueName := "test-queue"
|
||||
testCases := []struct {
|
||||
Name string
|
||||
ConnectionString string
|
||||
QueueName string
|
||||
Unique bool
|
||||
}{
|
||||
{
|
||||
Name: "unique",
|
||||
ConnectionString: "redis://127.0.0.1/0",
|
||||
QueueName: queueName,
|
||||
Unique: true,
|
||||
},
|
||||
{
|
||||
Name: "non-unique",
|
||||
ConnectionString: "redis://127.0.0.1/0",
|
||||
QueueName: queueName,
|
||||
Unique: false,
|
||||
},
|
||||
{
|
||||
Name: "unique with prefix",
|
||||
ConnectionString: "redis://127.0.0.1/0?prefix=forgejo:queue:",
|
||||
QueueName: "forgejo:queue:" + queueName,
|
||||
Unique: true,
|
||||
},
|
||||
{
|
||||
Name: "non-unique with prefix",
|
||||
ConnectionString: "redis://127.0.0.1/0?prefix=forgejo:queue:",
|
||||
QueueName: "forgejo:queue:" + queueName,
|
||||
Unique: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
suite.Run(testCase.Name, func() {
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: testCase.ConnectionString,
|
||||
}
|
||||
|
||||
// Configure expectations.
|
||||
mockRedisStore := mock.NewInMemoryMockRedis()
|
||||
redisClient := mock.NewMockUniversalClient(suite.mockController)
|
||||
|
||||
redisClient.EXPECT().
|
||||
Ping(gomock.Any()).
|
||||
Times(1).
|
||||
Return(&redis.StatusCmd{})
|
||||
redisClient.EXPECT().
|
||||
LLen(gomock.Any(), testCase.QueueName).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.LLen)
|
||||
redisClient.EXPECT().
|
||||
LPop(gomock.Any(), testCase.QueueName).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.LPop)
|
||||
redisClient.EXPECT().
|
||||
RPush(gomock.Any(), testCase.QueueName, gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.RPush)
|
||||
|
||||
if testCase.Unique {
|
||||
redisClient.EXPECT().
|
||||
SAdd(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.SAdd)
|
||||
redisClient.EXPECT().
|
||||
SRem(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(1).
|
||||
DoAndReturn(mockRedisStore.SRem)
|
||||
redisClient.EXPECT().
|
||||
SIsMember(gomock.Any(), testCase.QueueName+"_unique", gomock.Any()).
|
||||
Times(2).
|
||||
DoAndReturn(mockRedisStore.SIsMember)
|
||||
}
|
||||
|
||||
client, err := newBaseRedisGeneric(
|
||||
toBaseConfig(queueName, queueSettings),
|
||||
testCase.Unique,
|
||||
redisClient,
|
||||
)
|
||||
suite.Require().NoError(err)
|
||||
|
||||
ctx := context.Background()
|
||||
expectedContent := []byte("test")
|
||||
|
||||
suite.Require().NoError(client.PushItem(ctx, expectedContent))
|
||||
|
||||
found, err := client.HasItem(ctx, expectedContent)
|
||||
suite.Require().NoError(err)
|
||||
if testCase.Unique {
|
||||
suite.True(found)
|
||||
} else {
|
||||
suite.False(found)
|
||||
}
|
||||
|
||||
found, err = client.HasItem(ctx, []byte("not found content"))
|
||||
suite.Require().NoError(err)
|
||||
suite.False(found)
|
||||
|
||||
content, err := client.PopItem(ctx)
|
||||
suite.Require().NoError(err)
|
||||
suite.Equal(expectedContent, content)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
133
modules/queue/base_redis_with_server_test.go
Normal file
133
modules/queue/base_redis_with_server_test.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/nosql"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
const defaultTestRedisServer = "127.0.0.1:6379"
|
||||
|
||||
type baseRedisWithServerTestSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestBaseRedisWithServer(t *testing.T) {
|
||||
suite.Run(t, &baseRedisWithServerTestSuite{})
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) TestNormal() {
|
||||
redisAddress := "redis://" + suite.testRedisHost() + "/0"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
redisServer, accessible := suite.startRedisServer(redisAddress)
|
||||
|
||||
// If it's accessible, but redisServer command is nil, that means we are using
|
||||
// an already running redis server.
|
||||
if redisServer == nil && !accessible {
|
||||
suite.T().Skip("redis-server not found in Forgejo test yet")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(suite.T(), newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(suite.T(), newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) TestWithPrefix() {
|
||||
redisAddress := "redis://" + suite.testRedisHost() + "/0?prefix=forgejo:queue:"
|
||||
queueSettings := setting.QueueSettings{
|
||||
Length: 10,
|
||||
ConnStr: redisAddress,
|
||||
}
|
||||
|
||||
redisServer, accessible := suite.startRedisServer(redisAddress)
|
||||
|
||||
// If it's accessible, but redisServer command is nil, that means we are using
|
||||
// an already running redis server.
|
||||
if redisServer == nil && !accessible {
|
||||
suite.T().Skip("redis-server not found in Forgejo test yet")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if redisServer != nil {
|
||||
_ = redisServer.Process.Signal(os.Interrupt)
|
||||
_ = redisServer.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
testQueueBasic(suite.T(), newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false)
|
||||
testQueueBasic(suite.T(), newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true)
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) startRedisServer(address string) (*exec.Cmd, bool) {
|
||||
var redisServer *exec.Cmd
|
||||
|
||||
if !suite.waitRedisReady(address, 0) {
|
||||
redisServerProg, err := exec.LookPath("redis-server")
|
||||
if err != nil {
|
||||
return nil, false
|
||||
}
|
||||
redisServer = &exec.Cmd{
|
||||
Path: redisServerProg,
|
||||
Args: []string{redisServerProg, "--bind", "127.0.0.1", "--port", "6379"},
|
||||
Dir: suite.T().TempDir(),
|
||||
Stdin: os.Stdin,
|
||||
Stdout: os.Stdout,
|
||||
Stderr: os.Stderr,
|
||||
}
|
||||
|
||||
suite.Require().NoError(redisServer.Start())
|
||||
|
||||
if !suite.True(suite.waitRedisReady(address, 5*time.Second), "start redis-server") {
|
||||
// Return with redis server even if it's not available. It was started,
|
||||
// even if it's not reachable for any reasons, it's still started, the
|
||||
// parent will close it.
|
||||
return redisServer, false
|
||||
}
|
||||
}
|
||||
|
||||
return redisServer, true
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) waitRedisReady(conn string, dur time.Duration) (ready bool) {
|
||||
ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
for t := time.Now(); ; time.Sleep(50 * time.Millisecond) {
|
||||
ret := nosql.GetManager().GetRedisClient(conn).Ping(ctxTimed)
|
||||
if ret.Err() == nil {
|
||||
return true
|
||||
}
|
||||
if time.Since(t) > dur {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *baseRedisWithServerTestSuite) testRedisHost() string {
|
||||
value := os.Getenv("TEST_REDIS_SERVER")
|
||||
if value != "" {
|
||||
return value
|
||||
}
|
||||
|
||||
return defaultTestRedisServer
|
||||
}
|
133
modules/queue/mock/inmemorymockredis.go
Normal file
133
modules/queue/mock/inmemorymockredis.go
Normal file
|
@ -0,0 +1,133 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
redis "github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// InMemoryMockRedis is a very primitive in-memory redis-like feature. The main
|
||||
// purpose of this struct is to give some backend to mocked unit tests.
|
||||
type InMemoryMockRedis struct {
|
||||
queues map[string][][]byte
|
||||
}
|
||||
|
||||
func NewInMemoryMockRedis() InMemoryMockRedis {
|
||||
return InMemoryMockRedis{
|
||||
queues: map[string][][]byte{},
|
||||
}
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) LLen(ctx context.Context, key string) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
cmd.SetVal(int64(len(r.queues[key])))
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SAdd(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
for _, value := range r.queues[key] {
|
||||
if string(value) == string(content) {
|
||||
cmd.SetVal(0)
|
||||
|
||||
return cmd
|
||||
}
|
||||
}
|
||||
|
||||
r.queues[key] = append(r.queues[key], content)
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) RPush(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
r.queues[key] = append(r.queues[key], content)
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) LPop(ctx context.Context, key string) *redis.StringCmd {
|
||||
cmd := redis.NewStringCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
if len(queue) < 1 {
|
||||
cmd.SetErr(errors.New("queue is empty"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
value, rest := queue[0], queue[1:]
|
||||
|
||||
r.queues[key] = rest
|
||||
|
||||
cmd.SetVal(string(value))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SRem(ctx context.Context, key string, content []byte) *redis.IntCmd {
|
||||
cmd := redis.NewIntCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
if len(queue) < 1 {
|
||||
cmd.SetErr(errors.New("queue is empty"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
newList := [][]byte{}
|
||||
|
||||
for _, value := range queue {
|
||||
if string(value) != string(content) {
|
||||
newList = append(newList, value)
|
||||
}
|
||||
}
|
||||
|
||||
r.queues[key] = newList
|
||||
|
||||
cmd.SetVal(1)
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func (r *InMemoryMockRedis) SIsMember(ctx context.Context, key string, content []byte) *redis.BoolCmd {
|
||||
cmd := redis.NewBoolCmd(ctx)
|
||||
|
||||
queue, found := r.queues[key]
|
||||
if !found {
|
||||
cmd.SetErr(errors.New("queue not found"))
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
for _, value := range queue {
|
||||
if string(value) == string(content) {
|
||||
cmd.SetVal(true)
|
||||
|
||||
return cmd
|
||||
}
|
||||
}
|
||||
|
||||
cmd.SetVal(false)
|
||||
|
||||
return cmd
|
||||
}
|
7168
modules/queue/mock/redisuniversalclient.go
Normal file
7168
modules/queue/mock/redisuniversalclient.go
Normal file
File diff suppressed because it is too large
Load diff
|
@ -192,16 +192,24 @@ func (q *WorkerPoolQueue[T]) ShutdownWait(timeout time.Duration) {
|
|||
<-q.shutdownDone
|
||||
}
|
||||
|
||||
func getNewQueueFn(t string) (string, func(cfg *BaseConfig, unique bool) (baseQueue, error)) {
|
||||
func getNewQueue(t string, cfg *BaseConfig, unique bool) (string, baseQueue, error) {
|
||||
switch t {
|
||||
case "dummy", "immediate":
|
||||
return t, newBaseDummy
|
||||
queue, err := newBaseDummy(cfg, unique)
|
||||
|
||||
return t, queue, err
|
||||
case "channel":
|
||||
return t, newBaseChannelGeneric
|
||||
queue, err := newBaseChannelGeneric(cfg, unique)
|
||||
|
||||
return t, queue, err
|
||||
case "redis":
|
||||
return t, newBaseRedisGeneric
|
||||
queue, err := newBaseRedisGeneric(cfg, unique, nil)
|
||||
|
||||
return t, queue, err
|
||||
default: // level(leveldb,levelqueue,persistable-channel)
|
||||
return "level", newBaseLevelQueueGeneric
|
||||
queue, err := newBaseLevelQueueGeneric(cfg, unique)
|
||||
|
||||
return "level", queue, err
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,14 +225,14 @@ func NewWorkerPoolQueueWithContext[T any](ctx context.Context, name string, queu
|
|||
|
||||
var w WorkerPoolQueue[T]
|
||||
var err error
|
||||
queueType, newQueueFn := getNewQueueFn(queueSetting.Type)
|
||||
w.baseQueueType = queueType
|
||||
|
||||
w.baseConfig = toBaseConfig(name, queueSetting)
|
||||
w.baseQueue, err = newQueueFn(w.baseConfig, unique)
|
||||
|
||||
w.baseQueueType, w.baseQueue, err = getNewQueue(queueSetting.Type, w.baseConfig, unique)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Trace("Created queue %q of type %q", name, queueType)
|
||||
log.Trace("Created queue %q of type %q", name, w.baseQueueType)
|
||||
|
||||
w.ctxRun, _, w.ctxRunCancel = process.GetManager().AddTypedContext(ctx, "Queue: "+w.GetName(), process.SystemProcessType, false)
|
||||
w.batchChan = make(chan []T)
|
||||
|
|
Loading…
Reference in a new issue