Skip to content

Commit 4db754f

Browse files
authored
fix(redis,stream): shutdown-aware XREAD ctx + clock-epoch defence (#620 follow-up) (#631)
## Summary Follow-up to merged PR #620, addressing the [Gemini medium-priority review](#620 (review)) that landed after the merge. Three changes, all with regression tests: - `safeUnixMilliToUint64` defends `nextXAddID`'s `*` branch against a system clock set before the Unix epoch (a negative `UnixMilli` would wrap to a `math.MaxUint64`-adjacent value and wedge every subsequent `XADD '*'` chasing that pathological future-ms). - `xread`'s `$`-resolve and `xreadBusyPoll`'s per-iteration ctx now derive from `r.handlerContext()` instead of `context.Background()`, so a server `Close()` aborts in-flight BLOCK XREADs within ~one `redisBusyPollBackoff` (10 ms) instead of running until the BLOCK deadline. The busy-poll loop also short-circuits at the top of each iteration when `handlerCtx` is cancelled — necessary because `isXReadIterCtxError` would otherwise silently translate the per-iteration cancel into "empty iteration" and let the loop spin at backoff cadence. - `resolveXReadDollarID`'s doc comment said "legacy blobs fall back to a full load"; PR #620 deliberately removed that fallback in favour of the "discard-on-read, delete-on-write" contract. Comment now matches the implementation. ## Test plan - [x] `TestSafeUnixMilliToUint64` (unit) — clamp-at-0 for negatives (`-1`, large negative, `MinInt64`); pass-through for `0`, positive, `MaxInt64`. - [x] `TestRedis_StreamXReadShutdownShortCircuits` (e2e) — `XREAD ... BLOCK 5s` in a goroutine, `redisServer.Close()` after 50 ms, assert reply is `redis.Nil` within 2 s. - [x] Build / vet / lint clean. - [x] Existing `TestRedis_StreamXReadShortBlockReturnsNullNotError` and `TestRedis_StreamXReadIterCtxDeadlineReturnsNull` still pass — the shutdown short-circuit doesn't perturb the existing BLOCK-timeout-returns-null path. /gemini review @codex review
2 parents 21cf2c9 + f726981 commit 4db754f

3 files changed

Lines changed: 224 additions & 9 deletions

File tree

adapter/redis_compat_commands.go

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3702,10 +3702,34 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string,
37023702
}
37033703
return requested, nil
37043704
}
3705+
return autoXAddID(safeUnixMilliToUint64(time.Now().UnixMilli()), hasLast, lastMs, lastSeq)
3706+
}
37053707

3706-
nowMs := uint64(time.Now().UnixMilli()) //nolint:gosec // always non-negative
3708+
// autoXAddID resolves XADD '*' to a concrete stream ID given a wall-clock
3709+
// nowMs. Pulled out of nextXAddID so the auto-ID branch is testable
3710+
// without depending on time.Now() — the only un-injectable dependency is
3711+
// already isolated in the caller.
3712+
//
3713+
// Two corner cases the caller cannot rely on the wall clock to avoid:
3714+
//
3715+
// - nowMs == 0 on a fresh stream (!hasLast). A naive "<nowMs>-0" reply
3716+
// yields "0-0", which Redis explicitly rejects as a stream ID and
3717+
// which XREAD ... 0 would treat as the empty after-marker. Bump the
3718+
// seq to 1 so the first auto-generated entry is "0-1" — strictly
3719+
// greater than 0-0 and reachable via XREAD ... 0. (This case fires
3720+
// only when safeUnixMilliToUint64 clamped a pre-epoch clock to 0;
3721+
// under any sane clock, nowMs is well above 0.)
3722+
//
3723+
// - nowMs <= lastMs. Advance past lastMs/lastSeq via bumpStreamID so
3724+
// the stream stays strictly monotonic even across a backwards clock
3725+
// step or a corrupted meta where lastMs is far in the future.
3726+
func autoXAddID(nowMs uint64, hasLast bool, lastMs, lastSeq uint64) (string, error) {
37073727
if !hasLast || nowMs > lastMs {
3708-
return strconv.FormatUint(nowMs, 10) + "-0", nil
3728+
seq := uint64(0)
3729+
if nowMs == 0 {
3730+
seq = 1
3731+
}
3732+
return strconv.FormatUint(nowMs, 10) + "-" + strconv.FormatUint(seq, 10), nil
37093733
}
37103734
// Either nowMs == lastMs (same millisecond), or lastMs is in the future
37113735
// (monotonic guarantee across a backwards clock step or a corrupted
@@ -3718,6 +3742,21 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string,
37183742
return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10), nil
37193743
}
37203744

3745+
// safeUnixMilliToUint64 returns ms as uint64, clamping any negative value
3746+
// (caused by a system clock set before the Unix epoch) to 0. Without this
3747+
// clamp, a direct uint64 cast of a negative int64 would yield a value
3748+
// near math.MaxUint64, which would then make nextXAddID's "future-ms"
3749+
// branch chase that pathological value forever — effectively wedging
3750+
// every subsequent XADD '*' on the stream until the clock recovers.
3751+
// The lastMs/lastSeq monotonic guarantee carries the stream forward
3752+
// from there via bumpStreamID.
3753+
func safeUnixMilliToUint64(ms int64) uint64 {
3754+
if ms < 0 {
3755+
return 0
3756+
}
3757+
return uint64(ms) //nolint:gosec // negative values handled above
3758+
}
3759+
37213760
// bumpStreamID returns the strictly-greater successor of (ms, seq) within
37223761
// the uint64-uint64 stream ID space. Bumps seq; on seq overflow carries
37233762
// to ms+1, seq=0; on ms overflow returns an error (no representable
@@ -4305,11 +4344,14 @@ func (r *RedisServer) resolveXReadAfterIDs(ctx context.Context, req *xreadReques
43054344
}
43064345

43074346
// resolveXReadDollarID resolves the "$" after-ID for a single stream by
4308-
// asking the store for the highest ID ever assigned. New-layout streams
4309-
// answer from meta in one read; legacy blobs fall back to a full load.
4310-
// Returns streamZeroID for non-existent and empty-never-written streams.
4311-
// ctx threads through the caller's cancellation/deadline so the resolve
4312-
// step doesn't survive past a BLOCK-window cancel.
4347+
// asking the store for the highest ID ever assigned. The new-layout meta
4348+
// answers in one read; when meta is absent the stream is treated as
4349+
// empty — legacy single-blob data is intentionally ignored under the
4350+
// "discard-on-read, delete-on-write" contract documented on
4351+
// dollarIDFromState (and matching loadStreamAt). Returns streamZeroID
4352+
// for non-existent and empty-never-written streams. ctx threads through
4353+
// the caller's cancellation/deadline so the resolve step doesn't survive
4354+
// past a BLOCK-window cancel.
43134355
func (r *RedisServer) resolveXReadDollarID(ctx context.Context, key []byte) (string, error) {
43144356
readTS := r.readTS()
43154357
typ, err := r.keyTypeAt(ctx, key, readTS)
@@ -4567,7 +4609,12 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) {
45674609
// the resolve either succeeds quickly or fails cleanly, leaving
45684610
// the BLOCK-window timeout semantics (null on expiry) to the
45694611
// busy-poll below.
4570-
resolveCtx, resolveCancel := context.WithTimeout(context.Background(), redisDispatchTimeout)
4612+
//
4613+
// Parent on r.handlerContext() (not context.Background()) so an
4614+
// in-flight resolve aborts promptly when the server is shutting
4615+
// down — otherwise the per-resolve ScanAt could survive past
4616+
// graceful-shutdown's drain window.
4617+
resolveCtx, resolveCancel := context.WithTimeout(r.handlerContext(), redisDispatchTimeout)
45714618
err = r.resolveXReadAfterIDs(resolveCtx, &req)
45724619
resolveCancel()
45734620
if err != nil {
@@ -4581,7 +4628,19 @@ func (r *RedisServer) xread(conn redcon.Conn, cmd redcon.Command) {
45814628
// xreadBusyPoll runs the BLOCK-window busy-poll loop. Extracted from xread
45824629
// so the parent function stays under the cyclop budget.
45834630
func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline time.Time) {
4631+
handlerCtx := r.handlerContext()
45844632
for {
4633+
// Server-shutdown short-circuit: if the parent handlerContext
4634+
// has been cancelled, abandon the busy-poll immediately rather
4635+
// than spin until the BLOCK deadline. iterCtx below is rooted
4636+
// in handlerCtx, so it would cancel-on-call too — but routing
4637+
// through isXReadIterCtxError silently translates that into an
4638+
// empty iteration and the loop would burn CPU at
4639+
// redisBusyPollBackoff cadence until the deadline.
4640+
if handlerCtx.Err() != nil {
4641+
conn.WriteNull()
4642+
return
4643+
}
45854644
// BLOCK-expired before the loop body: respect the Redis contract
45864645
// that a BLOCK timeout returns null, not an error. If we fell
45874646
// through here without remaining time (very small BLOCK, or
@@ -4598,7 +4657,12 @@ func (r *RedisServer) xreadBusyPoll(conn redcon.Conn, req xreadRequest, deadline
45984657
if iterTimeout > redisDispatchTimeout {
45994658
iterTimeout = redisDispatchTimeout
46004659
}
4601-
iterCtx, iterCancel := context.WithTimeout(context.Background(), iterTimeout)
4660+
// iterCtx is rooted in handlerCtx so its underlying storage
4661+
// scans abort promptly on server shutdown rather than running
4662+
// until iterTimeout fires. The handlerCtx.Err() guard at the
4663+
// top of each iteration prevents the loop from spinning once
4664+
// the parent ctx is cancelled.
4665+
iterCtx, iterCancel := context.WithTimeout(handlerCtx, iterTimeout)
46024666
results, err := r.xreadOnce(iterCtx, req)
46034667
iterCancel()
46044668
// Per-iteration ctx hitting its deadline (or being cancelled by

adapter/redis_compat_commands_stream_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,3 +763,65 @@ func TestRedis_UserKeyShadowingStreamPrefixSurvivesMultiExec(t *testing.T) {
763763
require.NoError(t, err)
764764
require.Equal(t, "user-value", plain)
765765
}
766+
767+
// TestRedis_StreamXReadShutdownShortCircuits guards Gemini's medium
768+
// concern: the XREAD busy-poll loop's per-iteration ctx must be rooted
769+
// in r.handlerContext() so that a server shutdown aborts the loop
770+
// promptly instead of running until the BLOCK deadline. A handlerCtx
771+
// cancellation also short-circuits the loop entry to a null reply, so
772+
// the client sees BLOCK timeout semantics rather than a hung
773+
// connection or a delayed -ERR.
774+
func TestRedis_StreamXReadShutdownShortCircuits(t *testing.T) {
775+
t.Parallel()
776+
nodes, _, _ := createNode(t, 3)
777+
defer shutdown(nodes)
778+
779+
rdb := redis.NewClient(&redis.Options{Addr: nodes[0].redisAddress})
780+
defer func() { _ = rdb.Close() }()
781+
ctx := context.Background()
782+
783+
// Seed one entry and then start an XREAD that will block waiting
784+
// for *new* entries on top of "1-0". With a 5-second BLOCK budget,
785+
// pre-fix this would happily run for the full 5 s after Close()
786+
// because iterCtx was rooted in context.Background(). Post-fix the
787+
// handlerCtx.Err() guard at the top of each loop iteration kicks
788+
// in within ~one redisBusyPollBackoff (10 ms) and we reply null.
789+
_, err := rdb.XAdd(ctx, &redis.XAddArgs{
790+
Stream: "stream-shutdown",
791+
ID: "1-0",
792+
Values: []string{"k", "v"},
793+
}).Result()
794+
require.NoError(t, err)
795+
796+
type xreadOutcome struct {
797+
streams []redis.XStream
798+
err error
799+
}
800+
xreadDone := make(chan xreadOutcome, 1)
801+
go func() {
802+
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
803+
Streams: []string{"stream-shutdown", "1-0"},
804+
Count: 1,
805+
Block: 5 * time.Second,
806+
}).Result()
807+
xreadDone <- xreadOutcome{streams: streams, err: err}
808+
}()
809+
810+
// Give the XREAD a moment to enter the busy-poll loop, then cancel
811+
// the server's base context. The poll loop must observe the
812+
// cancellation and reply null well before the 5 s BLOCK deadline.
813+
time.Sleep(50 * time.Millisecond)
814+
require.NoError(t, nodes[0].redisServer.Close())
815+
816+
select {
817+
case res := <-xreadDone:
818+
// BLOCK timeout returns redis.Nil — the same wire-level reply
819+
// the client would have seen if the BLOCK had expired
820+
// naturally. The server must NOT surface the cancellation as a
821+
// -ERR or hang the connection.
822+
require.True(t, errors.Is(res.err, redis.Nil),
823+
"BLOCK after shutdown must return redis.Nil, got err=%v streams=%v", res.err, res.streams)
824+
case <-time.After(2 * time.Second):
825+
t.Fatal("XREAD did not return within 2 s of server Close — busy-poll did not honour handlerContext cancel")
826+
}
827+
}

adapter/redis_stream_limit_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,92 @@ func TestIsKnownInternalKey_StreamPrefixNarrowed(t *testing.T) {
304304
})
305305
}
306306
}
307+
308+
// TestSafeUnixMilliToUint64 guards Gemini's medium concern: a system
309+
// clock set before the Unix epoch makes time.Now().UnixMilli() return a
310+
// negative int64; a naive uint64 cast wraps to a value near
311+
// math.MaxUint64 that wedges every subsequent XADD '*' (the future-ms
312+
// branch in nextXAddID would chase that pathological value forever). The
313+
// helper must clamp at 0 so the lastMs/lastSeq monotonic carry takes
314+
// over.
315+
func TestSafeUnixMilliToUint64(t *testing.T) {
316+
t.Parallel()
317+
318+
cases := []struct {
319+
name string
320+
in int64
321+
want uint64
322+
}{
323+
{"zero", 0, 0},
324+
{"positive epoch ms", 1_777_000_000_000, 1_777_000_000_000},
325+
{"max int64", math.MaxInt64, uint64(math.MaxInt64)},
326+
// Negative values represent a clock set before the Unix epoch
327+
// (1970-01-01). All must clamp at 0.
328+
{"minus one", -1, 0},
329+
{"large negative", -1_000_000_000_000, 0},
330+
{"min int64", math.MinInt64, 0},
331+
}
332+
for _, tc := range cases {
333+
t.Run(tc.name, func(t *testing.T) {
334+
t.Parallel()
335+
if got := safeUnixMilliToUint64(tc.in); got != tc.want {
336+
t.Fatalf("safeUnixMilliToUint64(%d): want %d, got %d", tc.in, tc.want, got)
337+
}
338+
})
339+
}
340+
}
341+
342+
// TestAutoXAddID covers the XADD '*' path of nextXAddID with synthetic
343+
// nowMs values, including the Codex P2 / Gemini-medium edge case:
344+
// safeUnixMilliToUint64 clamps a pre-epoch clock to 0, and a naive
345+
// "0-0" auto-ID is rejected by Redis (XREAD ... 0 treats it as the
346+
// after-marker and skips it). autoXAddID must bump seq to 1 in that
347+
// case so the first auto-generated entry is "0-1".
348+
func TestAutoXAddID(t *testing.T) {
349+
t.Parallel()
350+
351+
cases := []struct {
352+
name string
353+
nowMs uint64
354+
hasLast bool
355+
lastMs uint64
356+
lastSeq uint64
357+
want string
358+
wantErr bool
359+
}{
360+
// Fresh stream, healthy clock: nowMs > 0, seq starts at 0.
361+
{"fresh stream, sane clock", 1_777_000_000_000, false, 0, 0, "1777000000000-0", false},
362+
// Fresh stream, clock pre-epoch (clamped to 0): MUST yield 0-1
363+
// rather than 0-0 — the original Codex P2 / Gemini-medium case.
364+
{"fresh stream, clamped clock → 0-1", 0, false, 0, 0, "0-1", false},
365+
// Existing stream, nowMs strictly greater: seq resets to 0.
366+
{"clock advanced past lastMs", 200, true, 100, 5, "200-0", false},
367+
// Existing stream, nowMs == lastMs: bumpStreamID seq carry.
368+
{"same ms as lastMs", 100, true, 100, 5, "100-6", false},
369+
// Existing stream, nowMs < lastMs (clock went backwards):
370+
// bumpStreamID carries from lastMs/lastSeq, NOT from nowMs.
371+
{"clock behind lastMs", 50, true, 100, 5, "100-6", false},
372+
// seq at MaxUint64 carries to ms+1.
373+
{"seq at max carries", 100, true, 100, ^uint64(0), "101-0", false},
374+
// Both ms and seq at MaxUint64: ID space exhausted, error.
375+
{"ID space exhausted", 100, true, ^uint64(0), ^uint64(0), "", true},
376+
}
377+
for _, tc := range cases {
378+
t.Run(tc.name, func(t *testing.T) {
379+
t.Parallel()
380+
got, err := autoXAddID(tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq)
381+
if tc.wantErr {
382+
if err == nil {
383+
t.Fatalf("autoXAddID(%d,%v,%d,%d): expected error, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, got)
384+
}
385+
return
386+
}
387+
if err != nil {
388+
t.Fatalf("autoXAddID(%d,%v,%d,%d): unexpected error %v", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, err)
389+
}
390+
if got != tc.want {
391+
t.Fatalf("autoXAddID(%d,%v,%d,%d): want %q, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, tc.want, got)
392+
}
393+
})
394+
}
395+
}

0 commit comments

Comments
 (0)