Skip to content

Commit f726981

Browse files
committed
fix(redis,stream): XADD '*' avoids 0-0 when clock is pre-epoch
Gemini medium + Codex P2 round 1 on PR #631: the original change clamped a pre-epoch clock to nowMs=0 via safeUnixMilliToUint64, but on a fresh stream the auto-ID branch then returned "0-0" — the very ID the same function rejects as invalid for explicitly-requested IDs and which XREAD ... 0 treats as the empty after-marker. Fix: extract the auto-ID branch into autoXAddID(nowMs, hasLast, ...) so it can be unit-tested without the wall clock; bump seq to 1 when nowMs is 0 so the first auto-generated entry is "0-1" rather than "0-0". The existing bumpStreamID-on-collision path is unchanged. Tests: - TestAutoXAddID covers the fresh-stream branch (sane clock, clamped clock → 0-1), the same-ms collision (bump to seq+1), the clock-behind-lastMs case (carry from lastMs/lastSeq, not nowMs), the seq-at-MaxUint64 carry to ms+1, and the ID-space-exhausted error. Build / vet / lint clean.
1 parent 0a969c3 commit f726981

2 files changed

Lines changed: 81 additions & 2 deletions

File tree

adapter/redis_compat_commands.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3702,10 +3702,34 @@ func nextXAddID(hasLast bool, lastMs, lastSeq uint64, requested string) (string,
37023702
}
37033703
return requested, nil
37043704
}
3705+
return autoXAddID(safeUnixMilliToUint64(time.Now().UnixMilli()), hasLast, lastMs, lastSeq)
3706+
}
37053707

3706-
nowMs := safeUnixMilliToUint64(time.Now().UnixMilli())
3708+
// autoXAddID resolves XADD '*' to a concrete stream ID given a wall-clock
3709+
// nowMs. Pulled out of nextXAddID so the auto-ID branch is testable
3710+
// without depending on time.Now() — the only un-injectable dependency is
3711+
// already isolated in the caller.
3712+
//
3713+
// Two corner cases the caller cannot rely on the wall clock to avoid:
3714+
//
3715+
// - nowMs == 0 on a fresh stream (!hasLast). A naive "<nowMs>-0" reply
3716+
// yields "0-0", which Redis explicitly rejects as a stream ID and
3717+
// which XREAD ... 0 would treat as the empty after-marker. Bump the
3718+
// seq to 1 so the first auto-generated entry is "0-1" — strictly
3719+
// greater than 0-0 and reachable via XREAD ... 0. (This case fires
3720+
// only when safeUnixMilliToUint64 clamped a pre-epoch clock to 0;
3721+
// under any sane clock, nowMs is well above 0.)
3722+
//
3723+
// - nowMs <= lastMs. Advance past lastMs/lastSeq via bumpStreamID so
3724+
// the stream stays strictly monotonic even across a backwards clock
3725+
// step or a corrupted meta where lastMs is far in the future.
3726+
func autoXAddID(nowMs uint64, hasLast bool, lastMs, lastSeq uint64) (string, error) {
37073727
if !hasLast || nowMs > lastMs {
3708-
return strconv.FormatUint(nowMs, 10) + "-0", nil
3728+
seq := uint64(0)
3729+
if nowMs == 0 {
3730+
seq = 1
3731+
}
3732+
return strconv.FormatUint(nowMs, 10) + "-" + strconv.FormatUint(seq, 10), nil
37093733
}
37103734
// Either nowMs == lastMs (same millisecond), or lastMs is in the future
37113735
// (monotonic guarantee across a backwards clock step or a corrupted

adapter/redis_stream_limit_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,58 @@ func TestSafeUnixMilliToUint64(t *testing.T) {
338338
})
339339
}
340340
}
341+
342+
// TestAutoXAddID covers the XADD '*' path of nextXAddID with synthetic
343+
// nowMs values, including the Codex P2 / Gemini-medium edge case:
344+
// safeUnixMilliToUint64 clamps a pre-epoch clock to 0, and a naive
345+
// "0-0" auto-ID is rejected by Redis (XREAD ... 0 treats it as the
346+
// after-marker and skips it). autoXAddID must bump seq to 1 in that
347+
// case so the first auto-generated entry is "0-1".
348+
func TestAutoXAddID(t *testing.T) {
349+
t.Parallel()
350+
351+
cases := []struct {
352+
name string
353+
nowMs uint64
354+
hasLast bool
355+
lastMs uint64
356+
lastSeq uint64
357+
want string
358+
wantErr bool
359+
}{
360+
// Fresh stream, healthy clock: nowMs > 0, seq starts at 0.
361+
{"fresh stream, sane clock", 1_777_000_000_000, false, 0, 0, "1777000000000-0", false},
362+
// Fresh stream, clock pre-epoch (clamped to 0): MUST yield 0-1
363+
// rather than 0-0 — the original Codex P2 / Gemini-medium case.
364+
{"fresh stream, clamped clock → 0-1", 0, false, 0, 0, "0-1", false},
365+
// Existing stream, nowMs strictly greater: seq resets to 0.
366+
{"clock advanced past lastMs", 200, true, 100, 5, "200-0", false},
367+
// Existing stream, nowMs == lastMs: bumpStreamID seq carry.
368+
{"same ms as lastMs", 100, true, 100, 5, "100-6", false},
369+
// Existing stream, nowMs < lastMs (clock went backwards):
370+
// bumpStreamID carries from lastMs/lastSeq, NOT from nowMs.
371+
{"clock behind lastMs", 50, true, 100, 5, "100-6", false},
372+
// seq at MaxUint64 carries to ms+1.
373+
{"seq at max carries", 100, true, 100, ^uint64(0), "101-0", false},
374+
// Both ms and seq at MaxUint64: ID space exhausted, error.
375+
{"ID space exhausted", 100, true, ^uint64(0), ^uint64(0), "", true},
376+
}
377+
for _, tc := range cases {
378+
t.Run(tc.name, func(t *testing.T) {
379+
t.Parallel()
380+
got, err := autoXAddID(tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq)
381+
if tc.wantErr {
382+
if err == nil {
383+
t.Fatalf("autoXAddID(%d,%v,%d,%d): expected error, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, got)
384+
}
385+
return
386+
}
387+
if err != nil {
388+
t.Fatalf("autoXAddID(%d,%v,%d,%d): unexpected error %v", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, err)
389+
}
390+
if got != tc.want {
391+
t.Fatalf("autoXAddID(%d,%v,%d,%d): want %q, got %q", tc.nowMs, tc.hasLast, tc.lastMs, tc.lastSeq, tc.want, got)
392+
}
393+
})
394+
}
395+
}

0 commit comments

Comments
 (0)