Skip to content

Commit 5446190

Browse files
committed
backup(zset): PR790 r2 codex P1 — handle legacy !redis|zset| single-blob layout
Codex flagged that the wide-column zset encoder skips the legacy consolidated single-key blob layout the live store still writes. A zset stored only as `!redis|zset|<userKey>` (with the magic- prefixed pb.RedisZSetValue body) is silently dropped from backup output and any inline TTL becomes an orphan — user-visible sorted-set data loss. Live-side references (adapter, not changed by this commit): - adapter/redis_compat_types.go:82 — redisZSetPrefix - adapter/redis_compat_commands.go:3495-3508 — writes the blob for non-empty persisted zset updates - adapter/redis_compat_helpers.go:610-631 — reads it as the fallback when no wide-column members exist Fix: new public RedisDB.HandleZSetLegacyBlob method that decodes the magic-prefixed pb.RedisZSetValue and registers the same per- member state HandleZSetMember would. The wide-column merge case (mid-migration snapshot containing BOTH layouts for the same user key) works because `!redis|zset|` sorts BEFORE `!zs|...` so the blob arrives first and wide-column rows then update / add members via the latest-wins map. Inline TTL: `!redis|zset|<k>` sorts BEFORE `!redis|ttl|<k>`, so HandleTTL after this handler sees redisKindZSet already and folds via the case-redisKindZSet branch. No pendingTTL detour needed for this ordering. Fail-closed contract (matches existing wide-column path): - Missing magic prefix → ErrRedisInvalidZSetLegacyBlob - Unmarshal error → ErrRedisInvalidZSetLegacyBlob - NaN score → ErrRedisInvalidZSetLegacyBlob (Redis ZADD rejects NaN at wire level) Caller audit (per /loop standing instruction): new public method HandleZSetLegacyBlob has no external callers. Verified via 'grep -rn HandleZSetLegacyBlob --include=*.go' — all matches inside the test file in this PR. The cmd/elastickv-snapshot-decode dispatcher (Phase 0a follow-up, not yet built) will route the `!redis|zset|` prefix to this handler. Parallel bug class: the SAME issue exists for `!redis|hash|`, `!redis|set|`, and `!redis|stream|` legacy blob prefixes. Those encoders shipped in earlier PRs (#725, #758, #791). Each needs its own legacy-blob handler in a follow-up PR; this commit fixes only the zset case codex flagged on PR #790. New tests: - TestRedisDB_ZSetLegacyBlobRoundTrip — basic round-trip - TestRedisDB_ZSetLegacyBlobThenWideColumnMerges — mid-migration - TestRedisDB_ZSetLegacyBlobWithInlineTTL — TTL ordering - TestRedisDB_ZSetLegacyBlobRejectsMissingMagic — fail-closed - TestRedisDB_ZSetLegacyBlobRejectsNaNScore — fail-closed - TestRedisDB_ZSetLegacyBlobRejectsMalformedKey — fail-closed Self-review: 1. Data loss — exact opposite: this commit RECOVERS zsets that were silently dropped. New fail-closed guards prevent silently importing a corrupt blob. 2. Concurrency — no new shared state; per-DB sequential as before. 3. Performance — one protobuf Unmarshal per legacy zset key (same as the live read path). Member map shares the same latest-wins behavior as wide-column intake. 4. Consistency — merge order (blob first, wide-column second) is determined by snapshot lex order; tested explicitly. 5. Coverage — 6 new tests. All 84 redis tests pass.
1 parent 63e54d9 commit 5446190

2 files changed

Lines changed: 285 additions & 0 deletions

File tree

internal/backup/redis_zset.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"path/filepath"
99
"sort"
1010

11+
pb "github.com/bootjp/elastickv/proto"
1112
cockroachdberr "github.com/cockroachdb/errors"
13+
gproto "google.golang.org/protobuf/proto"
1214
)
1315

1416
// Redis zset encoder. Translates raw !zs|... snapshot records into the
@@ -36,17 +38,48 @@ const (
3638
RedisZSetScorePrefix = "!zs|scr|"
3739
RedisZSetMetaDeltaPrefix = "!zs|meta|d|"
3840

41+
// RedisZSetLegacyBlobPrefix is the consolidated single-key
42+
// layout the live store still writes for non-empty persisted
43+
// zsets (`adapter/redis_compat_types.go:82` redisZSetPrefix,
44+
// produced by `adapter/redis_compat_commands.go:3495-3508` and
45+
// read by `adapter/redis_compat_helpers.go:610-631` as the
46+
// fallback when no wide-column members exist). A backup that
47+
// skipped this prefix would silently drop legacy-only zsets;
48+
// HandleZSetLegacyBlob decodes the blob and registers the same
49+
// per-member state HandleZSetMeta + HandleZSetMember would.
50+
// Codex P1 finding on PR #790 (round 2).
51+
RedisZSetLegacyBlobPrefix = "!redis|zset|"
52+
3953
// redisZSetScoreSize is the size of the IEEE 754 big-endian score
4054
// stored in !zs|mem| values. Same constant as zsetMetaSizeBytes in
4155
// store/zset_helpers.go; duplicated here to keep the backup
4256
// package free of internal/storage imports.
4357
redisZSetScoreSize = 8
58+
59+
// redisZSetLegacyProtoPrefixLen is the on-disk magic prefix size
60+
// for `!redis|zset|` values
61+
// (`adapter/redis_storage_codec.go:15` storedRedisZSetProtoPrefix).
62+
redisZSetLegacyProtoPrefixLen = 4
4463
)
4564

65+
// redisZSetLegacyProtoPrefix mirrors
66+
// adapter/redis_storage_codec.go:15 storedRedisZSetProtoPrefix. A
67+
// rename on the live side without an accompanying backup update
68+
// surfaces as ErrRedisInvalidZSetLegacyBlob on decode of any real
69+
// cluster dump.
70+
var redisZSetLegacyProtoPrefix = []byte{0x00, 'R', 'Z', 0x01}
71+
4672
// ErrRedisInvalidZSetMeta is returned when an !zs|meta| value is not
4773
// the expected 8-byte big-endian member count.
4874
var ErrRedisInvalidZSetMeta = cockroachdberr.New("backup: invalid !zs|meta| value")
4975

76+
// ErrRedisInvalidZSetLegacyBlob is returned when a `!redis|zset|`
77+
// value's magic prefix is missing, its protobuf body fails to
78+
// unmarshal, or its decoded scores include NaN. Fail-closed for
79+
// the same reason as ErrRedisInvalidZSetMember: silently accepting
80+
// a corrupt blob would lose the entire zset's contents at restore.
81+
var ErrRedisInvalidZSetLegacyBlob = cockroachdberr.New("backup: invalid !redis|zset| value")
82+
5083
// ErrRedisInvalidZSetMember is returned when an !zs|mem| value is not
5184
// the expected 8-byte IEEE 754 score, or contains a NaN score (Redis's
5285
// ZADD command rejects NaN, so a NaN at backup time indicates store
@@ -153,6 +186,91 @@ func (r *RedisDB) HandleZSetScore(_, _ []byte) error { return nil }
153186
// source of truth at backup time.
154187
func (r *RedisDB) HandleZSetMetaDelta(_, _ []byte) error { return nil }
155188

189+
// HandleZSetLegacyBlob processes one `!redis|zset|<userKey>` record.
190+
// This is the consolidated single-key layout the live store still
191+
// writes for non-empty persisted zsets (see RedisZSetLegacyBlobPrefix
192+
// docstring). The encoded value is a magic-prefixed
193+
// `pb.RedisZSetValue` carrying every (member, score) pair.
194+
//
195+
// Decoded entries land in the same per-key state HandleZSetMember
196+
// would have produced, so the per-zset JSON output is identical
197+
// regardless of which layout the live store used. NaN scores fail
198+
// closed at intake, matching HandleZSetMember's contract.
199+
//
200+
// The legacy prefix `!redis|zset|` lex-sorts BEFORE `!zs|...` and
201+
// BEFORE `!redis|ttl|`, so when a zset is stored in both formats
202+
// (mid-migration), this handler creates the state first and the
203+
// later wide-column records merge into it — duplicate
204+
// HandleZSetMember calls follow the same latest-wins policy.
205+
//
206+
// `!redis|zset|` ALSO sorts BEFORE `!redis|ttl|`, so an inline TTL
207+
// on the same user key will reach HandleTTL after this handler has
208+
// already registered redisKindZSet. The HandleTTL redisKindZSet
209+
// branch then folds the expiry into st.expireAtMs via zsetState
210+
// (which itself drains pendingTTL — a no-op here since the typed
211+
// record came first).
212+
func (r *RedisDB) HandleZSetLegacyBlob(key, value []byte) error {
213+
userKey, ok := parseZSetLegacyBlobKey(key)
214+
if !ok {
215+
return cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob, "key: %q", key)
216+
}
217+
entries, err := decodeZSetLegacyBlobValue(value)
218+
if err != nil {
219+
return err
220+
}
221+
st := r.zsetState(userKey)
222+
for _, e := range entries {
223+
st.members[e.member] = e.score
224+
}
225+
return nil
226+
}
227+
228+
// zsetLegacyEntry is the per-(member, score) projection extracted
229+
// from a `!redis|zset|` blob's protobuf body.
230+
type zsetLegacyEntry struct {
231+
member string
232+
score float64
233+
}
234+
235+
// parseZSetLegacyBlobKey strips `!redis|zset|` and returns the
236+
// user-key bytes. Unlike the wide-column meta key there is no
237+
// userKeyLen prefix — the live store appends the user key directly
238+
// (`adapter/redis_compat_types.go:177` ZSetKey).
239+
func parseZSetLegacyBlobKey(key []byte) ([]byte, bool) {
240+
rest := bytes.TrimPrefix(key, []byte(RedisZSetLegacyBlobPrefix))
241+
if len(rest) == len(key) {
242+
return nil, false
243+
}
244+
return rest, true
245+
}
246+
247+
// decodeZSetLegacyBlobValue strips the magic prefix and unmarshals
248+
// the protobuf body into a slice of (member, score) entries.
249+
// Rejects NaN scores (same fail-closed contract as
250+
// HandleZSetMember).
251+
func decodeZSetLegacyBlobValue(value []byte) ([]zsetLegacyEntry, error) {
252+
if len(value) < redisZSetLegacyProtoPrefixLen ||
253+
!bytes.Equal(value[:redisZSetLegacyProtoPrefixLen], redisZSetLegacyProtoPrefix) {
254+
return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob,
255+
"missing or corrupt magic prefix (len=%d)", len(value))
256+
}
257+
msg := &pb.RedisZSetValue{}
258+
if err := gproto.Unmarshal(value[redisZSetLegacyProtoPrefixLen:], msg); err != nil {
259+
return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob,
260+
"unmarshal: %v", err)
261+
}
262+
out := make([]zsetLegacyEntry, 0, len(msg.GetEntries()))
263+
for _, e := range msg.GetEntries() {
264+
score := e.GetScore()
265+
if math.IsNaN(score) {
266+
return nil, cockroachdberr.Wrapf(ErrRedisInvalidZSetLegacyBlob,
267+
"NaN score for member %q", e.GetMember())
268+
}
269+
out = append(out, zsetLegacyEntry{member: e.GetMember(), score: score})
270+
}
271+
return out, nil
272+
}
273+
156274
// zsetState lazily creates per-key state. Mirrors the hash/list/set
157275
// kindByKey-registration pattern so HandleZSetMeta, HandleZSetMember,
158276
// and the HandleTTL back-edge all agree on the kind.

internal/backup/redis_zset_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,41 @@ import (
88
"path/filepath"
99
"testing"
1010

11+
pb "github.com/bootjp/elastickv/proto"
1112
"github.com/cockroachdb/errors"
13+
gproto "google.golang.org/protobuf/proto"
1214
)
1315

16+
// encodeZSetLegacyBlobValue produces the magic-prefixed protobuf
17+
// wire format the live store writes for `!redis|zset|<userKey>`
18+
// values (mirror of adapter/redis_storage_codec.go::marshalZSetValue).
19+
func encodeZSetLegacyBlobValue(t *testing.T, entries []zsetLegacyEntry) []byte {
20+
t.Helper()
21+
msg := &pb.RedisZSetValue{}
22+
for _, e := range entries {
23+
msg.Entries = append(msg.Entries, &pb.RedisZSetEntry{
24+
Member: e.member,
25+
Score: e.score,
26+
})
27+
}
28+
body, err := gproto.Marshal(msg)
29+
if err != nil {
30+
t.Fatalf("marshal pb.RedisZSetValue: %v", err)
31+
}
32+
out := make([]byte, 0, redisZSetLegacyProtoPrefixLen+len(body))
33+
out = append(out, redisZSetLegacyProtoPrefix...)
34+
out = append(out, body...)
35+
return out
36+
}
37+
38+
// zsetLegacyBlobKey is the test-side mirror of
39+
// adapter/redis_compat_types.go:177 ZSetKey:
40+
// `!redis|zset|<userKey>` (no userKeyLen prefix).
41+
func zsetLegacyBlobKey(userKey string) []byte {
42+
out := []byte(RedisZSetLegacyBlobPrefix)
43+
return append(out, userKey...)
44+
}
45+
1446
// encodeZSetMetaValue builds the 8-byte BE member-count value used by
1547
// the live store/zset_helpers.go (mirror of store.MarshalZSetMeta).
1648
func encodeZSetMetaValue(memberCount int64) []byte {
@@ -608,3 +640,138 @@ func TestRedisDB_ZSetMaxInt64DeclaredLen(t *testing.T) {
608640
t.Fatalf("math.MaxInt64 boundary must be accepted, got %v", err)
609641
}
610642
}
643+
644+
// TestRedisDB_ZSetLegacyBlobRoundTrip pins the codex P1 fix: a
645+
// zset stored only via the legacy `!redis|zset|<userKey>` blob
646+
// must surface in the dump with all its members. Without
647+
// HandleZSetLegacyBlob, the encoder would skip the record and
648+
// produce an empty zsets/ output for that key — silent backup
649+
// data loss.
650+
func TestRedisDB_ZSetLegacyBlobRoundTrip(t *testing.T) {
651+
t.Parallel()
652+
db, root := newRedisDB(t)
653+
value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{
654+
{member: "alice", score: 100},
655+
{member: "bob", score: 50},
656+
{member: "charlie", score: 30},
657+
})
658+
if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("leaderboard"), value); err != nil {
659+
t.Fatalf("HandleZSetLegacyBlob: %v", err)
660+
}
661+
if err := db.Finalize(); err != nil {
662+
t.Fatal(err)
663+
}
664+
got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "leaderboard.json"))
665+
// Members sorted by member-name bytes (matches the wide-column
666+
// encoder's output policy).
667+
assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{
668+
{member: "alice", scoreNum: 100, scoreKind: "number"},
669+
{member: "bob", scoreNum: 50, scoreKind: "number"},
670+
{member: "charlie", scoreNum: 30, scoreKind: "number"},
671+
})
672+
}
673+
674+
// TestRedisDB_ZSetLegacyBlobThenWideColumnMerges pins the mid-
675+
// migration shape: when a snapshot carries BOTH the legacy
676+
// `!redis|zset|<k>` blob AND wide-column `!zs|mem|<k>...` rows for
677+
// the same user key (which the live store does during the
678+
// migration window), the encoder must produce a single merged
679+
// zset. `!redis|zset|` lex-sorts before `!zs|...` so the blob
680+
// arrives first; the wide-column rows then update / add members.
681+
func TestRedisDB_ZSetLegacyBlobThenWideColumnMerges(t *testing.T) {
682+
t.Parallel()
683+
db, root := newRedisDB(t)
684+
legacy := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{
685+
{member: "alice", score: 1},
686+
{member: "bob", score: 2},
687+
})
688+
if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), legacy); err != nil {
689+
t.Fatal(err)
690+
}
691+
// Wide-column rows: update alice's score, add charlie.
692+
if err := db.HandleZSetMember(zsetMemberKey("k", []byte("alice")), encodeZSetScoreValue(99)); err != nil {
693+
t.Fatal(err)
694+
}
695+
if err := db.HandleZSetMember(zsetMemberKey("k", []byte("charlie")), encodeZSetScoreValue(3)); err != nil {
696+
t.Fatal(err)
697+
}
698+
if err := db.Finalize(); err != nil {
699+
t.Fatal(err)
700+
}
701+
got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json"))
702+
assertZSetMembersEqual(t, zsetMembersArray(t, got), []zsetMemberEntry{
703+
{member: "alice", scoreNum: 99, scoreKind: "number"}, // wide-column won
704+
{member: "bob", scoreNum: 2, scoreKind: "number"}, // legacy survived
705+
{member: "charlie", scoreNum: 3, scoreKind: "number"},
706+
})
707+
}
708+
709+
// TestRedisDB_ZSetLegacyBlobWithInlineTTL pins that a TTL'd zset
710+
// stored only via the legacy blob round-trips its expiry. The
711+
// snapshot order is `!redis|zset|<k>` (sorts before `!redis|ttl|`),
712+
// so HandleZSetLegacyBlob runs first and registers redisKindZSet,
713+
// then HandleTTL routes via the redisKindZSet branch (no
714+
// pendingTTL detour needed for this ordering).
715+
func TestRedisDB_ZSetLegacyBlobWithInlineTTL(t *testing.T) {
716+
t.Parallel()
717+
db, root := newRedisDB(t)
718+
value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}})
719+
if err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value); err != nil {
720+
t.Fatal(err)
721+
}
722+
if err := db.HandleTTL([]byte("k"), encodeTTLValue(fixedExpireMs)); err != nil {
723+
t.Fatal(err)
724+
}
725+
if err := db.Finalize(); err != nil {
726+
t.Fatal(err)
727+
}
728+
got := readZSetJSON(t, filepath.Join(root, "redis", "db_0", "zsets", "k.json"))
729+
if zsetFloat(t, got, "expire_at_ms") != float64(fixedExpireMs) {
730+
t.Fatalf("expire_at_ms = %v want %d", got["expire_at_ms"], fixedExpireMs)
731+
}
732+
}
733+
734+
// TestRedisDB_ZSetLegacyBlobRejectsMissingMagic pins fail-closed
735+
// behaviour: a `!redis|zset|<k>` value without the magic prefix
736+
// fails at intake rather than silently decoding garbage protobuf.
737+
func TestRedisDB_ZSetLegacyBlobRejectsMissingMagic(t *testing.T) {
738+
t.Parallel()
739+
db, _ := newRedisDB(t)
740+
body, err := gproto.Marshal(&pb.RedisZSetValue{})
741+
if err != nil {
742+
t.Fatalf("marshal: %v", err)
743+
}
744+
err = db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), body) // no magic prefix
745+
if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) {
746+
t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err)
747+
}
748+
}
749+
750+
// TestRedisDB_ZSetLegacyBlobRejectsNaNScore pins NaN-fail-closed
751+
// parallel to HandleZSetMember's contract. Redis ZADD rejects NaN
752+
// at the wire level, so a NaN in storage indicates corruption.
753+
func TestRedisDB_ZSetLegacyBlobRejectsNaNScore(t *testing.T) {
754+
t.Parallel()
755+
db, _ := newRedisDB(t)
756+
value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: math.NaN()}})
757+
err := db.HandleZSetLegacyBlob(zsetLegacyBlobKey("k"), value)
758+
if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) {
759+
t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err)
760+
}
761+
}
762+
763+
// TestRedisDB_ZSetLegacyBlobRejectsMalformedKey pins that a
764+
// `!redis|zset|` key with no trailing user-key bytes fails parse.
765+
func TestRedisDB_ZSetLegacyBlobRejectsMalformedKey(t *testing.T) {
766+
t.Parallel()
767+
db, _ := newRedisDB(t)
768+
value := encodeZSetLegacyBlobValue(t, []zsetLegacyEntry{{member: "m", score: 1}})
769+
// Key has the prefix but no trailing user-key bytes — parser must
770+
// still accept it (empty user key is technically valid Redis).
771+
// Use a key that doesn't have the prefix to trigger the parse
772+
// failure.
773+
err := db.HandleZSetLegacyBlob([]byte("not-the-right-prefix|k"), value)
774+
if !errors.Is(err, ErrRedisInvalidZSetLegacyBlob) {
775+
t.Fatalf("err=%v want ErrRedisInvalidZSetLegacyBlob", err)
776+
}
777+
}

0 commit comments

Comments
 (0)