@@ -20,12 +20,20 @@ import (
2020)
2121
2222const (
23- timestampSize = 8
24- valueHeaderSize = 9 // 1 byte tombstone + 8 bytes expireAt
25- snapshotBatchSize = 1000
26- dirPerms = 0755
27- metaLastCommitTS = "_meta_last_commit_ts"
28- spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore
23+ timestampSize = 8
24+ valueHeaderSize = 9 // 1 byte tombstone + 8 bytes expireAt
25+ snapshotBatchCountLimit = 1000
26+ snapshotBatchByteLimit = 8 << 20 // 8 MiB; balances restore write amplification vs peak memory usage
27+ dirPerms = 0755
28+ metaLastCommitTS = "_meta_last_commit_ts"
29+ spoolBufSize = 32 * 1024 // buffer size for streaming I/O during restore
30+
31+ // maxPebbleEncodedKeySize is the limit for encoded Pebble on-disk keys,
32+ // which are the user key concatenated with the 8-byte inverted timestamp.
33+ // Using maxSnapshotKeySize+timestampSize (instead of just maxSnapshotKeySize)
34+ // avoids rejecting keys that are valid at the user-key level but slightly
35+ // exceed maxSnapshotKeySize once the timestamp suffix is appended.
36+ maxPebbleEncodedKeySize = maxSnapshotKeySize + timestampSize
2937)
3038
3139var metaLastCommitTSBytes = []byte (metaLastCommitTS )
@@ -106,13 +114,21 @@ func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error) {
106114// Key = [UserKeyBytes] [8-byte Inverted Timestamp]
107115
108116func encodeKey (key []byte , ts uint64 ) []byte {
109- k := make ([]byte , len (key )+ timestampSize )
110- copy (k , key )
111- // Invert TS for descending order (newer first)
112- binary .BigEndian .PutUint64 (k [len (key ):], ^ ts )
117+ k := make ([]byte , encodedKeyLen (key ))
118+ fillEncodedKey (k , key , ts )
113119 return k
114120}
115121
122+ func encodedKeyLen (key []byte ) int {
123+ return len (key ) + timestampSize
124+ }
125+
126+ func fillEncodedKey (dst []byte , key []byte , ts uint64 ) {
127+ copy (dst , key )
128+ // Invert TS for descending order (newer first)
129+ binary .BigEndian .PutUint64 (dst [len (key ):], ^ ts )
130+ }
131+
116132func decodeKey (k []byte ) ([]byte , uint64 ) {
117133 if len (k ) < timestampSize {
118134 return nil , 0
@@ -133,13 +149,23 @@ type storedValue struct {
133149
134150func encodeValue (val []byte , tombstone bool , expireAt uint64 ) []byte {
135151 // Format: [Tombstone(1)] [ExpireAt(8)] [Value(...)]
136- buf := make ([]byte , valueHeaderSize + len (val ))
152+ buf := make ([]byte , encodedValueLen (len (val )))
153+ fillEncodedValue (buf , val , tombstone , expireAt )
154+ return buf
155+ }
156+
157+ func encodedValueLen (valueLen int ) int {
158+ return valueHeaderSize + valueLen
159+ }
160+
161+ func fillEncodedValue (dst []byte , val []byte , tombstone bool , expireAt uint64 ) {
137162 if tombstone {
138- buf [0 ] = 1
163+ dst [0 ] = 1
164+ } else {
165+ dst [0 ] = 0
139166 }
140- binary .LittleEndian .PutUint64 (buf [1 :], expireAt )
141- copy (buf [valueHeaderSize :], val )
142- return buf
167+ binary .LittleEndian .PutUint64 (dst [1 :], expireAt )
168+ copy (dst [valueHeaderSize :], val )
143169}
144170
145171func decodeValue (data []byte ) (storedValue , error ) {
@@ -498,6 +524,9 @@ func (s *pebbleStore) ReverseScanAt(ctx context.Context, start []byte, end []byt
498524}
499525
500526func (s * pebbleStore ) PutAt (ctx context.Context , key []byte , value []byte , commitTS uint64 , expireAt uint64 ) error {
527+ if err := validateValueSize (value ); err != nil {
528+ return err
529+ }
501530 commitTS = s .alignCommitTS (commitTS )
502531
503532 k := encodeKey (key , commitTS )
@@ -582,6 +611,9 @@ func (s *pebbleStore) applyMutationsBatch(b *pebble.Batch, mutations []*KVPairMu
582611
583612 switch mut .Op {
584613 case OpTypePut :
614+ if err := validateValueSize (mut .Value ); err != nil {
615+ return err
616+ }
585617 v = encodeValue (mut .Value , false , mut .ExpireAt )
586618 case OpTypeDelete :
587619 v = encodeValue (nil , true , 0 )
@@ -634,43 +666,105 @@ func (s *pebbleStore) Snapshot() (Snapshot, error) {
634666 return newPebbleSnapshot (snap , lastCommitTS ), nil
635667}
636668
637- func restoreOneEntry (r io.Reader , batch * pebble.Batch ) (bool , error ) {
638- var kLen uint64
639- if err := binary .Read (r , binary .LittleEndian , & kLen ); err != nil {
669+ // readRestoreEntry reads one entry's key-length, key bytes, and value-length
670+ // from r. The key bytes are stored in *keyBuf (grown as needed to avoid per-entry
671+ // allocations). Returns (kLen, vLen, eof=true, nil) on clean EOF at the key-length field.
672+ func readRestoreEntry (r io.Reader , keyBuf * []byte ) (kLen , vLen int , eof bool , err error ) {
673+ kLen , err = readRestoreFieldLen (r , "snapshot key" , maxPebbleEncodedKeySize )
674+ if err != nil {
640675 if errors .Is (err , io .EOF ) {
641- return true , nil
676+ return 0 , 0 , true , nil
642677 }
643- return false , errors .WithStack (err )
678+ return 0 , 0 , false , err
679+ }
680+ if cap (* keyBuf ) < kLen {
681+ * keyBuf = make ([]byte , kLen )
682+ }
683+ if _ , err = io .ReadFull (r , (* keyBuf )[:kLen ]); err != nil {
684+ return 0 , 0 , false , errors .WithStack (err )
685+ }
686+ vLen , err = readRestoreFieldLen (r , "snapshot value" , maxSnapshotValueSize + valueHeaderSize )
687+ if err != nil {
688+ return 0 , 0 , false , err
689+ }
690+ return kLen , vLen , false , nil
691+ }
692+
693+ func readRestoreFieldLen (r io.Reader , field string , maxSize int ) (int , error ) {
694+ var raw uint64
695+ if err := binary .Read (r , binary .LittleEndian , & raw ); err != nil {
696+ return 0 , errors .WithStack (err )
644697 }
645- key := make ([]byte , kLen )
646- if _ , err := io .ReadFull (r , key ); err != nil {
647- return false , errors .WithStack (err )
698+ return restoreFieldLenInt (raw , field , maxSize )
699+ }
700+
701+ func restoreFieldLenInt (raw uint64 , field string , maxSize int ) (int , error ) {
702+ if raw > uint64 (math .MaxInt ) || int (raw ) > maxSize {
703+ switch field {
704+ case "snapshot key" :
705+ return 0 , errors .WithStack (errors .Wrapf (ErrSnapshotKeyTooLarge , "length %d > %d" , raw , maxSize ))
706+ case "snapshot value" :
707+ return 0 , errors .WithStack (errors .Wrapf (ErrValueTooLarge , "length %d > %d" , raw , maxSize ))
708+ default :
709+ return 0 , errors .WithStack (errors .Newf ("%s length %d > %d" , field , raw , maxSize ))
710+ }
648711 }
712+ return int (raw ), nil
713+ }
714+
715+ func snapshotBatchShouldFlush (batch * pebble.Batch ) bool {
716+ return batch .Count () >= snapshotBatchCountLimit || batch .Len () >= snapshotBatchByteLimit
717+ }
649718
650- var vLen uint64
651- if err := binary . Read ( r , binary . LittleEndian , & vLen ); err ! = nil {
652- return false , errors . WithStack ( err )
719+ func commitSnapshotBatch ( batch * pebble. Batch , opts * pebble. WriteOptions ) error {
720+ if batch = = nil {
721+ return nil
653722 }
654- val := make ([]byte , vLen )
655- if _ , err := io .ReadFull (r , val ); err != nil {
656- return false , errors .WithStack (err )
723+ defer func () {
724+ _ = batch .Close ()
725+ }()
726+ if batch .Empty () && (opts == nil || ! opts .Sync ) {
727+ return nil
728+ }
729+ return errors .WithStack (batch .Commit (opts ))
730+ }
731+
732+ func flushSnapshotBatch (db * pebble.DB , batch * * pebble.Batch , opts * pebble.WriteOptions ) error {
733+ if err := commitSnapshotBatch (* batch , opts ); err != nil {
734+ return err
657735 }
736+ * batch = db .NewBatch ()
737+ return nil
738+ }
658739
659- if err := batch .Set (key , val , nil ); err != nil {
660- return false , errors .WithStack (err )
740+ func setEncodedVersionInBatch (batch * pebble.Batch , key []byte , version VersionedValue ) error {
741+ deferred := batch .SetDeferred (encodedKeyLen (key ), encodedValueLen (len (version .Value )))
742+ fillEncodedKey (deferred .Key , key , version .TS )
743+ fillEncodedValue (deferred .Value , version .Value , version .Tombstone , version .ExpireAt )
744+ return errors .WithStack (deferred .Finish ())
745+ }
746+
747+ // writeRestoreEntry writes one key-value entry from the stream into batch,
748+ // reading the value bytes directly into the deferred write buffer. keyBuf must
749+ // already contain kLen key bytes.
750+ func writeRestoreEntry (r io.Reader , batch * pebble.Batch , keyBuf []byte , kLen , vLen int ) error {
751+ deferred := batch .SetDeferred (kLen , vLen )
752+ copy (deferred .Key , keyBuf [:kLen ])
753+ if _ , err := io .ReadFull (r , deferred .Value ); err != nil {
754+ return errors .WithStack (err )
661755 }
662- return false , nil
756+ return errors . WithStack ( deferred . Finish ())
663757}
664758
665759// restoreBatchLoopInto reads raw Pebble key-value entries from r and writes
666760// them into db using batched commits. It is used for both the direct and the
667761// temp-dir atomic native Pebble restore paths.
668762func restoreBatchLoopInto (r io.Reader , db * pebble.DB ) error {
669763 batch := db .NewBatch ()
670- batchCnt := 0
764+ var keyBuf [] byte // reused across entries to reduce per-entry allocations
671765
672766 for {
673- eof , err := restoreOneEntry (r , batch )
767+ kLen , vLen , eof , err := readRestoreEntry (r , & keyBuf )
674768 if err != nil {
675769 _ = batch .Close ()
676770 return err
@@ -679,20 +773,26 @@ func restoreBatchLoopInto(r io.Reader, db *pebble.DB) error {
679773 break
680774 }
681775
682- batchCnt ++
683- if batchCnt >= snapshotBatchSize {
684- if err := batch .Commit ( pebble . NoSync ); err != nil {
685- _ = batch . Close ()
686- return errors . WithStack ( err )
776+ // Flush before adding when the batch is non-empty and the anticipated
777+ // entry size would push the batch over the byte limit.
778+ if ! batch . Empty () && batch .Len () + kLen + vLen >= snapshotBatchByteLimit {
779+ if err := flushSnapshotBatch ( db , & batch , pebble . NoSync ); err != nil {
780+ return err
687781 }
782+ }
783+
784+ if err := writeRestoreEntry (r , batch , keyBuf , kLen , vLen ); err != nil {
688785 _ = batch .Close ()
689- batch = db .NewBatch ()
690- batchCnt = 0
786+ return err
787+ }
788+
789+ if snapshotBatchShouldFlush (batch ) {
790+ if err := flushSnapshotBatch (db , & batch , pebble .NoSync ); err != nil {
791+ return err
792+ }
691793 }
692794 }
693- err := batch .Commit (pebble .Sync )
694- _ = batch .Close ()
695- return errors .WithStack (err )
795+ return commitSnapshotBatch (batch , pebble .Sync )
696796}
697797
698798func (s * pebbleStore ) Restore (r io.Reader ) error {
@@ -951,7 +1051,6 @@ func (s *pebbleStore) restoreFromStreamingMVCC(r io.Reader) error {
9511051// into the given Pebble database using batched commits.
9521052func writeMVCCEntriesToDB (body io.Reader , db * pebble.DB ) error {
9531053 batch := db .NewBatch ()
954- batchCnt := 0
9551054 for {
9561055 key , versions , eof , err := readMVCCSnapshotEntry (body )
9571056 if err != nil {
@@ -962,30 +1061,18 @@ func writeMVCCEntriesToDB(body io.Reader, db *pebble.DB) error {
9621061 break
9631062 }
9641063 for _ , v := range versions {
965- pKey := encodeKey (key , v .TS )
966- pVal := encodeValue (v .Value , v .Tombstone , v .ExpireAt )
967- if err := batch .Set (pKey , pVal , nil ); err != nil {
1064+ if err := setEncodedVersionInBatch (batch , key , v ); err != nil {
9681065 _ = batch .Close ()
969- return errors . WithStack ( err )
1066+ return err
9701067 }
971- batchCnt ++
972- if batchCnt >= snapshotBatchSize {
973- if err := batch .Commit (pebble .NoSync ); err != nil {
974- _ = batch .Close ()
975- return errors .WithStack (err )
1068+ if snapshotBatchShouldFlush (batch ) {
1069+ if err := flushSnapshotBatch (db , & batch , pebble .NoSync ); err != nil {
1070+ return err
9761071 }
977- _ = batch .Close ()
978- batch = db .NewBatch ()
979- batchCnt = 0
9801072 }
9811073 }
9821074 }
983- if err := batch .Commit (pebble .Sync ); err != nil {
984- _ = batch .Close ()
985- return errors .WithStack (err )
986- }
987- _ = batch .Close ()
988- return nil
1075+ return commitSnapshotBatch (batch , pebble .Sync )
9891076}
9901077
9911078// swapInTempDB closes the current DB, removes its directory, and renames tmpDir
@@ -1137,33 +1224,31 @@ func spoolGobPayload(r io.Reader, dst io.Writer) error {
11371224}
11381225
11391226// writeGobEntriesToDB writes the decoded gob snapshot entries into db using
1140- // batched commits.
1227+ // batched commits. NOTE: this function mutates entries in-place by clearing
1228+ // each version's Value and then nilling the entry's Key and Versions slice
1229+ // after encoding to reduce peak memory usage. Callers must not reuse entries
1230+ // after this call.
11411231func writeGobEntriesToDB (entries []mvccSnapshotEntry , db * pebble.DB ) error {
11421232 batch := db .NewBatch ()
1143- batchCnt := 0
1144- for _ , entry := range entries {
1145- for _ , v := range entry .Versions {
1146- pKey := encodeKey (entry .Key , v .TS )
1147- pVal := encodeValue (v .Value , v .Tombstone , v .ExpireAt )
1148- if err := batch .Set (pKey , pVal , nil ); err != nil {
1233+ for i := range entries {
1234+ entry := & entries [i ]
1235+ for j := range entry .Versions {
1236+ version := entry .Versions [j ]
1237+ if err := setEncodedVersionInBatch (batch , entry .Key , version ); err != nil {
11491238 _ = batch .Close ()
1150- return errors . WithStack ( err )
1239+ return err
11511240 }
1152- batchCnt ++
1153- if batchCnt >= snapshotBatchSize {
1154- if err := batch .Commit (pebble .NoSync ); err != nil {
1155- _ = batch .Close ()
1156- return errors .WithStack (err )
1241+ entry .Versions [j ].Value = nil
1242+ if snapshotBatchShouldFlush (batch ) {
1243+ if err := flushSnapshotBatch (db , & batch , pebble .NoSync ); err != nil {
1244+ return err
11571245 }
1158- _ = batch .Close ()
1159- batch = db .NewBatch ()
1160- batchCnt = 0
11611246 }
11621247 }
1248+ entry .Key = nil
1249+ entry .Versions = nil
11631250 }
1164- err := batch .Commit (pebble .Sync )
1165- _ = batch .Close ()
1166- return errors .WithStack (err )
1251+ return commitSnapshotBatch (batch , pebble .Sync )
11671252}
11681253
11691254func (s * pebbleStore ) Close () error {
0 commit comments