Skip to content

Commit 7ccc764

Browse files
authored
Merge pull request #342 from bootjp/feature/dynamodb
Store DynamoDB schemas and items as protobuf while preserving legacy …
2 parents c427b79 + f4e491b commit 7ccc764

35 files changed

Lines changed: 1903 additions & 197 deletions

adapter/add_voter_join_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,13 +296,11 @@ func (w *serverWorkers) Go(run func() error) {
296296
if w == nil || run == nil {
297297
return
298298
}
299-
w.wg.Add(1)
300-
go func() {
301-
defer w.wg.Done()
299+
w.wg.Go(func() {
302300
if err := run(); err != nil {
303301
w.errCh <- err
304302
}
305-
}()
303+
})
306304
}
307305

308306
func (w *serverWorkers) AwaitNoError(t *testing.T, timeout time.Duration) {

adapter/dynamodb.go

Lines changed: 48 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"hash/fnv"
1010
"io"
1111
"log/slog"
12+
"maps"
1213
"math/big"
1314
"net"
1415
"net/http"
16+
"slices"
1517
"sort"
1618
"strconv"
1719
"strings"
@@ -391,9 +393,7 @@ func (s *dynamoRequestMetricsState) tableMetrics() map[string]monitoring.DynamoD
391393
return nil
392394
}
393395
out := make(map[string]monitoring.DynamoDBTableMetrics, len(s.tables))
394-
for table, metrics := range s.tables {
395-
out[table] = metrics
396-
}
396+
maps.Copy(out, s.tables)
397397
return out
398398
}
399399

@@ -738,7 +738,7 @@ func buildCreateTableProjection(in createTableProjection) (dynamoGSIProjection,
738738
func (d *DynamoDBServer) createTableWithRetry(ctx context.Context, tableName string, baseSchema *dynamoTableSchema) error {
739739
backoff := transactRetryInitialBackoff
740740
deadline := time.Now().Add(transactRetryMaxDuration)
741-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
741+
for range transactRetryMaxAttempts {
742742
readTS := d.nextTxnReadTS()
743743
exists, err := d.tableExistsAt(ctx, tableName, readTS)
744744
if err != nil {
@@ -795,7 +795,7 @@ func makeCreateTableRequest(baseSchema *dynamoTableSchema, nextGeneration uint64
795795
MigratingFromGeneration: baseSchema.MigratingFromGeneration,
796796
Generation: nextGeneration,
797797
}
798-
schemaBytes, err := json.Marshal(schema)
798+
schemaBytes, err := encodeStoredDynamoTableSchema(schema)
799799
if err != nil {
800800
return nil, errors.WithStack(err)
801801
}
@@ -848,7 +848,7 @@ func decodeDeleteTableInput(bodyReader io.Reader) (deleteTableInput, error) {
848848
func (d *DynamoDBServer) deleteTableWithRetry(ctx context.Context, tableName string) error {
849849
backoff := transactRetryInitialBackoff
850850
deadline := time.Now().Add(transactRetryMaxDuration)
851-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
851+
for range transactRetryMaxAttempts {
852852
readTS := d.nextTxnReadTS()
853853
schema, exists, err := d.loadTableSchemaAt(ctx, tableName, readTS)
854854
if err != nil {
@@ -947,10 +947,7 @@ func (d *DynamoDBServer) scanDeleteCandidateKeys(ctx context.Context, prefix []b
947947

948948
func (d *DynamoDBServer) deleteKeysByBatches(ctx context.Context, keys [][]byte) error {
949949
for start := 0; start < len(keys); start += tableCleanupDeleteBatchSize {
950-
end := start + tableCleanupDeleteBatchSize
951-
if end > len(keys) {
952-
end = len(keys)
953-
}
950+
end := min(start+tableCleanupDeleteBatchSize, len(keys))
954951
if err := d.dispatchDeleteBatch(ctx, keys[start:end]); err != nil {
955952
return err
956953
}
@@ -1043,10 +1040,7 @@ func decodeListTablesInput(bodyReader io.Reader) (listTablesInput, error) {
10431040
func paginateTableNames(names []string, in listTablesInput) ([]string, bool) {
10441041
start := findExclusiveStartIndex(names, in.ExclusiveStartTableName)
10451042
limit := resolveTableListLimit(in.Limit, len(names))
1046-
end := start + limit
1047-
if end > len(names) {
1048-
end = len(names)
1049-
}
1043+
end := min(start+limit, len(names))
10501044
return names[start:end], end < len(names)
10511045
}
10521046

@@ -1163,7 +1157,7 @@ func (d *DynamoDBServer) retryItemWriteWithGeneration(
11631157
) (*itemWritePlan, error) {
11641158
backoff := transactRetryInitialBackoff
11651159
deadline := time.Now().Add(transactRetryMaxDuration)
1166-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
1160+
for range transactRetryMaxAttempts {
11671161
readTS := d.nextTxnReadTS()
11681162
plan, err := prepare(readTS)
11691163
if err != nil {
@@ -1547,9 +1541,7 @@ func buildUpdatedItem(schema *dynamoTableSchema, in updateItemInput, current map
15471541
return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrConditionalFailed, err.Error())
15481542
}
15491543
nextItem := cloneAttributeValueMap(current)
1550-
for k, v := range in.Key {
1551-
nextItem[k] = v
1552-
}
1544+
maps.Copy(nextItem, in.Key)
15531545
if err := applyUpdateExpression(in.UpdateExpression, in.ExpressionAttributeNames, in.ExpressionAttributeValues, nextItem); err != nil {
15541546
return nil, newDynamoAPIError(http.StatusBadRequest, dynamoErrValidation, err.Error())
15551547
}
@@ -1598,7 +1590,7 @@ func buildItemWriteRequestWithSource(
15981590
nextItem map[string]attributeValue,
15991591
current *dynamoItemLocation,
16001592
) (*kv.OperationGroup[kv.OP], [][]byte, error) {
1601-
payload, err := json.Marshal(nextItem)
1593+
payload, err := encodeStoredDynamoItem(nextItem)
16021594
if err != nil {
16031595
return nil, nil, errors.WithStack(err)
16041596
}
@@ -2155,9 +2147,9 @@ func resolveQueryCondition(in queryInput, schema *dynamoTableSchema) (dynamoKeyS
21552147
func filterQueryItems(kvs []*store.KVPair, cond queryCondition) ([]map[string]attributeValue, error) {
21562148
items := make([]map[string]attributeValue, 0, len(kvs))
21572149
for _, kvp := range kvs {
2158-
item := map[string]attributeValue{}
2159-
if err := json.Unmarshal(kvp.Value, &item); err != nil {
2160-
return nil, errors.WithStack(err)
2150+
item, err := decodeStoredDynamoItem(kvp.Value)
2151+
if err != nil {
2152+
return nil, err
21612153
}
21622154
if !matchesQueryCondition(item, cond) {
21632155
continue
@@ -2884,9 +2876,9 @@ func (it *tableReadIterator) Next(ctx context.Context) (map[string]attributeValu
28842876
if err != nil || !ok {
28852877
return nil, ok, err
28862878
}
2887-
item := map[string]attributeValue{}
2888-
if err := json.Unmarshal(kvp.Value, &item); err != nil {
2889-
return nil, false, errors.WithStack(err)
2879+
item, err := decodeStoredDynamoItem(kvp.Value)
2880+
if err != nil {
2881+
return nil, false, err
28902882
}
28912883
item, err = it.projector(item)
28922884
if err != nil {
@@ -3008,9 +3000,9 @@ func projectItemByAttributes(item map[string]attributeValue, attrs []string) map
30083000
func decodeItemsFromKVPairs(kvs []*store.KVPair) ([]map[string]attributeValue, error) {
30093001
items := make([]map[string]attributeValue, 0, len(kvs))
30103002
for _, kvp := range kvs {
3011-
item := map[string]attributeValue{}
3012-
if err := json.Unmarshal(kvp.Value, &item); err != nil {
3013-
return nil, errors.WithStack(err)
3003+
item, err := decodeStoredDynamoItem(kvp.Value)
3004+
if err != nil {
3005+
return nil, err
30143006
}
30153007
items = append(items, item)
30163008
}
@@ -3188,12 +3180,10 @@ func (d *DynamoDBServer) startGSIReadWorkers(
31883180
results chan<- gsiReadResult,
31893181
cancel context.CancelFunc,
31903182
) {
3191-
for i := 0; i < workerCount; i++ {
3192-
wg.Add(1)
3193-
go func() {
3194-
defer wg.Done()
3183+
for range workerCount {
3184+
wg.Go(func() {
31953185
d.gsiReadWorker(ctx, readTS, filter, jobs, results, cancel)
3196-
}()
3186+
})
31973187
}
31983188
}
31993189

@@ -3222,7 +3212,7 @@ func collectOrderedGSIReadResults(
32223212
}
32233213
}
32243214
items := make([]map[string]attributeValue, 0, len(indexed))
3225-
for i := 0; i < len(itemKeys); i++ {
3215+
for i := range itemKeys {
32263216
if item := indexed[i]; item != nil {
32273217
items = append(items, item)
32283218
}
@@ -3899,7 +3889,7 @@ func (d *DynamoDBServer) transactWriteItemsWithRetry(ctx context.Context, in tra
38993889
backoff := transactRetryInitialBackoff
39003890
deadline := time.Now().Add(transactRetryMaxDuration)
39013891
var lastErr error
3902-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
3892+
for range transactRetryMaxAttempts {
39033893
reqs, generations, cleanupKeys, err := d.buildTransactWriteItemsRequest(ctx, in)
39043894
if err != nil {
39053895
return err
@@ -4232,7 +4222,7 @@ func buildConditionCheckLockRequest(
42324222
) (*kv.OperationGroup[kv.OP], [][]byte, error) {
42334223
elems := make([]*kv.Elem[kv.OP], 0, 1)
42344224
if found {
4235-
payload, err := json.Marshal(current)
4225+
payload, err := encodeStoredDynamoItem(current)
42364226
if err != nil {
42374227
return nil, nil, errors.WithStack(err)
42384228
}
@@ -4285,10 +4275,7 @@ func waitRetryWithDeadline(ctx context.Context, deadline time.Time, backoff time
42854275
if remaining <= 0 {
42864276
return errors.New("retry timeout")
42874277
}
4288-
delay := backoff
4289-
if delay > remaining {
4290-
delay = remaining
4291-
}
4278+
delay := min(backoff, remaining)
42924279
return waitTransactRetryBackoff(ctx, delay)
42934280
}
42944281

@@ -5061,9 +5048,7 @@ func removeDocumentPathAttribute(current attributeValue, token documentPathToken
50615048

50625049
func replaceAttributeValueMap(dst map[string]attributeValue, src map[string]attributeValue) {
50635050
clear(dst)
5064-
for key, value := range src {
5065-
dst[key] = value
5066-
}
5051+
maps.Copy(dst, src)
50675052
}
50685053

50695054
func deleteAttributeValueElements(current attributeValue, deleteValue attributeValue) (attributeValue, bool, error) {
@@ -5739,12 +5724,7 @@ func listContainsAttributeValue(values []attributeValue, needle attributeValue)
57395724
}
57405725

57415726
func stringSetContains(values []string, needle string) bool {
5742-
for _, value := range values {
5743-
if value == needle {
5744-
return true
5745-
}
5746-
}
5747-
return false
5727+
return slices.Contains(values, needle)
57485728
}
57495729

57505730
func numberSetContains(values []string, needle string) bool {
@@ -6236,12 +6216,12 @@ func parseComparisonKeyConditionTerm(term string) (parsedKeyConditionTerm, error
62366216

62376217
func splitComparisonTerm(term string, op queryRangeOperator) (parsedKeyConditionTerm, bool) {
62386218
opStr := string(op)
6239-
idx := strings.Index(term, opStr)
6240-
if idx < 0 {
6219+
before, after, ok := strings.Cut(term, opStr)
6220+
if !ok {
62416221
return parsedKeyConditionTerm{}, false
62426222
}
6243-
left := strings.TrimSpace(term[:idx])
6244-
right := strings.TrimSpace(term[idx+len(opStr):])
6223+
left := strings.TrimSpace(before)
6224+
right := strings.TrimSpace(after)
62456225
if left == "" || !strings.HasPrefix(right, ":") {
62466226
return parsedKeyConditionTerm{}, false
62476227
}
@@ -6905,9 +6885,9 @@ func splitNumericMantissa(trimmed string) (string, string, error) {
69056885
}
69066886
intPart := trimmed
69076887
fracPart := ""
6908-
if dot := strings.IndexByte(trimmed, '.'); dot >= 0 {
6909-
intPart = trimmed[:dot]
6910-
fracPart = trimmed[dot+1:]
6888+
if before, after, ok := strings.Cut(trimmed, "."); ok {
6889+
intPart = before
6890+
fracPart = after
69116891
}
69126892
if intPart == "" && fracPart == "" {
69136893
return "", "", errors.New("unsupported key attribute type")
@@ -7399,9 +7379,9 @@ func (d *DynamoDBServer) loadTableSchemaAt(ctx context.Context, tableName string
73997379
}
74007380
return nil, false, errors.WithStack(err)
74017381
}
7402-
schema := &dynamoTableSchema{}
7403-
if err := json.Unmarshal(b, schema); err != nil {
7404-
return nil, false, errors.WithStack(err)
7382+
schema, err := decodeStoredDynamoTableSchema(b)
7383+
if err != nil {
7384+
return nil, false, err
74057385
}
74067386
d.observeTables(ctx, schema.TableName)
74077387
return schema, true, nil
@@ -7430,9 +7410,9 @@ func (d *DynamoDBServer) readItemAtKeyAt(ctx context.Context, key []byte, ts uin
74307410
}
74317411
return nil, false, errors.WithStack(err)
74327412
}
7433-
item := map[string]attributeValue{}
7434-
if err := json.Unmarshal(b, &item); err != nil {
7435-
return nil, false, errors.WithStack(err)
7413+
item, err := decodeStoredDynamoItem(b)
7414+
if err != nil {
7415+
return nil, false, err
74367416
}
74377417
return item, true, nil
74387418
}
@@ -7481,7 +7461,7 @@ func (d *DynamoDBServer) ensureLegacyTableMigration(ctx context.Context, tableNa
74817461
func (d *DynamoDBServer) ensureLegacyTableMigrationLocked(ctx context.Context, tableName string) error {
74827462
backoff := transactRetryInitialBackoff
74837463
deadline := time.Now().Add(transactRetryMaxDuration)
7484-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
7464+
for range transactRetryMaxAttempts {
74857465
readTS := d.nextTxnReadTS()
74867466
schema, exists, err := d.loadTableSchemaAt(ctx, tableName, readTS)
74877467
if err != nil {
@@ -7599,7 +7579,7 @@ func (d *DynamoDBServer) migrateLegacyItem(
75997579

76007580
backoff := transactRetryInitialBackoff
76017581
deadline := time.Now().Add(transactRetryMaxDuration)
7602-
for attempt := 0; attempt < transactRetryMaxAttempts; attempt++ {
7582+
for range transactRetryMaxAttempts {
76037583
readTS := d.nextTxnReadTS()
76047584
req, done, err := d.buildLegacyMigrationRequest(ctx, targetSchema, sourceSchema, targetKey, sourceKey, readTS)
76057585
if err != nil {
@@ -7650,9 +7630,9 @@ func (d *DynamoDBServer) migrateLegacySourcePage(
76507630
if !bytes.HasPrefix(kvp.Key, prefix) {
76517631
return nil, true, nil
76527632
}
7653-
item := map[string]attributeValue{}
7654-
if err := json.Unmarshal(kvp.Value, &item); err != nil {
7655-
return nil, false, errors.WithStack(err)
7633+
item, err := decodeStoredDynamoItem(kvp.Value)
7634+
if err != nil {
7635+
return nil, false, err
76567636
}
76577637
if err := d.migrateLegacyItem(ctx, targetSchema, sourceSchema, kvp.Key, item); err != nil {
76587638
return nil, false, err
@@ -7737,7 +7717,7 @@ func (d *DynamoDBServer) finalizeLegacyTableMigration(ctx context.Context, schem
77377717
oldGeneration := schema.MigratingFromGeneration
77387718
finalized := *schema
77397719
finalized.MigratingFromGeneration = 0
7740-
body, err := json.Marshal(&finalized)
7720+
body, err := encodeStoredDynamoTableSchema(&finalized)
77417721
if err != nil {
77427722
return errors.WithStack(err)
77437723
}

adapter/dynamodb_failure_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func createCompositeKeyTable(t *testing.T, ctx context.Context, client *dynamodb
101101
func putCompositeItems(t *testing.T, ctx context.Context, client *dynamodb.Client, tableName string, total int) {
102102
t.Helper()
103103

104-
for i := 0; i < total; i++ {
104+
for i := range total {
105105
_, err := client.PutItem(ctx, &dynamodb.PutItemInput{
106106
TableName: aws.String(tableName),
107107
Item: map[string]ddbTypes.AttributeValue{

adapter/dynamodb_migration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func TestDynamoDB_EnsureLegacyTableMigration_NormalizesLegacyGSIJSONFormat(t *te
175175
})
176176
require.NoError(t, err)
177177
writer.put(dynamoTableMetaKey(legacySchema.TableName), legacyBody)
178-
writer.put(dynamoTableGenerationKey(legacySchema.TableName), []byte(fmt.Sprintf("%d", legacySchema.Generation)))
178+
writer.put(dynamoTableGenerationKey(legacySchema.TableName), fmt.Appendf(nil, "%d", legacySchema.Generation))
179179

180180
writer.writeItem(legacySchema, map[string]attributeValue{
181181
"pk": newStringAttributeValue("tenant"),

0 commit comments

Comments
 (0)