Skip to content

Commit db4fd0b

Browse files
committed
Make ShardIteratorType and PollingIntervalMs configurable, fix shard re-discovery
- Add ShardIteratorType config (default: LATEST) propagated from Aspire options - Add PollingIntervalMs config (default: 1000) for tunable poll frequency - Fix shard re-discovery to only replace exhausted iterators, preserving active ones - Add key-value parsing for new config options
1 parent ecac169 commit db4fd0b

4 files changed

Lines changed: 51 additions & 9 deletions

File tree

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,26 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
169169
if (shardIterators.Any(s => s == null))
170170
{
171171
var newShards = await GetShardIterators(streamArn, stoppingToken);
172-
// Merge: keep active iterators, add any new ones
173-
var activeIterators = shardIterators.Where(s => s != null).ToList();
174-
foreach (var newIter in newShards)
172+
// Replace only null entries with new iterators, keep active ones
173+
for (int i = 0; i < shardIterators.Count; i++)
175174
{
176-
if (newIter != null)
177-
activeIterators.Add(newIter);
175+
if (shardIterators[i] == null && i < newShards.Count)
176+
{
177+
shardIterators[i] = newShards[i];
178+
}
178179
}
179-
shardIterators = activeIterators;
180+
// Append any additional new shards discovered
181+
for (int i = shardIterators.Count; i < newShards.Count; i++)
182+
{
183+
shardIterators.Add(newShards[i]);
184+
}
185+
// Remove remaining null entries (fully exhausted shards with no replacement)
186+
shardIterators = shardIterators.Where(s => s != null).ToList();
180187
}
181188

182189
if (!hasRecords)
183190
{
184-
await Task.Delay(1000, stoppingToken);
191+
await Task.Delay(_config.PollingIntervalMs, stoppingToken);
185192
}
186193
}
187194
}
@@ -223,7 +230,7 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
223230
{
224231
StreamArn = streamArn,
225232
ShardId = shard.ShardId,
226-
ShardIteratorType = ShardIteratorType.LATEST
233+
ShardIteratorType = new ShardIteratorType(_config.ShardIteratorType)
227234
}, stoppingToken);
228235

229236
iterators.Add(iteratorResponse.ShardIterator);

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundServiceConfig.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,14 @@ public class DynamoDBStreamsEventSourceBackgroundServiceConfig
2727
/// The DynamoDB table name to read streams from.
2828
/// </summary>
2929
public required string TableName { get; init; }
30+
31+
/// <summary>
32+
/// The shard iterator type to use when reading from the stream.
33+
/// </summary>
34+
public required string ShardIteratorType { get; init; } = "LATEST";
35+
36+
/// <summary>
37+
/// The polling interval in milliseconds between stream reads when no records are found.
38+
/// </summary>
39+
public required int PollingIntervalMs { get; init; } = DynamoDBStreamsEventSourceProcess.DefaultPollingIntervalMs;
3040
}

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceConfig.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,16 @@ internal class DynamoDBStreamsEventSourceConfig
3939
/// The DynamoDB table name to read streams from.
4040
/// </summary>
4141
public string? TableName { get; set; }
42+
43+
/// <summary>
44+
/// The shard iterator type to use when reading from the stream.
45+
/// Valid values: LATEST, TRIM_HORIZON. Default is LATEST.
46+
/// </summary>
47+
public string? ShardIteratorType { get; set; }
48+
49+
/// <summary>
50+
/// The polling interval in milliseconds between stream reads when no records are found.
51+
/// Default is 1000.
52+
/// </summary>
53+
public int? PollingIntervalMs { get; set; }
4254
}

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceProcess.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
1414
public class DynamoDBStreamsEventSourceProcess
1515
{
1616
internal const int DefaultBatchSize = 100;
17+
internal const int DefaultPollingIntervalMs = 1000;
1718

1819
/// <summary>
1920
/// The Parent task for all the tasks started for each DynamoDB Streams event source.
@@ -76,7 +77,9 @@ public static DynamoDBStreamsEventSourceProcess Startup(RunCommandSettings setti
7677
BatchSize = config.BatchSize ?? DefaultBatchSize,
7778
FunctionName = config.FunctionName ?? LambdaRuntimeApi.DefaultFunctionName,
7879
LambdaRuntimeApi = lambdaRuntimeApi,
79-
TableName = tableName
80+
TableName = tableName,
81+
ShardIteratorType = config.ShardIteratorType ?? "LATEST",
82+
PollingIntervalMs = config.PollingIntervalMs ?? DefaultPollingIntervalMs
8083
};
8184

8285
builder.Services.AddSingleton(backgroundServiceConfig);
@@ -184,6 +187,16 @@ internal static List<DynamoDBStreamsEventSourceConfig> LoadDynamoDBStreamsEventS
184187
case "tablename":
185188
config.TableName = keyValuePair[1].Trim();
186189
break;
190+
case "sharditeratortype":
191+
config.ShardIteratorType = keyValuePair[1].Trim();
192+
break;
193+
case "pollingintervalms":
194+
if (!int.TryParse(keyValuePair[1].Trim(), out var pollingInterval))
195+
{
196+
throw new InvalidOperationException("Value for polling interval is not a formatted integer");
197+
}
198+
config.PollingIntervalMs = pollingInterval;
199+
break;
187200
}
188201
}
189202

0 commit comments

Comments
 (0)