Skip to content

Commit a8d99c1

Browse files
Copilotbootjp
andcommitted
Address review feedback: snapshot safety and correctness fixes
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
1 parent 9e5db56 commit a8d99c1

4 files changed

Lines changed: 69 additions & 5 deletions

File tree

kv/snapshot_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ func TestSnapshot(t *testing.T) {
5858
var buf bytes.Buffer
5959
_, err = kvFSMSnap.snapshot.WriteTo(&buf)
6060
assert.NoError(t, err)
61+
kvFSMSnap.Release()
6162
err = fsm2.Restore(io.NopCloser(bytes.NewReader(buf.Bytes())))
6263
assert.NoError(t, err)
6364

store/mvcc_store.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ type VersionedValue struct {
2626
}
2727

2828
const (
29-
checksumSize = 4
30-
mvccSnapshotVersion = uint32(1)
29+
checksumSize = 4
30+
mvccSnapshotVersion = uint32(1)
31+
maxSnapshotKeySize = 1 << 20 // 1 MiB per key
32+
maxSnapshotVersionCount = 1 << 20 // 1M versions per key
33+
maxSnapshotValueSize = 64 << 20 // 64 MiB per value
3134
)
3235

3336
var mvccSnapshotMagic = [8]byte{'E', 'K', 'V', 'M', 'V', 'C', 'C', '2'}
@@ -606,7 +609,8 @@ func writeMVCCSnapshotHeader(f *os.File) (int64, error) {
606609

607610
func (s *mvccStore) writeSnapshotBody(f *os.File) (uint32, error) {
608611
hash := crc32.NewIEEE()
609-
w := io.MultiWriter(f, hash)
612+
bw := bufio.NewWriter(f)
613+
w := io.MultiWriter(bw, hash)
610614

611615
s.mtx.RLock()
612616
defer s.mtx.RUnlock()
@@ -631,6 +635,9 @@ func (s *mvccStore) writeSnapshotBody(f *os.File) (uint32, error) {
631635
return 0, err
632636
}
633637
}
638+
if err := bw.Flush(); err != nil {
639+
return 0, errors.WithStack(err)
640+
}
634641
return hash.Sum32(), nil
635642
}
636643

@@ -788,6 +795,9 @@ func readMVCCSnapshotEntry(r io.Reader) ([]byte, []VersionedValue, bool, error)
788795
}
789796
return nil, nil, false, errors.WithStack(err)
790797
}
798+
if keyLen > maxSnapshotKeySize {
799+
return nil, nil, false, errors.Newf("mvcc snapshot key too large: %d > %d", keyLen, maxSnapshotKeySize)
800+
}
791801

792802
key := make([]byte, keyLen)
793803
if _, err := io.ReadFull(r, key); err != nil {
@@ -798,6 +808,9 @@ func readMVCCSnapshotEntry(r io.Reader) ([]byte, []VersionedValue, bool, error)
798808
if err := binary.Read(r, binary.LittleEndian, &versionCount); err != nil {
799809
return nil, nil, false, errors.WithStack(err)
800810
}
811+
if versionCount > maxSnapshotVersionCount {
812+
return nil, nil, false, errors.Newf("mvcc snapshot version count too large: %d > %d", versionCount, maxSnapshotVersionCount)
813+
}
801814
versions := make([]VersionedValue, 0, versionCount)
802815
for i := uint64(0); i < versionCount; i++ {
803816
version, err := readMVCCSnapshotVersion(r)
@@ -829,6 +842,9 @@ func readMVCCSnapshotVersion(r io.Reader) (VersionedValue, error) {
829842
if err := binary.Read(r, binary.LittleEndian, &valueLen); err != nil {
830843
return VersionedValue{}, errors.WithStack(err)
831844
}
845+
if valueLen > maxSnapshotValueSize {
846+
return VersionedValue{}, errors.Newf("mvcc snapshot value too large: %d > %d", valueLen, maxSnapshotValueSize)
847+
}
832848
value := make([]byte, valueLen)
833849
if _, err := io.ReadFull(r, value); err != nil {
834850
return VersionedValue{}, errors.WithStack(err)

store/mvcc_store_snapshot_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package store
33
import (
44
"bytes"
55
"context"
6+
"encoding/binary"
7+
"encoding/gob"
8+
"hash/crc32"
69
"testing"
710

811
"github.com/stretchr/testify/require"
@@ -81,6 +84,43 @@ func TestMVCCStore_ApplyMutations_UnknownOp(t *testing.T) {
8184
require.ErrorIs(t, err, ErrUnknownOp)
8285
}
8386

87+
func TestMVCCStore_RestoreLegacySnapshot(t *testing.T) {
88+
t.Parallel()
89+
90+
ctx := context.Background()
91+
92+
// Build a legacy gob+crc32 snapshot payload.
93+
legacy := mvccSnapshot{
94+
LastCommitTS: 42,
95+
MinRetainedTS: 10,
96+
Entries: []mvccSnapshotEntry{
97+
{
98+
Key: []byte("legacy-key"),
99+
Versions: []VersionedValue{
100+
{TS: 42, Value: []byte("legacy-value"), Tombstone: false, ExpireAt: 0},
101+
},
102+
},
103+
},
104+
}
105+
106+
var payload bytes.Buffer
107+
require.NoError(t, gob.NewEncoder(&payload).Encode(legacy))
108+
109+
checksum := crc32.ChecksumIEEE(payload.Bytes())
110+
var cs [4]byte
111+
binary.LittleEndian.PutUint32(cs[:], checksum)
112+
full := append(payload.Bytes(), cs[:]...)
113+
114+
dst := newTestMVCCStore(t)
115+
require.NoError(t, dst.Restore(bytes.NewReader(full)))
116+
117+
require.Equal(t, uint64(42), dst.LastCommitTS())
118+
119+
v, err := dst.GetAt(ctx, []byte("legacy-key"), 100)
120+
require.NoError(t, err)
121+
require.Equal(t, []byte("legacy-value"), v)
122+
}
123+
84124
func snapshotBytes(t *testing.T, snap Snapshot) []byte {
85125
t.Helper()
86126

store/snapshot_pebble.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,26 +70,33 @@ func writePebbleSnapshotEntries(snap *pebble.Snapshot, w io.Writer) error {
7070
if err != nil {
7171
return errors.WithStack(err)
7272
}
73-
defer iter.Close()
7473

7574
for iter.First(); iter.Valid(); iter.Next() {
7675
k := iter.Key()
7776
v := iter.Value()
7877

7978
if err := binary.Write(w, binary.LittleEndian, uint64(len(k))); err != nil {
79+
_ = iter.Close()
8080
return errors.WithStack(err)
8181
}
8282
if _, err := w.Write(k); err != nil {
83+
_ = iter.Close()
8384
return errors.WithStack(err)
8485
}
8586
if err := binary.Write(w, binary.LittleEndian, uint64(len(v))); err != nil {
87+
_ = iter.Close()
8688
return errors.WithStack(err)
8789
}
8890
if _, err := w.Write(v); err != nil {
91+
_ = iter.Close()
8992
return errors.WithStack(err)
9093
}
9194
}
92-
return nil
95+
if err := iter.Error(); err != nil {
96+
_ = iter.Close()
97+
return errors.WithStack(err)
98+
}
99+
return errors.WithStack(iter.Close())
93100
}
94101

95102
type countingWriter struct {

0 commit comments

Comments
 (0)