Skip to content

Commit 340bc67

Browse files
committed
Add DynamoDB Streams event source emulator to Lambda Test Tool
Address PR review comments: - Poll shards concurrently using Task.WhenAll instead of sequential loop - Re-discover shards when any iterator becomes null (not just when all are null) - Paginate DescribeStream to handle streams with many shards - Preserve empty List/Map attribute values (non-null but empty collections) - Add unit tests for Binary (B), Binary Set (BS), and empty List/Map conversion
1 parent 7ba1e7a commit 340bc67

3 files changed

Lines changed: 152 additions & 30 deletions

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"Projects": [
3+
{
4+
"Name": "Amazon.Lambda.TestTool",
5+
"Type": "Minor",
6+
"ChangelogMessages": [
7+
"Add support emulating Lambda DynamoDB Stream event source"
8+
]
9+
}
10+
]
11+
}

Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -96,30 +96,44 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
9696

9797
while (!stoppingToken.IsCancellationRequested)
9898
{
99-
var hasRecords = false;
100-
99+
// Poll all shards concurrently
100+
var tasks = new List<Task<(int Index, GetRecordsResponse? Response)>>();
101101
for (int i = 0; i < shardIterators.Count; i++)
102102
{
103-
if (stoppingToken.IsCancellationRequested)
104-
return;
105-
106-
var iterator = shardIterators[i];
107-
if (iterator == null)
103+
if (shardIterators[i] == null)
108104
continue;
109105

110-
var getRecordsResponse = await _streamsClient.GetRecordsAsync(new GetRecordsRequest
106+
var index = i;
107+
var iterator = shardIterators[i]!;
108+
tasks.Add(PollShard(index, iterator, stoppingToken));
109+
}
110+
111+
if (tasks.Count == 0)
112+
{
113+
// All iterators exhausted — re-discover shards
114+
shardIterators = await GetShardIterators(streamArn, stoppingToken);
115+
if (shardIterators.Count == 0)
111116
{
112-
ShardIterator = iterator,
113-
Limit = _config.BatchSize
114-
}, stoppingToken);
117+
await Task.Delay(1000, stoppingToken);
118+
}
119+
continue;
120+
}
121+
122+
var results = await Task.WhenAll(tasks);
123+
124+
var hasRecords = false;
125+
foreach (var (index, response) in results)
126+
{
127+
if (response == null)
128+
continue;
115129

116-
shardIterators[i] = getRecordsResponse.NextShardIterator;
130+
shardIterators[index] = response.NextShardIterator;
117131

118-
if (getRecordsResponse.Records.Count == 0)
132+
if (response.Records.Count == 0)
119133
continue;
120134

121135
hasRecords = true;
122-
var lambdaRecords = ConvertToLambdaRecords(getRecordsResponse.Records, streamArn);
136+
var lambdaRecords = ConvertToLambdaRecords(response.Records, streamArn);
123137

124138
var lambdaPayload = new DynamoDBEvent
125139
{
@@ -145,33 +159,59 @@ private async Task PollStream(string streamArn, CancellationToken stoppingToken)
145159
}
146160
}
147161

148-
// Check for new shards periodically
149-
if (shardIterators.All(s => s == null))
162+
// Re-discover shards when any iterator becomes null (shard closed/split)
163+
if (shardIterators.Any(s => s == null))
150164
{
151-
shardIterators = await GetShardIterators(streamArn, stoppingToken);
152-
if (shardIterators.Count == 0)
165+
var newShards = await GetShardIterators(streamArn, stoppingToken);
166+
// Merge: keep active iterators, add any new ones
167+
var activeIterators = shardIterators.Where(s => s != null).ToList();
168+
foreach (var newIter in newShards)
153169
{
154-
await Task.Delay(1000, stoppingToken);
170+
if (newIter != null)
171+
activeIterators.Add(newIter);
155172
}
173+
shardIterators = activeIterators;
156174
}
157-
else if (!hasRecords)
175+
176+
if (!hasRecords)
158177
{
159-
// No records found, wait before polling again
160178
await Task.Delay(1000, stoppingToken);
161179
}
162180
}
163181
}
164182

165-
private async Task<List<string?>> GetShardIterators(string streamArn, CancellationToken stoppingToken)
183+
private async Task<(int Index, GetRecordsResponse? Response)> PollShard(int index, string iterator, CancellationToken stoppingToken)
166184
{
167-
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
185+
var response = await _streamsClient.GetRecordsAsync(new GetRecordsRequest
168186
{
169-
StreamArn = streamArn
187+
ShardIterator = iterator,
188+
Limit = _config.BatchSize
170189
}, stoppingToken);
171190

191+
return (index, response);
192+
}
193+
194+
private async Task<List<string?>> GetShardIterators(string streamArn, CancellationToken stoppingToken)
195+
{
196+
var shards = new List<Shard>();
197+
string? lastEvaluatedShardId = null;
198+
199+
// Paginate through all shards
200+
do
201+
{
202+
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
203+
{
204+
StreamArn = streamArn,
205+
ExclusiveStartShardId = lastEvaluatedShardId
206+
}, stoppingToken);
207+
208+
shards.AddRange(describeResponse.StreamDescription.Shards);
209+
lastEvaluatedShardId = describeResponse.StreamDescription.LastEvaluatedShardId;
210+
} while (lastEvaluatedShardId != null);
211+
172212
var iterators = new List<string?>();
173213

174-
foreach (var shard in describeResponse.StreamDescription.Shards)
214+
foreach (var shard in shards)
175215
{
176216
var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
177217
{
@@ -277,15 +317,15 @@ internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValu
277317
lambdaValue.BOOL = sdkValue.BOOL;
278318
if (sdkValue.NULL != null)
279319
lambdaValue.NULL = sdkValue.NULL;
280-
if (sdkValue.SS?.Count > 0)
320+
if (sdkValue.SS != null)
281321
lambdaValue.SS = sdkValue.SS;
282-
if (sdkValue.NS?.Count > 0)
322+
if (sdkValue.NS != null)
283323
lambdaValue.NS = sdkValue.NS;
284-
if (sdkValue.BS?.Count > 0)
324+
if (sdkValue.BS != null)
285325
lambdaValue.BS = sdkValue.BS;
286-
if (sdkValue.L?.Count > 0)
326+
if (sdkValue.L != null)
287327
lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList();
288-
if (sdkValue.M?.Count > 0)
328+
if (sdkValue.M != null)
289329
lambdaValue.M = ConvertAttributeMap(sdkValue.M);
290330

291331
return lambdaValue;

Tools/LambdaTestTool-v2/tests/Amazon.Lambda.TestTool.UnitTests/DynamoDBStreamsEventSource/ConvertDynamoDBStreamsRecordTests.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,77 @@ public void ConvertRecordWithUserIdentity()
143143
Assert.Equal("Service", result.UserIdentity.Type);
144144
}
145145

146+
[Fact]
147+
public void ConvertRecordWithBinaryAttributes()
148+
{
149+
var binaryData = new byte[] { 0x01, 0x02, 0x03, 0xFF };
150+
var binarySet = new List<MemoryStream>
151+
{
152+
new MemoryStream(new byte[] { 0xAA, 0xBB }),
153+
new MemoryStream(new byte[] { 0xCC, 0xDD })
154+
};
155+
156+
var record = new Record
157+
{
158+
EventID = "event-bin",
159+
EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
160+
EventVersion = "1.1",
161+
Dynamodb = new StreamRecord
162+
{
163+
Keys = new Dictionary<string, AttributeValue>
164+
{
165+
["Id"] = new AttributeValue { S = "key-1" }
166+
},
167+
NewImage = new Dictionary<string, AttributeValue>
168+
{
169+
["BinaryAttr"] = new AttributeValue { B = new MemoryStream(binaryData) },
170+
["BinarySetAttr"] = new AttributeValue { BS = binarySet }
171+
}
172+
}
173+
};
174+
175+
var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
176+
177+
var newImage = result.Dynamodb.NewImage;
178+
Assert.NotNull(newImage["BinaryAttr"].B);
179+
Assert.Equal(binaryData, newImage["BinaryAttr"].B.ToArray());
180+
Assert.NotNull(newImage["BinarySetAttr"].BS);
181+
Assert.Equal(2, newImage["BinarySetAttr"].BS.Count);
182+
Assert.Equal(new byte[] { 0xAA, 0xBB }, newImage["BinarySetAttr"].BS[0].ToArray());
183+
Assert.Equal(new byte[] { 0xCC, 0xDD }, newImage["BinarySetAttr"].BS[1].ToArray());
184+
}
185+
186+
[Fact]
187+
public void ConvertRecordWithEmptyListAndMap()
188+
{
189+
var record = new Record
190+
{
191+
EventID = "event-empty",
192+
EventName = new Amazon.DynamoDBStreams.OperationType("INSERT"),
193+
EventVersion = "1.1",
194+
Dynamodb = new StreamRecord
195+
{
196+
Keys = new Dictionary<string, AttributeValue>
197+
{
198+
["Id"] = new AttributeValue { S = "key-1" }
199+
},
200+
NewImage = new Dictionary<string, AttributeValue>
201+
{
202+
["EmptyList"] = new AttributeValue { L = new List<AttributeValue>() },
203+
["EmptyMap"] = new AttributeValue { M = new Dictionary<string, AttributeValue>() }
204+
}
205+
}
206+
};
207+
208+
var result = DynamoDBStreamsEventSourceBackgroundService.ConvertToLambdaRecord(record, TestStreamArn);
209+
210+
var newImage = result.Dynamodb.NewImage;
211+
Assert.NotNull(newImage["EmptyList"].L);
212+
Assert.Empty(newImage["EmptyList"].L);
213+
Assert.NotNull(newImage["EmptyMap"].M);
214+
Assert.Empty(newImage["EmptyMap"].M);
215+
}
216+
146217
[Fact]
147218
public void ConvertMultipleRecords()
148219
{

0 commit comments

Comments
 (0)