Skip to content

backup: Redis stream encoder (Phase 0a)#791

Open
bootjp wants to merge 6 commits into
mainfrom
backup/redis-stream-encoder
Open

backup: Redis stream encoder (Phase 0a)#791
bootjp wants to merge 6 commits into
mainfrom
backup/redis-stream-encoder

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented May 19, 2026

Summary

Adds the Redis stream encoder for the Phase 0 logical snapshot
decoder (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md
lines 336-344). Mirrors the hash/list/set/zset encoders shipped in
#725/#755/#758/#790. After this lands, the only Phase 0a Redis work
remaining is HLL TTL routing (the HLL blob path itself shipped in
#713; what's missing is hll_ttl.jsonl sidecar wiring per design
line 345-348).

Wire format mirrors store/stream_helpers.go and
adapter/redis_storage_codec.go:

  • !stream|meta|<userKeyLen(4)><userKey>
    24-byte BE Length(8) || LastMs(8) || LastSeq(8)
  • !stream|entry|<userKeyLen(4)><userKey><ms(8)><seq(8)>
    magic-prefixed pb.RedisStreamEntry protobuf
    (0x00 'R' 'X' 'E' 0x01 || pb.Marshal(...))

Output is JSONL — one record per line, sorted by (ms, seq) — plus a
trailing _meta terminator:

{"id":"1714400000000-0","fields":{"event":"login","user":"alice"}}
{"_meta":true,"length":2,"last_ms":1714400000001,"last_seq":0,"expire_at_ms":null}

The interleaved (name, value) protobuf field list decodes into the
"fields" JSON object matching the design example at line 338. JSONL
was chosen over per-entry files because real streams routinely hold
tens of thousands of entries — one file per entry would dominate
tar / find runtime by inode pressure.

Fail-closed behavior

  • Magic-prefix missing on entry valueErrRedisInvalidStreamEntry.
    Live store always writes the prefix; absence indicates corruption
    or a stale legacy value. Decoding raw protobuf without the prefix
    would either silently misparse or panic inside protobuf.
  • Odd field countErrRedisInvalidStreamEntry. Live XADD
    enforces even arity at the wire level; an odd count at backup time
    would silently drop the dangling field if we accepted it.
  • Meta value wrong length / overflowErrRedisInvalidStreamMeta
    (same shape as hash/list/set/zset overflow guards).
  • Entry key without trailing 16-byte StreamID
    ErrRedisInvalidStreamKey.

TTL routing

!redis|ttl|<userKey> for a registered stream folds into the JSONL
_meta.expire_at_ms field — design line 341-344's explicit
requirement. Without this routing, a TTL'd stream restores as
permanent.

Self-review (5 lenses)

  1. Data loss — magic-prefix, even-arity, overflow guards all fail
    closed. Field slice cloned at decode so snapshot buffer mutations
    can't bleed into emitted state.
  2. Concurrency / distributedRedisDB is sequential per scope;
    no shared state.
  3. Performance — per-stream slice (not map) for XADD-order
    accumulation; sort at flush is O(n log n) on (ms, seq). JSONL
    via bytes.Buffer single growth. Matches list/zset cost shape.
  4. Data consistency — entries sorted by (ms, seq) tuple, not
    string ("10-0" < "2-0" lexicographically would emit out of XADD
    order). _meta.last_ms / last_seq preserved verbatim so XADD
    * monotonicity survives restore.
  5. Test coverage — 14 table-driven tests under
    internal/backup/redis_stream_test.go.

Caller audit (per /loop standing instruction)

Semantics-changing edit: new case redisKindStream: branch in
HandleTTL (redis_string.go:309). Purely additive. Verified:

grep -n 'redisKindStream' internal/backup/
# internal/backup/redis_string.go:88
# internal/backup/redis_string.go:309
# internal/backup/redis_stream.go:170

Three references, all new in this PR. No prior caller maps to
redisKindStream.

Test plan

  • go test -race ./internal/backup/ → ok
  • golangci-lint run ./internal/backup/... → 0 issues
  • go build ./... → ok
  • go vet ./internal/backup/... → ok

Decodes !stream|meta|/!stream|entry| snapshot records into per-stream
streams/<key>.jsonl files per the Phase 0 design (lines 336-344).
Mirrors the hash/list/set/zset encoders (#725/#755/#758/#790).

Wire format:
- !stream|meta|<userKeyLen(4)><userKey>
    -> 24-byte BE Length(8) || LastMs(8) || LastSeq(8)
- !stream|entry|<userKeyLen(4)><userKey><ms(8)><seq(8)>
    -> magic-prefixed pb.RedisStreamEntry protobuf
      (0x00 'R' 'X' 'E' 0x01 || pb.Marshal(...))

Output is JSONL -- one record per line, sorted by (ms, seq) -- plus
a trailing _meta terminator that captures length, last_ms, last_seq,
and expire_at_ms (the design's pattern at line 338-339). The
interleaved (name, value) field list from the protobuf decodes into
the "fields" JSON object matching the design example. Per-line
JSONL was chosen over per-entry files because real streams routinely
hold tens of thousands of entries (one file per entry would dominate
tar + find runtime by inode pressure).

Fail-closed behavior:
- Magic-prefix missing on an entry value -> ErrRedisInvalidStreamEntry.
  The live store always writes the prefix; its absence indicates
  corruption or a stale legacy value. Decoding raw protobuf without
  the prefix would either silently misparse or panic inside protobuf.
- Odd field count -> ErrRedisInvalidStreamEntry. Live XADD enforces
  even arity at the wire level; an odd count at backup time would
  silently drop the dangling field if accepted.
- Meta value of wrong length / overflow -> ErrRedisInvalidStreamMeta
  (same shape as the hash/list/set/zset overflow guards).
- Entry key without the trailing 16-byte StreamID ->
  ErrRedisInvalidStreamKey.

TTL routing: !redis|ttl|<userKey> for a registered stream key folds
into the JSONL _meta terminator's expire_at_ms field, matching the
design's line 341-344 explicit requirement. Without this routing, a
TTL'd stream would silently restore as permanent.

Self-review:
1. Data loss -- magic-prefix, even-arity, and overflow guards all
   fail closed. Entry value is cloned (cloneStringSlice on the
   protobuf output) so a follow-up edit to the snapshot buffer
   cannot mutate emitted state.
2. Concurrency -- RedisDB is sequential per scope; no shared state.
3. Performance -- per-stream state in a slice (not a map) so XADD-
   order accumulation costs O(n); sort at flush is O(n log n) on
   (ms, seq). JSONL output is streamed via bytes.Buffer (single
   allocation grow). Matches list/zset cost shape.
4. Consistency -- entries sorted by (ms, seq) tuple, NOT by formatted
   string (sorting "10-0" vs "2-0" lexicographically would emit them
   out of XADD order); _meta last_ms/last_seq preserved verbatim so
   a restorer keeps XADD '*' monotonicity. JSONL terminator marker
   _meta:true is on a dedicated line so streaming consumers can
   detect end-of-stream without reading the whole file.
5. Coverage -- 14 table-driven tests under redis_stream_test.go:
   - round-trip basic (out-of-order entries, sorted at flush)
   - fields decoded to JSON object (design example match)
   - empty stream still emits file
   - TTL inlining into _meta.expire_at_ms
   - length-mismatch warning
   - malformed meta length / overflow / MaxInt64 boundary
   - missing magic prefix rejection
   - odd field count rejection
   - malformed entry key (wrong StreamID suffix length)
   - entries-without-meta still emit file
   - ID wire format ("<ms>-<seq>" decimal)
   - multi-stream user-key sort order

Caller audit for semantics-changing edit (new case redisKindStream
branch in HandleTTL, redis_string.go:309): purely additive -- the
new branch fires only when streamState() has previously registered
the key. No prior call site changes behavior. Verified via
'grep -n redisKindStream internal/backup/': three refs, all new in
this PR.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 19, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 12 minutes and 23 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: af0392c7-c6c5-453f-84be-bbded66955a2

📥 Commits

Reviewing files that changed from the base of the PR and between 8c0d07f and 421254e.

📒 Files selected for processing (5)
  • internal/backup/redis_set.go
  • internal/backup/redis_stream.go
  • internal/backup/redis_stream_test.go
  • internal/backup/redis_string.go
  • internal/backup/redis_string_test.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch backup/redis-stream-encoder

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 3m 45s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a)

Tasks

  • Gather context (PR description, changed files list)
  • Read internal/backup/redis_stream.go
  • Read internal/backup/redis_stream_test.go
  • Read internal/backup/redis_string.go diff
  • Read related files for context (stream_helpers, storage codec, existing encoders)
  • Review: correctness, error handling, data consistency
  • Review: performance & memory
  • Review: test coverage
  • Post final review

The encoder follows the established hash/list/set pattern cleanly and the fail-closed guards, overflow checks, and TTL routing are all correct. Two data-loss issues need fixing before merge; everything else is minor.


Critical — Data loss

1. Duplicate field names silently dropped (redis_stream.go:333-336)

fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth)
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
    fieldsMap[e.fields[i]] = e.fields[i+1]   // last-write-wins for dupe names
}

Redis streams explicitly allow duplicate field names within a single entry (unlike hashes). XADD key * event login event logout produces an entry with two event fields, and pb.RedisStreamEntry.Fields is repeated string — an ordered list — precisely to preserve that ordering. Converting to map[string]string silently drops all but the last value for any repeated name. The backup would produce a JSONL that cannot restore the original entry faithfully.

The hash encoder anticipates this with its array-of-records shape and the comment: "Redis hash field names are binary-safe and JSON object keys are not." The same argument applies here. The design doc's example ({"event":"login","user":"alice"}) uses non-colliding names; a format-version bump to [{"name":…,"value":…}] as the PR itself suggests is the right fix. Alternatively, document that streams with duplicate field names produce corrupt backups — but that's a worse outcome.

Per CLAUDE.md convention: a failing test should accompany the fix. e.g.:

// TestRedisDB_StreamDuplicateFieldNamesPreserved pins that duplicate
// field names within one entry survive the encode/decode round-trip.
val := encodeStreamEntryValue(t, "1-0", []string{"k", "v1", "k", "v2"})

Fix this →

2. Non-UTF-8 field names and values silently corrupted (redis_stream.go:302-305)

type streamEntryJSON struct {
    ID     string            `json:"id"`
    Fields map[string]string `json:"fields"`  // ← string, not binary-safe
}

Go's json.Marshal for string coerces invalid UTF-8 bytes to the Unicode replacement character (U+FFFD). Any stream entry whose field name or value contains non-UTF-8 bytes (e.g., a Protobuf-encoded payload, a UUID with raw bytes, arbitrary sensor data) will have those bytes silently destroyed on backup. The hash encoder solves this identically via marshalRedisBinaryValue, which emits {"base64":"…"} for non-UTF-8 values. The same treatment is needed here.

The PR description argues stream field names are "strings rather than binary-safe bytes" — but the live adapter/redis_storage_codec.go stores and retrieves them as []byte via the same protobuf repeated string fields field that hashes use. There is no ASCII/UTF-8 constraint enforced at the wire level.

Fix this →


Medium — Style consistency

3. flushStreams duplicates the flushWideColumnDir boilerplate (redis_stream.go:249-276)

Every other wide-column encoder (flushHashes, flushLists, flushSets) delegates to flushWideColumnDir. flushStreams hand-rolls the identical pattern: nil-check, ensureDir, sorted key iteration, per-key callback. Given that flushWideColumnDir[T any] accepts any state type, streams slot in directly:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch", ...)
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

Not a bug, but the divergence means a future change to flushWideColumnDir (error policy, security check, etc.) won't automatically propagate to streams.

Fix this →


Minor

4. In-place sort of st.entries contradicts the clone-at-decode rationale (redis_stream.go:323-329)

marshalStreamJSONL calls sort.Slice(st.entries, …), mutating the live state. The self-review point 1 notes that "field slice cloned at decode so snapshot buffer mutations can't bleed into emitted state" — but that same care isn't applied to the entries slice itself. Currently harmless (flush is a one-shot path), but inconsistent. A sort.Slice on a copy — or switching to slices.SortedFunc over a copy — would be strictly consistent.

5. _meta last_ms/last_seq are zero-valued when no meta record arrived

When only entries arrive (test TestRedisDB_StreamEntriesWithoutMetaStillEmitFile), st.metaSeen == false, st.lastMs == 0, st.lastSeq == 0. The emitted _meta line then asserts "last_ms": 0, "last_seq": 0, which a restorer might interpret as "stream never had an entry with ms > 0". This is arguably correct (unknown = zero), but a comment in marshalStreamJSONL explaining the invariant would help a reader of the output format.

6. Large uint64 values lose precision in standard JSON readers (informational)

streamMetaJSON.LastMs, LastSeq are uint64, and streamMetaJSON.Length is int64. Values above 2^53 cannot be represented exactly as IEEE 754 doubles. Standard json.Unmarshal into any (as the tests do with map[string]any) will silently round these. A restorer using json.Decoder.UseNumber() avoids this; worth a note in the format doc or an inline comment.


What's good

  • All four error sentinels (ErrRedisInvalidStreamMeta, ErrRedisInvalidStreamEntry, ErrRedisInvalidStreamKey) are correctly wrapped and cover every parse boundary.
  • (ms, seq) tuple sort is correct — the comment about "10-0" < "2-0" lexicographically is exactly the right justification.
  • TTL routing through kindByKey is clean and consistent with hash/list/set.
  • The 14 table-driven tests cover the happy path, error paths, boundary conditions, and warn-sink contract well. The assertStreamMetaTerminator helper keeps per-test bodies tight.
  • cloneStringSlice in decodeStreamEntryValue is the right defensive move against protobuf buffer reuse.
  • Magic-prefix check before protobuf unmarshal is the correct fail-closed order.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b2d0b82c74

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +309 to +315
case redisKindStream:
// Same per-record TTL inlining: XADD + EXPIRE replay in
// one shot from the per-stream JSONL `_meta` terminator,
// no separate sidecar.
st := r.streamState(userKey)
st.expireAtMs = expireAtMs
st.hasTTL = true
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Buffer stream TTLs that arrive before stream keys

When processing the sorted snapshot stream, !redis|ttl|<key> sorts before !stream|meta|.../!stream|entry|..., so kindByKey is still redisKindUnknown when HandleTTL sees a TTL for a stream. This new stream case only works if the caller happens to dispatch stream records before the TTL record; in the normal sorted pipeline the TTL is counted as an orphan and the later _meta line is emitted with expire_at_ms:null, so TTL'd streams restore as permanent.

Useful? React with 👍 / 👎.

Comment thread internal/backup/redis_stream.go Outdated
Comment on lines +333 to +335
fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth)
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
fieldsMap[e.fields[i]] = e.fields[i+1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve duplicate stream fields instead of map-collapsing

For streams created with duplicate field names (e.g. XADD s * f v1 f v2), the adapter stores and returns the interleaved entry.Fields slice verbatim, but this projection overwrites earlier values in a map[string]string. Such entries are valid because parseXAddFields only enforces even arity, so the backup silently drops duplicate field/value pairs and a restore would not reproduce the stream entry.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a Redis stream encoder that translates raw snapshot records into per-stream JSONL files, including support for metadata, entries, and TTL inlining. The reviewer provided several performance-oriented suggestions aimed at reducing memory allocations and GC pressure during scanning. These include pre-allocating prefix byte slices, optimizing map lookups to avoid redundant string conversions, and utilizing json.Encoder with reused maps to process large streams more efficiently.

redisStreamProtoPrefixLen = 5
)

var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To avoid repeated []byte(string) allocations during snapshot scanning, these prefixes should be pre-allocated as package-level byte slices, following the pattern established in store/stream_helpers.go.

Suggested change
var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01}
var (
redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01}
redisStreamMetaPrefixBytes = []byte(RedisStreamMetaPrefix)
redisStreamEntryPrefixBytes = []byte(RedisStreamEntryPrefix)
)

Comment on lines +164 to +167
uk := string(userKey)
if st, ok := r.streams[uk]; ok {
return st
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This lookup allocates a new string for every stream entry processed. Since Redis streams can contain tens of thousands of entries, this creates significant GC pressure. Using the m[string(b)] compiler optimization for the initial check avoids allocation for all entries after the first one for each stream.

Suggested change
uk := string(userKey)
if st, ok := r.streams[uk]; ok {
return st
}
if st, ok := r.streams[string(userKey)]; ok {
return st
}
uk := string(userKey)

// family — streams update meta in-place rather than via per-XADD
// deltas — so we do not need a delta-skip guard here.
func parseStreamMetaKey(key []byte) ([]byte, bool) {
rest := bytes.TrimPrefix(key, []byte(RedisStreamMetaPrefix))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the pre-allocated byte slice to avoid conversion overhead.

Suggested change
rest := bytes.TrimPrefix(key, []byte(RedisStreamMetaPrefix))
rest := bytes.TrimPrefix(key, redisStreamMetaPrefixBytes)

// userKeyLen prefix, then peels off the trailing 16-byte StreamID
// (ms || seq). Returns (userKey, ms, seq, true) on success.
func parseStreamEntryKey(key []byte) ([]byte, uint64, uint64, bool) {
rest := bytes.TrimPrefix(key, []byte(RedisStreamEntryPrefix))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the pre-allocated byte slice to avoid conversion overhead.

Suggested change
rest := bytes.TrimPrefix(key, []byte(RedisStreamEntryPrefix))
rest := bytes.TrimPrefix(key, redisStreamEntryPrefixBytes)

Comment on lines +330 to +363
var buf bytes.Buffer
const xaddPairWidth = 2 // (name, value) — XADD enforces even arity
for _, e := range st.entries {
fieldsMap := make(map[string]string, len(e.fields)/xaddPairWidth)
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
fieldsMap[e.fields[i]] = e.fields[i+1]
}
rec := streamEntryJSON{
ID: formatStreamID(e.ms, e.seq),
Fields: fieldsMap,
}
line, err := json.Marshal(rec)
if err != nil {
return nil, cockroachdberr.WithStack(err)
}
buf.Write(line)
buf.WriteByte('\n')
}
meta := streamMetaJSON{
Meta: true,
Length: st.length,
LastMs: st.lastMs,
LastSeq: st.lastSeq,
}
if st.hasTTL {
ms := st.expireAtMs
meta.ExpireAtMs = &ms
}
line, err := json.Marshal(meta)
if err != nil {
return nil, cockroachdberr.WithStack(err)
}
buf.Write(line)
buf.WriteByte('\n')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For large streams, using json.Encoder is more efficient than json.Marshal as it writes directly to the buffer and avoids intermediate byte slice allocations. Additionally, reusing the fieldsMap across entries significantly reduces allocations in the hot loop. Ensure that the number of entries in the stream is bounded to prevent potential OOM issues.

	var buf bytes.Buffer
	enc := json.NewEncoder(&buf)
	enc.SetEscapeHTML(false)
	const xaddPairWidth = 2 // (name, value) — XADD enforces even arity
	fieldsMap := make(map[string]string)
	for _, e := range st.entries {
		for k := range fieldsMap {
			delete(fieldsMap, k)
		}
		for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
			fieldsMap[e.fields[i]] = e.fields[i+1]
		}
		rec := streamEntryJSON{
			ID:     formatStreamID(e.ms, e.seq),
			Fields: fieldsMap,
		}
		if err := enc.Encode(rec); err != nil {
			return nil, cockroachdberr.WithStack(err)
		}
	}
	meta := streamMetaJSON{
		Meta:    true,
		Length:  st.length,
		LastMs:  st.lastMs,
		LastSeq: st.lastSeq,
	}
	if st.hasTTL {
		ms := st.expireAtMs
		meta.ExpireAtMs = &ms
	}
	if err := enc.Encode(meta); err != nil {
		return nil, cockroachdberr.WithStack(err)
	}
References
  1. To prevent unbounded memory growth and potential OOM issues, apply a fixed bound to collections that can grow from external requests.

bootjp added a commit that referenced this pull request May 19, 2026
…d records

Pebble snapshots are emitted in encoded-key order
(store/snapshot_pebble.go::iter.First()+Next()), and `!redis|ttl|`
lex-sorts BEFORE every wide-column prefix where the type letter is
>= 's' (`!st|` set, `!stream|`, `!zs|` zset — because `r` < `s`/`z`).
The original HandleTTL routed any unknown-kind expiry straight into
orphanTTLCount, so for sets, streams, and zsets the TTL was DROPPED
before zsetState/setState/streamState could claim it. Restored
sets/zsets/streams with TTL became permanent.

Codex P1 finding on PR #790. The same bug exists in the already-
merged set encoder (PR #758); this commit fixes both retroactively.
Stream encoder (PR #791) inherits the fix once rebased.

Fix: HandleTTL parks unknown-kind expiries in a new pendingTTL
map. Each wide-column state-init that may face the bad ordering
(setState, zsetState — and streamState once PR #791 lands) drains
the entry on first user-key registration via claimPendingTTL().
Finalize counts whatever remains in pendingTTL as truly-unmatched
orphans (a TTL whose user key never appeared in any typed record —
indicates store corruption or an unknown type prefix).

Semantic-change caller audit (per /loop standing instruction):
- HandleTTL's redisKindUnknown branch: previously incremented
  orphanTTLCount immediately; now buffers and lets Finalize count.
  All callers: only the per-record dispatcher in
  cmd/elastickv-snapshot-decode (not yet built — Phase 0a follow-
  up). No external caller mutates orphanTTLCount today.
- TestRedisDB_OrphanTTLCountedNotBuffered: updated to assert
  intake-time orphanTTLCount==0 + pendingTTL grows, then post-
  Finalize orphanTTLCount==n.
- New caller claimPendingTTL: called only by zsetState and setState
  in this PR. hashState/listState don't call it because their
  type prefixes (`!hs|`/`!lst|`) lex-sort BEFORE `!redis|ttl|`
  so the typed record arrives first; pendingTTL is always empty
  for them. Verified via `grep -n 'claimPendingTTL' internal/backup/`.

New tests:
- TestRedisDB_ZSetTTLArrivesBeforeRows — pins the fix for zsets.
- TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for
  PR #758's set encoder.
- TestRedisDB_OrphanTTLCountsTrulyUnmatchedKeys — pins the new
  Finalize-time orphan semantics.

Self-review:
1. Data loss — the original code DROPPED real TTL'd
   sets/zsets/streams on every backup. This fix recovers them.
   No new data-loss surface introduced.
2. Concurrency — pendingTTL is added to RedisDB which is already
   sequential-per-scope; no new locking required.
3. Performance — pendingTTL holds (string-userKey, uint64-expireAt)
   pairs. The string allocation cost matches kindByKey's, which
   we already pay for every typed record. The original P2 round
   6 concern (don't buffer arbitrarily-large value payloads) is
   preserved: we still only buffer 8-byte expiry, not value bytes.
4. Consistency — drain happens at FIRST state registration so a
   later HandleTTL re-arrival (which would route through the
   redisKindSet/ZSet case in HandleTTL) is a no-op (st.expireAtMs
   gets overwritten with the same value).
5. Coverage — 3 new tests + 1 updated test. All 64 redis tests pass.
@claude claude Bot mentioned this pull request May 19, 2026
4 tasks
…ield preservation

Two P1 findings from chatgpt-codex on PR #791:

P1a: Buffer stream TTLs that arrive before stream rows

Pebble snapshots emit records in encoded-key order
(store/snapshot_pebble.go::iter.First()+Next()), and
`!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < `s`.
In real snapshot order the TTL arrives FIRST, kindByKey is still
redisKindUnknown when HandleTTL fires, and the original code
counted the TTL as an orphan and dropped it — every TTL'd stream
restored as permanent.

Same root cause as the set encoder's latent bug in PR #758. This
commit adds a pendingTTL infrastructure (matching the parallel fix
on PR #790) so the expiry parks during the redisKindUnknown
window and drains when streamState first registers the user key.
The set encoder gets the same retroactive drain.

P1b: Preserve duplicate stream fields instead of map-collapsing

XADD permits duplicate field names within one entry (e.g.
`XADD s * f v1 f v2`). The protobuf entry stores the interleaved
slice verbatim, but my marshalStreamJSONL collapsed pairs into
`map[string]string`, silently dropping every duplicate. A restore
of such an entry would lose the second (and later) pair.

Fix: emit `fields` as a JSON ARRAY of `{name, value}` records
(streamFieldJSON). Order is the protobuf's interleaved order so
a restore can replay the original XADD argv exactly.

The design example at
docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338
showed object form. That representation was unsafe for streams
(though fine for hashes where the wire-level encoder normalises
field names earlier). The format is owned by Phase 0 — adjusted
in this PR before the format ships any consumers.

Caller audit (per /loop standing instruction):
- HandleTTL's redisKindUnknown branch: same semantic change as
  PR #790's r1 — previously incremented orphanTTLCount on intake;
  now buffers in pendingTTL and lets Finalize count at end.
  Same audit conclusion: no external callers of orphanTTLCount;
  TestRedisDB_OrphanTTLCountedNotBuffered updated to assert the
  new intake/Finalize split.
- streamEntryJSON.Fields type change `map → slice`: only
  marshalled by encoding/json; the only reader is the test suite,
  which is updated in this commit. No on-disk format compatibility
  concerns because Phase 0 has not shipped a consumer yet.
- New caller claimPendingTTL: called by setState (retroactive) and
  streamState (new) in this PR. hashState/listState don't call it
  because their type prefixes lex-sort BEFORE `!redis|ttl|`.
  Verified via `grep -n 'claimPendingTTL' internal/backup/`.

New tests:
- TestRedisDB_StreamDuplicateFieldsPreserved — pins P1b fix.
- TestRedisDB_StreamTTLArrivesBeforeRows — pins P1a fix for streams.
- TestRedisDB_SetTTLArrivesBeforeRows — retroactive coverage for
  PR #758's set encoder (same root cause as the stream bug).
- TestRedisDB_StreamFieldsDecodedToArray (renamed from
  ToObject) — updated to match the array shape.

Self-review:
1. Data loss — the original code DROPPED real TTL'd streams on
   every backup AND dropped duplicate-field entries' later pairs.
   This fix recovers both. No new data-loss surface introduced.
2. Concurrency — pendingTTL added to RedisDB which is already
   sequential-per-scope; no new locking required.
3. Performance — pendingTTL holds (string-userKey, uint64-expireAt)
   pairs; same allocation shape as kindByKey. Fields slice
   replaces a map of the same logical size — slightly cheaper
   actually (no hash overhead).
4. Consistency — drain happens at FIRST state registration. The
   array form preserves insertion order from the protobuf so the
   restored XADD argv matches.
5. Coverage — 4 new tests + 2 updated. All 78 redis tests pass.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 2m 35s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a)

Tasks

  • Gather context (PR description, changed files list)
  • Read internal/backup/redis_stream.go
  • Read internal/backup/redis_stream_test.go
  • Read internal/backup/redis_string.go diff
  • Read related context files (redis_set.go, redis_hash.go, marshalRedisBinaryValue)
  • Review: P1 correctness issues (TTL ordering, duplicate fields, binary safety)
  • Review: performance & memory
  • Review: test coverage gaps
  • Post final review

The encoder is well-structured and the two P1 issues from the previous review round have both been fixed and tested. One data-loss issue remains open. Everything else is minor/informational.


Previously Fixed — Confirmed

Duplicate field names (prior P1): streamFieldJSON now uses []streamFieldJSON array-of-records rather than map[string]string. TestRedisDB_StreamDuplicateFieldsPreserved pins the fix. ✓

TTL arrives before stream rows (Codex P1): streamState now calls claimPendingTTL on first registration, draining pendingTTL into st.expireAtMs/st.hasTTL. HandleTTL parks unknown-kind TTLs in pendingTTL and streamState drains them. TestRedisDB_StreamTTLArrivesBeforeRows and TestRedisDB_StreamTTLInlinedFromScanIndex cover both arrival orderings. ✓


Critical — Data loss

Non-UTF-8 field names and values silently corrupted (redis_stream.go:316-319)

streamFieldJSON uses plain string for both name and value:

type streamFieldJSON struct {
    Name  string `json:"name"`
    Value string `json:"value"`
}

Go's json.Marshal silently replaces invalid UTF-8 bytes with the Unicode replacement character (U+FFFD). Redis stream field names and values are stored as []byte in pb.RedisStreamEntry.Fields (a repeated string field in protobuf, which is bytes-on-the-wire). Any stream entry whose field name or value contains non-UTF-8 bytes (raw binary payloads, sensor data, legacy encodings) will be silently corrupted in the backup JSONL — those bytes cannot be recovered at restore time.

The hash encoder solves this exactly: hashFieldRecord uses json.RawMessage for both Name and Value, populated by marshalRedisBinaryValue which emits {"base64":"..."} for non-UTF-8 content (redis_hash.go:235-238). The stream encoder needs the same treatment:

// Change streamFieldJSON to mirror hashFieldRecord:
type streamFieldJSON struct {
    Name  json.RawMessage `json:"name"`
    Value json.RawMessage `json:"value"`
}

// In marshalStreamJSONL, replace the field construction loop:
for i := 0; i+1 < len(e.fields); i += xaddPairWidth {
    nameJSON, err := marshalRedisBinaryValue([]byte(e.fields[i]))
    if err != nil {
        return nil, err
    }
    valueJSON, err := marshalRedisBinaryValue([]byte(e.fields[i+1]))
    if err != nil {
        return nil, err
    }
    fields = append(fields, streamFieldJSON{Name: nameJSON, Value: valueJSON})
}

Per CLAUDE.md convention, a failing test should come first — e.g.:

// TestRedisDB_StreamNonUTF8FieldsRoundTrip pins that a stream entry
// whose field name or value contains non-UTF-8 bytes survives the
// encode step without silent U+FFFD replacement.
val := encodeStreamEntryValue(t, "1-0", []string{"\xff\xfe", "\x00\x01\x02"})

The readStreamJSONLextractStreamFieldsAsPairs chain would need to handle the {"base64":"..."} shape, but locking the format in a test is the right investment before the decoder is written.

Fix this →


Medium — Style consistency

flushStreams duplicates the flushWideColumnDir boilerplate (redis_stream.go:260-287)

flushSets, flushLists, and flushHashes all delegate to the generic flushWideColumnDir[T]. flushStreams hand-rolls the same pattern: nil-check, ensureDir, sorted user-key slice, per-key callback. A future change to flushWideColumnDir (error policy, security check) won't propagate here automatically. *redisStreamState slots directly into the generic:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch",
                "user_key_len", len(uk),
                "declared_len", st.length,
                "observed_entries", len(st.entries),
                "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key")
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

Fix this →


Minor

In-place sort of st.entries mutates live state (redis_stream.go:347)

sort.Slice(st.entries, ...) mutates the accumulated slice in-place. Currently harmless (flush is one-shot), but inconsistent with the cloneStringSlice rationale in decodeStreamEntryValue ("field slice cloned so snapshot buffer mutations can't bleed into emitted state"). Sort a shallow copy for consistency.

Large uint64 values lose precision in JSON readers (informational)

streamMetaJSON.LastMs and LastSeq are uint64 encoded as JSON numbers. Values above 2^53 can't be represented as IEEE 754 doubles; a restorer using json.Unmarshal into any (as the test helpers do) would silently round them. For timestamps this won't matter in practice (~year 287,000). Worth a one-line comment on streamMetaJSON noting that restorers should use json.Decoder.UseNumber() for strict fidelity. Note also: the assertStreamMetaTerminator test helper compares via float64 — assertions on seq values above 2^53 would silently pass even if rounded.


What's good

  • The pendingTTL / claimPendingTTL infrastructure is a clean, general solution that also retroactively fixes set TTL ordering (backup: Redis set encoder (Phase 0a) #758).
  • The (ms, seq) tuple sort is correct with the right justification.
  • cloneStringSlice defensive copy in decodeStreamEntryValue is appropriate.
  • Magic-prefix check before protobuf unmarshal is fail-closed in the right order.
  • All four error sentinels are correctly wrapped.
  • 14 table-driven tests cover the happy path, all four error types, TTL inlining (both orderings), empty stream, length-mismatch warning, and sort determinism.
  • TestRedisDB_StreamDuplicateFieldsPreserved and TestRedisDB_StreamTTLArrivesBeforeRows are the right regression pins for the two fixed P1s.

bootjp added a commit that referenced this pull request May 19, 2026
…lob layout

Codex flagged that the wide-column zset encoder skips the legacy
consolidated single-key blob layout the live store still writes.
A zset stored only as `!redis|zset|<userKey>` (with the magic-
prefixed pb.RedisZSetValue body) is silently dropped from backup
output and any inline TTL becomes an orphan — user-visible
sorted-set data loss.

Live-side references (adapter, not changed by this commit):
- adapter/redis_compat_types.go:82 — redisZSetPrefix
- adapter/redis_compat_commands.go:3495-3508 — writes the blob for
  non-empty persisted zset updates
- adapter/redis_compat_helpers.go:610-631 — reads it as the
  fallback when no wide-column members exist

Fix: new public RedisDB.HandleZSetLegacyBlob method that decodes
the magic-prefixed pb.RedisZSetValue and registers the same per-
member state HandleZSetMember would. The wide-column merge case
(mid-migration snapshot containing BOTH layouts for the same user
key) works because `!redis|zset|` sorts BEFORE `!zs|...` so the
blob arrives first and wide-column rows then update / add members
via the latest-wins map.

Inline TTL: `!redis|zset|<k>` sorts BEFORE `!redis|ttl|<k>`, so
HandleTTL after this handler sees redisKindZSet already and
folds via the case-redisKindZSet branch. No pendingTTL detour
needed for this ordering.

Fail-closed contract (matches existing wide-column path):
- Missing magic prefix → ErrRedisInvalidZSetLegacyBlob
- Unmarshal error    → ErrRedisInvalidZSetLegacyBlob
- NaN score          → ErrRedisInvalidZSetLegacyBlob (Redis ZADD
                       rejects NaN at wire level)

Caller audit (per /loop standing instruction): new public method
HandleZSetLegacyBlob has no external callers. Verified via
'grep -rn HandleZSetLegacyBlob --include=*.go' — all matches inside
the test file in this PR. The cmd/elastickv-snapshot-decode
dispatcher (Phase 0a follow-up, not yet built) will route the
`!redis|zset|` prefix to this handler.

Parallel bug class: the SAME issue exists for `!redis|hash|`,
`!redis|set|`, and `!redis|stream|` legacy blob prefixes. Those
encoders shipped in earlier PRs (#725, #758, #791). Each needs
its own legacy-blob handler in a follow-up PR; this commit fixes
only the zset case codex flagged on PR #790.

New tests:
- TestRedisDB_ZSetLegacyBlobRoundTrip — basic round-trip
- TestRedisDB_ZSetLegacyBlobThenWideColumnMerges — mid-migration
- TestRedisDB_ZSetLegacyBlobWithInlineTTL — TTL ordering
- TestRedisDB_ZSetLegacyBlobRejectsMissingMagic — fail-closed
- TestRedisDB_ZSetLegacyBlobRejectsNaNScore — fail-closed
- TestRedisDB_ZSetLegacyBlobRejectsMalformedKey — fail-closed

Self-review:
1. Data loss — exact opposite: this commit RECOVERS zsets that
   were silently dropped. New fail-closed guards prevent silently
   importing a corrupt blob.
2. Concurrency — no new shared state; per-DB sequential as before.
3. Performance — one protobuf Unmarshal per legacy zset key
   (same as the live read path). Member map shares the same
   latest-wins behavior as wide-column intake.
4. Consistency — merge order (blob first, wide-column second) is
   determined by snapshot lex order; tested explicitly.
5. Coverage — 6 new tests. All 84 redis tests pass.
…fields

Claude-bot Critical finding (PR #791 round 2): the stream encoder
emitted field names and values as plain Go strings, and
`json.Marshal(string)` silently substitutes U+FFFD for every
ill-formed UTF-8 byte sequence. A future schema migration switching
`pb.RedisStreamEntry.Fields` from `repeated string` to `bytes`, or
a code path that bypasses the proto marshaler's UTF-8 validation,
would surface as silent backup corruption of binary stream
payloads (sensor data, legacy encodings, raw bytes).

The hash encoder already solves this via `marshalRedisBinaryValue`
+ `json.RawMessage`, which emits non-UTF-8 bytes as
`{"base64":"..."}` envelopes (redis_hash.go:235-238). This commit
applies the same projection to streams.

Changes:
- streamFieldJSON.Name/Value: string -> json.RawMessage.
- New buildStreamFieldRecords helper: extracted the per-pair
  marshaling out of marshalStreamJSONL so the projection is
  unit-testable independently of the protobuf pipeline (proto3
  string fields enforce UTF-8 at gproto.Marshal so we cannot push
  binary bytes through that path).
- extractStreamFieldsAsPairs test helper now accepts EITHER a
  plain JSON string OR the `{"base64":"..."}` envelope, hiding the
  per-pair envelope detection from per-test assertions. A new
  decodeRedisBinaryEnvelope helper handles the base64url reversal.

Caller audit (per /loop standing instruction): buildStreamFieldRecords
is private to the package. streamFieldJSON's two-field shape did
not change publicly visible behavior (the JSON output for UTF-8
fields is byte-identical to before; only non-UTF-8 fields now route
through the envelope). The only consumer of streamFieldJSON is
marshalStreamJSONL (line 369). Verified via
`grep -rn buildStreamFieldRecords|streamFieldJSON internal/backup/`
- all matches inside the same file or the test file.

New test:
- TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip exercises
  buildStreamFieldRecords directly with non-UTF-8 input. Marshals
  one streamEntryJSON, re-parses, and asserts byte-identical
  round-trip via the binary envelope. Existing
  TestRedisDB_StreamFieldsDecodedToArray and
  TestRedisDB_StreamDuplicateFieldsPreserved still pin the UTF-8
  plain-string path.

Self-review:
1. Data loss - opposite of the original concern: this commit
   preserves binary stream bytes that the previous code would have
   mangled into U+FFFD. The protobuf wire format prevents the bad
   bytes from reaching us today, but the projection now matches
   the hash encoder's defensive shape.
2. Concurrency - no new shared state.
3. Performance - one extra allocation per (name, value) pair for
   the marshaled RawMessage. Matches hash encoder's cost shape;
   bounded by maxWideColumnItems on the live side.
4. Consistency - stream encoder's binary-safe handling now matches
   the hash encoder's. A future format-version bump can canonicalize
   the JSON projection across all wide-column types.
5. Coverage - 1 new test pinning the projection; existing
   round-trip and duplicate-fields tests confirm no UTF-8 regression.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 3m 6s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 3

Tasks

  • Read trigger comment and prior review context
  • Read current internal/backup/redis_stream.go
  • Read current internal/backup/redis_stream_test.go
  • Read internal/backup/redis_string.go (modified)
  • Read internal/backup/redis_set.go (modified)
  • Check: non-UTF-8 field corruption fix status
  • Check: flushStreams boilerplate dedup status
  • Review: any new issues
  • Post final review

All three P1 issues from prior rounds are now fixed and regression-tested. One medium issue from both previous reviews is still open. Everything else is minor.


Previously Critical — All Confirmed Fixed

Duplicate field namesstreamFieldJSON now uses []streamFieldJSON (array-of-records with json.RawMessage). TestRedisDB_StreamDuplicateFieldsPreserved pins the round-trip. ✓

Non-UTF-8 field corruptionstreamFieldJSON.Name/Value are json.RawMessage populated by marshalRedisBinaryValue in buildStreamFieldRecords (redis_stream.go:418-432). TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip pins the projection directly. ✓

TTL arrives before stream rowsstreamState calls claimPendingTTL on first registration; TestRedisDB_StreamTTLArrivesBeforeRows and TestRedisDB_StreamTTLInlinedFromScanIndex cover both orderings. setState in redis_set.go:131-134 retroactively applies the same fix to sets. ✓


Medium — Still Open

flushStreams duplicates the flushWideColumnDir boilerplate (redis_stream.go:260-287)

This has been called out in both previous rounds and remains unfixed. flushSets, flushLists, and flushHashes all delegate to the generic flushWideColumnDir[T]. flushStreams still hand-rolls the identical nil-check / ensureDir / sorted-key-slice / per-key-callback pattern. *redisStreamState slots in directly:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch",
                "user_key_len", len(uk),
                "declared_len", st.length,
                "observed_entries", len(st.entries),
                "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key")
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

A future change to flushWideColumnDir (error policy, security check) won't propagate to the hand-rolled copy.

Fix this →


Minor

Stale file-header comment still shows old map-based field format (redis_stream.go:41-43)

The package doc example shows "fields":{"event":"login","user":"alice"} — the map shape from the design doc that was intentionally abandoned in favour of the [{"name":...,"value":...}] array. A reader scanning the file header would get a misleading picture of the actual output format.

// {"id":"1714400000000-0","fields":{"event":"login","user":"alice"}}

Should be updated to reflect the current array shape, e.g.:

// {"id":"1714400000000-0","fields":[{"name":"event","value":"login"},{"name":"user","value":"alice"}]}

In-place sort of st.entries mutates live state (redis_stream.go:359)

sort.Slice(st.entries, ...) still sorts the accumulated slice in-place. Currently harmless (flush is one-shot), but inconsistent with the cloneStringSlice rationale in decodeStreamEntryValue.

buildStreamFieldRecords binary-safety is a future-only guard (informational)

The test comment at redis_stream_test.go:292-295 correctly identifies a subtlety worth noting in the production code too: proto3 string fields enforce UTF-8 — gproto.Marshal rejects non-UTF-8 bytes — so the {"base64":"..."} branch in the JSONL output cannot be triggered via the live store path today. The marshalRedisBinaryValue guard is the right defensive move (it costs nothing and protects against a future Fields bytes schema migration), but a one-line comment on buildStreamFieldRecords clarifying this would help a reader understand why the test goes directly to buildStreamFieldRecords rather than through HandleStreamEntry.

Large uint64 JSON precision (informational, carried from prior rounds)

streamMetaJSON.LastMs / LastSeq are uint64; assertStreamMetaTerminator compares them as float64. Values above 2^53 silently round in both json.Unmarshal and the test assertions. Not a practical risk for current timestamps, but worth a one-line comment on streamMetaJSON noting that restorers should use json.Decoder.UseNumber() for strict fidelity.


What's good

  • All three prior P1s fixed cleanly with companion regression tests per CLAUDE.md convention.
  • buildStreamFieldRecords + marshalRedisBinaryValue is the correct binary-safe projection, exactly mirroring hashFieldRecord.
  • pendingTTL / claimPendingTTL infrastructure is a general solution that also retroactively corrects the set TTL ordering bug from backup: Redis set encoder (Phase 0a) #758.
  • TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip correctly tests the buildStreamFieldRecords function directly (since gproto.Marshal would reject the bytes) and uses extractStreamFieldsAsPairs + decodeRedisBinaryEnvelope to verify the full round-trip. The helpers are clean and reusable.
  • TestRedisDB_StreamDuplicateFieldsPreserved, TestRedisDB_StreamTTLArrivesBeforeRows, and TestRedisDB_SetTTLArrivesBeforeRows are the correct regression pins.
  • (ms, seq) tuple sort is correct with the right justification; cloneStringSlice defensive copy is appropriate.
  • All four error sentinels are correctly wrapped and cover every parse boundary.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e5bd65048f

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread internal/backup/redis_string.go Outdated
// allocates) rather than the alias slice — the snapshot
// reader reuses key buffers across iterations, so a slice
// alias would race with the next record.
r.pendingTTL[string(userKey)] = expireAtMs
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid buffering all unknown TTL keys in memory

The new redisKindUnknown branch stores every unmatched TTL in pendingTTL, which keeps a copied string(userKey) alive until Finalize. In this repository snapshot there is still no zset backup handler under internal/backup, so TTLs for expiring sorted sets never get drained and accumulate for the entire run; on large datasets this turns the previous constant-space orphan path into O(number of unknown TTL keys) memory and can exhaust RAM before the backup finishes.

Useful? React with 👍 / 👎.

Codex P1 on round 2: the pendingTTL buffer introduced in r1 grew
without bound. An adversarial or corrupt snapshot whose
`!redis|ttl|<k>` records never find a typed-record claimer would
accumulate (string-userKey, uint64-expireAt) entries until the
decoder runs out of memory. The reviewer notes that this PR's
branch has no zset handler yet, so every zset TTL accumulates for
the whole run; on a real cluster's snapshot that's potentially
millions of buffered keys.

Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB
entries, ~64 MiB worst-case memory). Once the cap is reached,
subsequent unknown-kind TTLs fall back to the original immediate-
orphan-count path without buffering the user-key bytes. Already-
buffered entries can still be drained by later state-inits; new
entries beyond the cap are counted but not buffered.

API:
- new RedisDB.WithPendingTTLCap(int) chainable setter.
- new pendingTTLOverflow counter (surfaced in the orphan-TTL warning).
- Finalize's warning includes the overflow count and the cap so an
  operator can detect a snapshot that exceeded the buffer budget.

Caller audit (per /loop standing instruction):
- HandleTTL's redisKindUnknown branch behavior changes: previously
  ALWAYS buffered, now buffers up to cap then orphan-counts. Other
  branches unchanged.
- Callers of HandleTTL: only test files and the future
  cmd/elastickv-snapshot-decode driver. No external caller depends
  on the unbounded-buffering shape; the cap default (1 MiB) covers
  every realistic legitimate workload, and the WithPendingTTLCap
  override lets callers tune to their host budget.
- New helper parkUnknownTTL is package-private with one call site
  (HandleTTL line 401). No prior call sites to audit.
- Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap
  --include=*.go internal/backup/`.

New tests:
- TestRedisDB_PendingTTLBoundedByCap drives 2*cap unknown TTLs and
  asserts: pendingTTL stays at cap, orphanTTLCount tracks the
  overflow at intake, post-Finalize total == 2*cap.
- TestRedisDB_WithPendingTTLCapZeroDisablesBuffering pins that
  cap==0 reverts to the original immediate-orphan path.
- TestRedisDB_WithPendingTTLCapNegativeCoercedToZero pins input
  sanitisation.

Self-review:
1. Data loss - the cap can mis-classify TTLs that arrive in the
   overflow window AND would have been drained by a later
   state-init. Mitigation: the default cap (1 MiB entries) is well
   above the count of legitimately-buffered TTL'd wide-column keys
   on any real cluster; operators with abnormal workloads can raise
   the cap via WithPendingTTLCap.
2. Concurrency - no new shared state.
3. Performance - one extra comparison per unknown-kind TTL.
   Bounded memory is the win.
4. Consistency - parkUnknownTTL keeps the userKey-copy contract
   (allocates string) when buffering, matches the prior intent.
5. Coverage - 3 new tests + 1 updated test
   (TestRedisDB_OrphanTTLCountedNotBuffered already pins the
   buffered + drained-at-Finalize semantics under the default cap).
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 2m 25s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 4

Tasks

  • Read trigger comment and prior review context
  • Read current internal/backup/redis_stream.go
  • Read current internal/backup/redis_stream_test.go
  • Read current internal/backup/redis_string.go
  • Read current internal/backup/redis_set.go
  • Check: flushStreams boilerplate dedup (medium, open since round 1)
  • Check: stale file-header comment (minor, round 3)
  • Check: pendingTTL OOM bound (Codex P1, latest commit f59acc0)
  • Review: any new issues

The pendingTTL OOM fix from the latest commit is clean, well-structured, and fully tested. All three P1 issues from prior rounds remain closed. One medium issue is still open for the fourth consecutive round; two new minor issues surface in the OOM fix.


Previously Fixed — All Confirmed

Non-UTF-8 field corruptionstreamFieldJSON.Name/Value are json.RawMessage via marshalRedisBinaryValue. TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip pins it. ✓

Duplicate field namesstreamFieldJSON is []streamFieldJSON array-of-records. TestRedisDB_StreamDuplicateFieldsPreserved pins it. ✓

TTL ordering (streams + sets)streamState / setState both call claimPendingTTL on first registration. Both orderings tested. ✓

pendingTTL OOM (Codex P1, latest commit)parkUnknownTTL caps the buffer at defaultPendingTTLCap = 1 << 20 entries; overflow falls back to orphanTTLCount immediately. WithPendingTTLCap allows override. Four regression tests: TestRedisDB_OrphanTTLCountedNotBuffered, TestRedisDB_PendingTTLBoundedByCap, TestRedisDB_WithPendingTTLCapZeroDisablesBuffering, TestRedisDB_WithPendingTTLCapNegativeCoercedToZero. The cap == 0 short-circuit is correct (len(m) >= 0 is always true). ✓


Medium — Still Open (4th round)

flushStreams still duplicates the flushWideColumnDir boilerplate (redis_stream.go:260-287)

This has been called out in every review round. flushSets delegates cleanly (redis_set.go:182); flushStreams still hand-rolls the identical nil-check / ensureDir / sorted-key-slice / per-key-callback pattern. A future change to flushWideColumnDir (error policy, security check) won't propagate here. *redisStreamState slots directly into the generic:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch",
                "user_key_len", len(uk),
                "declared_len", st.length,
                "observed_entries", len(st.entries),
                "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key")
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

Fix this →


Minor — New This Round

orphanTTLCount field comment is stale (redis_string.go:127-132)

The comment still describes the old count-only approach:

"We deliberately track only the count (not the keys themselves) because the keys are unused before the wide-column encoders land; buffering full keys would allocate proportional to user-key size (up to 1 MiB per key)"

That was accurate before PR #791-r1. The code now buffers keys in pendingTTL (capped at pendingTTLCap), and orphanTTLCount only counts genuinely unmatched entries (those that overflowed the cap or remained in pendingTTL at Finalize). A future reader will find the comment and the code in direct contradiction.

// Suggest updating to something like:
// orphanTTLCount counts TTL records that were not matched by any typed
// record. Unknown-kind TTLs are first parked in pendingTTL so wide-column
// state-inits can drain them; entries that overflow pendingTTLCap, or
// that remain in pendingTTL at Finalize, increment this counter.

Test name contradicts its body (redis_string_test.go:580)

TestRedisDB_OrphanTTLCountedNotBuffered asserts the opposite of what the name says: the inline comment explicitly says "orphan TTL records are now BUFFERED in pendingTTL during intake" and the assertions confirm pendingTTL grows to n entries. A reader scanning function names would infer this test exercises the cap-zero / no-buffering path. Consider renaming to TestRedisDB_OrphanTTLBufferedDuringIntake to match the tested behavior.


Minor — Carried from Round 3 (Still Open)

Stale file-header comment shows old map shape (redis_stream.go:41-43)

// {"id":"1714400000000-0","fields":{"event":"login","user":"alice"}}

Should show the current array shape:

// {"id":"1714400000000-0","fields":[{"name":"event","value":"login"},{"name":"user","value":"alice"}]}

In-place sort mutates live state (redis_stream.go:359)

sort.Slice(st.entries, ...) still sorts the accumulated slice in-place. Currently harmless (flush is one-shot), but inconsistent with the cloneStringSlice rationale noted in decodeStreamEntryValue.


What's good

  • parkUnknownTTL + WithPendingTTLCap is a clean, independently-testable extraction of the overflow logic. The function-level comment correctly describes the TOCTOU-free guarantee (we copy string(userKey) because the snapshot reader reuses key buffers).
  • All four cap-behavior tests are well-targeted: cap=8 exercises the split between buffered and immediate-orphan paths in a single run; cap=0 pins the pre-PR backward-compatible mode; negative coercion is tested independently.
  • The Finalize overflow-detection branch emits distinct pending_ttl_buffer_overflow and pending_ttl_buffer_cap fields so operators can distinguish "snapshot had more unmatched TTLs than the buffer cap" from "TTLs remained unmatched after the cap was sufficient". That's the right operator-facing signal.
  • pendingTTL field comment (redis_string.go:196-215) is accurate and covers the lex-ordering rationale and lifecycle correctly.

bootjp added a commit that referenced this pull request May 19, 2026
Codex flagged the same P1 on PR #791 round 2 — pendingTTL grows
unboundedly when no zset handler is present to drain the parked
TTLs. The same bug exists on PR #790's branch (the two PRs share
the pendingTTL infrastructure; the codex finding's text applies
verbatim).

Fix: cap pendingTTL at pendingTTLCap entries (default 1 MiB). Once
the cap is reached, subsequent unknown-kind TTLs fall back to
immediate-orphan counting via the new parkUnknownTTL helper.

New API:
- RedisDB.WithPendingTTLCap(int) chainable setter.
- pendingTTLOverflow counter surfaced in the orphan-TTL warning.

Caller audit (per /loop standing instruction):
- HandleTTL's redisKindUnknown branch behavior changes: previously
  ALWAYS buffered, now buffers up to cap then orphan-counts. Other
  branches unchanged.
- Callers of HandleTTL: tests only. The cmd/elastickv-snapshot-decode
  driver (Phase 0a follow-up) will be the first production caller.
- New helper parkUnknownTTL is package-private with one call site
  (HandleTTL). No prior call sites to audit.
- Verified via `grep -rn HandleTTL|pendingTTLCap|WithPendingTTLCap
  --include=*.go internal/backup/`.

New tests:
- TestRedisDB_PendingTTLBoundedByCap.
- TestRedisDB_WithPendingTTLCapZeroDisablesBuffering.
- TestRedisDB_WithPendingTTLCapNegativeCoercedToZero.

Self-review:
1. Data loss - cap can mis-classify TTLs in the overflow window
   that COULD have been drained by a later state-init. Mitigation:
   default cap (1 MiB) is well above the count of legitimately-
   buffered wide-column TTL'd keys on real clusters; operators
   can tune via WithPendingTTLCap.
2. Concurrency - no new shared state.
3. Performance - one extra comparison per unknown-kind TTL.
4. Consistency - mirrors the fix landing on PR #791 r3; when
   either PR merges first the other can rebase cleanly.
5. Coverage - 3 new tests + the existing
   TestRedisDB_OrphanTTLCountedNotBuffered (still pins the
   default-cap buffered + drained-at-Finalize semantics).
Mirror of PR #790 round 5 codex P1 fix: the cap-with-silent-orphan
fallback introduced in r3 is itself a data-loss path. In real
Pebble snapshot order (`!redis|ttl|` < `!st|`/`!stream|`/`!zs|`),
an overflowed TTL likely belongs to a wide-column key arriving
later in the scan; dropping it permanently loses `expire_at_ms`.

Fix: replace the silent-orphan fallback with fail-closed semantics.
Three modes:

- cap > 0, buffer NOT full: buffer the entry as before.
- cap == 0: counter-only mode (operator explicit opt-out).
- cap > 0, buffer FULL: return ErrPendingTTLBufferFull. Operator
  must raise WithPendingTTLCap or accept counter-only via cap=0.

Caller audit (per /loop standing instruction): same as the parallel
PR #790 r5 fix. HandleTTL's redisKindUnknown branch CAN return a
non-nil error in a new condition. All call sites check the error
(grep -rn '\.HandleTTL(' --include=*.go). parkUnknownTTL changes
signature from `func(...)` to `func(...) error`; single caller
(HandleTTL) updated.

Test renames:
- TestRedisDB_PendingTTLBoundedByCap → TestRedisDB_PendingTTLFailsClosedAtCap
- TestRedisDB_WithPendingTTLCapZeroDisablesBuffering → TestRedisDB_WithPendingTTLCapZeroOpts

Self-review: same shape as PR #790 r5 commit's review. Data loss
avoidance via fail-closed; semantics match CLAUDE.md's pattern
for replication/HLC/snapshot invariants.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 3m 42s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 5

Tasks

  • Read trigger comment and prior review context
  • Read current internal/backup/redis_stream.go
  • Read current internal/backup/redis_stream_test.go
  • Read current internal/backup/redis_string.go
  • Read current internal/backup/redis_string_test.go
  • Read current internal/backup/redis_set.go
  • Verify all P1 status from Round 4
  • Check medium / minor issues from prior rounds
  • Review latest commit for new issues
  • Post final review

All five P1 issues across four prior rounds remain fixed. One medium issue enters its fifth consecutive round without being addressed. Two new minor observations surface in the latest commit.


Previously Fixed — All Confirmed

Duplicate field namesstreamFieldJSON is []streamFieldJSON with json.RawMessage. TestRedisDB_StreamDuplicateFieldsPreserved pins the round-trip. ✓

Non-UTF-8 field corruptionstreamFieldJSON.Name/Value populated via marshalRedisBinaryValue in buildStreamFieldRecords (redis_stream.go:418-432). TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip pins the projection directly. ✓

TTL ordering (streams + sets)streamState / setState both call claimPendingTTL on first registration. Both arrival orderings tested. ✓

pendingTTL OOMparkUnknownTTL caps the buffer at defaultPendingTTLCap = 1 << 20. ✓

pendingTTL fail-closedErrPendingTTLBufferFull returned when buffer is at cap; pendingTTLOverflow incremented; TestRedisDB_PendingTTLFailsClosedAtCap pins the error path. ✓


Medium — Still Open (5th Round)

flushStreams still duplicates the flushWideColumnDir boilerplate (redis_stream.go:260-287)

This has been called out in every review round. flushSets delegates cleanly via flushWideColumnDir (redis_set.go:182). flushStreams still hand-rolls the identical nil-check / ensureDir / sorted-key-slice / per-key-callback pattern. *redisStreamState slots directly into the generic:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch",
                "user_key_len", len(uk),
                "declared_len", st.length,
                "observed_entries", len(st.entries),
                "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key")
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

A future change to flushWideColumnDir (error policy, security check, e.g. the symlink guard at redis_string.go:626-638) won't propagate to the hand-rolled copy.

Fix this →


Minor — Still Open from Round 4

orphanTTLCount field comment is now actively misleading (redis_string.go:138-147)

The comment still reads:

"We deliberately track only the count (not the keys themselves) because the keys are unused before the wide-column encoders land; buffering full keys would allocate proportional to user-key size (up to 1 MiB per key), and the warning sink only ever reads len(). Codex P2 round 6."

This contradicts the current implementation on three counts: (1) wide-column encoders have landed in this PR; (2) keys are buffered — in pendingTTL up to pendingTTLCap; (3) orphanTTLCount no longer counts intake TTLs — it counts truly-unmatched entries (those remaining in pendingTTL at Finalize, plus pendingTTLOverflow). A reader who trusts this comment will have a wrong mental model of the encoding pipeline.

// Suggest updating to:
// orphanTTLCount counts TTL records not matched by any typed record.
// Unknown-kind TTLs are first parked in pendingTTL; at Finalize,
// whatever remains in pendingTTL (never claimed by a wide-column
// state-init) plus pendingTTLOverflow (entries dropped when the
// buffer was at cap) are folded into this counter.

Fix this →

Test name TestRedisDB_OrphanTTLCountedNotBuffered contradicts its body (redis_string_test.go:580)

The test's own inline comment (lines 583-591) says "orphan TTL records are now BUFFERED in pendingTTL during intake" and the assertions confirm len(db.pendingTTL) == n. The function name asserts the opposite — "CountedNotBuffered". A reader scanning function names will infer this test exercises the cap-zero / no-buffering path.

// Rename to:
func TestRedisDB_OrphanTTLBufferedDuringIntake(t *testing.T) {

Fix this →


Minor — Still Open from Round 3

Stale file-header comment shows old map-based field format (redis_stream.go:41-43)

// {"id":"1714400000000-0","fields":{"event":"login","user":"alice"}}

The {"event":"login","user":"alice"} map shape was intentionally replaced with the [{"name":...,"value":...}] array in response to the duplicate-field P1. The example in the package doc should match the actual output:

// {"id":"1714400000000-0","fields":[{"name":"event","value":"login"},{"name":"user","value":"alice"}]}

Fix this →

In-place sort of st.entries mutates live state (redis_stream.go:359)

sort.Slice(st.entries, ...) still sorts the accumulated slice in-place. Currently harmless (flush is one-shot), but inconsistent with the cloneStringSlice rationale noted in decodeStreamEntryValue ("field slice cloned so snapshot buffer mutations can't bleed into emitted state").


Minor — New This Round

TestRedisDB_WithPendingTTLCapZeroOpts is less descriptive than the Round 4 name (redis_string_test.go:654)

Round 4 confirmed the test as TestRedisDB_WithPendingTTLCapZeroDisablesBuffering — the latest commit renamed it to TestRedisDB_WithPendingTTLCapZeroOpts. The new name obscures what the test exercises: that cap==0 disables buffering entirely so every unknown-kind TTL becomes an immediate orphan without ErrPendingTTLBufferFull. Consider restoring the more descriptive name.


Informational (carried)

Large uint64 values lose precision in JSON readers

streamMetaJSON.LastMs, LastSeq, and streamMetaJSON.Length (int64) are encoded as JSON numbers. assertStreamMetaTerminator compares them via float64 — assertions on values above 2^53 would silently pass even if rounded. In practice, UNIX-ms timestamps won't reach that range, but a one-line comment on streamMetaJSON noting that restorers should use json.Decoder.UseNumber() for strict fidelity would help.

buildStreamFieldRecords binary-safety is a future-only guard

As the test comment at redis_stream_test.go:290-295 correctly notes, proto3 string enforces UTF-8 — gproto.Marshal rejects non-UTF-8 bytes — so the {"base64":"..."} branch in the JSONL cannot be triggered via the live store path today. The guard is the right defensive move (costs nothing, protects against a future Fields bytes schema migration). A one-line comment on buildStreamFieldRecords making this explicit would save a future reader from wondering whether the base64 branch is dead code.


What's good

  • All five P1 issues from across four rounds are closed and regression-tested per CLAUDE.md convention.
  • parkUnknownTTL extraction with ErrPendingTTLBufferFull is clean: the cap==0 short-circuit is correct, pendingTTLOverflow is incremented before the error return so Finalize's warning includes the overflow count even if the caller aborts, and the error message includes recovery instructions.
  • TestRedisDB_PendingTTLFailsClosedAtCap correctly verifies the split between buffered (within-cap) and failed-closed (at-cap) paths in one run.
  • flushWideColumnDir ensureDir + symlink-ancestor check is applied correctly in the hand-rolled flushStreams — the symlink security invariant propagates even though the boilerplate wasn't deduplicated.
  • (ms, seq) tuple sort is correct with the right justification; all four error sentinels are correctly wrapped; cloneStringSlice defensive copy is appropriate.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: be430fe29d

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry,
"unmarshal: %v", err)
}
if len(msg.GetFields())%2 != 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject stream entries that contain zero field/value pairs

decodeStreamEntryValue only rejects odd-length field lists, so a payload with len(msg.GetFields()) == 0 is currently accepted as valid and written into backup output. In this codebase, stream writes require at least one field/value pair (parseXAddFields in adapter/redis_compat_commands.go errors when no pairs are provided), so an empty-field entry can only come from corruption or an invalid legacy payload and will not replay cleanly during restore. Treating zero-length fields as ErrRedisInvalidStreamEntry would keep this path fail-closed like the existing odd-arity guard.

Useful? React with 👍 / 👎.

…tries

Mirror of PR #790 round 6 codex P1 fix. The entry-count cap from
r3/r4 doesn't actually bound memory (Redis user keys up to 1 MiB
each → entry-count cap permits N × 1 MiB of buffered key bytes).

Fix: replace entry-count cap with a byte budget
(pendingTTLBytesCap, default 64 MiB). claimPendingTTL decrements
the byte counter so a long-running scan that drains as it goes
can reuse the budget.

Renames (no external callers existed — symbols introduced in
r3/r4 of this PR):
- WithPendingTTLCap → WithPendingTTLByteCap
- pendingTTLCap → pendingTTLBytesCap
- defaultPendingTTLCap → defaultPendingTTLBytesCap (64 MiB)
- Added pendingTTLBytes counter
- Added pendingTTLEntryOverheadBytes (= 8)

Caller audit (per /loop standing instruction): WithPendingTTLCap
had zero external callers in this branch's tree
(`grep -rn 'WithPendingTTLCap|pendingTTLCap\b' --include=*.go`
returns nothing post-rename). parkUnknownTTL keeps its error
return signature; only the comparison shifts from entry count to
byte budget. claimPendingTTL gains a counter-decrement matching
parkUnknownTTL's add side.

Tests renamed/added (same as PR #790 r6):
- TestRedisDB_PendingTTLFailsClosedAtByteCap (renamed)
- TestRedisDB_PendingTTLByteCapBoundedByLargeKey (new)
- TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim (new)
- TestRedisDB_WithPendingTTLByteCapZeroOpts (renamed)
- TestRedisDB_WithPendingTTLByteCapNegativeCoercedToZero (renamed)

Self-review:
1. Data loss - byte budget is a strictly tighter memory bound than
   the previous entry-count cap.
2. Concurrency - no new shared state.
3. Performance - one int comparison + add per parkUnknownTTL; one
   subtract per claimPendingTTL.
4. Consistency - matches PR #790 r6 exactly; the two PRs share the
   pendingTTL infrastructure and the fix is identical.
5. Coverage - 5 tests (3 new, 2 renamed). All pass with -race.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 19, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 19, 2026

Claude finished @bootjp's task in 4m 9s —— View job


Code Review — PR #791: backup: Redis stream encoder (Phase 0a) — Round 6

Tasks

  • Read trigger comment and prior review context
  • Read current internal/backup/redis_stream.go
  • Read current internal/backup/redis_string.go
  • Read current internal/backup/redis_string_test.go
  • Read current internal/backup/redis_stream_test.go
  • Verify prior-round fixes and open issues
  • Analyze latest commit: bytes-based pendingTTL bounding
  • Post final review

All six P1 issues from prior rounds remain fixed. The bytes-based bounding in the latest commit (421254e) is correct and adds three good new tests. One medium issue enters its sixth consecutive round without being addressed. Four minor issues carry over unchanged from prior rounds.


Previously Fixed — All Confirmed

Duplicate field namesstreamFieldJSON is []streamFieldJSON array-of-records with json.RawMessage. ✓

Non-UTF-8 field corruptionstreamFieldJSON.Name/Value populated via marshalRedisBinaryValue in buildStreamFieldRecords. ✓

TTL ordering (streams + sets)streamState / setState both call claimPendingTTL on first registration. Both arrival orderings tested. ✓

pendingTTL OOMparkUnknownTTL caps by entry count. ✓

pendingTTL fail-closedErrPendingTTLBufferFull returned when buffer is at cap. ✓

pendingTTL bytes vs entries (latest commit)pendingTTLBytesCap / pendingTTLBytes with pendingTTLEntryOverheadBytes=8. claimPendingTTL correctly decrements by len(uk) + 8. The cap-zero short-circuit, fail-closed path, and overflow increment are all correct. Three new regression tests: TestRedisDB_PendingTTLByteCapBoundedByLargeKey (single oversized key), TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim (drain reclaims budget), TestRedisDB_PendingTTLFailsClosedAtByteCap (within-cap succeeds, at-cap fails). ✓


Medium — Still Open (6th Round)

flushStreams still duplicates the flushWideColumnDir boilerplate (redis_stream.go:260-287)

This has been called out in every review round. flushSets delegates cleanly (redis_set.go:182). flushStreams still hand-rolls the identical nil-check / ensureDir / sorted-key-slice / per-key-callback pattern. A future change to flushWideColumnDir (error policy, security check, e.g. the symlink guard in ensureDir) won't propagate here:

func (r *RedisDB) flushStreams() error {
    return flushWideColumnDir(r, r.streams, "streams", func(dir, uk string, st *redisStreamState) error {
        if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length {
            r.warn("redis_stream_length_mismatch",
                "user_key_len", len(uk),
                "declared_len", st.length,
                "observed_entries", len(st.entries),
                "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key")
        }
        return r.writeStreamJSONL(dir, []byte(uk), st)
    })
}

Fix this →


Minor — Still Open from Prior Rounds

orphanTTLCount field comment is actively misleading (redis_string.go:145-155)

The comment still says "We deliberately track only the count (not the keys themselves) because the keys are unused before the wide-column encoders land" — but they have landed in this PR. It also says "buffering full keys would allocate proportional to user-key size" as a reason not to buffer — but keys are now buffered in pendingTTL (up to pendingTTLBytesCap). And orphanTTLCount no longer counts intake TTLs — it counts entries remaining in pendingTTL at Finalize plus pendingTTLOverflow.

// Suggest updating to:
// orphanTTLCount counts TTL records not matched by any typed record.
// Unknown-kind TTLs are first parked in pendingTTL up to
// pendingTTLBytesCap bytes; at Finalize, whatever remains in
// pendingTTL (never claimed by a wide-column state-init) plus
// pendingTTLOverflow (entries that exceeded the byte budget) are
// folded into this counter.

Fix this →

Test name TestRedisDB_OrphanTTLCountedNotBuffered contradicts its body (redis_string_test.go:580)

The test's own inline comment (lines 583-591) says "orphan TTL records are now BUFFERED in pendingTTL during intake" and the assertions confirm len(db.pendingTTL) == n. The function name asserts the opposite. A reader scanning test names will misread this test as covering the cap-zero path:

// Rename to:
func TestRedisDB_OrphanTTLBufferedDuringIntake(t *testing.T) {

Fix this →

Stale file-header comment shows old map-based field format (redis_stream.go:41-43)

//	{"id":"1714400000000-0","fields":{"event":"login","user":"alice"}}

The {"event":"login","user":"alice"} map shape was intentionally abandoned in favour of [{"name":...,"value":...}] to fix the duplicate-field P1. The package doc example should match the actual output:

//	{"id":"1714400000000-0","fields":[{"name":"event","value":"login"},{"name":"user","value":"alice"}]}

Fix this →

In-place sort mutates live state (redis_stream.go:359)

sort.Slice(st.entries, ...) still sorts the accumulated slice in-place. Currently harmless (flush is one-shot), but inconsistent with the cloneStringSlice rationale in decodeStreamEntryValue.


Minor — Latest Commit

TestRedisDB_WithPendingTTLByteCapZeroOpts is less descriptive than the Round 5 name

"Opts" does not convey what the test exercises. The prior name TestRedisDB_WithPendingTTLCapZeroDisablesBuffering communicated the observable behaviour; the update to prefix "ByteCap" (to match the byte-based rename) is correct, but "Opts" should complete the intent: TestRedisDB_WithPendingTTLByteCapZeroDisablesBuffering.


What's good (latest commit)

  • Byte-based bounding is the right model: an entry-count cap of N still permits ~N MiB of accumulated key bytes on adversarial snapshots; charging len(userKey) + 8 per entry makes the cap a deterministic byte budget the operator can reason about.
  • claimPendingTTL correctly decrements pendingTTLBytes -= len(uk) + pendingTTLEntryOverheadBytes, matching the cost charged in parkUnknownTTL exactly. No underflow possible because delete(r.pendingTTL, uk) prevents double-drain.
  • pendingTTLOverflow is incremented before the error return so Finalize's warning surface the overflow count even if the caller aborts on the first error.
  • TestRedisDB_PendingTTLByteCapBoundedByLargeKey correctly pins the single-oversized-key case (100 + 8 = 108 > 64), which an entry-count cap would have missed entirely.
  • TestRedisDB_PendingTTLByteBudgetReclaimedOnClaim correctly verifies that claimPendingTTL frees the byte budget so subsequent entries can be buffered even after a drain.
  • The defaultPendingTTLBytesCap = 64 << 20 choice is well-justified and the pendingTTLEntryOverheadBytes constant is documented to exclude Go-map bucket overhead intentionally (deterministic budget).

| Branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant