diff --git a/internal/backup/redis_set.go b/internal/backup/redis_set.go index 7cd4a440..f6f80975 100644 --- a/internal/backup/redis_set.go +++ b/internal/backup/redis_set.go @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil } // setState lazily creates per-key state. Mirrors the hash/list // kindByKey-registration pattern so HandleSetMeta, HandleSetMember, // and the HandleTTL back-edge all agree on the kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!st|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the set's +// `expire_at_ms`. Without this drain step, every TTL'd set would +// restore as permanent — a latent bug in PR #758 surfaced by codex +// on PR #791. Phase 0a tests added in the same PR pin the ordering. func (r *RedisDB) setState(userKey []byte) *redisSetState { uk := string(userKey) if st, ok := r.sets[uk]; ok { return st } st := &redisSetState{members: make(map[string]struct{})} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } r.sets[uk] = st r.kindByKey[uk] = redisKindSet return st diff --git a/internal/backup/redis_stream.go b/internal/backup/redis_stream.go new file mode 100644 index 00000000..a4cd6d17 --- /dev/null +++ b/internal/backup/redis_stream.go @@ -0,0 +1,432 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "path/filepath" + "sort" + "strconv" + + pb "github.com/bootjp/elastickv/proto" + cockroachdberr "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// Redis stream encoder. Translates raw !stream|... snapshot records +// into the per-stream `streams/.jsonl` shape defined by Phase 0 +// (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md, lines +// 336-344). +// +// Wire format mirrors store/stream_helpers.go and +// adapter/redis_storage_codec.go: +// - !stream|meta| +// → 24-byte BE Length(8) || LastMs(8) || LastSeq(8) +// - !stream|entry| +// → magic-prefixed pb.RedisStreamEntry protobuf with fields +// {id string, fields []string} where Fields is the +// interleaved (name1, value1, name2, value2, ...) XADD +// field list. +// +// The protobuf entry value carries a magic prefix +// `0x00 'R' 'X' 'E' 0x01` (mirror of +// adapter/redis_storage_codec.go:17 storedRedisStreamEntryProtoPrefix); +// re-declared here so this package stays adapter-independent. +// +// Output is JSONL (one record per line) plus a trailing `_meta` +// terminator line that captures length, last_ms, last_seq, and TTL. +// Per the design line 336-339: +// +// {"id":"1714400000000-0","fields":{"event":"login","user":"alice"}} +// {"_meta":true,"length":2,"last_ms":1714400000001,"last_seq":0, +// "expire_at_ms":null} +// +// JSONL was chosen for streams over per-entry files because real +// streams routinely hold tens of thousands of entries and per-entry +// inode pressure would dominate `tar`/`find` runtime. +const ( + RedisStreamMetaPrefix = "!stream|meta|" + RedisStreamEntryPrefix = "!stream|entry|" + + // redisStreamMetaSize is the on-disk size of one !stream|meta| + // value: Length(8) || LastMs(8) || LastSeq(8). Mirrors + // store.streamMetaBinarySize; duplicated here to keep the backup + // package free of `store` imports. + redisStreamMetaSize = 24 + + // redisStreamIDBytes is the per-entry-key suffix size: ms(8) + // || seq(8). Mirrors store.StreamIDBytes. + redisStreamIDBytes = 16 + + // redisStreamProtoPrefix is the magic byte prefix on the stored + // pb.RedisStreamEntry serialization. Mirrors + // adapter/redis_storage_codec.go:storedRedisStreamEntryProtoPrefix. + // A live-side rename here without an accompanying backup update + // would surface as ErrRedisInvalidStreamEntry on decode of any + // real cluster dump — caught at the property tests. + redisStreamProtoPrefixLen = 5 +) + +var redisStreamProtoPrefix = []byte{0x00, 'R', 'X', 'E', 0x01} + +// ErrRedisInvalidStreamMeta is returned when an !stream|meta| value +// is not the expected 24 bytes or carries a negative length. +var ErrRedisInvalidStreamMeta = cockroachdberr.New("backup: invalid !stream|meta| value") + +// ErrRedisInvalidStreamEntry is returned when an !stream|entry| +// value's magic prefix is missing or its protobuf body fails to +// unmarshal. +var ErrRedisInvalidStreamEntry = cockroachdberr.New("backup: invalid !stream|entry| value") + +// ErrRedisInvalidStreamKey is returned when a !stream| key cannot +// be parsed for its userKeyLen+userKey (or trailing ID) segments. +var ErrRedisInvalidStreamKey = cockroachdberr.New("backup: malformed !stream| key") + +// redisStreamEntry buffers one decoded XADD entry while the encoder +// assembles the per-stream JSONL output. We keep ms+seq separately +// alongside the formatted string ID so flushStreams can sort by +// (ms, seq) deterministically; sorting by the formatted "ms-seq" +// string would put "10-0" before "2-0". +type redisStreamEntry struct { + ms uint64 + seq uint64 + fields []string // interleaved (name, value) pairs, XADD order +} + +// redisStreamState buffers one userKey's stream during a snapshot +// scan. Like the hash/list/set/zset encoders we accumulate per-key +// state in memory; a single stream is bounded by maxWideColumnItems +// on the live side, so this remains tractable. +type redisStreamState struct { + metaSeen bool + length int64 + lastMs uint64 + lastSeq uint64 + entries []redisStreamEntry + expireAtMs uint64 + hasTTL bool +} + +// HandleStreamMeta processes one !stream|meta| record. +// Value layout: Length(8) || LastMs(8) || LastSeq(8). The encoder +// uses the meta's last_ms / last_seq verbatim in the JSONL _meta +// terminator so a restorer can replay them into the same XADD '*' +// monotonicity window. Length mismatches against the observed +// entry count surface as `redis_stream_length_mismatch` at flush +// time. +func (r *RedisDB) HandleStreamMeta(key, value []byte) error { + userKey, ok := parseStreamMetaKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamKey, "meta key: %q", key) + } + if len(value) != redisStreamMetaSize { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamMeta, + "length %d != %d", len(value), redisStreamMetaSize) + } + rawLen := binary.BigEndian.Uint64(value[0:8]) + if rawLen > math.MaxInt64 { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamMeta, + "declared length %d overflows int64", rawLen) + } + st := r.streamState(userKey) + st.length = int64(rawLen) //nolint:gosec // bounded above + st.lastMs = binary.BigEndian.Uint64(value[8:16]) + st.lastSeq = binary.BigEndian.Uint64(value[16:24]) + st.metaSeen = true + return nil +} + +// HandleStreamEntry processes one !stream|entry| +// record. The ID is recovered from the trailing 16 bytes of the +// key; the value is the magic-prefixed `pb.RedisStreamEntry` +// protobuf carrying the entry's interleaved (name, value) field +// list. +func (r *RedisDB) HandleStreamEntry(key, value []byte) error { + userKey, ms, seq, ok := parseStreamEntryKey(key) + if !ok { + return cockroachdberr.Wrapf(ErrRedisInvalidStreamKey, "entry key: %q", key) + } + fields, err := decodeStreamEntryValue(value) + if err != nil { + return err + } + st := r.streamState(userKey) + st.entries = append(st.entries, redisStreamEntry{ms: ms, seq: seq, fields: fields}) + return nil +} + +// streamState lazily creates per-key state. Mirrors the +// hash/list/set/zset kindByKey-registration pattern so HandleStreamMeta, +// HandleStreamEntry, and the HandleTTL back-edge all agree on the +// kind. +// +// On first registration we drain any pendingTTL for the user key. +// `!redis|ttl|` lex-sorts BEFORE `!stream|...` (because `r` < `s`), +// so in real snapshot order the TTL arrives FIRST; HandleTTL parks +// it in pendingTTL, and this function inlines it into the stream's +// JSONL `_meta.expire_at_ms`. Without this drain step, every TTL'd +// stream would restore as permanent. Codex P1 finding on PR #791. +func (r *RedisDB) streamState(userKey []byte) *redisStreamState { + uk := string(userKey) + if st, ok := r.streams[uk]; ok { + return st + } + st := &redisStreamState{} + if expireAtMs, ok := r.claimPendingTTL(userKey); ok { + st.expireAtMs = expireAtMs + st.hasTTL = true + } + r.streams[uk] = st + r.kindByKey[uk] = redisKindStream + return st +} + +// parseStreamMetaKey strips !stream|meta| and the 4-byte BE +// userKeyLen prefix. Returns (userKey, true) on success. Unlike +// the hash/set encoders there is no `!stream|meta|d|...` delta +// family — streams update meta in-place rather than via per-XADD +// deltas — so we do not need a delta-skip guard here. +func parseStreamMetaKey(key []byte) ([]byte, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisStreamMetaPrefix)) + if len(rest) == len(key) { + return nil, false + } + return parseUserKeyLenPrefix(rest) +} + +// parseStreamEntryKey strips !stream|entry| and the 4-byte BE +// userKeyLen prefix, then peels off the trailing 16-byte StreamID +// (ms || seq). Returns (userKey, ms, seq, true) on success. +func parseStreamEntryKey(key []byte) ([]byte, uint64, uint64, bool) { + rest := bytes.TrimPrefix(key, []byte(RedisStreamEntryPrefix)) + if len(rest) == len(key) { + return nil, 0, 0, false + } + userKey, ok := parseUserKeyLenPrefix(rest) + if !ok { + return nil, 0, 0, false + } + // After (userKeyLen(4) + userKey), exactly StreamIDBytes must remain. + tail := rest[wideColumnUserKeyLenSize+len(userKey):] + if len(tail) != redisStreamIDBytes { + return nil, 0, 0, false + } + ms := binary.BigEndian.Uint64(tail[0:8]) + seq := binary.BigEndian.Uint64(tail[8:16]) + return userKey, ms, seq, true +} + +// decodeStreamEntryValue strips the magic prefix and protobuf-decodes +// the entry payload. Returns the interleaved field list (name1, +// value1, name2, value2, ...) used by every Redis stream consumer. +func decodeStreamEntryValue(value []byte) ([]string, error) { + if len(value) < redisStreamProtoPrefixLen || + !bytes.Equal(value[:redisStreamProtoPrefixLen], redisStreamProtoPrefix) { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "missing or corrupt magic prefix (len=%d)", len(value)) + } + msg := &pb.RedisStreamEntry{} + if err := gproto.Unmarshal(value[redisStreamProtoPrefixLen:], msg); err != nil { + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "unmarshal: %v", err) + } + if len(msg.GetFields())%2 != 0 { + // Live XADD enforces even arity (name/value pairs). An odd + // field count at backup time indicates corruption that would + // silently lose the dangling field if we accepted it — fail + // closed. + return nil, cockroachdberr.Wrapf(ErrRedisInvalidStreamEntry, + "odd field count %d (XADD enforces name/value pairs)", len(msg.GetFields())) + } + return cloneStringSlice(msg.GetFields()), nil +} + +func cloneStringSlice(src []string) []string { + if src == nil { + return nil + } + out := make([]string, len(src)) + copy(out, src) + return out +} + +// flushStreams writes one JSONL file per accumulated stream to +// streams/.jsonl. Empty streams (Length==0, no entries) +// still emit a file when meta was seen, mirroring the wide-column +// encoders' policy: their existence is observable to clients (TYPE +// returns "stream", XLEN returns 0). Mismatched declared-vs-observed +// length surfaces an `redis_stream_length_mismatch` warning. +func (r *RedisDB) flushStreams() error { + if len(r.streams) == 0 { + return nil + } + dir := filepath.Join(r.dbDir(), "streams") + if err := r.ensureDir(dir); err != nil { + return err + } + userKeys := make([]string, 0, len(r.streams)) + for k := range r.streams { + userKeys = append(userKeys, k) + } + sort.Strings(userKeys) + for _, uk := range userKeys { + st := r.streams[uk] + if r.warn != nil && st.metaSeen && int64(len(st.entries)) != st.length { + r.warn("redis_stream_length_mismatch", + "user_key_len", len(uk), + "declared_len", st.length, + "observed_entries", len(st.entries), + "hint", "meta record's Length does not match the count of !stream|entry| keys for this user key") + } + if err := r.writeStreamJSONL(dir, []byte(uk), st); err != nil { + return err + } + } + return nil +} + +func (r *RedisDB) writeStreamJSONL(dir string, userKey []byte, st *redisStreamState) error { + encoded := EncodeSegment(userKey) + if err := r.recordIfFallback(encoded, userKey); err != nil { + return err + } + path := filepath.Join(dir, encoded+".jsonl") + body, err := marshalStreamJSONL(st) + if err != nil { + return err + } + if err := writeFileAtomic(path, body); err != nil { + return cockroachdberr.WithStack(err) + } + return nil +} + +// streamFieldJSON is the dump-format projection of one (name, value) +// pair from a stream entry. We emit a list of name/value records +// rather than a JSON object because XADD permits duplicate field +// names within one entry — e.g. `XADD s * f v1 f v2` records BOTH +// (f, v1) and (f, v2) as distinct interleaved entries in the +// stored protobuf's Fields slice. The design example at +// docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:338 +// showed an object shape, but that representation silently drops +// duplicates and a restore would not reproduce the original +// stream entry. Codex P1 (PR #791) — switched to a name/value +// record array. +// +// Name and Value are `json.RawMessage` populated via +// `marshalRedisBinaryValue` so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. Without this, `json.Marshal` of a +// plain `string` carrying invalid UTF-8 silently substitutes U+FFFD +// for each ill-formed byte sequence, and the restored stream entry +// would carry the replacement-character mangle instead of the +// original bytes. Redis stream field names and values are +// binary-safe (the live store keeps them as protobuf `bytes` +// despite the wire-format `repeated string` shape), so the +// projection must preserve every byte. Mirrors hashFieldRecord +// (redis_hash.go:235-238). Claude bot Critical (PR #791 round 2). +type streamFieldJSON struct { + Name json.RawMessage `json:"name"` + Value json.RawMessage `json:"value"` +} + +// streamEntryJSON is the dump-format projection of one stream +// entry. Fields is an ARRAY so duplicate field names within a +// single XADD round-trip correctly. The array preserves the +// interleaved insertion order from the protobuf so consumers can +// re-assemble the original XADD argv. +type streamEntryJSON struct { + ID string `json:"id"` + Fields []streamFieldJSON `json:"fields"` +} + +// streamMetaJSON is the dump-format projection of the final _meta +// terminator line. +type streamMetaJSON struct { + Meta bool `json:"_meta"` + Length int64 `json:"length"` + LastMs uint64 `json:"last_ms"` + LastSeq uint64 `json:"last_seq"` + ExpireAtMs *uint64 `json:"expire_at_ms"` +} + +// marshalStreamJSONL renders one stream state as JSONL. Entries are +// sorted by (ms, seq) so identical snapshots produce identical +// output across runs regardless of XADD insertion order. Each line +// uses encoding/json (compact, no MarshalIndent) so the format is +// stable enough for `diff -r`. +func marshalStreamJSONL(st *redisStreamState) ([]byte, error) { + sort.Slice(st.entries, func(i, j int) bool { + a, b := st.entries[i], st.entries[j] + if a.ms != b.ms { + return a.ms < b.ms + } + return a.seq < b.seq + }) + var buf bytes.Buffer + const xaddPairWidth = 2 // (name, value) — XADD enforces even arity + for _, e := range st.entries { + fields, err := buildStreamFieldRecords(e.fields, xaddPairWidth) + if err != nil { + return nil, err + } + rec := streamEntryJSON{ + ID: formatStreamID(e.ms, e.seq), + Fields: fields, + } + line, err := json.Marshal(rec) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + buf.Write(line) + buf.WriteByte('\n') + } + meta := streamMetaJSON{ + Meta: true, + Length: st.length, + LastMs: st.lastMs, + LastSeq: st.lastSeq, + } + if st.hasTTL { + ms := st.expireAtMs + meta.ExpireAtMs = &ms + } + line, err := json.Marshal(meta) + if err != nil { + return nil, cockroachdberr.WithStack(err) + } + buf.Write(line) + buf.WriteByte('\n') + return buf.Bytes(), nil +} + +// formatStreamID emits a stream ID in Redis's "ms-seq" wire format +// (the same shape XADD/XRANGE clients exchange on the wire). +func formatStreamID(ms, seq uint64) string { + return strconv.FormatUint(ms, 10) + "-" + strconv.FormatUint(seq, 10) +} + +// buildStreamFieldRecords converts one entry's interleaved +// (name1, value1, name2, value2, ...) field slice into a +// streamFieldJSON array. Each name/value goes through +// marshalRedisBinaryValue so non-UTF-8 bytes round-trip via the +// `{"base64":"..."}` envelope. Without this projection, plain +// `string` fields would surrender every ill-formed UTF-8 byte to +// json.Marshal's silent U+FFFD substitution and the restored +// stream entry would not be byte-identical to the source. Claude +// bot Critical (PR #791 round 2). +func buildStreamFieldRecords(fields []string, pairWidth int) ([]streamFieldJSON, error) { + out := make([]streamFieldJSON, 0, len(fields)/pairWidth) + for i := 0; i+1 < len(fields); i += pairWidth { + nameJSON, err := marshalRedisBinaryValue([]byte(fields[i])) + if err != nil { + return nil, err + } + valueJSON, err := marshalRedisBinaryValue([]byte(fields[i+1])) + if err != nil { + return nil, err + } + out = append(out, streamFieldJSON{Name: nameJSON, Value: valueJSON}) + } + return out, nil +} diff --git a/internal/backup/redis_stream_test.go b/internal/backup/redis_stream_test.go new file mode 100644 index 00000000..234faebe --- /dev/null +++ b/internal/backup/redis_stream_test.go @@ -0,0 +1,662 @@ +package backup + +import ( + "bufio" + "bytes" + "encoding/base64" + "encoding/binary" + "encoding/json" + "math" + "os" + "path/filepath" + "testing" + + pb "github.com/bootjp/elastickv/proto" + "github.com/cockroachdb/errors" + gproto "google.golang.org/protobuf/proto" +) + +// streamMetaKey is the test-side mirror of store.StreamMetaKey. +func streamMetaKey(userKey string) []byte { + out := []byte(RedisStreamMetaPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + return append(out, userKey...) +} + +// streamEntryKey is the test-side mirror of store.StreamEntryKey. +func streamEntryKey(userKey string, ms, seq uint64) []byte { + out := []byte(RedisStreamEntryPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], uint32(len(userKey))) //nolint:gosec + out = append(out, l[:]...) + out = append(out, userKey...) + var id [16]byte + binary.BigEndian.PutUint64(id[0:8], ms) + binary.BigEndian.PutUint64(id[8:16], seq) + return append(out, id[:]...) +} + +// encodeStreamMetaValue mirrors store.MarshalStreamMeta: +// Length(8) || LastMs(8) || LastSeq(8), all big-endian. +func encodeStreamMetaValue(length int64, lastMs, lastSeq uint64) []byte { + v := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(v[0:8], uint64(length)) //nolint:gosec + binary.BigEndian.PutUint64(v[8:16], lastMs) + binary.BigEndian.PutUint64(v[16:24], lastSeq) + return v +} + +// encodeStreamEntryValue produces the magic-prefixed protobuf wire +// format the live store writes for !stream|entry| values. +func encodeStreamEntryValue(t *testing.T, id string, fields []string) []byte { + t.Helper() + body, err := gproto.Marshal(&pb.RedisStreamEntry{Id: id, Fields: fields}) + if err != nil { + t.Fatalf("marshal pb.RedisStreamEntry: %v", err) + } + out := make([]byte, 0, redisStreamProtoPrefixLen+len(body)) + out = append(out, redisStreamProtoPrefix...) + out = append(out, body...) + return out +} + +// readStreamJSONL parses the JSONL output (one record per line) and +// splits it into entries (no `_meta` key) plus the trailing meta +// terminator. The terminator is always the last line per spec. +func readStreamJSONL(t *testing.T, path string) (entries []map[string]any, meta map[string]any) { + t.Helper() + f, err := os.Open(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("open %s: %v", path, err) + } + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Bytes() + if len(bytes.TrimSpace(line)) == 0 { + continue + } + var rec map[string]any + if err := json.Unmarshal(line, &rec); err != nil { + t.Fatalf("unmarshal %q: %v", line, err) + } + if _, ok := rec["_meta"]; ok { + meta = rec + continue + } + entries = append(entries, rec) + } + if err := scanner.Err(); err != nil { + t.Fatalf("scan: %v", err) + } + if meta == nil { + t.Fatalf("no _meta terminator in %s", path) + } + return entries, meta +} + +func streamFloat(t *testing.T, m map[string]any, key string) float64 { + t.Helper() + v, ok := m[key] + if !ok { + t.Fatalf("field %q missing in %+v", key, m) + } + f, ok := v.(float64) + if !ok { + t.Fatalf("field %q = %T(%v), want float64", key, v, v) + } + return f +} + +// assertStreamMetaTerminator pins the trailing `_meta` line shape. +// Extracted from the round-trip tests to keep the per-test bodies +// below the cyclop ceiling. +func assertStreamMetaTerminator(t *testing.T, meta map[string]any, wantLen int64, wantLastMs, wantLastSeq uint64) { + t.Helper() + if streamFloat(t, meta, "length") != float64(wantLen) { + t.Fatalf("meta.length = %v want %d", meta["length"], wantLen) + } + if streamFloat(t, meta, "last_ms") != float64(wantLastMs) { + t.Fatalf("meta.last_ms = %v want %d", meta["last_ms"], wantLastMs) + } + if streamFloat(t, meta, "last_seq") != float64(wantLastSeq) { + t.Fatalf("meta.last_seq = %v want %d", meta["last_seq"], wantLastSeq) + } + if meta["expire_at_ms"] != nil { + t.Fatalf("meta.expire_at_ms must be nil without TTL, got %v", meta["expire_at_ms"]) + } +} + +// TestRedisDB_StreamRoundTripBasic confirms a multi-entry stream +// round-trips through the encoder in (ms, seq) order, with the +// fields decoded from the protobuf entry value and the trailing +// _meta line preserving length / last_ms / last_seq. +func TestRedisDB_StreamRoundTripBasic(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("events"), encodeStreamMetaValue(3, 1714400000002, 0)); err != nil { + t.Fatalf("HandleStreamMeta: %v", err) + } + // Submit out of (ms, seq) order — encoder must sort. + for _, e := range []struct { + ms, seq uint64 + id string + fields []string + }{ + {1714400000002, 0, "1714400000002-0", []string{"event", "logout", "user", "alice"}}, + {1714400000000, 0, "1714400000000-0", []string{"event", "login", "user", "alice"}}, + {1714400000001, 0, "1714400000001-0", []string{"event", "click", "user", "alice"}}, + } { + key := streamEntryKey("events", e.ms, e.seq) + val := encodeStreamEntryValue(t, e.id, e.fields) + if err := db.HandleStreamEntry(key, val); err != nil { + t.Fatalf("HandleStreamEntry(%s): %v", e.id, err) + } + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + entries, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "events.jsonl")) + if len(entries) != 3 { + t.Fatalf("entries = %d, want 3 (got %v)", len(entries), entries) + } + for i, w := range []string{"1714400000000-0", "1714400000001-0", "1714400000002-0"} { + if entries[i]["id"] != w { + t.Fatalf("entries[%d].id = %v, want %v", i, entries[i]["id"], w) + } + } + assertStreamMetaTerminator(t, meta, 3, 1714400000002, 0) +} + +// streamFieldsPair is the decoded counterpart of streamFieldJSON used +// in assertions. +type streamFieldsPair struct{ name, value string } + +// extractStreamFieldsAsPairs pulls the `fields` array out of a +// decoded JSONL entry and returns it as a slice of (name, value) +// pairs. Centralises the type assertions so the per-test bodies +// stay below the cyclop ceiling and forcetypeassert lints don't +// fire at every call site. +// +// Each name/value can be EITHER a plain JSON string (UTF-8 content) +// or a `{"base64":"..."}` envelope object (non-UTF-8 binary bytes). +// The fields are emitted via marshalRedisBinaryValue so binary +// stream payloads round-trip byte-identical; this helper hides the +// per-pair envelope detection from the per-test assertions. +func extractStreamFieldsAsPairs(t *testing.T, entry map[string]any) []streamFieldsPair { + t.Helper() + raw, ok := entry["fields"].([]any) + if !ok { + t.Fatalf("entry.fields = %T(%v), want array", entry["fields"], entry["fields"]) + } + out := make([]streamFieldsPair, 0, len(raw)) + for i, r := range raw { + rec, ok := r.(map[string]any) + if !ok { + t.Fatalf("entry.fields[%d] = %T(%v), want object", i, r, r) + } + out = append(out, streamFieldsPair{ + name: decodeRedisBinaryEnvelope(t, "name", rec["name"]), + value: decodeRedisBinaryEnvelope(t, "value", rec["value"]), + }) + } + return out +} + +// decodeRedisBinaryEnvelope reverses marshalRedisBinaryValue for +// tests: a plain JSON string round-trips as a string; a +// `{"base64":"..."}` envelope decodes via base64url back to the +// original byte string. Returns the recovered bytes as a Go +// string so the test assertions can compare against the input +// regardless of which projection the encoder chose. +func decodeRedisBinaryEnvelope(t *testing.T, label string, raw any) string { + t.Helper() + if s, ok := raw.(string); ok { + return s + } + env, ok := raw.(map[string]any) + if !ok { + t.Fatalf("%s = %T(%v), want string or base64 envelope", label, raw, raw) + } + encoded, ok := env["base64"].(string) + if !ok { + t.Fatalf("%s base64 envelope missing payload: %v", label, env) + } + decoded, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + t.Fatalf("%s base64 decode: %v (payload %q)", label, err, encoded) + } + return string(decoded) +} + +// assertStreamFieldsEqual checks that the decoded fields array +// matches the expected ordered list of (name, value) pairs. +func assertStreamFieldsEqual(t *testing.T, got []streamFieldsPair, want []streamFieldsPair) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("len(fields) = %d, want %d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("fields[%d] = %v, want %v", i, got[i], want[i]) + } + } +} + +// TestRedisDB_StreamFieldsDecodedToArray confirms that interleaved +// `[name1, value1, name2, value2, ...]` arrays from the protobuf +// land as a JSONL `[{"name":...,"value":...}]` array. Switched +// from the design's original map shape (line 338) in response to +// codex's P1 on PR #791: XADD allows duplicate field names within +// one entry (e.g. `XADD s * f v1 f v2`) and the map representation +// silently collapsed them. +func TestRedisDB_StreamFieldsDecodedToArray(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 100, 5)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-5", []string{"event", "login", "user", "alice", "ip", "10.0.0.1"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 100, 5), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if len(entries) != 1 { + t.Fatalf("entries = %d, want 1", len(entries)) + } + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{ + {"event", "login"}, + {"user", "alice"}, + {"ip", "10.0.0.1"}, + }) +} + +// TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip pins the +// claude-bot Critical fix on PR #791: stream field names and +// values are binary-safe in Redis. Previously +// streamFieldJSON.Name/Value were plain Go `string` and went +// through json.Marshal, which silently substitutes U+FFFD for +// every ill-formed UTF-8 byte sequence — a stream entry carrying +// raw binary would be silently corrupted in the dump. The fix +// routes both fields through marshalRedisBinaryValue so non-UTF-8 +// bytes emit as `{"base64":"..."}` and round-trip byte-identical. +// +// The protobuf wire format itself enforces UTF-8 on `string` fields +// (proto3 `string` is by spec UTF-8, and `gproto.Marshal` rejects +// invalid bytes), so the path "live store → snapshot → decoder" +// cannot actually carry non-UTF-8 stream fields today; it's a +// defensive invariant in case a future schema migration switches +// `Fields` to `bytes`, or a code path bypasses the proto marshaler. +// We pin the projection's behavior directly on +// buildStreamFieldRecords + extractStreamFieldsAsPairs rather than +// trying to push bytes through a gproto.Marshal step that would +// reject them. +func TestBuildStreamFieldRecords_NonUTF8BytesRoundTrip(t *testing.T) { + t.Parallel() + const xaddPairWidth = 2 + binaryName := "\xff\xfe\x80" + binaryValue := "\x00\x01\xc3\x28\x02" + records, err := buildStreamFieldRecords([]string{binaryName, binaryValue, "utf8-key", "utf8-val"}, xaddPairWidth) + if err != nil { + t.Fatalf("buildStreamFieldRecords: %v", err) + } + if len(records) != xaddPairWidth { // we passed 2 pairs + t.Fatalf("len(records) = %d, want %d", len(records), xaddPairWidth) + } + // Marshal one entry to JSONL bytes, parse it back, and assert + // that the recovered pairs match the input byte-identical. + body, err := json.Marshal(streamEntryJSON{ID: "1-0", Fields: records}) + if err != nil { + t.Fatalf("json.Marshal streamEntryJSON: %v", err) + } + var parsed map[string]any + if err := json.Unmarshal(body, &parsed); err != nil { + t.Fatalf("json.Unmarshal: %v", err) + } + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, parsed), []streamFieldsPair{ + {binaryName, binaryValue}, + {"utf8-key", "utf8-val"}, + }) +} + +// TestRedisDB_StreamDuplicateFieldsPreserved pins the codex P1 fix: +// XADD permits duplicate field names within one entry (e.g. +// `XADD s * f v1 f v2` stores both pairs verbatim in the +// protobuf's Fields slice). The original map-based projection +// silently collapsed duplicates; the array shape preserves them +// and a restore can replay the original XADD argv exactly. +func TestRedisDB_StreamDuplicateFieldsPreserved(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 1, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1-0", []string{"f", "v1", "f", "v2", "g", "v3"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + assertStreamFieldsEqual(t, extractStreamFieldsAsPairs(t, entries[0]), []streamFieldsPair{ + {"f", "v1"}, + {"f", "v2"}, + {"g", "v3"}, + }) +} + +// TestRedisDB_StreamTTLArrivesBeforeRows pins the codex P1 fix: +// `!redis|ttl|` lex-sorts BEFORE `!stream|...` because `r` < +// `s`, so in real Pebble snapshot order the TTL arrives FIRST. +// The encoder MUST buffer the expiry in pendingTTL and drain it +// when streamState first registers the user key, inlining the +// value into the JSONL `_meta.expire_at_ms`. Without this drain +// every TTL'd stream would restore as permanent. +func TestRedisDB_StreamTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + // Snapshot order: TTL first, then meta + entry. + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleStreamMeta(streamMetaKey("k"), encodeStreamMetaValue(1, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"event", "login"}) + if err := db.HandleStreamEntry(streamEntryKey("k", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + _, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "k.jsonl")) + if streamFloat(t, meta, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("meta.expire_at_ms = %v want %d — pendingTTL drain failed", meta["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_SetTTLArrivesBeforeRows pins the same ordering fix +// for sets (`!redis|ttl|` lex-sorts before `!st|...` because +// `r` < `s`). Retroactive coverage for PR #758, which shipped the +// set encoder before the pendingTTL infrastructure existed. +func TestRedisDB_SetTTLArrivesBeforeRows(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMeta(setMetaKey("k"), encodeSetMetaValue(1)); err != nil { + t.Fatal(err) + } + if err := db.HandleSetMember(setMemberKey("k", []byte("m")), nil); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + got := readSetJSON(t, filepath.Join(root, "redis", "db_0", "sets", "k.json")) + if setFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("expire_at_ms = %v want %d — pendingTTL drain failed", got["expire_at_ms"], fixedExpireMs) + } +} + +// TestRedisDB_StreamEmptyStreamStillEmitsFile mirrors the other +// wide-column encoders: an empty stream (Length=0) must still emit +// a file because TYPE returns "stream" and XLEN returns 0 to clients. +func TestRedisDB_StreamEmptyStreamStillEmitsFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("empty"), encodeStreamMetaValue(0, 0, 0)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "empty.jsonl")) + if len(entries) != 0 { + t.Fatalf("empty stream should emit no entry lines, got %v", entries) + } + if streamFloat(t, meta, "length") != 0 { + t.Fatalf("meta.length = %v want 0", meta["length"]) + } +} + +// TestRedisDB_StreamTTLInlinedFromScanIndex pins that +// !redis|ttl| records for a stream user key fold into the +// _meta terminator's expire_at_ms field — design line 341-344. +// Without this routing, restoring a TTL'd stream would silently +// drop the TTL. +func TestRedisDB_StreamTTLInlinedFromScanIndex(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("k"), encodeStreamMetaValue(1, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"event", "login"}) + if err := db.HandleStreamEntry(streamEntryKey("k", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + _, meta := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "k.jsonl")) + if streamFloat(t, meta, "expire_at_ms") != float64(fixedExpireMs) { + t.Fatalf("meta.expire_at_ms = %v want %d", meta["expire_at_ms"], fixedExpireMs) + } + if _, err := os.Stat(filepath.Join(root, "redis", "db_0", "streams_ttl.jsonl")); !os.IsNotExist(err) { + t.Fatalf("unexpected stream TTL sidecar: stat err=%v", err) + } +} + +// TestRedisDB_StreamLengthMismatchWarns pins the warn-on-mismatch +// contract — same shape as the hash/list/set/zset encoders. +func TestRedisDB_StreamLengthMismatchWarns(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(5, 100, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "100-0", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 100, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + want := "redis_stream_length_mismatch" + found := false + for _, e := range events { + if e == want { + found = true + break + } + } + if !found { + t.Fatalf("expected %q warning, got %v", want, events) + } +} + +// TestRedisDB_StreamRejectsMalformedMetaValueLength pins that a +// !stream|meta| value of the wrong length surfaces as an error. +// The 24-byte fixed shape is the wire-format contract — any other +// length means the on-disk record is corrupt. +func TestRedisDB_StreamRejectsMalformedMetaValueLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + err := db.HandleStreamMeta(streamMetaKey("k"), []byte{0x00, 0x01, 0x02}) + if !errors.Is(err, ErrRedisInvalidStreamMeta) { + t.Fatalf("err=%v want ErrRedisInvalidStreamMeta", err) + } +} + +// TestRedisDB_StreamRejectsOverflowingMetaLength pins the high-bit +// overflow guard — same shape as hash/list/set/zset encoders. +func TestRedisDB_StreamRejectsOverflowingMetaLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + overflow := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(overflow[0:8], 1<<63) + err := db.HandleStreamMeta(streamMetaKey("k"), overflow) + if !errors.Is(err, ErrRedisInvalidStreamMeta) { + t.Fatalf("err=%v want ErrRedisInvalidStreamMeta", err) + } +} + +// TestRedisDB_StreamMaxInt64MetaLength pins the math.MaxInt64 +// boundary — declaredLen=math.MaxInt64 must be accepted, only > that +// rejected. +func TestRedisDB_StreamMaxInt64MetaLength(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + boundary := make([]byte, redisStreamMetaSize) + binary.BigEndian.PutUint64(boundary[0:8], math.MaxInt64) + if err := db.HandleStreamMeta(streamMetaKey("k"), boundary); err != nil { + t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err) + } +} + +// TestRedisDB_StreamRejectsEntryWithMissingMagic pins that a value +// missing the `0x00 'R' 'X' 'E' 0x01` magic prefix fails closed. +// The live store always writes this prefix; its absence indicates +// the value came from a stale legacy format or from a corrupted +// store, and decoding raw protobuf bytes without the prefix would +// either silently misparse or panic deep inside the proto library. +func TestRedisDB_StreamRejectsEntryWithMissingMagic(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + body, err := gproto.Marshal(&pb.RedisStreamEntry{Id: "1-0", Fields: []string{"k", "v"}}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + // Pass the raw body without the magic prefix. + err = db.HandleStreamEntry(streamEntryKey("k", 1, 0), body) + if !errors.Is(err, ErrRedisInvalidStreamEntry) { + t.Fatalf("err=%v want ErrRedisInvalidStreamEntry", err) + } +} + +// TestRedisDB_StreamRejectsEntryWithOddFieldCount pins the +// even-arity invariant. Live XADD enforces name/value pairs at the +// wire level (XADD rejects odd argument counts), so an odd count +// at backup time indicates corruption. Silently emitting the +// dangling field as `{"": null}` would re-corrupt the +// restored cluster. +func TestRedisDB_StreamRejectsEntryWithOddFieldCount(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + val := encodeStreamEntryValue(t, "1-0", []string{"event"}) // 1 element, missing value + err := db.HandleStreamEntry(streamEntryKey("k", 1, 0), val) + if !errors.Is(err, ErrRedisInvalidStreamEntry) { + t.Fatalf("err=%v want ErrRedisInvalidStreamEntry", err) + } +} + +// TestRedisDB_StreamRejectsMalformedEntryKey pins that an entry key +// without the trailing 16-byte StreamID fails parse cleanly. +func TestRedisDB_StreamRejectsMalformedEntryKey(t *testing.T) { + t.Parallel() + db, _ := newRedisDB(t) + // Construct a key with the entry prefix + userKeyLen + userKey + // but only a 4-byte trailing suffix (should be 16 for StreamID). + out := []byte(RedisStreamEntryPrefix) + var l [4]byte + binary.BigEndian.PutUint32(l[:], 1) + out = append(out, l[:]...) + out = append(out, 'k') + out = append(out, 0x00, 0x00, 0x00, 0x01) + val := encodeStreamEntryValue(t, "1-0", []string{"a", "b"}) + err := db.HandleStreamEntry(out, val) + if !errors.Is(err, ErrRedisInvalidStreamKey) { + t.Fatalf("err=%v want ErrRedisInvalidStreamKey", err) + } +} + +// TestRedisDB_StreamEntriesWithoutMetaStillEmitFile pins the +// entries-without-meta contract: stream entries may arrive before +// (or without) meta, and the encoder must still emit the JSONL +// without firing the length-mismatch warning. Mirrors the +// items-without-meta rule from list (#755 round 2) and set (#758). +func TestRedisDB_StreamEntriesWithoutMetaStillEmitFile(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + var events []string + db.WithWarnSink(func(event string, _ ...any) { events = append(events, event) }) + val := encodeStreamEntryValue(t, "1-0", []string{"a", "b"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1, 0), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if len(entries) != 1 { + t.Fatalf("entries = %v, want 1", entries) + } + for _, e := range events { + if e == "redis_stream_length_mismatch" { + t.Fatalf("entries-without-meta must not fire length-mismatch warning, got events %v", events) + } + } +} + +// TestRedisDB_StreamIDFormatMatchesRedisWire pins the wire format +// of the JSON `id` field: `-` decimal, matching what +// XADD/XRANGE expose to clients. A future encoder bug that emitted +// hex or base10-with-leading-zeros would silently corrupt the +// stream-restore replay path. +func TestRedisDB_StreamIDFormatMatchesRedisWire(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + if err := db.HandleStreamMeta(streamMetaKey("s"), encodeStreamMetaValue(1, 1714400000000, 7)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1714400000000-7", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey("s", 1714400000000, 7), val); err != nil { + t.Fatal(err) + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + entries, _ := readStreamJSONL(t, filepath.Join(root, "redis", "db_0", "streams", "s.jsonl")) + if entries[0]["id"] != "1714400000000-7" { + t.Fatalf("id = %v want %q", entries[0]["id"], "1714400000000-7") + } +} + +// TestRedisDB_StreamMultipleStreamsSortedByUserKey pins that +// flushStreams iterates user keys in sorted byte order so two +// runs producing the same logical content produce identical +// directory output. +func TestRedisDB_StreamMultipleStreamsSortedByUserKey(t *testing.T) { + t.Parallel() + db, root := newRedisDB(t) + for _, uk := range []string{"zeta", "alpha", "mango"} { + if err := db.HandleStreamMeta(streamMetaKey(uk), encodeStreamMetaValue(1, 1, 0)); err != nil { + t.Fatal(err) + } + val := encodeStreamEntryValue(t, "1-0", []string{"k", "v"}) + if err := db.HandleStreamEntry(streamEntryKey(uk, 1, 0), val); err != nil { + t.Fatal(err) + } + } + if err := db.Finalize(); err != nil { + t.Fatal(err) + } + for _, uk := range []string{"alpha", "mango", "zeta"} { + path := filepath.Join(root, "redis", "db_0", "streams", uk+".jsonl") + if _, err := os.Stat(path); err != nil { + t.Fatalf("stat %s: %v", path, err) + } + } +} diff --git a/internal/backup/redis_string.go b/internal/backup/redis_string.go index 688225d1..f9adeb69 100644 --- a/internal/backup/redis_string.go +++ b/internal/backup/redis_string.go @@ -85,6 +85,7 @@ const ( redisKindHash redisKindList redisKindSet + redisKindStream ) // RedisDB encodes one logical Redis database (`redis/db_/`). All @@ -185,6 +186,33 @@ type RedisDB struct { // Finalize into sets/.json with members sorted by raw byte // order for deterministic dump output. sets map[string]*redisSetState + + // streams buffers per-userKey stream state (meta + entry list). + // Flushed at Finalize into streams/.jsonl as one record per + // entry plus a trailing `_meta` line. Streams are bounded by + // maxWideColumnItems on the live side, so a single in-memory + // slice per stream is tractable. + streams map[string]*redisStreamState + + // pendingTTL buffers expiries whose user-key prefix sorts AFTER + // `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble + // snapshots emit records in encoded-key order + // (`store/snapshot_pebble.go::iter.First()/Next()`), and + // `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|` + // prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL + // would see kindByKey == redisKindUnknown and count the TTL + // as an orphan, dropping it before setState / streamState / + // zsetState had a chance to claim the user key — TTL'd + // sets, streams, and sorted sets would silently restore as + // permanent. + // + // Lifecycle: HandleTTL files the expiry here when kind is + // still unknown. Each wide-column state-init function + // (setState / streamState / zsetState) drains the entry when + // it first registers the user key. Finalize fires the + // orphan-TTL warning for whatever remains (those keys never + // appeared as a typed record — likely a corrupted store). + pendingTTL map[string]uint64 } // NewRedisDB constructs a RedisDB rooted at /redis/db_/. @@ -204,6 +232,8 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB { hashes: make(map[string]*redisHashState), lists: make(map[string]*redisListState), sets: make(map[string]*redisSetState), + streams: make(map[string]*redisStreamState), + pendingTTL: make(map[string]uint64), } } @@ -247,14 +277,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error { } // HandleTTL processes one !redis|ttl| record. Routing depends on -// what HandleString/HandleHLL recorded for the same userKey: +// what HandleString/HandleHLL recorded for the same userKey. +// +// There are two ordering regimes the snapshot stream presents: +// +// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order +// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record +// arrives FIRST, kindByKey is already set when HandleTTL fires, +// and we route directly to the per-type sidecar / inline field. +// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because +// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is +// still redisKindUnknown. We park the expiry in pendingTTL and +// let each wide-column state-init function (setState / +// streamState / zsetState) drain it when the user key finally +// surfaces as a typed record. Codex P1 finding on PR #791. // -// - redisKindHLL -> hll_ttl.jsonl -// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL -// lives in !redis|ttl| rather than the inline magic-prefix header) -// - redisKindUnknown -> counted in orphanTTLCount; reported via the -// warn sink on Finalize because Phase 0a's wide-column encoders -// have not landed yet. +// Routing: +// +// - redisKindHLL -> hll_ttl.jsonl (case 1) +// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings +// whose TTL lives in !redis|ttl| rather than the inline header) +// - redisKindHash/List/Set/Stream/ZSet -> inlined into the +// per-key JSON (case 1 for hash/list, case 2 for set/stream/zset +// where the state-init already drained from pendingTTL before +// HandleTTL would even be called the second time) +// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly +// unmatched entries (key never registered as a typed record). func (r *RedisDB) HandleTTL(userKey, value []byte) error { expireAtMs, err := decodeRedisTTLValue(value) if err != nil { @@ -297,17 +345,57 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error { st.expireAtMs = expireAtMs st.hasTTL = true return nil + case redisKindStream: + // Same per-record TTL inlining: XADD + EXPIRE replay in + // one shot from the per-stream JSONL `_meta` terminator, + // no separate sidecar. + st := r.streamState(userKey) + st.expireAtMs = expireAtMs + st.hasTTL = true + return nil case redisKindUnknown: - // Track orphan TTL counts only — keys are unused before the - // remaining wide-column encoders (set/zset/stream) land, and - // buffering them allocates proportional to user-key size - // (up to 1 MiB per key) for no benefit. Codex P2 round 6. - r.orphanTTLCount++ + // Park the expiry until a wide-column Handle*Meta / + // Handle*Member / Handle*Entry registers the user key. + // Without this buffering, sorted-set / set / stream TTLs + // would be counted as orphans and dropped because their + // type prefixes lex-sort AFTER `!redis|ttl|` in the + // snapshot's encoded-key order. Codex P1 (PR #791). + // + // We store userKey as a string copy (`string([]byte)` + // allocates) rather than the alias slice — the snapshot + // reader reuses key buffers across iterations, so a slice + // alias would race with the next record. + r.pendingTTL[string(userKey)] = expireAtMs return nil } return nil } +// claimPendingTTL drains any buffered TTL for userKey into the +// caller-provided state. Called by the wide-column state-init +// functions (setState / streamState / zsetState) when they first +// register a user key, so the parked expiry inlines into the same +// per-key JSON the rest of the record assembles. +// +// Returns (expireAtMs, true) when a buffered TTL existed. The +// caller should set state.expireAtMs / state.hasTTL on the +// returned value. The pending entry is removed so Finalize's +// orphan-count loop only sees truly-unmatched TTLs. +// +// Safe to call from hashState/listState too even though those +// types' typed records sort before `!redis|ttl|`; pendingTTL will +// always be empty for them. Keeping the call site uniform keeps +// the state-init contract simple. +func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) { + uk := string(userKey) + expireAtMs, ok := r.pendingTTL[uk] + if !ok { + return 0, false + } + delete(r.pendingTTL, uk) + return expireAtMs, true +} + // Finalize flushes all open sidecar writers and emits warnings for any // pending TTL records whose user key was never claimed by the wide-column // encoders. Call exactly once after every snapshot record has been @@ -318,6 +406,7 @@ func (r *RedisDB) Finalize() error { r.flushHashes, r.flushLists, r.flushSets, + r.flushStreams, func() error { return closeJSONL(r.stringsTTL) }, func() error { return closeJSONL(r.hllTTL) }, r.closeKeymap, @@ -326,10 +415,19 @@ func (r *RedisDB) Finalize() error { firstErr = err } } + // At this point all type-prefixed records have been processed + // and every wide-column state-init drained its claimPendingTTL. + // Whatever remains in pendingTTL is truly unmatched — the + // user key never appeared as a typed record. Likely causes: + // store corruption, a snapshot mid-write where the typed + // record was dropped, or a `!redis|ttl|` entry written for a + // key whose type prefix we don't recognise (a future Redis + // type added on the live side without a backup-encoder update). + r.orphanTTLCount += len(r.pendingTTL) if r.warn != nil && r.orphanTTLCount > 0 { r.warn("redis_orphan_ttl", "count", r.orphanTTLCount, - "hint", "remaining wide-column encoders (zset/stream) have not landed yet") + "hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix") } return firstErr } diff --git a/internal/backup/redis_string_test.go b/internal/backup/redis_string_test.go index 152a55d4..4654a9d5 100644 --- a/internal/backup/redis_string_test.go +++ b/internal/backup/redis_string_test.go @@ -579,11 +579,21 @@ func TestRedisDB_NoKeymapWhenAllReversible(t *testing.T) { func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Parallel() - // Codex P2 round 6: orphan TTL records (those with no prior - // HandleString/HandleHLL claim) must be counted only — the - // per-key payload would allocate proportional to user-key size - // and is unused before the wide-column encoders land. Drive a - // sample of orphan records and assert the count, not a buffer. + // Codex P1 (PR #791): orphan TTL records are now BUFFERED in + // pendingTTL during intake — the wide-column state-init + // functions need to drain them when a typed record finally + // registers a user key. The buffer holds (string-userKey, + // uint64-expireAt) pairs; the per-record allocation cost is + // the same as kindByKey's, which we already pay for every + // typed record. The original Codex P2 round 6 concern (don't + // buffer arbitrarily-large payload bytes) is preserved — we + // still don't keep the value bytes. + // + // At Finalize, entries still in pendingTTL are counted as + // truly unmatched orphans. This test asserts: + // - During intake: orphanTTLCount stays 0, pendingTTL grows. + // - After Finalize: orphanTTLCount == n (no typed record + // ever drained the entries). db, _ := newRedisDB(t) const n = 10_000 for i := 0; i < n; i++ { @@ -593,8 +603,17 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) { t.Fatalf("HandleTTL[%d]: %v", i, err) } } + if db.orphanTTLCount != 0 { + t.Fatalf("orphanTTLCount = %d at intake, want 0 (buffered)", db.orphanTTLCount) + } + if len(db.pendingTTL) != n { + t.Fatalf("pendingTTL len = %d, want %d", len(db.pendingTTL), n) + } + if err := db.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } if db.orphanTTLCount != n { - t.Fatalf("orphanTTLCount = %d, want %d", db.orphanTTLCount, n) + t.Fatalf("orphanTTLCount = %d after Finalize, want %d", db.orphanTTLCount, n) } }