Skip to content

Commit 1eab525

Browse files
authored
Merge pull request #423 from bootjp/feature/pebble-compaction
store: harden Pebble MVCC compaction
2 parents 8607456 + fbb069f commit 1eab525

11 files changed

Lines changed: 1193 additions & 123 deletions

adapter/dynamodb.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type DynamoDBServer struct {
114114
coordinator kv.Coordinator
115115
httpServer *http.Server
116116
targetHandlers map[string]func(http.ResponseWriter, *http.Request)
117+
readTracker *kv.ActiveTimestampTracker
117118
requestObserver monitoring.DynamoDBRequestObserver
118119
itemUpdateLocks [itemUpdateLockStripeCount]sync.Mutex
119120
tableLocks [tableLockStripeCount]sync.Mutex
@@ -126,6 +127,12 @@ func WithDynamoDBRequestObserver(observer monitoring.DynamoDBRequestObserver) Dy
126127
}
127128
}
128129

130+
func WithDynamoDBActiveTimestampTracker(tracker *kv.ActiveTimestampTracker) DynamoDBServerOption {
131+
return func(server *DynamoDBServer) {
132+
server.readTracker = tracker
133+
}
134+
}
135+
129136
type dynamoRequestMetricsContextKey struct{}
130137

131138
type dynamoRequestMetricsState struct {
@@ -1888,6 +1895,8 @@ func (d *DynamoDBServer) queryItems(ctx context.Context, in queryInput) (*queryO
18881895
if err != nil {
18891896
return nil, err
18901897
}
1898+
readPin := d.pinReadTS(readTS)
1899+
defer readPin.Release()
18911900
keySchema, cond, err := resolveQueryCondition(in, schema)
18921901
if err != nil {
18931902
return nil, err
@@ -1926,6 +1935,8 @@ func (d *DynamoDBServer) scanItems(ctx context.Context, in scanInput) (*queryOut
19261935
if err != nil {
19271936
return nil, err
19281937
}
1938+
readPin := d.pinReadTS(readTS)
1939+
defer readPin.Release()
19291940
indexKeySchema := schema.PrimaryKey
19301941
if strings.TrimSpace(in.IndexName) != "" {
19311942
indexKeySchema, err = schema.keySchemaForQuery(in.IndexName)
@@ -7367,6 +7378,13 @@ func (d *DynamoDBServer) nextTxnReadTS() uint64 {
73677378
return clock.Next()
73687379
}
73697380

7381+
func (d *DynamoDBServer) pinReadTS(ts uint64) *kv.ActiveTimestampToken {
7382+
if d == nil || d.readTracker == nil {
7383+
return &kv.ActiveTimestampToken{}
7384+
}
7385+
return d.readTracker.Pin(ts)
7386+
}
7387+
73707388
func (d *DynamoDBServer) loadTableSchema(ctx context.Context, tableName string) (*dynamoTableSchema, bool, error) {
73717389
return d.loadTableSchemaAt(ctx, tableName, snapshotTS(d.coordinator.Clock(), d.store))
73727390
}
@@ -7544,6 +7562,9 @@ func (d *DynamoDBServer) migrateLegacySourceItems(
75447562
sourceSchema *dynamoTableSchema,
75457563
readTS uint64,
75467564
) error {
7565+
readPin := d.pinReadTS(readTS)
7566+
defer readPin.Release()
7567+
75477568
prefix := dynamoItemPrefixForTable(targetSchema.TableName, sourceSchema.Generation)
75487569
upper := prefixScanEnd(prefix)
75497570
cursor := bytes.Clone(prefix)
@@ -7741,6 +7762,9 @@ func (d *DynamoDBServer) scanAllByPrefix(ctx context.Context, prefix []byte) ([]
77417762
}
77427763

77437764
func (d *DynamoDBServer) scanAllByPrefixAt(ctx context.Context, prefix []byte, readTS uint64) ([]*store.KVPair, error) {
7765+
readPin := d.pinReadTS(readTS)
7766+
defer readPin.Release()
7767+
77447768
end := prefixScanEnd(prefix)
77457769
start := bytes.Clone(prefix)
77467770

adapter/dynamodb_retention_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package adapter
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/bootjp/elastickv/kv"
10+
"github.com/bootjp/elastickv/store"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
type blockingDynamoScanStore struct {
15+
store.MVCCStore
16+
started chan uint64
17+
release chan struct{}
18+
once sync.Once
19+
}
20+
21+
func (s *blockingDynamoScanStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error) {
22+
s.once.Do(func() {
23+
s.started <- ts
24+
<-s.release
25+
})
26+
return s.MVCCStore.ScanAt(ctx, start, end, limit, ts)
27+
}
28+
29+
func requireReadTSPinned(t *testing.T, tracker *kv.ActiveTimestampTracker, want uint64) {
30+
t.Helper()
31+
require.Eventually(t, func() bool {
32+
return tracker.Oldest() == want
33+
}, time.Second, 10*time.Millisecond)
34+
}
35+
36+
func waitForScanStart(t *testing.T, started <-chan uint64) uint64 {
37+
t.Helper()
38+
select {
39+
case ts := <-started:
40+
return ts
41+
case <-time.After(time.Second):
42+
t.Fatal("timed out waiting for scan")
43+
return 0
44+
}
45+
}
46+
47+
func TestDynamoDBScanAllByPrefixAtPinsReadTS(t *testing.T) {
48+
t.Parallel()
49+
50+
tracker := kv.NewActiveTimestampTracker()
51+
backing := store.NewMVCCStore()
52+
require.NoError(t, backing.PutAt(context.Background(), []byte("prefix/item"), []byte("value"), 10, 0))
53+
54+
scanStore := &blockingDynamoScanStore{
55+
MVCCStore: backing,
56+
started: make(chan uint64, 1),
57+
release: make(chan struct{}),
58+
}
59+
server := NewDynamoDBServer(nil, scanStore, &stubAdapterCoordinator{}, WithDynamoDBActiveTimestampTracker(tracker))
60+
61+
const readTS uint64 = 42
62+
done := make(chan error, 1)
63+
go func() {
64+
_, err := server.scanAllByPrefixAt(context.Background(), []byte("prefix/"), readTS)
65+
done <- err
66+
}()
67+
68+
require.Equal(t, readTS, waitForScanStart(t, scanStore.started))
69+
requireReadTSPinned(t, tracker, readTS)
70+
71+
close(scanStore.release)
72+
73+
select {
74+
case err := <-done:
75+
require.NoError(t, err)
76+
case <-time.After(time.Second):
77+
t.Fatal("timed out waiting for scanAllByPrefixAt")
78+
}
79+
require.Zero(t, tracker.Oldest())
80+
}
81+
82+
func TestDynamoDBMigrateLegacySourceItemsPinsReadTS(t *testing.T) {
83+
t.Parallel()
84+
85+
tracker := kv.NewActiveTimestampTracker()
86+
scanStore := &blockingDynamoScanStore{
87+
MVCCStore: store.NewMVCCStore(),
88+
started: make(chan uint64, 1),
89+
release: make(chan struct{}),
90+
}
91+
server := NewDynamoDBServer(nil, scanStore, &stubAdapterCoordinator{}, WithDynamoDBActiveTimestampTracker(tracker))
92+
93+
targetSchema := &dynamoTableSchema{TableName: "tbl", Generation: 2}
94+
sourceSchema := &dynamoTableSchema{TableName: "tbl", Generation: 1}
95+
96+
const readTS uint64 = 55
97+
done := make(chan error, 1)
98+
go func() {
99+
done <- server.migrateLegacySourceItems(context.Background(), targetSchema, sourceSchema, readTS)
100+
}()
101+
102+
require.Equal(t, readTS, waitForScanStart(t, scanStore.started))
103+
requireReadTSPinned(t, tracker, readTS)
104+
105+
close(scanStore.release)
106+
107+
select {
108+
case err := <-done:
109+
require.NoError(t, err)
110+
case <-time.After(time.Second):
111+
t.Fatal("timed out waiting for migrateLegacySourceItems")
112+
}
113+
require.Zero(t, tracker.Oldest())
114+
}

kv/compactor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
const (
14-
defaultFSMCompactorInterval = 30 * time.Second
14+
defaultFSMCompactorInterval = 5 * time.Minute
1515
defaultFSMCompactorRetentionWindow = 30 * time.Minute
1616
defaultFSMCompactorTimeout = 5 * time.Second
1717
)
@@ -172,7 +172,6 @@ func (c *FSMCompactor) compactRuntime(ctx context.Context, runtime FSMCompactRun
172172
if err := runtime.Store.Compact(compactCtx, safeMinTS); err != nil {
173173
return errors.Wrapf(err, "compact group %d", runtime.GroupID)
174174
}
175-
retention.SetMinRetainedTS(safeMinTS)
176175
c.logger.InfoContext(compactCtx, "fsm compacted",
177176
"group_id", runtime.GroupID,
178177
"min_retained_ts", safeMinTS,

kv/compactor_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kv
22

33
import (
44
"context"
5+
"os"
56
"testing"
67
"time"
78

@@ -110,3 +111,42 @@ func TestFSMCompactorSkipsLaggingRuntime(t *testing.T) {
110111
require.NoError(t, err)
111112
require.Equal(t, []byte("v10"), val)
112113
}
114+
115+
func TestFSMCompactorCompactsEligiblePebbleRuntime(t *testing.T) {
116+
dir, err := os.MkdirTemp("", "fsm-compactor-pebble-*")
117+
require.NoError(t, err)
118+
defer os.RemoveAll(dir)
119+
120+
st, err := store.NewPebbleStore(dir)
121+
require.NoError(t, err)
122+
defer st.Close()
123+
124+
ctx := context.Background()
125+
require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v10"), 10, 0))
126+
require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v20"), 20, 0))
127+
require.NoError(t, st.PutAt(ctx, []byte("k"), []byte("v30"), 30, 0))
128+
129+
compactor := NewFSMCompactor(
130+
[]FSMCompactRuntime{{
131+
GroupID: 1,
132+
Raft: fakeRaftStats{stats: map[string]string{
133+
"state": "Follower",
134+
"fsm_pending": "0",
135+
"applied_index": "10",
136+
"commit_index": "10",
137+
}},
138+
Store: st,
139+
}},
140+
WithFSMCompactorInterval(time.Hour),
141+
WithFSMCompactorRetentionWindow(time.Millisecond),
142+
)
143+
144+
require.NoError(t, compactor.SyncOnce(ctx))
145+
146+
_, err = st.GetAt(ctx, []byte("k"), 20)
147+
require.ErrorIs(t, err, store.ErrReadTSCompacted)
148+
149+
val, err := st.GetAt(ctx, []byte("k"), 30)
150+
require.NoError(t, err)
151+
require.Equal(t, []byte("v30"), val)
152+
}

kv/shard_store.go

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -187,40 +187,59 @@ func (s *ShardStore) reverseScanRoutesAt(
187187
seenGroups := make(map[uint64]struct{})
188188
for i := len(routes) - 1; i >= 0; i-- {
189189
route := routes[i]
190-
scanStart := start
191-
scanEnd := end
192190
if clampToRoutes {
193-
if len(out) >= limit {
194-
break
195-
}
196-
scanStart = clampScanStart(start, route.Start)
197-
scanEnd = clampScanEnd(end, route.End)
198-
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit-len(out), ts, true)
191+
kvs, done, err := s.clampedReverseScanRouteAt(ctx, route, start, end, limit, len(out), ts)
199192
if err != nil {
200193
return nil, err
201194
}
202-
out = append(out, kvs...)
203-
} else {
204-
// When clampToRoutes is false (e.g. S3 manifest scans spanning multiple
205-
// shards), keys from different routes may interleave in descending order.
206-
// Fetch up to limit from every route and merge+sort descending so the
207-
// result honours the ReverseScanAt contract.
208-
// De-duplicate by GroupID: after a range split both halves share the same
209-
// GroupID (same backing shard store), so only scan each group once.
210-
if _, seen := seenGroups[route.GroupID]; seen {
211-
continue
212-
}
213-
seenGroups[route.GroupID] = struct{}{}
214-
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit, ts, true)
215-
if err != nil {
216-
return nil, err
195+
if done {
196+
break
217197
}
218-
out = mergeAndTrimReverseScanResults(out, kvs, limit)
198+
out = append(out, kvs...)
199+
continue
219200
}
201+
202+
// When clampToRoutes is false (e.g. S3 manifest scans spanning multiple
203+
// shards), keys from different routes may interleave in descending order.
204+
// Fetch up to limit from every route and merge+sort descending so the
205+
// result honours the ReverseScanAt contract.
206+
// De-duplicate by GroupID: after a range split both halves share the same
207+
// GroupID (same backing shard store), so only scan each group once.
208+
if _, seen := seenGroups[route.GroupID]; seen {
209+
continue
210+
}
211+
seenGroups[route.GroupID] = struct{}{}
212+
kvs, err := s.scanRouteAtDirection(ctx, route, start, end, limit, ts, true)
213+
if err != nil {
214+
return nil, err
215+
}
216+
out = mergeAndTrimReverseScanResults(out, kvs, limit)
220217
}
221218
return out, nil
222219
}
223220

221+
func (s *ShardStore) clampedReverseScanRouteAt(
222+
ctx context.Context,
223+
route distribution.Route,
224+
start []byte,
225+
end []byte,
226+
limit int,
227+
currentLen int,
228+
ts uint64,
229+
) ([]*store.KVPair, bool, error) {
230+
if currentLen >= limit {
231+
return nil, true, nil
232+
}
233+
234+
scanStart := clampScanStart(start, route.Start)
235+
scanEnd := clampScanEnd(end, route.End)
236+
kvs, err := s.scanRouteAtDirection(ctx, route, scanStart, scanEnd, limit-currentLen, ts, true)
237+
if err != nil {
238+
return nil, false, err
239+
}
240+
return kvs, false, nil
241+
}
242+
224243
func (s *ShardStore) scanRouteAtDirection(
225244
ctx context.Context,
226245
route distribution.Route,

main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr
398398
return nil
399399
}
400400

401-
func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, metricsRegistry *monitoring.Registry) error {
401+
func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, metricsRegistry *monitoring.Registry, readTracker *kv.ActiveTimestampTracker) error {
402402
dynamoL, err := lc.Listen(ctx, "tcp", dynamoAddr)
403403
if err != nil {
404404
return errors.Wrapf(err, "failed to listen on %s", dynamoAddr)
@@ -407,6 +407,7 @@ func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup
407407
dynamoL,
408408
shardStore,
409409
coordinate,
410+
adapter.WithDynamoDBActiveTimestampTracker(readTracker),
410411
adapter.WithDynamoDBRequestObserver(metricsRegistry.DynamoDBObserver()),
411412
)
412413
eg.Go(func() error {
@@ -563,7 +564,7 @@ func (r runtimeServerRunner) start() error {
563564
if err := startRaftServers(r.ctx, r.lc, r.eg, r.runtimes, r.shardStore, r.coordinate, r.distServer, r.pubsubRelay); err != nil {
564565
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
565566
}
566-
if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.metricsRegistry); err != nil {
567+
if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.metricsRegistry, r.readTracker); err != nil {
567568
return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err)
568569
}
569570
if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil {

0 commit comments

Comments
 (0)