Skip to content

Commit d412194

Browse files
committed
backup(stream): PR791 r1 codex P1 x2 — pendingTTL drain + duplicate-field preservation
Two P1 findings from chatgpt-codex on PR #791: P1a: Buffer stream TTLs that arrive before stream rows Pebble snapshots emit records in encoded-key order (store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < `s`. In real snapshot order the TTL arrives FIRST, kindByKey is still redisKindUnknown when HandleTTL fires, and the original code counted the TTL as an orphan and dropped it — every TTL'd stream restored as permanent. Same root cause as the set encoder's latent bug in PR #758. This commit adds a pendingTTL infrastructure (matching the parallel fix on PR #790) so the expiry parks during the redisKindUnknown window and drains when streamState first registers the user key. The set encoder gets the same retroactive drain. P1b: Preserve duplicate stream fields instead of map-collapsing XADD permits duplicate field names within one entry (e.g. `XADD s * f v1 f v2`). The protobuf entry stores the interleaved slice verbatim, but my marshalStreamJSONL collapsed pairs into `map[string]string`, silently dropping every duplicate. A restore of such an entry would lose the second (and later) pair. Fix: emit `fields` as a JSON ARRAY of `{name, value}` records (streamFieldJSON). Order is the protobuf's interleaved order so a restore can replay the original XADD argv exactly. The design example at docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338 showed object form. That representation was unsafe for streams (though fine for hashes where the wire-level encoder normalises field names earlier). The format is owned by Phase 0 — adjusted in this PR before the format ships any consumers. Caller audit (per /loop standing instruction): - HandleTTL's redisKindUnknown branch: same semantic change as PR #790's r1 — previously incremented orphanTTLCount on intake; now buffers in pendingTTL and lets Finalize count at end. Same audit conclusion: no external callers of orphanTTLCount; TestRedisDB_OrphanTTLCountedNotBuffered updated to assert the new intake/Finalize split. - streamEntryJSON.Fields type change `map → slice`: only marshalled by encoding/json; the only reader is the test suite, which is updated in this commit. No on-disk format compatibility concerns because Phase 0 has not shipped a consumer yet. - New caller claimPendingTTL: called by setState (retroactive) and streamState (new) in this PR. hashState/listState don't call it because their type prefixes lex-sort BEFORE `!redis|ttl|`. Verified via `grep -n 'claimPendingTTL' internal/backup/`. New tests: - TestRedisDB_StreamDuplicateFieldsPreserved — pins P1b fix. - TestRedisDB_StreamTTLArrivesBeforeRows — pins P1a fix for streams. - TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for PR #758's set encoder (same root cause as the stream bug). - TestRedisDB_StreamFieldsDecodedToArray (renamed from ToObject) — updated to match the array shape. Self-review: 1. Data loss — the original code DROPPED real TTL'd streams on every backup AND dropped duplicate-field entries' later pairs. This fix recovers both. No new data-loss surface introduced. 2. Concurrency — pendingTTL added to RedisDB which is already sequential-per-scope; no new locking required. 3. Performance — pendingTTL holds (string-userKey, uint64-expireAt) pairs; same allocation shape as kindByKey. Fields slice replaces a map of the same logical size — slightly cheaper actually (no hash overhead). 4. Consistency — drain happens at FIRST state registration. The array form preserves insertion order from the protobuf so the restored XADD argv matches. 5. Coverage — 4 new tests + 2 updated. All 78 redis tests pass.
1 parent b2d0b82 commit d412194

5 files changed

Lines changed: 308 additions & 45 deletions

File tree

internal/backup/redis_set.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil }
114114
// setState lazily creates per-key state. Mirrors the hash/list
115115
// kindByKey-registration pattern so HandleSetMeta, HandleSetMember,
116116
// and the HandleTTL back-edge all agree on the kind.
117+
//
118+
// On first registration we drain any pendingTTL for the user key.
119+
// `!redis|ttl|<k>` lex-sorts BEFORE `!st|...` (because `r` < `s`),
120+
// so in real snapshot order the TTL arrives FIRST; HandleTTL parks
121+
// it in pendingTTL, and this function inlines it into the set's
122+
// `expire_at_ms`. Without this drain step, every TTL'd set would
123+
// restore as permanent — a latent bug in PR #758 surfaced by codex
124+
// on PR #791. Phase 0a tests added in the same PR pin the ordering.
117125
func (r *RedisDB) setState(userKey []byte) *redisSetState {
118126
uk := string(userKey)
119127
if st, ok := r.sets[uk]; ok {
120128
return st
121129
}
122130
st := &redisSetState{members: make(map[string]struct{})}
131+
if expireAtMs, ok := r.claimPendingTTL(userKey); ok {
132+
st.expireAtMs = expireAtMs
133+
st.hasTTL = true
134+
}
123135
r.sets[uk] = st
124136
r.kindByKey[uk] = redisKindSet
125137
return st

internal/backup/redis_stream.go

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,23 @@ func (r *RedisDB) HandleStreamEntry(key, value []byte) error {
160160
// hash/list/set/zset kindByKey-registration pattern so HandleStreamMeta,
161161
// HandleStreamEntry, and the HandleTTL back-edge all agree on the
162162
// kind.
163+
//
164+
// On first registration we drain any pendingTTL for the user key.
165+
// `!redis|ttl|<k>` lex-sorts BEFORE `!stream|...` (because `r` < `s`),
166+
// so in real snapshot order the TTL arrives FIRST; HandleTTL parks
167+
// it in pendingTTL, and this function inlines it into the stream's
168+
// JSONL `_meta.expire_at_ms`. Without this drain step, every TTL'd
169+
// stream would restore as permanent. Codex P1 finding on PR #791.
163170
func (r *RedisDB) streamState(userKey []byte) *redisStreamState {
164171
uk := string(userKey)
165172
if st, ok := r.streams[uk]; ok {
166173
return st
167174
}
168175
st := &redisStreamState{}
176+
if expireAtMs, ok := r.claimPendingTTL(userKey); ok {
177+
st.expireAtMs = expireAtMs
178+
st.hasTTL = true
179+
}
169180
r.streams[uk] = st
170181
r.kindByKey[uk] = redisKindStream
171182
return st
@@ -291,17 +302,30 @@ func (r *RedisDB) writeStreamJSONL(dir string, userKey []byte, st *redisStreamSt
291302
return nil
292303
}
293304

294-
// streamEntryJSON is the dump-format projection of one stream entry.
295-
// Fields is emitted as a JSON object keyed by name (matching the
296-
// design's `"fields": {"event":"login","user":"alice"}` example)
297-
// because XADD itself enforces name/value pair shape and Redis
298-
// stream field names are user-controlled strings rather than
299-
// binary-safe bytes. A future format-version bump can switch to a
300-
// `[{"name":..., "value":...}]` array if reviewers find names that
301-
// collide under JSON-object keying.
305+
// streamFieldJSON is the dump-format projection of one (name, value)
306+
// pair from a stream entry. We emit a list of name/value records
307+
// rather than a JSON object because XADD permits duplicate field
308+
// names within one entry — e.g. `XADD s * f v1 f v2` records BOTH
309+
// (f, v1) and (f, v2) as distinct interleaved entries in the
310+
// stored protobuf's Fields slice. The design example at
311+
// docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338
312+
// showed an object shape, but that representation silently drops
313+
// duplicates and a restore would not reproduce the original
314+
// stream entry. Codex P1 (PR #791) — switched to a name/value
315+
// record array.
316+
type streamFieldJSON struct {
317+
Name string `json:"name"`
318+
Value string `json:"value"`
319+
}
320+
321+
// streamEntryJSON is the dump-format projection of one stream
322+
// entry. Fields is an ARRAY so duplicate field names within a
323+
// single XADD round-trip correctly. The array preserves the
324+
// interleaved insertion order from the protobuf so consumers can
325+
// re-assemble the original XADD argv.
302326
type streamEntryJSON struct {
303327
ID string `json:"id"`
304-
Fields map[string]string `json:"fields"`
328+
Fields []streamFieldJSON `json:"fields"`
305329
}
306330

307331
// streamMetaJSON is the dump-format projection of the final _meta
@@ -330,13 +354,16 @@ func marshalStreamJSONL(st *redisStreamState) ([]byte, error) {
330354
var buf bytes.Buffer
331355
const xaddPairWidth = 2 // (name, value) — XADD enforces even arity
332356
for _, e := range st.entries {
333-
fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth)
357+
fields := make([]streamFieldJSON, 0, len(e.fields)/xaddPairWidth)
334358
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
335-
fieldsMap[e.fields[i]] = e.fields[i+1]
359+
fields = append(fields, streamFieldJSON{
360+
Name: e.fields[i],
361+
Value: e.fields[i+1],
362+
})
336363
}
337364
rec := streamEntryJSON{
338365
ID: formatStreamID(e.ms, e.seq),
339-
Fields: fieldsMap,
366+
Fields: fields,
340367
}
341368
line, err := json.Marshal(rec)
342369
if err != nil {

internal/backup/redis_stream_test.go

Lines changed: 139 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,62 @@ func TestRedisDB_StreamRoundTripBasic(t *testing.T) {
169169
assertStreamMetaTerminator(t, meta, 3, 1714400000002, 0)
170170
}
171171

172-
// TestRedisDB_StreamFieldsDecodedToObject confirms that interleaved
172+
// streamFieldsPair is the decoded counterpart of streamFieldJSON used
173+
// in assertions.
174+
type streamFieldsPair struct{ name, value string }
175+
176+
// extractStreamFieldsAsPairs pulls the `fields` array out of a
177+
// decoded JSONL entry and returns it as a slice of (name, value)
178+
// pairs. Centralises the type assertions so the per-test bodies
179+
// stay below the cyclop ceiling and forcetypeassert lints don't
180+
// fire at every call site.
181+
func extractStreamFieldsAsPairs(t *testing.T, entry map[string]any) []streamFieldsPair {
182+
t.Helper()
183+
raw, ok := entry["fields"].([]any)
184+
if !ok {
185+
t.Fatalf("entry.fields = %T(%v), want array", entry["fields"], entry["fields"])
186+
}
187+
out := make([]streamFieldsPair, 0, len(raw))
188+
for i, r := range raw {
189+
rec, ok := r.(map[string]any)
190+
if !ok {
191+
t.Fatalf("entry.fields[%d] = %T(%v), want object", i, r, r)
192+
}
193+
name, ok := rec["name"].(string)
194+
if !ok {
195+
t.Fatalf("entry.fields[%d].name = %T(%v), want string", i, rec["name"], rec["name"])
196+
}
197+
value, ok := rec["value"].(string)
198+
if !ok {
199+
t.Fatalf("entry.fields[%d].value = %T(%v), want string", i, rec["value"], rec["value"])
200+
}
201+
out = append(out, streamFieldsPair{name: name, value: value})
202+
}
203+
return out
204+
}
205+
206+
// assertStreamFieldsEqual checks that the decoded fields array
207+
// matches the expected ordered list of (name, value) pairs.
208+
func assertStreamFieldsEqual(t *testing.T, got []streamFieldsPair, want []streamFieldsPair) {
209+
t.Helper()
210+
if len(got) != len(want) {
211+
t.Fatalf("len(fields) = %d, want %d", len(got), len(want))
212+
}
213+
for i := range want {
214+
if got[i] != want[i] {
215+
t.Fatalf("fields[%d] = %v, want %v", i, got[i], want[i])
216+
}
217+
}
218+
}
219+
220+
// TestRedisDB_StreamFieldsDecodedToArray confirms that interleaved
173221
// `[name1, value1, name2, value2, ...]` arrays from the protobuf
174-
// land as `{"name1": "value1", "name2": "value2"}` JSON objects per
175-
// the design example (line 338).
176-
func TestRedisDB_StreamFieldsDecodedToObject(t *testing.T) {
222+
// land as a JSONL `[{"name":...,"value":...}]` array. Switched
223+
// from the design's original map shape (line 338) in response to
224+
// codex's P1 on PR #791: XADD allows duplicate field names within
225+
// one entry (e.g. `XADD s * f v1 f v2`) and the map representation
226+
// silently collapsed them.
227+
func TestRedisDB_StreamFieldsDecodedToArray(t *testing.T) {
177228
t.Parallel()
178229
db, root := newRedisDB(t)
179230
if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 100, 5)); err != nil {
@@ -190,18 +241,92 @@ func TestRedisDB_StreamFieldsDecodedToObject(t *testing.T) {
190241
if len(entries) != 1 {
191242
t.Fatalf("entries = %d, want 1", len(entries))
192243
}
193-
fields, ok := entries[0]["fields"].(map[string]any)
194-
if !ok {
195-
t.Fatalf("entries[0].fields = %T(%v), want object", entries[0]["fields"], entries[0]["fields"])
244+
assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{
245+
{"event", "login"},
246+
{"user", "alice"},
247+
{"ip", "10.0.0.1"},
248+
})
249+
}
250+
251+
// TestRedisDB_StreamDuplicateFieldsPreserved pins the codex P1 fix:
252+
// XADD permits duplicate field names within one entry (e.g.
253+
// `XADD s * f v1 f v2` stores both pairs verbatim in the
254+
// protobuf's Fields slice). The original map-based projection
255+
// silently collapsed duplicates; the array shape preserves them
256+
// and a restore can replay the original XADD argv exactly.
257+
func TestRedisDB_StreamDuplicateFieldsPreserved(t *testing.T) {
258+
t.Parallel()
259+
db, root := newRedisDB(t)
260+
if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 1, 0)); err != nil {
261+
t.Fatal(err)
262+
}
263+
val := encodeStreamEntryValue(t, "1-0", []string{"f", "v1", "f", "v2", "g", "v3"})
264+
if err := db.HandleStreamEntry(streamEntryKey("s", 1, 0), val); err != nil {
265+
t.Fatal(err)
196266
}
197-
want := map[string]any{"event": "login", "user": "alice", "ip": "10.0.0.1"}
198-
if len(fields) != len(want) {
199-
t.Fatalf("fields = %v, want %v", fields, want)
267+
if err := db.Finalize(); err != nil {
268+
t.Fatal(err)
200269
}
201-
for k, v := range want {
202-
if fields[k] != v {
203-
t.Fatalf("fields[%q] = %v, want %v", k, fields[k], v)
204-
}
270+
entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl"))
271+
assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{
272+
{"f", "v1"},
273+
{"f", "v2"},
274+
{"g", "v3"},
275+
})
276+
}
277+
278+
// TestRedisDB_StreamTTLArrivesBeforeRows pins the codex P1 fix:
279+
// `!redis|ttl|<k>` lex-sorts BEFORE `!stream|...` because `r` <
280+
// `s`, so in real Pebble snapshot order the TTL arrives FIRST.
281+
// The encoder MUST buffer the expiry in pendingTTL and drain it
282+
// when streamState first registers the user key, inlining the
283+
// value into the JSONL `_meta.expire_at_ms`. Without this drain
284+
// every TTL'd stream would restore as permanent.
285+
func TestRedisDB_StreamTTLArrivesBeforeRows(t *testing.T) {
286+
t.Parallel()
287+
db, root := newRedisDB(t)
288+
// Snapshot order: TTL first, then meta + entry.
289+
if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil {
290+
t.Fatal(err)
291+
}
292+
if err := db.HandleStreamMeta(streamMetaKey("k"), encodeStreamMetaValue(1, 100, 0)); err != nil {
293+
t.Fatal(err)
294+
}
295+
val := encodeStreamEntryValue(t, "100-0", []string{"event", "login"})
296+
if err := db.HandleStreamEntry(streamEntryKey("k", 100, 0), val); err != nil {
297+
t.Fatal(err)
298+
}
299+
if err := db.Finalize(); err != nil {
300+
t.Fatal(err)
301+
}
302+
_, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "k.jsonl"))
303+
if streamFloat(t, meta, "expire_at_ms") != float64(fixedExpireMs) {
304+
t.Fatalf("meta.expire_at_ms = %v want %d — pendingTTL drain failed", meta["expire_at_ms"], fixedExpireMs)
305+
}
306+
}
307+
308+
// TestRedisDB_SetTTLArrivesBeforeRows pins the same ordering fix
309+
// for sets (`!redis|ttl|` lex-sorts before `!st|...` because
310+
// `r` < `s`). Retroactive coverage for PR #758, which shipped the
311+
// set encoder before the pendingTTL infrastructure existed.
312+
func TestRedisDB_SetTTLArrivesBeforeRows(t *testing.T) {
313+
t.Parallel()
314+
db, root := newRedisDB(t)
315+
if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil {
316+
t.Fatal(err)
317+
}
318+
if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil {
319+
t.Fatal(err)
320+
}
321+
if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil {
322+
t.Fatal(err)
323+
}
324+
if err := db.Finalize(); err != nil {
325+
t.Fatal(err)
326+
}
327+
got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json"))
328+
if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) {
329+
t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs)
205330
}
206331
}
207332

0 commit comments

Comments
 (0)