Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/backup/redis_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,11 @@ type hashFieldRecord struct {
Value json.RawMessage `json:"value"`
}

func marshalHashJSON(st *redisHashState) ([]byte, error) {
// nolint comment lives at the function head: dupl pairs this with
// marshalZSetJSON, which carries the rationale (parallel design-spec
// wrappers that can't collapse into a shared helper without breaking
// JSON field-order determinism). See redis_zset.go:marshalZSetJSON.
func marshalHashJSON(st *redisHashState) ([]byte, error) { //nolint:dupl // see comment above + redis_zset.go
// Sort by raw byte order for deterministic output across runs.
names := make([]string, 0, len(st.fields))
for name := range st.fields {
Expand Down
12 changes: 12 additions & 0 deletions internal/backup/redis_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,24 @@ func (r *RedisDB) HandleSetMetaDelta(_, _ []byte) error { return nil }
// setState lazily creates per-key state. Mirrors the hash/list
// kindByKey-registration pattern so HandleSetMeta, HandleSetMember,
// and the HandleTTL back-edge all agree on the kind.
//
// On first registration we drain any pendingTTL for the user key.
// `!redis|ttl|<k>` lex-sorts BEFORE `!st|...` (because `r` < `s`),
// so in real snapshot order the TTL arrives FIRST; HandleTTL parks
// it in pendingTTL, and this function inlines it into the set's
// `expire_at_ms`. Without this drain step, every TTL'd set would
// restore as permanent — a latent bug in PR #758 surfaced by codex
// on PR #790. Phase 0a tests added in the same PR pin the ordering.
func (r *RedisDB) setState(userKey []byte) *redisSetState {
uk := string(userKey)
if st, ok := r.sets[uk]; ok {
return st
}
st := &redisSetState{members: make(map[string]struct{})}
if expireAtMs, ok := r.claimPendingTTL(userKey); ok {
st.expireAtMs = expireAtMs
st.hasTTL = true
}
r.sets[uk] = st
r.kindByKey[uk] = redisKindSet
return st
Expand Down
125 changes: 111 additions & 14 deletions internal/backup/redis_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
redisKindHash
redisKindList
redisKindSet
redisKindZSet
)

// RedisDB encodes one logical Redis database (`redis/db_<n>/`). All
Expand Down Expand Up @@ -185,6 +186,34 @@ type RedisDB struct {
// Finalize into sets/<key>.json with members sorted by raw byte
// order for deterministic dump output.
sets map[string]*redisSetState

// zsets buffers per-userKey sorted-set state. Score lives in the
// !zs|mem| value (8-byte IEEE 754 big-endian); member name is the
// trailing key bytes (binary-safe). Flushed at Finalize into
// zsets/<key>.json sorted by member-name bytes (not by score) so
// `diff -r` between dumps stays line-stable across score-only
// mutations.
zsets map[string]*redisZSetState

// pendingTTL buffers expiries whose user-key prefix sorts AFTER
// `!redis|ttl|` in the snapshot's lex-ordered stream. Pebble
// snapshots emit records in encoded-key order
// (`store/snapshot_pebble.go::iter.First()/Next()`), and
// `!redis|ttl|` lex-sorts before all `!st|`/`!stream|`/`!zs|`
// prefixes (`r` < `s`/`s`/`z`). Without buffering, HandleTTL
// would see kindByKey == redisKindUnknown and count the TTL
// as an orphan, dropping it before zsetState / setState /
// streamState had a chance to claim the user key — TTL'd
// sorted sets, sets, and streams would silently restore as
// permanent.
//
// Lifecycle: HandleTTL files the expiry here when kind is
// still unknown. Each wide-column state-init function
// (setState / zsetState / streamState etc.) drains the entry
// when it first registers the user key. Finalize fires the
// orphan-TTL warning for whatever remains (those keys never
// appeared as a typed record — likely a corrupted store).
pendingTTL map[string]uint64
}

// NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/.
Expand All @@ -204,6 +233,8 @@ func NewRedisDB(outRoot string, dbIndex int) *RedisDB {
hashes: make(map[string]*redisHashState),
lists: make(map[string]*redisListState),
sets: make(map[string]*redisSetState),
zsets: make(map[string]*redisZSetState),
pendingTTL: make(map[string]uint64),
}
}

Expand Down Expand Up @@ -246,15 +277,32 @@ func (r *RedisDB) HandleHLL(userKey, value []byte) error {
return r.writeBlob("hll", userKey, value)
}

// HandleTTL processes one !redis|ttl|<userKey> record. Routing depends on
// what HandleString/HandleHLL recorded for the same userKey:
// HandleTTL processes one !redis|ttl|<userKey> record. Routing
// depends on what the encoder has previously recorded for the user
// key. There are two ordering regimes the snapshot stream presents:
//
// 1. Prefix sorts BEFORE !redis|ttl| in encoded-key order
// (!hs|, !lst|, !redis|str|, !redis|hll|). The typed record
// arrives FIRST, kindByKey is already set when HandleTTL fires,
// and we route directly to the per-type sidecar / inline field.
// 2. Prefix sorts AFTER !redis|ttl| (!st|, !stream|, !zs|, because
// `r` < `s`/`s`/`z`). The TTL arrives FIRST and kindByKey is
// still redisKindUnknown. We park the expiry in pendingTTL and
// let each wide-column state-init function (setState /
// zsetState / streamState) drain it when the user key finally
// surfaces as a typed record. Codex P1 finding on PR #790.
//
// Routing:
//
// - redisKindHLL -> hll_ttl.jsonl
// - redisKindString -> strings_ttl.jsonl (legacy strings, whose TTL
// lives in !redis|ttl| rather than the inline magic-prefix header)
// - redisKindUnknown -> counted in orphanTTLCount; reported via the
// warn sink on Finalize because Phase 0a's wide-column encoders
// have not landed yet.
// - redisKindHLL -> hll_ttl.jsonl (case 1)
// - redisKindString -> strings_ttl.jsonl (case 1; legacy strings
// whose TTL lives in !redis|ttl| rather than the inline header)
// - redisKindHash/List/Set/ZSet/Stream -> inlined into the
// per-key JSON (case 1 for hash/list, case 2 for set/zset/stream
// where the state-init already drained from pendingTTL before
// HandleTTL would even be called the second time)
// - redisKindUnknown -> bufferPendingTTL. Finalize counts truly
// unmatched entries (key never registered as a typed record).
func (r *RedisDB) HandleTTL(userKey, value []byte) error {
expireAtMs, err := decodeRedisTTLValue(value)
if err != nil {
Expand Down Expand Up @@ -297,17 +345,56 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error {
st.expireAtMs = expireAtMs
st.hasTTL = true
return nil
case redisKindZSet:
// Same per-record TTL inlining: ZADD + EXPIRE replay in
// one shot from the per-zset JSON, no separate sidecar.
st := r.zsetState(userKey)
st.expireAtMs = expireAtMs
st.hasTTL = true
Comment on lines +348 to +353
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Buffer zset TTLs that arrive before zset rows

For expiring sorted sets in real snapshot order, this redisKindZSet branch is not reached: Pebble snapshots are emitted by iterator order (store/snapshot_pebble.go uses iter.First(); iter.Next()), and the TTL key prefix !redis|ttl| sorts before all zset prefixes !zs|.... That means HandleTTL sees redisKindUnknown, increments orphanTTLCount, and discards the expiry before zsetState is created, so every restored zset with a TTL becomes persistent. The zset TTL needs to be buffered/rerouted when the zset row is observed rather than relying on prior type observation.

Useful? React with 👍 / 👎.

return nil
case redisKindUnknown:
// Track orphan TTL counts only — keys are unused before the
// remaining wide-column encoders (set/zset/stream) land, and
// buffering them allocates proportional to user-key size
// (up to 1 MiB per key) for no benefit. Codex P2 round 6.
r.orphanTTLCount++
// Park the expiry until a wide-column Handle*Meta /
// Handle*Member / Handle*Entry registers the user key.
// Without this buffering, sorted-set / set / stream TTLs
// would be counted as orphans and dropped because their
// type prefixes lex-sort AFTER `!redis|ttl|` in the
// snapshot's encoded-key order. Codex P1 (PR #790).
//
// We store userKey as a string copy (`string([]byte)`
// allocates) rather than the alias slice — the snapshot
// reader reuses key buffers across iterations, so a slice
// alias would race with the next record.
r.pendingTTL[string(userKey)] = expireAtMs
return nil
}
return nil
}

// claimPendingTTL drains any buffered TTL for userKey into the
// caller-provided state. Called by the wide-column state-init
// functions (setState / zsetState / streamState) when they first
// register a user key, so the parked expiry inlines into the same
// per-key JSON the rest of the record assembles.
//
// Returns (expireAtMs, true) when a buffered TTL existed. The
// caller should set state.expireAtMs / state.hasTTL on the
// returned value. The pending entry is removed so Finalize's
// orphan-count loop only sees truly-unmatched TTLs.
//
// Safe to call from hashState/listState too even though those
// types' typed records sort before `!redis|ttl|`; pendingTTL will
// always be empty for them. Keeping the call site uniform keeps
// the state-init contract simple.
func (r *RedisDB) claimPendingTTL(userKey []byte) (uint64, bool) {
uk := string(userKey)
expireAtMs, ok := r.pendingTTL[uk]
if !ok {
return 0, false
}
delete(r.pendingTTL, uk)
return expireAtMs, true
}

// Finalize flushes all open sidecar writers and emits warnings for any
// pending TTL records whose user key was never claimed by the wide-column
// encoders. Call exactly once after every snapshot record has been
Expand All @@ -318,6 +405,7 @@ func (r *RedisDB) Finalize() error {
r.flushHashes,
r.flushLists,
r.flushSets,
r.flushZSets,
func() error { return closeJSONL(r.stringsTTL) },
func() error { return closeJSONL(r.hllTTL) },
r.closeKeymap,
Expand All @@ -326,10 +414,19 @@ func (r *RedisDB) Finalize() error {
firstErr = err
}
}
// At this point all type-prefixed records have been processed
// and every wide-column state-init drained its claimPendingTTL.
// Whatever remains in pendingTTL is truly unmatched — the
// user key never appeared as a typed record. Likely causes:
// store corruption, a snapshot mid-write where the typed
// record was dropped, or a `!redis|ttl|` entry written for a
// key whose type prefix we don't recognise (a future Redis
// type added on the live side without a backup-encoder update).
r.orphanTTLCount += len(r.pendingTTL)
if r.warn != nil && r.orphanTTLCount > 0 {
r.warn("redis_orphan_ttl",
"count", r.orphanTTLCount,
"hint", "remaining wide-column encoders (zset/stream) have not landed yet")
"hint", "TTL records whose user key never appeared in a typed record — possible store corruption or an unknown type prefix")
}
return firstErr
}
Expand Down
31 changes: 25 additions & 6 deletions internal/backup/redis_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,21 @@ func TestRedisDB_NoKeymapWhenAllReversible(t *testing.T) {

func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) {
t.Parallel()
// Codex P2 round 6: orphan TTL records (those with no prior
// HandleString/HandleHLL claim) must be counted only — the
// per-key payload would allocate proportional to user-key size
// and is unused before the wide-column encoders land. Drive a
// sample of orphan records and assert the count, not a buffer.
// Codex P1 (PR #790): orphan TTL records are now BUFFERED in
// pendingTTL during intake — the wide-column state-init
// functions need to drain them when a typed record finally
// registers a user key. The buffer holds (string-userKey,
// uint64-expireAt) pairs; the per-record allocation cost is the
// same as kindByKey's, which we already pay for every typed
// record. The original Codex P2 round 6 concern (don't buffer
// arbitrarily-large payload bytes) is preserved — we still
// don't keep the value bytes.
//
// At Finalize, entries still in pendingTTL are counted as truly
// unmatched orphans. This test now asserts:
// - During intake: orphanTTLCount stays at 0, pendingTTL grows.
// - After Finalize: orphanTTLCount == n (no typed record ever
// drained the entries).
db, _ := newRedisDB(t)
const n = 10_000
for i := 0; i < n; i++ {
Expand All @@ -593,8 +603,17 @@ func TestRedisDB_OrphanTTLCountedNotBuffered(t *testing.T) {
t.Fatalf("HandleTTL[%d]: %v", i, err)
}
}
if db.orphanTTLCount != 0 {
t.Fatalf("orphanTTLCount = %d at intake, want 0 (buffered)", db.orphanTTLCount)
}
if len(db.pendingTTL) != n {
t.Fatalf("pendingTTL len = %d, want %d", len(db.pendingTTL), n)
}
if err := db.Finalize(); err != nil {
t.Fatalf("Finalize: %v", err)
}
if db.orphanTTLCount != n {
t.Fatalf("orphanTTLCount = %d, want %d", db.orphanTTLCount, n)
t.Fatalf("orphanTTLCount = %d after Finalize, want %d", db.orphanTTLCount, n)
}
}

Expand Down
Loading
Loading