Skip to content

Commit a779712

Browse files
authored
Merge pull request #396 from bootjp/feature/s3-impl
add s3 adapter
2 parents cbf0c99 + 9a09659 commit a779712

18 files changed

Lines changed: 3531 additions & 137 deletions

adapter/dynamodb_migration_test.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,57 @@ func (c *localAdapterCoordinator) Dispatch(ctx context.Context, req *kv.Operatio
2828
if req == nil {
2929
return &kv.CoordinateResponse{}, nil
3030
}
31-
commitTS := c.Clock().Next()
32-
if req.IsTxn && commitTS <= req.StartTS {
33-
c.Clock().Observe(req.StartTS)
31+
commitTS, err := c.commitTSForRequest(req)
32+
if err != nil {
33+
return nil, err
34+
}
35+
if err := c.applyElems(ctx, req.Elems, commitTS); err != nil {
36+
return nil, err
37+
}
38+
return &kv.CoordinateResponse{}, nil
39+
}
40+
41+
func (c *localAdapterCoordinator) commitTSForRequest(req *kv.OperationGroup[kv.OP]) (uint64, error) {
42+
if req == nil {
43+
return 0, nil
44+
}
45+
commitTS := req.CommitTS
46+
if commitTS == 0 {
3447
commitTS = c.Clock().Next()
48+
if req.IsTxn && commitTS <= req.StartTS {
49+
c.Clock().Observe(req.StartTS)
50+
commitTS = c.Clock().Next()
51+
}
52+
} else {
53+
c.Clock().Observe(commitTS)
3554
}
36-
for _, elem := range req.Elems {
37-
switch elem.Op {
38-
case kv.Put:
39-
if err := c.store.PutAt(ctx, elem.Key, elem.Value, commitTS, 0); err != nil {
40-
return nil, err
41-
}
42-
case kv.Del:
43-
if err := c.store.DeleteAt(ctx, elem.Key, commitTS); err != nil {
44-
return nil, err
45-
}
55+
if req.IsTxn && commitTS <= req.StartTS {
56+
return 0, kv.ErrTxnCommitTSRequired
57+
}
58+
return commitTS, nil
59+
}
60+
61+
func (c *localAdapterCoordinator) applyElems(ctx context.Context, elems []*kv.Elem[kv.OP], commitTS uint64) error {
62+
for _, elem := range elems {
63+
if err := c.applyElem(ctx, elem, commitTS); err != nil {
64+
return err
4665
}
4766
}
48-
return &kv.CoordinateResponse{}, nil
67+
return nil
68+
}
69+
70+
func (c *localAdapterCoordinator) applyElem(ctx context.Context, elem *kv.Elem[kv.OP], commitTS uint64) error {
71+
if elem == nil {
72+
return nil
73+
}
74+
switch elem.Op {
75+
case kv.Put:
76+
return c.store.PutAt(ctx, elem.Key, elem.Value, commitTS, 0)
77+
case kv.Del:
78+
return c.store.DeleteAt(ctx, elem.Key, commitTS)
79+
default:
80+
return nil
81+
}
4982
}
5083

5184
func newLegacyMigrationTestServer(

0 commit comments

Comments
 (0)