Skip to content

Commit 1ad6223

Browse files
twmbclaude
andcommitted
output/aws_dynamodb: chunk batch writes to 25 item limit
DynamoDB BatchWriteItem API limits requests to 25 items. Previously, exceeding this limit caused a fallback to individual writes. Now batches are automatically chunked into groups of 25, with backoff reset between chunks. Two data-integrity paths are handled explicitly so a chunk failure cannot silently drop later items: * When a chunk's BatchWriteItem fails and the individual PutItem fallback recovers it, the loop now continues to the next chunk instead of returning nil early. * When a chunk's unprocessed-items retry budget is exhausted, the writer returns a service.BatchError that maps each still-unprocessed item back to its original batch index via reflect.DeepEqual, with a pessimistic whole-chunk-failed fallback if the SDK returns a shape we do not recognize. Items in unattempted later chunks are also marked failed so upstream retry targets only the unwritten items. Unit tests cover single-chunk happy path, multi-chunk happy path, individual-fallback continuation to the next chunk, global-index arithmetic for partial individual fallback in a non-zero chunk, the unprocessed-items BatchError path, and the pessimistic fallback when DeepEqual cannot match. An integration test against dynamodb-local exercises multi-chunk writes end to end. Closes #992 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 36090c3 commit 1ad6223

3 files changed

Lines changed: 473 additions & 64 deletions

File tree

internal/impl/aws/dynamodb/output.go

Lines changed: 123 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"fmt"
2222
"maps"
23+
"reflect"
2324
"strconv"
2425
"sync"
2526
"time"
@@ -38,6 +39,10 @@ import (
3839
)
3940

4041
const (
42+
// dynamoDBMaxBatchItems is the maximum number of items that can be sent in
43+
// a single BatchWriteItem request per the AWS API.
44+
dynamoDBMaxBatchItems = 25
45+
4146
// DynamoDB Output Fields
4247
ddboField = "namespace"
4348
ddboFieldTable = "table"
@@ -386,82 +391,137 @@ func (d *dynamoDBWriter) WriteBatch(ctx context.Context, b service.MessageBatch)
386391
return err
387392
}
388393

389-
batchResult, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
390-
RequestItems: map[string][]types.WriteRequest{
391-
*d.table: writeReqs,
392-
},
393-
})
394-
if err != nil {
395-
headlineErr := err
396-
397-
// None of the messages were successful, attempt to send individually
398-
individualRequestsLoop:
399-
for err != nil {
400-
batchErr := service.NewBatchError(b, headlineErr)
401-
for i, req := range writeReqs {
402-
if req.PutRequest == nil {
403-
continue
404-
}
405-
if _, iErr := d.client.PutItem(ctx, &dynamodb.PutItemInput{
406-
TableName: d.table,
407-
Item: req.PutRequest.Item,
408-
}); iErr != nil {
409-
d.log.Errorf("Put error: %v\n", iErr)
410-
wait := boff.NextBackOff()
411-
if wait == backoff.Stop {
412-
break individualRequestsLoop
394+
// Chunk writeReqs into groups of dynamoDBMaxBatchItems (25) per the
395+
// DynamoDB BatchWriteItem API limit.
396+
for chunkStart := 0; chunkStart < len(writeReqs); chunkStart += dynamoDBMaxBatchItems {
397+
boff.Reset()
398+
chunkEnd := min(chunkStart+dynamoDBMaxBatchItems, len(writeReqs))
399+
chunk := writeReqs[chunkStart:chunkEnd]
400+
401+
batchResult, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
402+
RequestItems: map[string][]types.WriteRequest{
403+
*d.table: chunk,
404+
},
405+
})
406+
if err != nil {
407+
headlineErr := err
408+
409+
// None of the messages were successful, attempt to send individually
410+
individualRequestsLoop:
411+
for err != nil {
412+
batchErr := service.NewBatchError(b, headlineErr)
413+
for j, req := range chunk {
414+
if req.PutRequest == nil {
415+
continue
413416
}
414-
select {
415-
case <-time.After(wait):
416-
case <-ctx.Done():
417-
break individualRequestsLoop
417+
if _, iErr := d.client.PutItem(ctx, &dynamodb.PutItemInput{
418+
TableName: d.table,
419+
Item: req.PutRequest.Item,
420+
}); iErr != nil {
421+
d.log.Errorf("Put error: %v\n", iErr)
422+
wait := boff.NextBackOff()
423+
if wait == backoff.Stop {
424+
break individualRequestsLoop
425+
}
426+
select {
427+
case <-time.After(wait):
428+
case <-ctx.Done():
429+
break individualRequestsLoop
430+
}
431+
batchErr.Failed(chunkStart+j, iErr)
432+
} else {
433+
chunk[j].PutRequest = nil
418434
}
419-
batchErr.Failed(i, iErr)
435+
}
436+
if batchErr.IndexedErrors() == 0 {
437+
err = nil
420438
} else {
421-
writeReqs[i].PutRequest = nil
439+
// Mark all items in subsequent unattempted chunks as
440+
// failed to prevent silent data loss.
441+
for k := chunkEnd; k < len(writeReqs); k++ {
442+
batchErr.Failed(k, headlineErr)
443+
}
444+
err = batchErr
422445
}
423446
}
424-
if batchErr.IndexedErrors() == 0 {
425-
err = nil
426-
} else {
427-
err = batchErr
447+
if err != nil {
448+
return err
428449
}
450+
// Individual fallback wrote every item in this chunk; move on.
451+
continue
429452
}
430-
return err
431-
}
432453

433-
unproc := batchResult.UnprocessedItems[*d.table]
434-
unprocessedLoop:
435-
for len(unproc) > 0 {
436-
wait := boff.NextBackOff()
437-
if wait == backoff.Stop {
438-
break unprocessedLoop
439-
}
454+
unproc := batchResult.UnprocessedItems[*d.table]
455+
unprocessedLoop:
456+
for len(unproc) > 0 {
457+
wait := boff.NextBackOff()
458+
if wait == backoff.Stop {
459+
break unprocessedLoop
460+
}
440461

441-
select {
442-
case <-time.After(wait):
443-
case <-ctx.Done():
444-
break unprocessedLoop
445-
}
446-
if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
447-
RequestItems: map[string][]types.WriteRequest{
448-
*d.table: unproc,
449-
},
450-
}); err != nil {
451-
d.log.Errorf("Write multi error: %v\n", err)
452-
} else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 {
453-
err = fmt.Errorf("setting %v items", len(unproc))
454-
} else {
455-
unproc = nil
462+
select {
463+
case <-time.After(wait):
464+
case <-ctx.Done():
465+
break unprocessedLoop
466+
}
467+
if batchResult, err = d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
468+
RequestItems: map[string][]types.WriteRequest{
469+
*d.table: unproc,
470+
},
471+
}); err != nil {
472+
d.log.Errorf("Write multi error: %v\n", err)
473+
} else if unproc = batchResult.UnprocessedItems[*d.table]; len(unproc) > 0 {
474+
err = fmt.Errorf("setting %v items", len(unproc))
475+
} else {
476+
unproc = nil
477+
}
456478
}
457-
}
458479

459-
if len(unproc) > 0 {
460-
if err == nil {
461-
err = errors.New("ran out of request retries")
480+
if len(unproc) > 0 {
481+
if err == nil {
482+
err = errors.New("ran out of request retries")
483+
}
484+
batchErr := service.NewBatchError(b, err)
485+
// Map each unprocessed item back to its original index in
486+
// this chunk so only the genuinely-unwritten items (and
487+
// items in unattempted later chunks) are marked as failed.
488+
matched := make([]bool, len(chunk))
489+
matches := 0
490+
for _, u := range unproc {
491+
for j := range chunk {
492+
if matched[j] {
493+
continue
494+
}
495+
if reflect.DeepEqual(u, chunk[j]) {
496+
matched[j] = true
497+
matches++
498+
break
499+
}
500+
}
501+
}
502+
if matches == len(unproc) {
503+
for j, m := range matched {
504+
if m {
505+
batchErr.Failed(chunkStart+j, err)
506+
}
507+
}
508+
} else {
509+
// Some unprocessed items did not match anything in the
510+
// chunk — the SDK returned a shape we did not expect. Fall
511+
// back to marking the whole chunk failed. Writes are
512+
// upserts, so retried items that already landed are
513+
// harmless; the alternative is silent data loss.
514+
for j := range chunk {
515+
batchErr.Failed(chunkStart+j, err)
516+
}
517+
}
518+
for k := chunkEnd; k < len(writeReqs); k++ {
519+
batchErr.Failed(k, err)
520+
}
521+
return batchErr
462522
}
463523
}
464-
return err
524+
return nil
465525
}
466526

467527
func (*dynamoDBWriter) Close(context.Context) error {
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dynamodb
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
"time"
22+
23+
"github.com/aws/aws-sdk-go-v2/aws"
24+
"github.com/aws/aws-sdk-go-v2/config"
25+
"github.com/aws/aws-sdk-go-v2/credentials"
26+
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
27+
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
28+
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
30+
"github.com/testcontainers/testcontainers-go"
31+
"github.com/testcontainers/testcontainers-go/wait"
32+
33+
"github.com/redpanda-data/benthos/v4/public/service"
34+
"github.com/redpanda-data/benthos/v4/public/service/integration"
35+
)
36+
37+
func TestIntegrationDynamoDBOutput(t *testing.T) {
38+
integration.CheckSkip(t)
39+
40+
ctr, err := testcontainers.Run(t.Context(), "amazon/dynamodb-local:latest",
41+
testcontainers.WithExposedPorts("8000/tcp"),
42+
testcontainers.WithWaitStrategy(
43+
wait.ForListeningPort("8000/tcp").WithStartupTimeout(30*time.Second),
44+
),
45+
)
46+
testcontainers.CleanupContainer(t, ctr)
47+
require.NoError(t, err)
48+
49+
mp, err := ctr.MappedPort(t.Context(), "8000/tcp")
50+
require.NoError(t, err)
51+
dynamoPort := mp.Port()
52+
53+
const tableName = "output_batch_table"
54+
require.Eventually(t, func() bool {
55+
return createTable(t.Context(), t, dynamoPort, tableName) == nil
56+
}, 30*time.Second, time.Second)
57+
58+
endpoint := fmt.Sprintf("http://localhost:%s", dynamoPort)
59+
60+
pConf, err := ddboOutputSpec().ParseYAML(fmt.Sprintf(`
61+
table: %s
62+
string_columns:
63+
id: ${! json("id") }
64+
content: ${! json("content") }
65+
region: us-east-1
66+
endpoint: %s
67+
credentials:
68+
id: xxxxx
69+
secret: xxxxx
70+
token: xxxxx
71+
`, tableName, endpoint), nil)
72+
require.NoError(t, err)
73+
74+
dConf, err := ddboConfigFromParsed(pConf)
75+
require.NoError(t, err)
76+
77+
writer, err := newDynamoDBWriter(dConf, service.MockResources())
78+
require.NoError(t, err)
79+
t.Cleanup(func() {
80+
require.NoError(t, writer.Close(context.Background()))
81+
})
82+
83+
require.NoError(t, writer.Connect(t.Context()))
84+
85+
const numMessages = 60
86+
batch := make(service.MessageBatch, numMessages)
87+
for i := range numMessages {
88+
batch[i] = service.NewMessage([]byte(fmt.Sprintf(`{"id":"id-%d","content":"content-%d"}`, i, i)))
89+
}
90+
91+
require.NoError(t, writer.WriteBatch(t.Context(), batch))
92+
93+
awsCfg, err := config.LoadDefaultConfig(t.Context(),
94+
config.WithRegion("us-east-1"),
95+
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("xxxxx", "xxxxx", "xxxxx")),
96+
)
97+
require.NoError(t, err)
98+
awsCfg.BaseEndpoint = &endpoint
99+
client := dynamodb.NewFromConfig(awsCfg)
100+
101+
seenIDs := make(map[string]bool, numMessages)
102+
var lastKey map[string]types.AttributeValue
103+
for {
104+
scanOut, err := client.Scan(t.Context(), &dynamodb.ScanInput{
105+
TableName: aws.String(tableName),
106+
ExclusiveStartKey: lastKey,
107+
})
108+
require.NoError(t, err)
109+
for _, item := range scanOut.Items {
110+
idAttr, ok := item["id"].(*types.AttributeValueMemberS)
111+
require.True(t, ok, "item missing string 'id' attribute")
112+
seenIDs[idAttr.Value] = true
113+
}
114+
if len(scanOut.LastEvaluatedKey) == 0 {
115+
break
116+
}
117+
lastKey = scanOut.LastEvaluatedKey
118+
}
119+
120+
assert.Len(t, seenIDs, numMessages, "expected all %d items to be written", numMessages)
121+
for i := range numMessages {
122+
expectedID := fmt.Sprintf("id-%d", i)
123+
assert.True(t, seenIDs[expectedID], "expected item %q to be present in table", expectedID)
124+
}
125+
}

0 commit comments

Comments
 (0)