Skip to content

Commit c159654

Browse files
committed
test(redis): add TTLBuffer unit tests, buffer-first integration tests, and update design doc
- adapter/redis_ttl_buffer_test.go (new): - TTLBuffer: Set/Get, nil PERSIST semantics, seq ordering (later wins), Drain snapshot+clear, MergeBack respects seq, nil receiver safety, overflow drops new keys, concurrent access with race detector - buildTTLFlushElems: nil→Del, non-nil→Put with round-trip decode, deterministic key sort, mixed batch - ttlAt buffer-first: hit skips store, miss falls back, PERSIST shadows stale store TTL - adapter/redis_ttl_compat_test.go (new): integration tests via createNode: EXPIRE/PEXPIRE/SET EX immediately visible via TTL/PTTL, INCR clears TTL (PERSIST), PERSIST command, MULTI/EXEC + EXPIRE, MULTI/EXEC + SET EX, key expiry, GETEX EX / GETEX PERSIST - docs/design/ttl-memory-buffer.md §5: add explicit note that MULTI/EXEC no longer guarantees TTL snapshot isolation; concurrent EXPIRE between MULTI and EXEC is not detected as ErrWriteConflict Fixes: gosec G115 (int→uint64) in buffer test, gci formatting in lua context
1 parent 4caf1da commit c159654

4 files changed

Lines changed: 653 additions & 2 deletions

File tree

adapter/redis_lua_context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2426,8 +2426,8 @@ type luaCommitPlan struct {
24262426

24272427
// luaKeyPlan carries the data elements and TTL metadata for a single key commit.
24282428
type luaKeyPlan struct {
2429-
elems []*kv.Elem[kv.OP]
2430-
finalType redisValueType
2429+
elems []*kv.Elem[kv.OP]
2430+
finalType redisValueType
24312431
preserveExisting bool
24322432
}
24332433

adapter/redis_ttl_buffer_test.go

Lines changed: 366 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,366 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
"github.com/bootjp/elastickv/kv"
11+
"github.com/bootjp/elastickv/store"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
// ────────────────────────────────────────────────────────────────
17+
// TTLBuffer — unit tests
18+
// ────────────────────────────────────────────────────────────────
19+
20+
func TestTTLBuffer_SetAndGet(t *testing.T) {
21+
t.Parallel()
22+
b := newTTLBuffer()
23+
expireAt := time.Now().Add(time.Minute)
24+
25+
b.Set([]byte("k"), &expireAt)
26+
27+
got, found := b.Get([]byte("k"))
28+
require.True(t, found)
29+
require.NotNil(t, got)
30+
require.Equal(t, expireAt.UnixNano(), got.UnixNano())
31+
}
32+
33+
func TestTTLBuffer_GetMissReturnsFalse(t *testing.T) {
34+
t.Parallel()
35+
b := newTTLBuffer()
36+
37+
got, found := b.Get([]byte("missing"))
38+
require.False(t, found)
39+
require.Nil(t, got)
40+
}
41+
42+
// Set(key, nil) represents PERSIST semantics: the TTL was explicitly removed.
43+
// Get must return (nil, true) — "found but no expiry".
44+
func TestTTLBuffer_SetNilIsPersistFound(t *testing.T) {
45+
t.Parallel()
46+
b := newTTLBuffer()
47+
48+
b.Set([]byte("k"), nil)
49+
50+
got, found := b.Get([]byte("k"))
51+
require.True(t, found, "PERSIST entry must be found in buffer")
52+
require.Nil(t, got, "PERSIST entry must have nil expireAt")
53+
}
54+
55+
// A second Set with a higher seq must overwrite a previous Set.
56+
func TestTTLBuffer_LaterSetWins(t *testing.T) {
57+
t.Parallel()
58+
b := newTTLBuffer()
59+
t1 := time.Now().Add(10 * time.Second)
60+
t2 := time.Now().Add(20 * time.Second)
61+
62+
b.Set([]byte("k"), &t1)
63+
b.Set([]byte("k"), &t2)
64+
65+
got, found := b.Get([]byte("k"))
66+
require.True(t, found)
67+
require.Equal(t, t2.UnixNano(), got.UnixNano())
68+
}
69+
70+
func TestTTLBuffer_Drain_ReturnsSnapshotAndClearsBuffer(t *testing.T) {
71+
t.Parallel()
72+
b := newTTLBuffer()
73+
expireAt := time.Now().Add(time.Minute)
74+
b.Set([]byte("a"), &expireAt)
75+
b.Set([]byte("b"), nil)
76+
77+
snapshot := b.Drain()
78+
79+
require.Len(t, snapshot, 2)
80+
require.Contains(t, snapshot, "a")
81+
require.Contains(t, snapshot, "b")
82+
require.Equal(t, 0, b.Len(), "buffer must be empty after Drain")
83+
}
84+
85+
func TestTTLBuffer_Drain_EmptyReturnsNil(t *testing.T) {
86+
t.Parallel()
87+
b := newTTLBuffer()
88+
89+
snapshot := b.Drain()
90+
require.Nil(t, snapshot)
91+
}
92+
93+
// MergeBack must NOT restore a failed entry if a newer write already exists.
94+
func TestTTLBuffer_MergeBack_NewerWriteWins(t *testing.T) {
95+
t.Parallel()
96+
b := newTTLBuffer()
97+
98+
// First write — goes into the buffer.
99+
old := time.Now().Add(10 * time.Second)
100+
b.Set([]byte("k"), &old)
101+
snapshot := b.Drain() // grab the entry (seq == 1)
102+
103+
// New write arrives while the flush (using snapshot) was in-flight.
104+
newer := time.Now().Add(60 * time.Second)
105+
b.Set([]byte("k"), &newer) // seq == 2
106+
107+
// Simulate flush failure: merge the old snapshot back.
108+
b.MergeBack(snapshot)
109+
110+
// The buffer must keep the newer value (seq=2), not the merged-back one (seq=1).
111+
got, found := b.Get([]byte("k"))
112+
require.True(t, found)
113+
require.Equal(t, newer.UnixNano(), got.UnixNano(),
114+
"MergeBack must not overwrite a newer in-buffer write")
115+
}
116+
117+
// MergeBack must restore an entry when no concurrent write occurred.
118+
func TestTTLBuffer_MergeBack_RestoresWhenNoNewerWrite(t *testing.T) {
119+
t.Parallel()
120+
b := newTTLBuffer()
121+
expireAt := time.Now().Add(time.Minute)
122+
b.Set([]byte("k"), &expireAt)
123+
snapshot := b.Drain()
124+
125+
// No new Set — buffer is now empty.
126+
b.MergeBack(snapshot)
127+
128+
got, found := b.Get([]byte("k"))
129+
require.True(t, found)
130+
require.Equal(t, expireAt.UnixNano(), got.UnixNano())
131+
}
132+
133+
// A nil *TTLBuffer must not panic on Get.
134+
func TestTTLBuffer_NilReceiver_GetIsNoop(t *testing.T) {
135+
t.Parallel()
136+
var b *TTLBuffer
137+
got, found := b.Get([]byte("k"))
138+
require.False(t, found)
139+
require.Nil(t, got)
140+
}
141+
142+
// A nil *TTLBuffer must not panic on Set.
143+
func TestTTLBuffer_NilReceiver_SetIsNoop(t *testing.T) {
144+
t.Parallel()
145+
var b *TTLBuffer
146+
expireAt := time.Now().Add(time.Minute)
147+
require.NotPanics(t, func() { b.Set([]byte("k"), &expireAt) })
148+
}
149+
150+
// When the buffer is full, new (unseen) keys must be silently dropped.
151+
// Existing keys must still be updatable.
152+
func TestTTLBuffer_Full_DropsNewKey(t *testing.T) {
153+
t.Parallel()
154+
b := newTTLBuffer()
155+
156+
// Fill the map to exactly ttlBufferMaxSize via direct struct access so the
157+
// test runs in O(N) map writes without going through the seq machinery.
158+
b.mu.Lock()
159+
var seq uint64
160+
for i := range ttlBufferMaxSize {
161+
seq++
162+
b.entries[fmt.Sprintf("slot:%d", i)] = ttlBufferEntry{seq: seq}
163+
}
164+
b.mu.Unlock()
165+
166+
require.Equal(t, ttlBufferMaxSize, b.Len())
167+
168+
// A brand-new key must be dropped.
169+
expireAt := time.Now().Add(time.Minute)
170+
b.Set([]byte("brand-new"), &expireAt)
171+
_, found := b.Get([]byte("brand-new"))
172+
require.False(t, found, "new key must be dropped when the buffer is full")
173+
174+
// An existing key must still accept updates.
175+
b.Set([]byte("slot:0"), &expireAt)
176+
got, found := b.Get([]byte("slot:0"))
177+
require.True(t, found, "existing key must be updatable even when buffer is full")
178+
require.NotNil(t, got)
179+
}
180+
181+
// Concurrent Set/Get/Drain/MergeBack must not data-race.
182+
// Run with `go test -race` to exercise the race detector.
183+
func TestTTLBuffer_ConcurrentAccess(t *testing.T) {
184+
t.Parallel()
185+
b := newTTLBuffer()
186+
const goroutines = 50
187+
const opsPerGoroutine = 100
188+
189+
var wg sync.WaitGroup
190+
for g := range goroutines {
191+
wg.Add(1)
192+
go func(id int) {
193+
defer wg.Done()
194+
key := []byte(fmt.Sprintf("key:%d", id%10))
195+
expireAt := time.Now().Add(time.Duration(id) * time.Second)
196+
for range opsPerGoroutine {
197+
b.Set(key, &expireAt)
198+
_, _ = b.Get(key)
199+
if id%5 == 0 {
200+
drained := b.Drain()
201+
b.MergeBack(drained)
202+
}
203+
}
204+
}(g)
205+
}
206+
wg.Wait()
207+
}
208+
209+
// ────────────────────────────────────────────────────────────────
210+
// buildTTLFlushElems — unit tests
211+
// ────────────────────────────────────────────────────────────────
212+
213+
func TestBuildTTLFlushElems_Empty(t *testing.T) {
214+
t.Parallel()
215+
elems := buildTTLFlushElems(nil)
216+
require.Empty(t, elems)
217+
}
218+
219+
// nil expireAt (PERSIST) must produce a Del operation on the TTL key.
220+
func TestBuildTTLFlushElems_NilExpireGeneratesDel(t *testing.T) {
221+
t.Parallel()
222+
entries := map[string]ttlBufferEntry{
223+
"mykey": {expireAt: nil, seq: 1},
224+
}
225+
226+
elems := buildTTLFlushElems(entries)
227+
228+
require.Len(t, elems, 1)
229+
assert.Equal(t, kv.Del, elems[0].Op)
230+
assert.Equal(t, redisTTLKey([]byte("mykey")), elems[0].Key)
231+
assert.Nil(t, elems[0].Value)
232+
}
233+
234+
// non-nil expireAt must produce a Put with the encoded TTL.
235+
func TestBuildTTLFlushElems_NonNilExpireGeneratesPut(t *testing.T) {
236+
t.Parallel()
237+
expireAt := time.UnixMilli(9_000_000_000_000) // far future
238+
entries := map[string]ttlBufferEntry{
239+
"mykey": {expireAt: &expireAt, seq: 1},
240+
}
241+
242+
elems := buildTTLFlushElems(entries)
243+
244+
require.Len(t, elems, 1)
245+
assert.Equal(t, kv.Put, elems[0].Op)
246+
assert.Equal(t, redisTTLKey([]byte("mykey")), elems[0].Key)
247+
require.NotNil(t, elems[0].Value)
248+
249+
// Round-trip: decoded value must match the original expiry.
250+
decoded, err := decodeRedisTTL(elems[0].Value)
251+
require.NoError(t, err)
252+
assert.Equal(t, expireAt.UnixMilli(), decoded.UnixMilli())
253+
}
254+
255+
// Keys must appear in lexicographic order for deterministic Raft log entries.
256+
func TestBuildTTLFlushElems_KeysAreSorted(t *testing.T) {
257+
t.Parallel()
258+
exp := time.Now().Add(time.Minute)
259+
entries := map[string]ttlBufferEntry{
260+
"zzz": {expireAt: &exp, seq: 3},
261+
"aaa": {expireAt: &exp, seq: 1},
262+
"mmm": {expireAt: &exp, seq: 2},
263+
}
264+
265+
elems := buildTTLFlushElems(entries)
266+
267+
require.Len(t, elems, 3)
268+
// TTL key has the !redis|ttl| prefix; strip it to compare user keys.
269+
prefix := len(redisTTLKey(nil))
270+
userKeys := make([]string, len(elems))
271+
for i, e := range elems {
272+
userKeys[i] = string(e.Key[prefix:])
273+
}
274+
assert.Equal(t, []string{"aaa", "mmm", "zzz"}, userKeys)
275+
}
276+
277+
// Mixed nil / non-nil in one batch.
278+
func TestBuildTTLFlushElems_MixedDelAndPut(t *testing.T) {
279+
t.Parallel()
280+
exp := time.Now().Add(time.Minute)
281+
entries := map[string]ttlBufferEntry{
282+
"persist": {expireAt: nil, seq: 1},
283+
"expire": {expireAt: &exp, seq: 2},
284+
}
285+
286+
elems := buildTTLFlushElems(entries)
287+
require.Len(t, elems, 2)
288+
289+
ops := map[string]kv.OP{}
290+
prefix := len(redisTTLKey(nil))
291+
for _, e := range elems {
292+
ops[string(e.Key[prefix:])] = e.Op
293+
}
294+
assert.Equal(t, kv.Del, ops["persist"])
295+
assert.Equal(t, kv.Put, ops["expire"])
296+
}
297+
298+
// ────────────────────────────────────────────────────────────────
299+
// ttlAt — buffer-first read path (unit)
300+
// ────────────────────────────────────────────────────────────────
301+
302+
// When the buffer holds a TTL entry, ttlAt must return it without
303+
// touching the Raft/MVCCStore.
304+
func TestTTLAt_BufferHit_SkipsStore(t *testing.T) {
305+
t.Parallel()
306+
server, st := newRedisStorageMigrationTestServer(t)
307+
ctx := context.Background()
308+
key := []byte("bufhit:key")
309+
310+
// Write a TTL into the buffer only — the Raft store has no entry.
311+
expireAt := time.Now().Add(time.Hour)
312+
server.ttlBuffer.Set(key, &expireAt)
313+
314+
// Verify the store is indeed empty.
315+
_, err := st.GetAt(ctx, redisTTLKey(key), server.readTS())
316+
require.ErrorIs(t, err, store.ErrKeyNotFound, "store must not have a TTL entry yet")
317+
318+
// ttlAt must return the buffer value.
319+
got, err := server.ttlAt(ctx, key, server.readTS())
320+
require.NoError(t, err)
321+
require.NotNil(t, got)
322+
require.Equal(t, expireAt.UnixMilli(), got.UnixMilli())
323+
}
324+
325+
// When the buffer is empty, ttlAt must fall back to the Raft store.
326+
func TestTTLAt_BufferMiss_FallsBackToStore(t *testing.T) {
327+
t.Parallel()
328+
server, st := newRedisStorageMigrationTestServer(t)
329+
ctx := context.Background()
330+
key := []byte("bufmiss:key")
331+
332+
// Write TTL directly to the store, bypassing the buffer.
333+
expireAt := time.Now().Add(time.Hour)
334+
ts := server.readTS() + 1
335+
require.NoError(t, st.PutAt(ctx, redisTTLKey(key), encodeRedisTTL(expireAt), ts, 0))
336+
337+
// Buffer is empty.
338+
_, found := server.ttlBuffer.Get(key)
339+
require.False(t, found)
340+
341+
got, err := server.ttlAt(ctx, key, ts)
342+
require.NoError(t, err)
343+
require.NotNil(t, got)
344+
require.Equal(t, expireAt.UnixMilli(), got.UnixMilli())
345+
}
346+
347+
// A PERSIST entry in the buffer (nil expireAt, found=true) must return nil
348+
// without falling back to the store, even when the store has a stale TTL.
349+
func TestTTLAt_BufferPersist_ReturnsNil(t *testing.T) {
350+
t.Parallel()
351+
server, st := newRedisStorageMigrationTestServer(t)
352+
ctx := context.Background()
353+
key := []byte("persist:key")
354+
355+
// Store has an old TTL.
356+
oldExpiry := time.Now().Add(time.Hour)
357+
ts := server.readTS() + 1
358+
require.NoError(t, st.PutAt(ctx, redisTTLKey(key), encodeRedisTTL(oldExpiry), ts, 0))
359+
360+
// Buffer says "TTL was removed".
361+
server.ttlBuffer.Set(key, nil)
362+
363+
got, err := server.ttlAt(ctx, key, ts)
364+
require.NoError(t, err)
365+
require.Nil(t, got, "buffer PERSIST entry must shadow the stale store TTL")
366+
}

0 commit comments

Comments
 (0)