Skip to content

Commit 5be6cd6

Browse files
committed
Fix DynamoDB Streams checkpointing - preserve iterator position across shard re-discoveries
Track shard iterators by shard ID in a dictionary instead of a positional list. On shard re-discovery, only create new iterators for newly discovered shards - existing shards keep their current NextShardIterator position. This prevents re-processing of already-invoked records when shards are re-discovered after a shard closes or after empty poll thresholds.
1 parent 592106d commit 5be6cd6

1 file changed

Lines changed: 41 additions & 36 deletions

File tree

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -100,31 +100,32 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
100100

101101
private async Task PollStream(string streamArn, CancellationToken stoppingToken)
102102
{
103-
var shardIterators = await GetShardIterators(streamArn, stoppingToken);
103+
// Track iterators by shard ID to preserve position across re-discoveries
104+
var shardIterators = await GetShardIterators(streamArn, null, stoppingToken);
104105
var emptyPollCount = 0;
105106

106107
while (!stoppingToken.IsCancellationRequested)
107108
{
108109
// Poll all shards concurrently
109-
var tasks = new List<Task<(int Index, GetRecordsResponse? Response)>>();
110-
for (int i = 0; i < shardIterators.Count; i++)
110+
var shardIds = shardIterators.Keys.ToList();
111+
var tasks = new List<Task<(string ShardId, GetRecordsResponse? Response)>>();
112+
foreach (var shardId in shardIds)
111113
{
112-
if (shardIterators[i] == null)
114+
var iterator = shardIterators[shardId];
115+
if (iterator == null)
113116
continue;
114117

115-
var index = i;
116-
var iterator = shardIterators[i]!;
117-
tasks.Add(PollShard(index, iterator, stoppingToken));
118+
tasks.Add(PollShard(shardId, iterator, stoppingToken));
118119
}
119120

120121
var activeCount = tasks.Count;
121122
_logger.LogInformation("Polling {activeShardCount} active shard(s) out of {totalCount} total", activeCount, shardIterators.Count);
122123

123124
if (activeCount == 0)
124125
{
125-
// All iterators exhausted — re-discover shards
126+
// All iterators exhausted — re-discover shards but preserve existing positions
126127
_logger.LogInformation("All shard iterators exhausted, re-discovering shards");
127-
shardIterators = await GetShardIterators(streamArn, stoppingToken);
128+
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
128129
emptyPollCount = 0;
129130
if (shardIterators.Count == 0)
130131
{
@@ -137,27 +138,27 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
137138
var results = await Task.WhenAll(tasks);
138139

139140
var hasRecords = false;
140-
var exhaustedInThisPoll = 0;
141-
foreach (var (index, response) in results)
141+
foreach (var (shardId, response) in results)
142142
{
143143
if (response == null)
144144
continue;
145145

146-
// Log when a shard iterator becomes null (shard closed)
147146
if (response.NextShardIterator == null)
148147
{
149-
exhaustedInThisPoll++;
150-
_logger.LogInformation("Shard at index {index} has been closed (NextShardIterator is null), records in final batch: {count}",
151-
index, response.Records.Count);
148+
_logger.LogInformation("Shard {shardId} has been closed (NextShardIterator is null), records in final batch: {count}",
149+
shardId, response.Records.Count);
150+
shardIterators.Remove(shardId);
151+
}
152+
else
153+
{
154+
shardIterators[shardId] = response.NextShardIterator;
152155
}
153-
154-
shardIterators[index] = response.NextShardIterator;
155156

156157
if (response.Records.Count == 0)
157158
continue;
158159

159160
hasRecords = true;
160-
_logger.LogInformation("Retrieved {recordCount} record(s) from shard index {index}", response.Records.Count, index);
161+
_logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId);
161162
var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn);
162163

163164
var lambdaPayload = new DynamoDBEvent
@@ -184,14 +185,11 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
184185
}
185186
}
186187

187-
// Remove exhausted shards (null iterators) and re-discover to pick up child shards
188-
var exhaustedCount = shardIterators.Count(s => s == null);
189-
if (exhaustedCount > 0)
188+
// If any shards were removed (closed), re-discover to pick up child shards
189+
if (results.Any(r => r.Response?.NextShardIterator == null))
190190
{
191-
_logger.LogInformation("Removing {exhaustedCount} exhausted shard(s), re-discovering to find child shards", exhaustedCount);
192-
// Re-discover shards immediately when any shard closes, since new records
193-
// will be on child shards that we don't have iterators for yet.
194-
shardIterators = await GetShardIterators(streamArn, stoppingToken);
191+
_logger.LogInformation("Closed shard(s) detected, re-discovering to find child shards");
192+
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
195193
emptyPollCount = 0;
196194
continue;
197195
}
@@ -203,11 +201,10 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
203201
else
204202
{
205203
emptyPollCount++;
206-
// After many empty polls, re-discover shards in case the stream topology changed
207204
if (emptyPollCount >= 30)
208205
{
209206
_logger.LogInformation("No records after {count} consecutive polls, re-discovering shards", emptyPollCount);
210-
shardIterators = await GetShardIterators(streamArn, stoppingToken);
207+
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
211208
emptyPollCount = 0;
212209
continue;
213210
}
@@ -218,24 +215,27 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
218215
}
219216
}
220217

221-
private async Task<(int Index, GetRecordsResponse? Response)> PollShard(int index, string iterator, CancellationToken stoppingToken)
218+
private async Task<(string ShardId, GetRecordsResponse? Response)> PollShard(string shardId, string iterator, CancellationToken stoppingToken)
222219
{
223220
var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest
224221
{
225222
ShardIterator = iterator,
226223
Limit = _config.BatchSize
227224
}, stoppingToken);
228225

229-
return (index, response);
226+
return (shardId, response);
230227
}
231228

232-
private async Task<List<string?>> GetShardIterators(string streamArn, CancellationToken stoppingToken)
229+
/// <summary>
230+
/// Discover open shards and get iterators. If existingIterators is provided, only creates new iterators
231+
/// for shards not already tracked — preserving the stream position for known shards.
232+
/// </summary>
233+
private async Task<Dictionary<string, string?>> GetShardIterators(string streamArn, Dictionary<string, string?>? existingIterators, CancellationToken stoppingToken)
233234
{
234235
_logger.LogInformation("Discovering shards for stream {streamArn}", streamArn);
235236
var shards = new List<Shard>();
236237
string? lastEvaluatedShardId = null;
237238

238-
// Paginate through all shards
239239
do
240240
{
241241
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
@@ -250,24 +250,29 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
250250

251251
_logger.LogInformation("Discovered {shardCount} total shard(s) for stream", shards.Count);
252252

253-
// Only get iterators for open (leaf) shards — shards without an EndingSequenceNumber.
254-
// Closed shards with LATEST iterator type will never return new records.
255253
var openShards = shards.Where(s => s.SequenceNumberRange?.EndingSequenceNumber == null).ToList();
256254
_logger.LogInformation("Filtered to {openCount} open (leaf) shard(s) out of {totalCount} total", openShards.Count, shards.Count);
257255

258-
var iterators = new List<string?>();
256+
var iterators = new Dictionary<string, string?>();
259257

260258
foreach (var shard in openShards)
261259
{
260+
// Preserve existing iterator position for shards we're already tracking
261+
if (existingIterators != null && existingIterators.TryGetValue(shard.ShardId, out var existingIterator) && existingIterator != null)
262+
{
263+
iterators[shard.ShardId] = existingIterator;
264+
continue;
265+
}
266+
262267
var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
263268
{
264269
StreamArn = streamArn,
265270
ShardId = shard.ShardId,
266271
ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType)
267272
}, stoppingToken);
268273

269-
_logger.LogInformation("Got iterator for shard {shardId}", shard.ShardId);
270-
iterators.Add(iteratorResponse.ShardIterator);
274+
_logger.LogInformation("Got new iterator for shard {shardId}", shard.ShardId);
275+
iterators[shard.ShardId] = iteratorResponse.ShardIterator;
271276
}
272277

273278
return iterators;

0 commit comments

Comments
 (0)