Skip to content

Commit a4142fa

Browse files
authored
backup: add Redis hash encoder (#725)
## Summary - First of the Phase 0a wide-column encoders. Translates `!hs|meta|` + `!hs|fld|` records on the live store into one `hashes/<encoded>.json` file per Redis hash, per the design at `docs/design/2026_04_29_proposed_snapshot_logical_decoder.md:327`. - Stacked on top of #713 (the strings/HLL/TTL encoder); reuses `RedisDB`, the SHA-fallback KEYMAP path, and the dirsCreated cache. - TTL is folded inline into `expire_at_ms` (not a sidecar) so a restorer can replay each hash in one shot. `HandleTTL` adds a `redisKindHash` route. - Field values reuse the UTF-8-or-base64 envelope from PR #714's SQS body for binary safety. Field names percent-encode (`EncodeSegment`) when not valid UTF-8 to keep JSON object keys as strings. - Empty hashes still emit a file (HLEN==0 is observable). Length mismatches between `meta.Len` and observed `!hs|fld|` keys surface as `redis_hash_length_mismatch` warnings. - Output is sorted by user key + field name for deterministic dumps (matches the round-9 sort pattern from #716). ## Self-review (per CLAUDE.md "five lenses") 1. **Data loss** — empty hashes preserved; declared/observed length mismatch warned; field value bytes preserved verbatim via base64 envelope when not UTF-8; SHA-fallback paths feed KEYMAP. 2. **Concurrency** — `RedisDB` is single-writer per the existing contract; the new `hashes` map follows the same per-key state model used by `kindByKey`/`inlineTTLEmitted`. 3. **Performance** — fields buffered per key (Redis hashes are typically small); `flushHashes` runs once at Finalize. Sort is `O(n log n)` over user keys + per-hash field names — acceptable at dump cadence. 4. **Data consistency** — `kindByKey` registration on both meta and field arrival prevents TTL mis-routing across snapshot record-ordering scenarios. 5. **Test coverage** — 8 sub-tests: round-trip, empty hash, inline TTL routing, length mismatch warning, binary value envelope, malformed meta, truncated field key, SHA-fallback keymap. ## Test plan - [x] `go test -race ./internal/backup/...` — clean. - [x] `GOOS=windows GOARCH=amd64 go build ./internal/backup/...` — clean. - [x] `GOOS=js GOARCH=wasm go build ./internal/backup/...` — clean. - [x] `golangci-lint run` — clean.
2 parents e8d4054 + b757dad commit a4142fa

3 files changed

Lines changed: 787 additions & 9 deletions

File tree

internal/backup/redis_hash.go

Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
package backup
2+
3+
import (
4+
"bytes"
5+
"encoding/base64"
6+
"encoding/binary"
7+
"encoding/json"
8+
"path/filepath"
9+
"sort"
10+
"unicode/utf8"
11+
12+
cockroachdberr "github.com/cockroachdb/errors"
13+
)
14+
15+
// Snapshot key prefixes the hash encoder dispatches on. Mirror the live
16+
// store/hash_helpers.go constants — a renamed prefix on the live side
17+
// surfaces here at compile time via the dispatch tests.
18+
const (
19+
RedisHashMetaPrefix = "!hs|meta|"
20+
RedisHashFieldPrefix = "!hs|fld|"
21+
RedisHashMetaDeltaPrefix = "!hs|meta|d|"
22+
23+
// hashUserKeyLenSize is the fixed BE-uint32 width of the
24+
// per-key length prefix used by every wide-column key shape.
25+
// Mirrors store/wideColKeyLenSize.
26+
hashUserKeyLenSize = 4
27+
)
28+
29+
// ErrRedisInvalidHashMeta is returned when the !hs|meta| value is not
30+
// the expected 8-byte big-endian field count.
31+
var ErrRedisInvalidHashMeta = cockroachdberr.New("backup: invalid !hs|meta| value")
32+
33+
// ErrRedisInvalidHashKey is returned when an !hs| key cannot be parsed
34+
// for its userKeyLen+userKey segment (truncated, malformed, etc).
35+
var ErrRedisInvalidHashKey = cockroachdberr.New("backup: malformed !hs| key")
36+
37+
// redisHashState buffers the per-userKey hash being assembled. The
38+
// encoder accumulates fields as they arrive and flushes a single JSON
39+
// record at Finalize time. We deliberately buffer per key (rather than
40+
// stream) because the design's per-hash JSON shape requires the full
41+
// field map up-front and Redis hashes are typically small.
42+
type redisHashState struct {
43+
declaredLen int64
44+
metaSeen bool
45+
fields map[string][]byte // field-name → field-value bytes
46+
expireAtMs uint64
47+
hasTTL bool
48+
}
49+
50+
// HandleHashMeta processes one !hs|meta|<userKey> record. The value is
51+
// the 8-byte BE field count. We park the state for finalize-time flush
52+
// and register the user key so a later !redis|ttl|<userKey> record
53+
// routes back to this hash state.
54+
//
55+
// Delta keys (!hs|meta|d|...) share the !hs|meta| string prefix, so a
56+
// snapshot dispatcher that routes by "starts with RedisHashMetaPrefix"
57+
// will land delta records here too. Phase 0a's output (an array of
58+
// observed fields) doesn't need to apply the delta arithmetic — the
59+
// !hs|fld|... records are the source of truth — so we silently skip
60+
// delta keys instead of returning ErrRedisInvalidHashKey. Codex P1
61+
// round 14 (PR #725 #13).
62+
func (r *RedisDB) HandleHashMeta(key, value []byte) error {
63+
if bytes.HasPrefix(key, []byte(RedisHashMetaDeltaPrefix)) {
64+
return nil
65+
}
66+
userKey, ok := parseHashMetaKey(key)
67+
if !ok {
68+
return cockroachdberr.Wrapf(ErrRedisInvalidHashKey, "meta key: %q", key)
69+
}
70+
if len(value) != redisUint64Bytes {
71+
return cockroachdberr.Wrapf(ErrRedisInvalidHashMeta,
72+
"length %d != %d", len(value), redisUint64Bytes)
73+
}
74+
st := r.hashState(userKey)
75+
st.declaredLen = int64(binary.BigEndian.Uint64(value)) //nolint:gosec // signed int64 by design
76+
st.metaSeen = true
77+
return nil
78+
}
79+
80+
// HandleHashField processes one !hs|fld|<userKey><fieldName> record.
81+
// The value is the raw field-value bytes (binary-safe).
82+
//
83+
// Note: Redis hash field names are binary-safe and may legitimately
84+
// be empty — `HSET k "" v` is a valid command and the live store
85+
// emits a key shaped exactly `!hs|fld|<len><userKey>` with no
86+
// trailing field bytes. We deliberately do NOT reject zero-length
87+
// field names here so backup decoding succeeds on real data created
88+
// via HSET with empty names. Codex P1 round 13 (PR #725).
89+
func (r *RedisDB) HandleHashField(key, value []byte) error {
90+
userKey, fieldName, ok := parseHashFieldKey(key)
91+
if !ok {
92+
return cockroachdberr.Wrapf(ErrRedisInvalidHashKey, "field key: %q", key)
93+
}
94+
st := r.hashState(userKey)
95+
st.fields[string(fieldName)] = bytes.Clone(value)
96+
return nil
97+
}
98+
99+
// hashState lazily creates per-key state. The `kindByKey` registration
100+
// lives here (Gemini medium PR #725 #1/#3) so every code path that
101+
// touches a hash state — meta, field, and the TTL-routing back-edge
102+
// from HandleTTL — agrees on the kind. Caller audit (per the loop's
103+
// "audit semantics-changing edits" rule):
104+
//
105+
// - HandleHashMeta and HandleHashField both want kindByKey set;
106+
// centralising here means the explicit assignment is no longer
107+
// needed at the call site.
108+
// - HandleTTL only ever calls hashState() inside the
109+
// `case redisKindHash:` branch, where kindByKey == redisKindHash
110+
// already holds; the assignment here is idempotent for that path.
111+
// - No other caller exists; verified via
112+
// `grep -n "r\.hashState(" internal/backup/`.
113+
func (r *RedisDB) hashState(userKey []byte) *redisHashState {
114+
uk := string(userKey)
115+
if st, ok := r.hashes[uk]; ok {
116+
return st
117+
}
118+
st := &redisHashState{fields: make(map[string][]byte)}
119+
r.hashes[uk] = st
120+
r.kindByKey[uk] = redisKindHash
121+
return st
122+
}
123+
124+
// parseHashMetaKey strips !hs|meta| and the 4-byte BE userKeyLen prefix.
125+
// Returns (userKey, true) on success. Delta keys (!hs|meta|d|...)
126+
// share the meta string prefix and would otherwise be parsed as
127+
// base-meta with a garbage userKeyLen — refuse them at the boundary
128+
// so a misrouted delta surfaces a parse error rather than silent
129+
// state corruption. Callers that want delta-tolerant behavior
130+
// (HandleHashMeta) should detect the delta prefix BEFORE calling
131+
// this function. Codex P1 round 14 (PR #725 #13).
132+
func parseHashMetaKey(key []byte) ([]byte, bool) {
133+
if bytes.HasPrefix(key, []byte(RedisHashMetaDeltaPrefix)) {
134+
return nil, false
135+
}
136+
rest := bytes.TrimPrefix(key, []byte(RedisHashMetaPrefix))
137+
if len(rest) == len(key) {
138+
return nil, false
139+
}
140+
return parseUserKeyLenPrefix(rest)
141+
}
142+
143+
// parseHashFieldKey strips !hs|fld|, the 4-byte userKeyLen prefix, and
144+
// returns (userKey, fieldName, true).
145+
func parseHashFieldKey(key []byte) ([]byte, []byte, bool) {
146+
rest := bytes.TrimPrefix(key, []byte(RedisHashFieldPrefix))
147+
if len(rest) == len(key) {
148+
return nil, nil, false
149+
}
150+
userKey, ok := parseUserKeyLenPrefix(rest)
151+
if !ok {
152+
return nil, nil, false
153+
}
154+
fieldName := rest[hashUserKeyLenSize+len(userKey):]
155+
return userKey, fieldName, true
156+
}
157+
158+
// parseUserKeyLenPrefix decodes the shared <len(4)><userKey> shape used
159+
// by every wide-column !hs|/!st|/!zs| key. Returns the userKey slice
160+
// (aliasing the input) plus a presence flag.
161+
//
162+
// The length comparison is done in uint64 space because on 32-bit
163+
// architectures `int(uint32)` can wrap to a negative value when the
164+
// high bit is set, bypassing the bounds check and causing a slice
165+
// panic. Gemini high finding (PR #725 round 1).
166+
func parseUserKeyLenPrefix(b []byte) ([]byte, bool) {
167+
if len(b) < hashUserKeyLenSize {
168+
return nil, false
169+
}
170+
ukLen := binary.BigEndian.Uint32(b[:hashUserKeyLenSize])
171+
if uint64(len(b)) < uint64(hashUserKeyLenSize)+uint64(ukLen) {
172+
return nil, false
173+
}
174+
return b[hashUserKeyLenSize : hashUserKeyLenSize+int(ukLen)], true //nolint:gosec // bounded above
175+
}
176+
177+
// flushHashes writes one JSON file per accumulated hash to
178+
// hashes/<encoded>.json. Empty hashes (Len=0, no fields) still emit a
179+
// file because their existence is observable to clients (HEXISTS,
180+
// HLEN). Mismatched declared-vs-observed length surfaces an
181+
// `redis_hash_length_mismatch` warning.
182+
func (r *RedisDB) flushHashes() error {
183+
if len(r.hashes) == 0 {
184+
return nil
185+
}
186+
dir := filepath.Join(r.dbDir(), "hashes")
187+
if err := r.ensureDir(dir); err != nil {
188+
return err
189+
}
190+
// Stable order across runs (Codex pattern from #716): sort by user
191+
// key before flushing so identical snapshots produce identical
192+
// dump output regardless of Go's randomised map iteration.
193+
userKeys := make([]string, 0, len(r.hashes))
194+
for k := range r.hashes {
195+
userKeys = append(userKeys, k)
196+
}
197+
sort.Strings(userKeys)
198+
for _, uk := range userKeys {
199+
st := r.hashes[uk]
200+
if r.warn != nil && st.metaSeen && int64(len(st.fields)) != st.declaredLen {
201+
r.warn("redis_hash_length_mismatch",
202+
"user_key_len", len(uk),
203+
"declared_len", st.declaredLen,
204+
"observed_fields", len(st.fields),
205+
"hint", "meta record's Len does not match the count of !hs|fld| keys for this user key")
206+
}
207+
if err := r.writeHashJSON(dir, []byte(uk), st); err != nil {
208+
return err
209+
}
210+
}
211+
return nil
212+
}
213+
214+
func (r *RedisDB) writeHashJSON(dir string, userKey []byte, st *redisHashState) error {
215+
encoded := EncodeSegment(userKey)
216+
if err := r.recordIfFallback(encoded, userKey); err != nil {
217+
return err
218+
}
219+
path := filepath.Join(dir, encoded+".json")
220+
body, err := marshalHashJSON(st)
221+
if err != nil {
222+
return err
223+
}
224+
if err := writeFileAtomic(path, body); err != nil {
225+
return cockroachdberr.WithStack(err)
226+
}
227+
return nil
228+
}
229+
230+
// hashFieldRecord is the dump-format projection of one Redis hash
231+
// field. Both name and value go through the same UTF-8-or-base64
232+
// envelope (json.RawMessage produced by marshalRedisBinaryValue) so
233+
// arbitrary binary bytes round-trip without lossy rewrites.
234+
//
235+
// We deliberately emit `fields` as an ARRAY of records rather than a
236+
// JSON object keyed on the field name, because Redis hash field
237+
// names are binary-safe and JSON object keys are not. With a map
238+
// shape, two distinct fields could collapse to the same JSON key
239+
// (Codex P1 round 12 #725: a UTF-8 literal `%FF` and a single byte
240+
// `0xFF` both percent-encoded to `%FF` would overwrite one
241+
// another), and a >240-byte non-UTF-8 field name would route
242+
// through EncodeSegment's SHA fallback which is non-reversible at
243+
// this layer (no per-field KEYMAP). The array form keeps every
244+
// (name, value) pair distinct and binary-safe.
245+
type hashFieldRecord struct {
246+
Name json.RawMessage `json:"name"`
247+
Value json.RawMessage `json:"value"`
248+
}
249+
250+
func marshalHashJSON(st *redisHashState) ([]byte, error) {
251+
// Sort by raw byte order for deterministic output across runs.
252+
names := make([]string, 0, len(st.fields))
253+
for name := range st.fields {
254+
names = append(names, name)
255+
}
256+
sort.Strings(names)
257+
fields := make([]hashFieldRecord, 0, len(names))
258+
for _, name := range names {
259+
nameJSON, err := marshalRedisBinaryValue([]byte(name))
260+
if err != nil {
261+
return nil, err
262+
}
263+
valueJSON, err := marshalRedisBinaryValue(st.fields[name])
264+
if err != nil {
265+
return nil, err
266+
}
267+
fields = append(fields, hashFieldRecord{Name: nameJSON, Value: valueJSON})
268+
}
269+
type out struct {
270+
FormatVersion uint32 `json:"format_version"`
271+
Fields []hashFieldRecord `json:"fields"`
272+
ExpireAtMs *uint64 `json:"expire_at_ms"`
273+
}
274+
rec := out{FormatVersion: 1, Fields: fields}
275+
if st.hasTTL {
276+
ms := st.expireAtMs
277+
rec.ExpireAtMs = &ms
278+
}
279+
body, err := json.MarshalIndent(rec, "", " ")
280+
if err != nil {
281+
return nil, cockroachdberr.WithStack(err)
282+
}
283+
return body, nil
284+
}
285+
286+
// marshalRedisBinaryValue is the shared "binary-safe text or base64
287+
// envelope" projection used by every Redis wide-column type for value
288+
// bytes that may or may not be valid UTF-8. Mirrors the SQS body
289+
// projection for restore-roundtrip determinism: a UTF-8 bytestring
290+
// emits as a plain JSON string; non-UTF-8 emits as
291+
// `{"base64":"<base64url>"}`.
292+
func marshalRedisBinaryValue(b []byte) (json.RawMessage, error) {
293+
if utf8.Valid(b) {
294+
out, err := json.Marshal(string(b))
295+
if err != nil {
296+
return nil, cockroachdberr.WithStack(err)
297+
}
298+
return out, nil
299+
}
300+
envelope := struct {
301+
Base64 string `json:"base64"`
302+
}{Base64: base64.RawURLEncoding.EncodeToString(b)}
303+
out, err := json.Marshal(envelope)
304+
if err != nil {
305+
return nil, cockroachdberr.WithStack(err)
306+
}
307+
return out, nil
308+
}

0 commit comments

Comments
 (0)