Skip to content

Commit 024d4e6

Browse files
authored
Merge branch 'main' into feature/s3-doc
2 parents d60d4be + b96599f commit 024d4e6

7 files changed

Lines changed: 377 additions & 33 deletions

File tree

adapter/dynamodb.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ const (
6262
batchWriteItemMaxItems = 25
6363
dynamoMaxRequestBodyBytes = 1 << 20
6464

65-
dynamoTableMetaPrefix = "!ddb|meta|table|"
66-
dynamoTableGenerationPrefix = "!ddb|meta|gen|"
67-
dynamoItemPrefix = "!ddb|item|"
68-
dynamoGSIPrefix = "!ddb|gsi|"
65+
dynamoTableMetaPrefix = kv.DynamoTableMetaPrefix
66+
dynamoTableGenerationPrefix = kv.DynamoTableGenerationPrefix
67+
dynamoItemPrefix = kv.DynamoItemPrefix
68+
dynamoGSIPrefix = kv.DynamoGSIPrefix
6969
dynamoScanPageLimit = 1024
7070
dynamoKeyEscapeByte = byte(0x00)
7171
dynamoKeyTerminatorByte = byte(0x01)

kv/shard_key.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,45 @@ const redisInternalRoutePrefix = "!redis|"
1111

1212
var redisInternalRoutePrefixBytes = []byte(redisInternalRoutePrefix)
1313

14+
const (
15+
dynamoRoutePrefix = "!ddb|route|table|"
16+
17+
// DynamoTableMetaPrefix prefixes DynamoDB table metadata keys.
18+
DynamoTableMetaPrefix = "!ddb|meta|table|"
19+
// DynamoTableGenerationPrefix prefixes DynamoDB table generation keys.
20+
DynamoTableGenerationPrefix = "!ddb|meta|gen|"
21+
// DynamoItemPrefix prefixes DynamoDB item storage keys.
22+
DynamoItemPrefix = "!ddb|item|"
23+
// DynamoGSIPrefix prefixes DynamoDB GSI storage keys.
24+
DynamoGSIPrefix = "!ddb|gsi|"
25+
)
26+
27+
var (
28+
dynamoRoutePrefixBytes = []byte(dynamoRoutePrefix)
29+
dynamoTableMetaPrefixBytes = []byte(DynamoTableMetaPrefix)
30+
dynamoTableGenerationPrefixBytes = []byte(DynamoTableGenerationPrefix)
31+
dynamoItemPrefixBytes = []byte(DynamoItemPrefix)
32+
dynamoGSIPrefixBytes = []byte(DynamoGSIPrefix)
33+
)
34+
1435
// routeKey normalizes internal keys (e.g., list metadata/items) to the logical
1536
// user key used for shard routing.
1637
func routeKey(key []byte) []byte {
1738
if key == nil {
1839
return nil
1940
}
41+
if embedded, ok := txnRouteKey(key); ok {
42+
return normalizeRouteKey(embedded)
43+
}
44+
return normalizeRouteKey(key)
45+
}
46+
47+
func normalizeRouteKey(key []byte) []byte {
2048
if user := redisRouteKey(key); user != nil {
2149
return user
2250
}
23-
if embedded, ok := txnRouteKey(key); ok {
24-
if user := s3keys.ExtractRouteKey(embedded); user != nil {
25-
return user
26-
}
27-
// Transaction internal keys embed the logical key after the prefix.
28-
if user := store.ExtractListUserKey(embedded); user != nil {
29-
return user
30-
}
31-
return embedded
51+
if table := dynamoRouteKey(key); table != nil {
52+
return table
3253
}
3354
if user := s3keys.ExtractRouteKey(key); user != nil {
3455
return user
@@ -50,3 +71,36 @@ func redisRouteKey(key []byte) []byte {
5071
}
5172
return rest[sep+1:]
5273
}
74+
75+
func dynamoRouteKey(key []byte) []byte {
76+
switch {
77+
case bytes.HasPrefix(key, dynamoTableMetaPrefixBytes):
78+
return dynamoRouteTableKey(key[len(dynamoTableMetaPrefixBytes):])
79+
case bytes.HasPrefix(key, dynamoTableGenerationPrefixBytes):
80+
return dynamoRouteTableKey(key[len(dynamoTableGenerationPrefixBytes):])
81+
case bytes.HasPrefix(key, dynamoItemPrefixBytes):
82+
return dynamoRouteFromTablePrefixedKey(key[len(dynamoItemPrefixBytes):])
83+
case bytes.HasPrefix(key, dynamoGSIPrefixBytes):
84+
return dynamoRouteFromTablePrefixedKey(key[len(dynamoGSIPrefixBytes):])
85+
default:
86+
return nil
87+
}
88+
}
89+
90+
func dynamoRouteFromTablePrefixedKey(rest []byte) []byte {
91+
sep := bytes.IndexByte(rest, '|')
92+
if sep <= 0 {
93+
return nil
94+
}
95+
return dynamoRouteTableKey(rest[:sep])
96+
}
97+
98+
func dynamoRouteTableKey(tableSegment []byte) []byte {
99+
if len(tableSegment) == 0 {
100+
return nil
101+
}
102+
out := make([]byte, 0, len(dynamoRoutePrefixBytes)+len(tableSegment))
103+
out = append(out, dynamoRoutePrefixBytes...)
104+
out = append(out, tableSegment...)
105+
return out
106+
}

kv/shard_key_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kv
22

33
import (
4+
"encoding/base64"
45
"testing"
56

67
"github.com/bootjp/elastickv/internal/s3keys"
@@ -27,3 +28,22 @@ func TestRouteKey_NormalizesTxnWrappedS3Key(t *testing.T) {
2728
embedded := s3keys.UploadPartKey("bucket-a", 7, "path/to/object", "upload-1", 3)
2829
require.Equal(t, s3keys.RouteKey("bucket-a", 7, "path/to/object"), routeKey(txnLockKey(embedded)))
2930
}
31+
32+
func TestRouteKey_NormalizesDynamoKeysToTable(t *testing.T) {
33+
t.Parallel()
34+
35+
tableSegment := []byte(base64.RawURLEncoding.EncodeToString([]byte("users")))
36+
indexSegment := base64.RawURLEncoding.EncodeToString([]byte("status-index"))
37+
want := dynamoRouteTableKey(tableSegment)
38+
39+
metaKey := append([]byte(DynamoTableMetaPrefix), tableSegment...)
40+
generationKey := append([]byte(DynamoTableGenerationPrefix), tableSegment...)
41+
itemKey := append([]byte(DynamoItemPrefix+string(tableSegment)+"|7|"), []byte("pk\x00\x01")...)
42+
gsiKey := append([]byte(DynamoGSIPrefix+string(tableSegment)+"|7|"+indexSegment+"|"), []byte("idx\x00\x01pk\x00\x01")...)
43+
44+
require.Equal(t, want, routeKey(metaKey))
45+
require.Equal(t, want, routeKey(generationKey))
46+
require.Equal(t, want, routeKey(itemKey))
47+
require.Equal(t, want, routeKey(gsiKey))
48+
require.Equal(t, want, routeKey(txnLockKey(itemKey)))
49+
}

kv/sharded_coordinator.go

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,13 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems
105105
return nil, errors.WithStack(ErrTxnPrimaryKeyRequired)
106106
}
107107

108-
if commitTS == 0 {
109-
commitTS = c.nextTxnTSAfter(startTS)
110-
} else {
111-
// Observe caller-provided commitTS to keep the HLC monotonic; without
112-
// this the clock could later issue timestamps smaller than commitTS.
113-
c.clock.Observe(commitTS)
108+
commitTS, err = c.resolveTxnCommitTS(startTS, commitTS)
109+
if err != nil {
110+
return nil, err
114111
}
115-
if commitTS == 0 || commitTS <= startTS {
116-
return nil, errors.WithStack(ErrTxnCommitTSRequired)
112+
113+
if len(gids) == 1 {
114+
return c.dispatchSingleShardTxn(startTS, commitTS, primaryKey, gids[0], elems)
117115
}
118116

119117
prepared, err := c.prewriteTxn(startTS, commitTS, primaryKey, grouped, gids)
@@ -131,6 +129,37 @@ func (c *ShardedCoordinator) dispatchTxn(startTS uint64, commitTS uint64, elems
131129
return &CoordinateResponse{CommitIndex: maxIndex}, nil
132130
}
133131

132+
func (c *ShardedCoordinator) resolveTxnCommitTS(startTS, commitTS uint64) (uint64, error) {
133+
if commitTS == 0 {
134+
commitTS = c.nextTxnTSAfter(startTS)
135+
} else {
136+
// Observe caller-provided commitTS to keep the HLC monotonic; without
137+
// this the clock could later issue timestamps smaller than commitTS.
138+
c.clock.Observe(commitTS)
139+
}
140+
if commitTS == 0 || commitTS <= startTS {
141+
return 0, errors.WithStack(ErrTxnCommitTSRequired)
142+
}
143+
return commitTS, nil
144+
}
145+
146+
func (c *ShardedCoordinator) dispatchSingleShardTxn(startTS, commitTS uint64, primaryKey []byte, gid uint64, elems []*Elem[OP]) (*CoordinateResponse, error) {
147+
g, err := c.txnGroupForID(gid)
148+
if err != nil {
149+
return nil, err
150+
}
151+
resp, err := g.Txn.Commit([]*pb.Request{
152+
onePhaseTxnRequest(startTS, commitTS, primaryKey, elems),
153+
})
154+
if err != nil {
155+
return nil, errors.WithStack(err)
156+
}
157+
if resp == nil {
158+
return &CoordinateResponse{}, nil
159+
}
160+
return &CoordinateResponse{CommitIndex: resp.CommitIndex}, nil
161+
}
162+
134163
type preparedGroup struct {
135164
gid uint64
136165
keys []*pb.Mutation
@@ -487,16 +516,9 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e
487516
if len(gids) != 1 {
488517
return nil, errors.WithStack(ErrInvalidRequest)
489518
}
490-
commitTS := reqs.CommitTS
491-
if commitTS == 0 {
492-
commitTS = c.nextTxnTSAfter(reqs.StartTS)
493-
} else {
494-
// Observe caller-provided commitTS to keep the HLC monotonic; without
495-
// this the clock could later issue timestamps smaller than commitTS.
496-
c.clock.Observe(commitTS)
497-
}
498-
if commitTS == 0 || commitTS <= reqs.StartTS {
499-
return nil, errors.WithStack(ErrTxnCommitTSRequired)
519+
commitTS, err := c.resolveTxnCommitTS(reqs.StartTS, reqs.CommitTS)
520+
if err != nil {
521+
return nil, err
500522
}
501523
return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids)
502524
}

kv/sharded_coordinator_txn_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,49 @@ func TestShardedCoordinatorDispatchTxn_CrossShardPhasesAndCommitIndex(t *testing
166166
require.Equal(t, commitMeta1.CommitTS, commitMeta2.CommitTS)
167167
}
168168

169+
func TestShardedCoordinatorDispatchTxn_SingleShardUsesOnePhase(t *testing.T) {
170+
t.Parallel()
171+
172+
engine := distribution.NewEngine()
173+
engine.UpdateRoute([]byte(""), nil, 1)
174+
175+
g1Txn := &recordingTransactional{
176+
responses: []*TransactionResponse{
177+
{CommitIndex: 17},
178+
},
179+
}
180+
181+
coord := NewShardedCoordinator(engine, map[uint64]*ShardGroup{
182+
1: {Txn: g1Txn},
183+
}, 1, NewHLC(), nil)
184+
185+
startTS := uint64(10)
186+
resp, err := coord.Dispatch(context.Background(), &OperationGroup[OP]{
187+
IsTxn: true,
188+
StartTS: startTS,
189+
Elems: []*Elem[OP]{
190+
{Op: Put, Key: []byte("b"), Value: []byte("v1")},
191+
{Op: Put, Key: []byte("c"), Value: []byte("v2")},
192+
},
193+
})
194+
require.NoError(t, err)
195+
require.NotNil(t, resp)
196+
require.Equal(t, uint64(17), resp.CommitIndex)
197+
198+
require.Len(t, g1Txn.requests, 1)
199+
req := g1Txn.requests[0]
200+
require.Equal(t, pb.Phase_NONE, req.Phase)
201+
require.Equal(t, startTS, req.Ts)
202+
require.Len(t, req.Mutations, 3)
203+
require.Equal(t, []byte("b"), req.Mutations[1].Key)
204+
require.Equal(t, []byte("c"), req.Mutations[2].Key)
205+
206+
meta := requestTxnMeta(t, req)
207+
require.Equal(t, []byte("b"), meta.PrimaryKey)
208+
require.Zero(t, meta.LockTTLms)
209+
require.Greater(t, meta.CommitTS, startTS)
210+
}
211+
169212
func TestShardedCoordinatorDispatchTxn_UsesProvidedCommitTS(t *testing.T) {
170213
t.Parallel()
171214

0 commit comments

Comments
 (0)