Skip to content

Commit e0d5153

Browse files
committed
kv: address review follow-ups
1 parent 0686c74 commit e0d5153

6 files changed

Lines changed: 51 additions & 27 deletions

File tree

adapter/dynamodb.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ const (
6262
batchWriteItemMaxItems = 25
6363
dynamoMaxRequestBodyBytes = 1 << 20
6464

65-
dynamoTableMetaPrefix = "!ddb|meta|table|"
66-
dynamoTableGenerationPrefix = "!ddb|meta|gen|"
67-
dynamoItemPrefix = "!ddb|item|"
68-
dynamoGSIPrefix = "!ddb|gsi|"
65+
dynamoTableMetaPrefix = kv.DynamoTableMetaPrefix
66+
dynamoTableGenerationPrefix = kv.DynamoTableGenerationPrefix
67+
dynamoItemPrefix = kv.DynamoItemPrefix
68+
dynamoGSIPrefix = kv.DynamoGSIPrefix
6969
dynamoScanPageLimit = 1024
7070
dynamoKeyEscapeByte = byte(0x00)
7171
dynamoKeyTerminatorByte = byte(0x01)

kv/shard_key.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,24 @@ const redisInternalRoutePrefix = "!redis|"
1212
var redisInternalRoutePrefixBytes = []byte(redisInternalRoutePrefix)
1313

1414
const (
15-
dynamoRoutePrefix = "!ddb|route|table|"
16-
dynamoTableMetaPrefix = "!ddb|meta|table|"
17-
dynamoTableGenerationPrefix = "!ddb|meta|gen|"
18-
dynamoItemPrefix = "!ddb|item|"
19-
dynamoGSIPrefix = "!ddb|gsi|"
15+
dynamoRoutePrefix = "!ddb|route|table|"
16+
17+
// DynamoTableMetaPrefix prefixes DynamoDB table metadata keys.
18+
DynamoTableMetaPrefix = "!ddb|meta|table|"
19+
// DynamoTableGenerationPrefix prefixes DynamoDB table generation keys.
20+
DynamoTableGenerationPrefix = "!ddb|meta|gen|"
21+
// DynamoItemPrefix prefixes DynamoDB item storage keys.
22+
DynamoItemPrefix = "!ddb|item|"
23+
// DynamoGSIPrefix prefixes DynamoDB GSI storage keys.
24+
DynamoGSIPrefix = "!ddb|gsi|"
2025
)
2126

2227
var (
2328
dynamoRoutePrefixBytes = []byte(dynamoRoutePrefix)
24-
dynamoTableMetaPrefixBytes = []byte(dynamoTableMetaPrefix)
25-
dynamoTableGenerationPrefixBytes = []byte(dynamoTableGenerationPrefix)
26-
dynamoItemPrefixBytes = []byte(dynamoItemPrefix)
27-
dynamoGSIPrefixBytes = []byte(dynamoGSIPrefix)
29+
dynamoTableMetaPrefixBytes = []byte(DynamoTableMetaPrefix)
30+
dynamoTableGenerationPrefixBytes = []byte(DynamoTableGenerationPrefix)
31+
dynamoItemPrefixBytes = []byte(DynamoItemPrefix)
32+
dynamoGSIPrefixBytes = []byte(DynamoGSIPrefix)
2833
)
2934

3035
// routeKey normalizes internal keys (e.g., list metadata/items) to the logical

kv/shard_key_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ func TestRouteKey_NormalizesDynamoKeysToTable(t *testing.T) {
3636
indexSegment := base64.RawURLEncoding.EncodeToString([]byte("status-index"))
3737
want := dynamoRouteTableKey(tableSegment)
3838

39-
metaKey := append([]byte(dynamoTableMetaPrefix), tableSegment...)
40-
generationKey := append([]byte(dynamoTableGenerationPrefix), tableSegment...)
41-
itemKey := append([]byte(dynamoItemPrefix+string(tableSegment)+"|7|"), []byte("pk\x00\x01")...)
42-
gsiKey := append([]byte(dynamoGSIPrefix+string(tableSegment)+"|7|"+indexSegment+"|"), []byte("idx\x00\x01pk\x00\x01")...)
39+
metaKey := append([]byte(DynamoTableMetaPrefix), tableSegment...)
40+
generationKey := append([]byte(DynamoTableGenerationPrefix), tableSegment...)
41+
itemKey := append([]byte(DynamoItemPrefix+string(tableSegment)+"|7|"), []byte("pk\x00\x01")...)
42+
gsiKey := append([]byte(DynamoGSIPrefix+string(tableSegment)+"|7|"+indexSegment+"|"), []byte("idx\x00\x01pk\x00\x01")...)
4343

4444
require.Equal(t, want, routeKey(metaKey))
4545
require.Equal(t, want, routeKey(generationKey))

kv/sharded_coordinator.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -516,16 +516,9 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e
516516
if len(gids) != 1 {
517517
return nil, errors.WithStack(ErrInvalidRequest)
518518
}
519-
commitTS := reqs.CommitTS
520-
if commitTS == 0 {
521-
commitTS = c.nextTxnTSAfter(reqs.StartTS)
522-
} else {
523-
// Observe caller-provided commitTS to keep the HLC monotonic; without
524-
// this the clock could later issue timestamps smaller than commitTS.
525-
c.clock.Observe(commitTS)
526-
}
527-
if commitTS == 0 || commitTS <= reqs.StartTS {
528-
return nil, errors.WithStack(ErrTxnCommitTSRequired)
519+
commitTS, err := c.resolveTxnCommitTS(reqs.StartTS, reqs.CommitTS)
520+
if err != nil {
521+
return nil, err
529522
}
530523
return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids)
531524
}

kv/txn_codec.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const txnLockFlagPrimary byte = 0x01
2525
const (
2626
txnMetaFlagLockTTL byte = 0x01
2727
txnMetaFlagCommitTS byte = 0x02
28+
txnMetaKnownFlags byte = txnMetaFlagLockTTL | txnMetaFlagCommitTS
2829
)
2930

3031
const txnMetaHeaderSize = 2
@@ -139,6 +140,9 @@ func decodeTxnMetaV2(b []byte) (TxnMeta, error) {
139140
return TxnMeta{}, errors.New("txn meta: truncated flags")
140141
}
141142
flags := b[1]
143+
if flags&^txnMetaKnownFlags != 0 {
144+
return TxnMeta{}, errors.WithStack(errors.Newf("txn meta: unsupported flags 0x%02x", flags))
145+
}
142146
r := bytes.NewReader(b[txnMetaHeaderSize:])
143147
primaryLen, err := readTxnUint64(r, "txn meta: primary key length truncated")
144148
if err != nil {
@@ -162,6 +166,9 @@ func decodeTxnMetaV2(b []byte) (TxnMeta, error) {
162166
return TxnMeta{}, err
163167
}
164168
}
169+
if r.Len() != 0 {
170+
return TxnMeta{}, errors.WithStack(errors.Newf("txn meta: unexpected trailing bytes %d", r.Len()))
171+
}
165172
return meta, nil
166173
}
167174

kv/txn_codec_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,22 @@ func TestDecodeTxnMetaV1Compatibility(t *testing.T) {
7474
require.NoError(t, err)
7575
require.Equal(t, meta, decoded)
7676
}
77+
78+
func TestDecodeTxnMetaV2RejectsUnknownFlags(t *testing.T) {
79+
t.Parallel()
80+
81+
encoded := encodeTxnMetaV2(TxnMeta{PrimaryKey: []byte("pk")})
82+
encoded[1] |= 0x80
83+
84+
_, err := DecodeTxnMeta(encoded)
85+
require.ErrorContains(t, err, "unsupported flags")
86+
}
87+
88+
func TestDecodeTxnMetaV2RejectsTrailingBytes(t *testing.T) {
89+
t.Parallel()
90+
91+
encoded := append(encodeTxnMetaV2(TxnMeta{PrimaryKey: []byte("pk")}), 0x01)
92+
93+
_, err := DecodeTxnMeta(encoded)
94+
require.ErrorContains(t, err, "unexpected trailing bytes")
95+
}

0 commit comments

Comments
 (0)