Skip to content

Commit bac2368

Browse files
authored
Merge pull request #384 from bootjp/feature/reduce-lock
adapter: reduce Lua list lock contention
2 parents 10a46af + 18547c5 commit bac2368

13 files changed

Lines changed: 700 additions & 90 deletions

adapter/redis_lua_compat_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package adapter
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/redis/go-redis/v9"
89
"github.com/stretchr/testify/require"
@@ -111,3 +112,94 @@ func TestRedis_LuaReplyHelpers(t *testing.T) {
111112
_, err = rdb.Eval(ctx, `return redis.error_reply("boom")`, nil).Result()
112113
require.ErrorContains(t, err, "boom")
113114
}
115+
116+
func TestRedis_LuaRPopLPushBullMQLikeLists(t *testing.T) {
117+
nodes, _, _ := createNode(t, 3)
118+
defer shutdown(nodes)
119+
120+
ctx := context.Background()
121+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
122+
defer func() { _ = rdb.Close() }()
123+
124+
require.NoError(t, rdb.RPush(ctx, "bull:test:wait", "job-1", "job-2", "job-3").Err())
125+
require.NoError(t, rdb.LPush(ctx, "bull:test:active", "job-0").Err())
126+
127+
result, err := rdb.Eval(ctx, `
128+
local moved = redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
129+
return {moved, redis.call("LLEN", KEYS[1]), redis.call("LLEN", KEYS[2])}
130+
`, []string{"bull:test:wait", "bull:test:active"}).Result()
131+
require.NoError(t, err)
132+
133+
values, ok := result.([]any)
134+
require.True(t, ok)
135+
require.Equal(t, []any{"job-3", int64(2), int64(2)}, values)
136+
137+
waiting, err := rdb.LRange(ctx, "bull:test:wait", 0, -1).Result()
138+
require.NoError(t, err)
139+
require.Equal(t, []string{"job-1", "job-2"}, waiting)
140+
141+
active, err := rdb.LRange(ctx, "bull:test:active", 0, -1).Result()
142+
require.NoError(t, err)
143+
require.Equal(t, []string{"job-3", "job-0"}, active)
144+
}
145+
146+
func TestRedis_LuaRPopLPushPreservesTTL(t *testing.T) {
147+
nodes, _, _ := createNode(t, 3)
148+
defer shutdown(nodes)
149+
150+
ctx := context.Background()
151+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
152+
defer func() { _ = rdb.Close() }()
153+
154+
require.NoError(t, rdb.RPush(ctx, "bull:test:rotate", "job-1", "job-2", "job-3").Err())
155+
require.NoError(t, rdb.PExpire(ctx, "bull:test:rotate", time.Minute).Err())
156+
157+
result, err := rdb.Eval(ctx, `
158+
local moved = redis.call("RPOPLPUSH", KEYS[1], KEYS[1])
159+
return {moved, redis.call("LLEN", KEYS[1])}
160+
`, []string{"bull:test:rotate"}).Result()
161+
require.NoError(t, err)
162+
163+
values, ok := result.([]any)
164+
require.True(t, ok)
165+
require.Equal(t, []any{"job-3", int64(3)}, values)
166+
167+
rotated, err := rdb.LRange(ctx, "bull:test:rotate", 0, -1).Result()
168+
require.NoError(t, err)
169+
require.Equal(t, []string{"job-3", "job-1", "job-2"}, rotated)
170+
171+
ttl, err := rdb.PTTL(ctx, "bull:test:rotate").Result()
172+
require.NoError(t, err)
173+
require.Greater(t, ttl, time.Duration(0))
174+
}
175+
176+
func TestRedis_LuaDelAndRecreateListNoOrphan(t *testing.T) {
177+
nodes, _, _ := createNode(t, 3)
178+
defer shutdown(nodes)
179+
180+
ctx := context.Background()
181+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
182+
defer func() { _ = rdb.Close() }()
183+
184+
// Pre-populate a list so there are existing storage items.
185+
require.NoError(t, rdb.RPush(ctx, "mylist", "a", "b", "c").Err())
186+
187+
// In a single Lua script: DEL the key, then recreate it as a list.
188+
// After the script, only the new items should be visible; the old
189+
// [a, b, c] items must not be orphaned/leaked in storage.
190+
result, err := rdb.Eval(ctx, `
191+
redis.call("DEL", KEYS[1])
192+
redis.call("RPUSH", KEYS[1], "d")
193+
return redis.call("LRANGE", KEYS[1], 0, -1)
194+
`, []string{"mylist"}).Result()
195+
require.NoError(t, err)
196+
197+
values, ok := result.([]any)
198+
require.True(t, ok)
199+
require.Equal(t, []any{"d"}, values, "list should contain only the newly pushed item")
200+
201+
// Verify via a plain LRANGE that the old items are gone.
202+
final, err := rdb.LRange(ctx, "mylist", 0, -1).Result()
203+
require.NoError(t, err)
204+
require.Equal(t, []string{"d"}, final)
205+
}

0 commit comments

Comments
 (0)