Skip to content

Commit 0a7f339

Browse files
committed
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-lambda-dotnet into normj/lambda-ddbstreams
2 parents bc88dd7 + 0799ed0 commit 0a7f339

4 files changed

Lines changed: 101 additions & 80 deletions

File tree

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs

Lines changed: 101 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -100,36 +100,50 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
100100

101101
private async Task PollStream(string streamArn, CancellationToken stoppingToken)
102102
{
103-
// Track iterators by shard ID to preserve position across re-discoveries
104-
var shardIterators = await GetShardIterators(streamArn, null, stoppingToken);
105-
var emptyPollCount = 0;
103+
// Shard polling strategy:
104+
//
105+
// Goal: Only deliver records to Lambda that were written AFTER the test tool started.
106+
//
107+
// 1. At startup, discover all shards. Open shards get a LATEST iterator (future records only).
108+
// Closed shards are recorded in a "closed at startup" set and never polled — they contain
109+
// only historical data from before the tool started.
110+
//
111+
// 2. Every 30 seconds (or immediately when a shard is exhausted), re-discover shards:
112+
// - Shards already being polled: leave their iterator alone (preserves position).
113+
// - Shards in the "closed at startup" set: skip (pre-existing historical data).
114+
// - Any other shard (new since startup): poll with TRIM_HORIZON to read all its records,
115+
// since the shard was created after the tool started and all its data is relevant.
116+
117+
var closedAtStartup = new HashSet<string>();
118+
var shardIterators = await DiscoverInitialShards(streamArn, closedAtStartup, stoppingToken);
119+
120+
_logger.LogInformation("Initial discovery: {openCount} open shard(s), {closedCount} closed shard(s) at startup",
121+
shardIterators.Count, closedAtStartup.Count);
122+
123+
var lastDiscoveryTime = DateTime.UtcNow;
124+
const int ShardRediscoveryIntervalSeconds = 30;
106125

107126
while (!stoppingToken.IsCancellationRequested)
108127
{
109-
// Poll all shards concurrently
110-
var shardIds = shardIterators.Keys.ToList();
128+
// Poll all active shards concurrently
111129
var tasks = new List<Task<(string ShardId, GetRecordsResponse? Response)>>();
112-
foreach (var shardId in shardIds)
130+
foreach (var (shardId, iterator) in shardIterators)
113131
{
114-
var iterator = shardIterators[shardId];
115132
if (iterator == null)
116133
continue;
117-
118134
tasks.Add(PollShard(shardId, iterator, stoppingToken));
119135
}
120136

121137
var activeCount = tasks.Count;
122-
_logger.LogInformation("Polling {activeShardCount} active shard(s) out of {totalCount} total", activeCount, shardIterators.Count);
138+
_logger.LogInformation("Polling {activeShardCount} active shard(s)", activeCount);
123139

124140
if (activeCount == 0)
125141
{
126-
// All iterators exhausted — re-discover shards but preserve existing positions
127-
_logger.LogInformation("All shard iterators exhausted, re-discovering shards");
128-
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
129-
emptyPollCount = 0;
142+
// No active shards — re-discover
143+
shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken);
144+
lastDiscoveryTime = DateTime.UtcNow;
130145
if (shardIterators.Count == 0)
131146
{
132-
_logger.LogInformation("No shards found, sleeping 1000ms before retry");
133147
await Task.Delay(1000, stoppingToken);
134148
}
135149
continue;
@@ -138,16 +152,18 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
138152
var results = await Task.WhenAll(tasks);
139153

140154
var hasRecords = false;
155+
var shardExhausted = false;
141156
foreach (var (shardId, response) in results)
142157
{
143158
if (response == null)
144159
continue;
145160

146161
if (response.NextShardIterator == null)
147162
{
148-
_logger.LogInformation("Shard {shardId} has been closed (NextShardIterator is null), records in final batch: {count}",
163+
_logger.LogInformation("Shard {shardId} exhausted (closed), records in final batch: {count}",
149164
shardId, response.Records.Count);
150165
shardIterators.Remove(shardId);
166+
shardExhausted = true;
151167
}
152168
else
153169
{
@@ -161,11 +177,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
161177
_logger.LogInformation("Retrieved {recordCount} record(s) from shard {shardId}", response.Records.Count, shardId);
162178
var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn);
163179

164-
var lambdaPayload = new DynamoDBEvent
165-
{
166-
Records = lambdaRecords
167-
};
168-
180+
var lambdaPayload = new DynamoDBEvent { Records = lambdaRecords };
169181
var invokeRequest = new InvokeRequest
170182
{
171183
InvocationType = InvocationType.RequestResponse,
@@ -185,31 +197,19 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
185197
}
186198
}
187199

188-
// If any shards were removed (closed), re-discover to pick up child shards
189-
if (results.Any(r => r.Response?.NextShardIterator == null))
200+
// Re-discover if a shard was exhausted or 30 seconds have elapsed
201+
var timeSinceDiscovery = (DateTime.UtcNow - lastDiscoveryTime).TotalSeconds;
202+
if (shardExhausted || timeSinceDiscovery >= ShardRediscoveryIntervalSeconds)
190203
{
191-
_logger.LogInformation("Closed shard(s) detected, re-discovering to find child shards");
192-
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
193-
emptyPollCount = 0;
204+
_logger.LogInformation("Re-discovering shards (exhausted={shardExhausted}, elapsed={elapsed}s)",
205+
shardExhausted, (int)timeSinceDiscovery);
206+
shardIterators = await DiscoverNewShards(streamArn, shardIterators, closedAtStartup, stoppingToken);
207+
lastDiscoveryTime = DateTime.UtcNow;
194208
continue;
195209
}
196210

197-
if (hasRecords)
211+
if (!hasRecords)
198212
{
199-
emptyPollCount = 0;
200-
}
201-
else
202-
{
203-
emptyPollCount++;
204-
if (emptyPollCount >= 30)
205-
{
206-
_logger.LogInformation("No records after {count} consecutive polls, re-discovering shards", emptyPollCount);
207-
shardIterators = await GetShardIterators(streamArn, shardIterators, stoppingToken);
208-
emptyPollCount = 0;
209-
continue;
210-
}
211-
212-
_logger.LogInformation("No records found (empty poll #{count}), sleeping {pollingInterval}ms", emptyPollCount, _config.PollingIntervalMs);
213213
await Task.Delay(_config.PollingIntervalMs, stoppingToken);
214214
}
215215
}
@@ -227,57 +227,93 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
227227
}
228228

229229
/// <summary>
230-
/// Discover open shards and get iterators. If existingIterators is provided, only creates new iterators
231-
/// for shards not already tracked — preserving the stream position for known shards.
230+
/// Initial shard discovery at startup. Uses LATEST for open shards and records closed shard IDs.
232231
/// </summary>
233-
private async Task<Dictionary<string, string?>> GetShardIterators(string streamArn, Dictionary<string, string?>? existingIterators, CancellationToken stoppingToken)
232+
private async Task<Dictionary<string, string?>> DiscoverInitialShards(string streamArn, HashSet<string> closedAtStartup, CancellationToken stoppingToken)
234233
{
235-
_logger.LogInformation("Discovering shards for stream {streamArn}", streamArn);
236-
var shards = new List<Shard>();
237-
string? lastEvaluatedShardId = null;
234+
var shards = await GetAllShards(streamArn, stoppingToken);
235+
var iterators = new Dictionary<string, string?>();
238236

239-
do
237+
foreach (var shard in shards)
240238
{
241-
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
239+
var isClosed = shard.SequenceNumberRange?.EndingSequenceNumber != null;
240+
if (isClosed)
241+
{
242+
closedAtStartup.Add(shard.ShardId);
243+
continue;
244+
}
245+
246+
// Open shard — use LATEST to only get records created after startup
247+
var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
242248
{
243249
StreamArn = streamArn,
244-
ExclusiveStartShardId = lastEvaluatedShardId
250+
ShardId = shard.ShardId,
251+
ShardIteratorType = ShardIteratorType.LATEST
245252
}, stoppingToken);
246253

247-
shards.AddRange(describeResponse.StreamDescription.Shards);
248-
lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId;
249-
} while (lastEvaluatedShardId != null);
250-
251-
_logger.LogInformation("Discovered {shardCount} total shard(s) for stream", shards.Count);
254+
_logger.LogInformation("Got LATEST iterator for startup shard {shardId}", shard.ShardId);
255+
iterators[shard.ShardId] = iteratorResponse.ShardIterator;
256+
}
252257

253-
var openShards = shards.Where(s => s.SequenceNumberRange?.EndingSequenceNumber == null).ToList();
254-
_logger.LogInformation("Filtered to {openCount} open (leaf) shard(s) out of {totalCount} total", openShards.Count, shards.Count);
258+
return iterators;
259+
}
255260

256-
var iterators = new Dictionary<string, string?>();
261+
/// <summary>
262+
/// Ongoing shard discovery. Preserves existing iterators, skips shards closed at startup,
263+
/// and starts TRIM_HORIZON pollers for any new shards (even if closed).
264+
/// </summary>
265+
private async Task<Dictionary<string, string?>> DiscoverNewShards(string streamArn, Dictionary<string, string?> existingIterators, HashSet<string> closedAtStartup, CancellationToken stoppingToken)
266+
{
267+
var shards = await GetAllShards(streamArn, stoppingToken);
268+
var iterators = new Dictionary<string, string?>(existingIterators);
257269

258-
foreach (var shard in openShards)
270+
foreach (var shard in shards)
259271
{
260-
// Preserve existing iterator position for shards we're already tracking
261-
if (existingIterators != null && existingIterators.TryGetValue(shard.ShardId, out var existingIterator) && existingIterator != null)
262-
{
263-
iterators[shard.ShardId] = existingIterator;
272+
// Already being polled — leave iterator alone
273+
if (iterators.ContainsKey(shard.ShardId))
264274
continue;
265-
}
266275

276+
// Was closed at startup — skip
277+
if (closedAtStartup.Contains(shard.ShardId))
278+
continue;
279+
280+
// New shard discovered after startup — use TRIM_HORIZON to read all its records
267281
var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
268282
{
269283
StreamArn = streamArn,
270284
ShardId = shard.ShardId,
271-
ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType)
285+
ShardIteratorType = ShardIteratorType.TRIM_HORIZON
272286
}, stoppingToken);
273287

274-
_logger.LogInformation("Got new iterator for shard {shardId}", shard.ShardId);
288+
_logger.LogInformation("Got TRIM_HORIZON iterator for new shard {shardId}", shard.ShardId);
275289
iterators[shard.ShardId] = iteratorResponse.ShardIterator;
276290
}
277291

278292
return iterators;
279293
}
280294

295+
private async Task<List<Shard>> GetAllShards(string streamArn, CancellationToken stoppingToken)
296+
{
297+
_logger.LogInformation("Discovering shards for stream {streamArn}", streamArn);
298+
var shards = new List<Shard>();
299+
string? lastEvaluatedShardId = null;
300+
301+
do
302+
{
303+
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
304+
{
305+
StreamArn = streamArn,
306+
ExclusiveStartShardId = lastEvaluatedShardId
307+
}, stoppingToken);
308+
309+
shards.AddRange(describeResponse.StreamDescription.Shards);
310+
lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId;
311+
} while (lastEvaluatedShardId != null);
312+
313+
_logger.LogInformation("Discovered {shardCount} shard(s)", shards.Count);
314+
return shards;
315+
}
316+
281317
/// <summary>
282318
/// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type.
283319
/// </summary>

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ public class DynamoDBStreamsEventSourceBackgroundServiceConfig
2828
/// </summary>
2929
public required string TableName { get; init; }
3030

31-
/// <summary>
32-
/// The shard iterator type to use when reading from the stream.
33-
/// </summary>
34-
public required string ShardIteratorType { get; init; } = "LATEST";
35-
3631
/// <summary>
3732
/// The polling interval in milliseconds between stream reads when no records are found.
3833
/// </summary>

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@ internal class DynamoDBStreamsEventSourceConfig
4040
/// </summary>
4141
public string? TableName { get; set; }
4242

43-
/// <summary>
44-
/// The shard iterator type to use when reading from the stream.
45-
/// Valid values: LATEST, TRIM_HORIZON. Default is TRIM_HORIZON.
46-
/// </summary>
47-
public string? ShardIteratorType { get; set; }
48-
4943
/// <summary>
5044
/// The polling interval in milliseconds between stream reads when no records are found.
5145
/// Default is 1000.

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti
9292
FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName,
9393
LambdaRuntimeApi = lambdaRuntimeApi,
9494
TableName = tableName,
95-
ShardIteratorType = config.ShardIteratorType ?? "TRIM_HORIZON",
9695
PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs
9796
};
9897

@@ -201,9 +200,6 @@ internal static List<DynamoDBStreamsEventSourceConfig> LoadDynamoDBStreamsEventS
201200
case "tablename":
202201
config.TableName = keyValuePair[1].Trim();
203202
break;
204-
case "sharditeratortype":
205-
config.ShardIteratorType = keyValuePair[1].Trim();
206-
break;
207203
case "pollingintervalms":
208204
if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval))
209205
{

0 commit comments

Comments
 (0)