@@ -10,6 +10,7 @@ import (
1010 "math"
1111 "os"
1212 "path/filepath"
13+ "syscall"
1314
1415 cockroachdberr "github.com/cockroachdb/errors"
1516)
@@ -75,18 +76,20 @@ const (
7576// RedisDB encodes one logical Redis database (`redis/db_<n>/`). All
7677// operations are scoped to its outRoot; the caller wires per-database
7778// instances when the producer supports multiple databases (today only
78- // db_0 is meaningful).
79+ // db_0 is meaningful, but the encoder is wired to take any non-negative
80+ // index so a future multi-db dump does not silently collide on db_0).
7981//
8082// Lifecycle:
8183//
82- // r := NewRedisDB(outRoot)
84+ // r := NewRedisDB(outRoot, dbIndex )
8385// for each snapshot record matching a redis prefix: r.Handle*(...)
8486// r.Finalize()
8587//
8688// Handle* methods are NOT goroutine-safe; the decoder pipeline is
8789// inherently sequential per scope, so a mutex would only add cost.
8890type RedisDB struct {
8991 outRoot string
92+ dbIndex int
9093
9194 // kindByKey records the Redis type each user key was first seen as.
9295 // Populated by HandleString and HandleHLL; consulted by HandleTTL.
@@ -105,9 +108,19 @@ type RedisDB struct {
105108 // has not been claimed by HandleString / HandleHLL. These are
106109 // candidates for hashes/lists/sets/zsets/streams (handled in a
107110 // follow-up PR) — for now Finalize logs them via the warning hook
108- // rather than dropping silently.
111+ // rather than dropping silently. Bounded by maxPendingWideColumnTTL
112+ // so a malformed snapshot with millions of orphan TTL records cannot
113+ // drive the encoder OOM; the bound is high enough that real
114+ // production state (where wide-column type encoders eventually
115+ // claim every TTL) is never affected.
109116 pendingWideColumnTTL []redisTTLPending
110117
118+ // dirsCreated caches the per-encoder directories writeBlob and
119+ // appendTTL have already MkdirAll'd. Avoids the per-record syscalls
120+ // flagged by Gemini #218; for a 10M-key dump this saves ~10M
121+ // stat+mkdir(EEXIST) round-trips.
122+ dirsCreated map [string ]struct {}
123+
111124 // warn is the structured-warning sink. Non-nil in production
112125 // (fed by the decoder driver); nil in tests if the test does not
113126 // care about warnings.
@@ -119,12 +132,27 @@ type redisTTLPending struct {
119132 ExpireAtMs uint64
120133}
121134
122- // NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/. The
123- // caller is responsible for choosing <n>; today only 0 is meaningful.
124- func NewRedisDB (outRoot string ) * RedisDB {
135+ // maxPendingWideColumnTTL caps the orphan-TTL buffer. 1M entries is well
136+ // past anything a real Redis instance produces under normal operation
137+ // (each entry is ~50 bytes, so the cap is ~50 MiB) but small enough that
138+ // a snapshot loaded with a billion synthetic !redis|ttl| records cannot
139+ // drive the encoder OOM. When the cap is hit, HandleTTL drops further
140+ // orphans and surfaces a structured warning at Finalize.
141+ const maxPendingWideColumnTTL = 1_000_000
142+
143+ // NewRedisDB constructs a RedisDB rooted at <outRoot>/redis/db_<n>/.
144+ // dbIndex selects <n>; today the producer always passes 0, but accepting
145+ // the index as a parameter prevents a future multi-db dump from silently
146+ // colliding on db_0.
147+ func NewRedisDB (outRoot string , dbIndex int ) * RedisDB {
148+ if dbIndex < 0 {
149+ dbIndex = 0
150+ }
125151 return & RedisDB {
126- outRoot : outRoot ,
127- kindByKey : make (map [string ]redisKeyKind ),
152+ outRoot : outRoot ,
153+ dbIndex : dbIndex ,
154+ kindByKey : make (map [string ]redisKeyKind ),
155+ dirsCreated : make (map [string ]struct {}),
128156 }
129157}
130158
@@ -183,10 +211,17 @@ func (r *RedisDB) HandleTTL(userKey, value []byte) error {
183211 case redisKindString :
184212 return r .appendTTL (& r .stringsTTL , redisStringsTTLFile , userKey , expireAtMs )
185213 case redisKindUnknown :
186- r .pendingWideColumnTTL = append (r .pendingWideColumnTTL , redisTTLPending {
187- UserKey : bytes .Clone (userKey ),
188- ExpireAtMs : expireAtMs ,
189- })
214+ // Bounded to prevent OOM on a snapshot that contains a
215+ // runaway number of orphan TTL records (e.g., many wide-
216+ // column types whose meta records were dropped). After the
217+ // cap, additional records are tracked only as a counter via
218+ // the warning sink at Finalize.
219+ if len (r .pendingWideColumnTTL ) < maxPendingWideColumnTTL {
220+ r .pendingWideColumnTTL = append (r .pendingWideColumnTTL , redisTTLPending {
221+ UserKey : bytes .Clone (userKey ),
222+ ExpireAtMs : expireAtMs ,
223+ })
224+ }
190225 return nil
191226 }
192227 return nil
@@ -212,12 +247,57 @@ func (r *RedisDB) Finalize() error {
212247 return firstErr
213248}
214249
215- func (r * RedisDB ) writeBlob (subdir string , userKey , value []byte ) error {
216- encoded := EncodeSegment (userKey )
217- dir := filepath .Join (r .outRoot , "redis" , "db_0" , subdir )
250+ // dbDir returns the per-encoder root, e.g. "<outRoot>/redis/db_0/".
251+ // Computed once per call rather than at construction so the encoder's
252+ // outRoot remains a plain field — easier to reason about in tests.
253+ func (r * RedisDB ) dbDir () string {
254+ return filepath .Join (r .outRoot , "redis" , redisDBSegment (r .dbIndex ))
255+ }
256+
257+ func redisDBSegment (idx int ) string {
258+ if idx < 0 {
259+ idx = 0
260+ }
261+ return "db_" + intToDecimal (idx )
262+ }
263+
264+ // intToDecimal is a tiny zero-allocation helper for non-negative ints.
265+ // Avoids the strconv import here just to format dbIndex.
266+ func intToDecimal (v int ) string {
267+ if v == 0 {
268+ return "0"
269+ }
270+ const maxIntDecimalDigits = 20 // covers MaxInt64
271+ var buf [maxIntDecimalDigits ]byte
272+ pos := len (buf )
273+ for v > 0 {
274+ pos --
275+ buf [pos ] = '0' + byte (v % 10 ) //nolint:mnd // 10 == decimal radix
276+ v /= 10 //nolint:mnd // 10 == decimal radix
277+ }
278+ return string (buf [pos :])
279+ }
280+
281+ // ensureDir runs MkdirAll once per directory and remembers the result
282+ // in r.dirsCreated, so repeated calls on the hot path (one per blob
283+ // record) collapse to a map lookup.
284+ func (r * RedisDB ) ensureDir (dir string ) error {
285+ if _ , ok := r .dirsCreated [dir ]; ok {
286+ return nil
287+ }
218288 if err := os .MkdirAll (dir , 0o755 ); err != nil { //nolint:mnd // 0755 == standard dir mode
219289 return cockroachdberr .WithStack (err )
220290 }
291+ r .dirsCreated [dir ] = struct {}{}
292+ return nil
293+ }
294+
295+ func (r * RedisDB ) writeBlob (subdir string , userKey , value []byte ) error {
296+ encoded := EncodeSegment (userKey )
297+ dir := filepath .Join (r .dbDir (), subdir )
298+ if err := r .ensureDir (dir ); err != nil {
299+ return err
300+ }
221301 path := filepath .Join (dir , encoded + ".bin" )
222302 if err := writeFileAtomic (path , value ); err != nil {
223303 return cockroachdberr .WithStack (err )
@@ -227,7 +307,7 @@ func (r *RedisDB) writeBlob(subdir string, userKey, value []byte) error {
227307
228308func (r * RedisDB ) appendTTL (slot * * jsonlFile , baseName string , userKey []byte , expireAtMs uint64 ) error {
229309 if * slot == nil {
230- f , err := openJSONL (filepath .Join (r .outRoot , "redis" , "db_0" , baseName ))
310+ f , err := openJSONL (filepath .Join (r .dbDir () , baseName ))
231311 if err != nil {
232312 return err
233313 }
@@ -369,15 +449,27 @@ func HasInlineTTL(value []byte) bool {
369449 return value [2 ]& redisStrHasTTL != 0
370450}
371451
372- // IsBlobAtomicWriteRetriable reports whether err from writeFileAtomic is
373- // a retriable I/O failure (no-space, transient FS error). Today this is a
374- // stub that returns false for any error; exposed so the master decoder
375- // loop can decide whether to abort the whole dump on encountering one.
452+ // IsBlobAtomicWriteRetriable reports whether err from writeFileAtomic
453+ // is a retriable I/O failure. Today the only retriable signal is
454+ // io.ErrShortWrite. ENOSPC (disk full) is intentionally NOT retriable
455+ // here — the master pipeline must surface it to the operator rather
456+ // than spin: a backup against a full disk has no business retrying.
457+ // IsBlobAtomicWriteOutOfSpace is the explicit out-of-space probe so
458+ // the pipeline can choose the right alarm wording.
376459func IsBlobAtomicWriteRetriable (err error ) bool {
377460 if err == nil {
378461 return false
379462 }
380- // errors.Is handles wrapped paths; both sentinel checks are stable
381- // for now because we never wrap them ourselves.
382463 return errors .Is (err , io .ErrShortWrite )
383464}
465+
466+ // IsBlobAtomicWriteOutOfSpace reports whether err from writeFileAtomic
467+ // (or any os.File write the master pipeline issues) was driven by a
468+ // full disk. Tested via syscall.ENOSPC + os.PathError unwrap, which
469+ // matches what os.File.Write returns on POSIX and Windows.
470+ func IsBlobAtomicWriteOutOfSpace (err error ) bool {
471+ if err == nil {
472+ return false
473+ }
474+ return errors .Is (err , syscall .ENOSPC )
475+ }
0 commit comments