Skip to content

Commit 20d2978

Browse files
twmbclaude
andcommitted
output/aws_dynamodb: mark unattempted chunks as failed on error
When a chunk fails and individual retries also fail, mark all items in subsequent unattempted chunks as failed in the BatchError. This prevents silent data loss where unattempted items would be incorrectly treated as successful. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8ab0499 commit 20d2978

1 file changed

Lines changed: 6 additions & 4 deletions

File tree

internal/impl/aws/dynamodb/output.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,10 +394,7 @@ func (d *dynamoDBWriter) WriteBatch(ctx context.Context, b service.MessageBatch)
394394
// DynamoDB BatchWriteItem API limit.
395395
for chunkStart := 0; chunkStart < len(writeReqs); chunkStart += dynamoDBMaxBatchItems {
396396
boff.Reset()
397-
chunkEnd := chunkStart + dynamoDBMaxBatchItems
398-
if chunkEnd > len(writeReqs) {
399-
chunkEnd = len(writeReqs)
400-
}
397+
chunkEnd := min(chunkStart+dynamoDBMaxBatchItems, len(writeReqs))
401398
chunk := writeReqs[chunkStart:chunkEnd]
402399

403400
batchResult, err := d.client.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
@@ -438,6 +435,11 @@ func (d *dynamoDBWriter) WriteBatch(ctx context.Context, b service.MessageBatch)
438435
if batchErr.IndexedErrors() == 0 {
439436
err = nil
440437
} else {
438+
// Mark all items in subsequent unattempted chunks as
439+
// failed to prevent silent data loss.
440+
for k := chunkEnd; k < len(writeReqs); k++ {
441+
batchErr.Failed(k, headlineErr)
442+
}
441443
err = batchErr
442444
}
443445
}

0 commit comments

Comments
 (0)