Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7ba1e7a
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj May 6, 2026
36df4ef
Add DevConfig file
normj May 6, 2026
340bc67
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj May 6, 2026
787e1dc
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
ecac169
Support stream ARN as direct input to DynamoDB Streams event source
normj May 6, 2026
b876946
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
db4fd0b
Make ShardIteratorType and PollingIntervalMs configurable, fix shard …
normj May 6, 2026
640a47b
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
2a50358
Use DescribeTable instead of ListStreams to get stream ARN
normj May 6, 2026
f3f69cf
Add logging to DynamoDB Streams background service
normj May 6, 2026
bc2eab6
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
b38b02a
Reduce shard re-discovery frequency for quiet tables
normj May 6, 2026
e644494
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
7eb6740
Fix stream polling: filter to open shards only, re-discover on shard …
normj May 6, 2026
592106d
Change default ShardIteratorType from LATEST to TRIM_HORIZON
normj May 6, 2026
5ff8065
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
5be6cd6
Fix DynamoDB Streams checkpointing - preserve iterator position acros…
normj May 6, 2026
bc88dd7
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 6, 2026
fd43290
Rework DynamoDB Streams polling to only process records created after…
normj May 7, 2026
0799ed0
Add high-level strategy comment to PollStream explaining shard manage…
normj May 7, 2026
0a7f339
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj May 7, 2026
b32a71a
Logging fixes
normj May 7, 2026
381450b
Merge branch 'dev' into normj/lambda-ddbstreams
normj May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="4.0.3.22" />
<PackageReference Include="AWSSDK.Lambda" Version="4.0.13.1" />
<PackageReference Include="AWSSDK.SQS" Version="4.0.2.14" />
<PackageReference Include="AWSSDK.DynamoDBv2" Version="4.0.15.2" />
<PackageReference Include="AWSSDK.DynamoDBStreams" Version="4.0.4.17" />
<PackageReference Include="Amazon.Lambda.DynamoDBEvents" Version="3.1.2" />
<PackageReference Include="AWSSDK.SSO" Version="4.0.2.13" />
<PackageReference Include="AWSSDK.SSOOIDC" Version="4.0.3.14" />
<PackageReference Include="Spectre.Console" Version="0.49.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Amazon.Lambda.TestTool.Models;
using Amazon.Lambda.TestTool.Processes;
using Amazon.Lambda.TestTool.Processes.SQSEventSource;
using Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;
using Amazon.Lambda.TestTool.Services;
using Amazon.Lambda.TestTool.Services.IO;
using Spectre.Console.Cli;
Expand Down Expand Up @@ -39,10 +40,10 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
EvaluateEnvironmentVariables(settings);

if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig))
if (!settings.LambdaEmulatorPort.HasValue && !settings.ApiGatewayEmulatorPort.HasValue && !settings.ApiGatewayEmulatorHttpsPort.HasValue && string.IsNullOrEmpty(settings.SQSEventSourceConfig) && string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
{
throw new ArgumentException("At least one of the following parameters must be set: " +
"--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port or --sqs-eventsource-config");
"--lambda-emulator-port, --api-gateway-emulator-port, --api-gateway-emulator-https-port, --sqs-eventsource-config or --dynamodbstreams-eventsource-config");
}

var tasks = new List<Task>();
Expand Down Expand Up @@ -87,6 +88,12 @@ public override async Task<int> ExecuteAsync(CommandContext context, RunCommandS
{
var sqsEventSourceProcess = SQSEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(sqsEventSourceProcess.RunningTask);
}

if (!string.IsNullOrEmpty(settings.DynamoDBStreamsEventSourceConfig))
{
var dynamoDBStreamsProcess = DynamoDBStreamsEventSourceProcess.Startup(settings, cancellationTokenSource.Token);
tasks.Add(dynamoDBStreamsProcess.RunningTask);
}

await Task.Run(() => Task.WaitAny(tasks.ToArray(), cancellationTokenSource.Token));
Expand Down Expand Up @@ -184,6 +191,16 @@ private void EvaluateEnvironmentVariables(RunCommandSettings settings)
throw new InvalidOperationException($"Environment variable {envVariable} for the SQS event source config was empty");
}
settings.SQSEventSourceConfig = environmentVariables[envVariable]?.ToString();
}

if (settings.DynamoDBStreamsEventSourceConfig != null && settings.DynamoDBStreamsEventSourceConfig.StartsWith(Constants.ArgumentEnvironmentVariablePrefix, StringComparison.CurrentCultureIgnoreCase))
{
var envVariable = settings.DynamoDBStreamsEventSourceConfig.Substring(Constants.ArgumentEnvironmentVariablePrefix.Length);
if (!environmentVariables.Contains(envVariable))
{
throw new InvalidOperationException($"Environment variable {envVariable} for the DynamoDB Streams event source config was empty");
}
settings.DynamoDBStreamsEventSourceConfig = environmentVariables[envVariable]?.ToString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public sealed class RunCommandSettings : CommandSettings
[Description("The configuration for the SQS event source. The format of the config is a comma delimited key pairs. For example \"QueueUrl=<queue-url>,FunctionName=<function-name>,VisibilityTimeout=100\". Possible keys are: BatchSize, DisableMessageDelete, FunctionName, LambdaRuntimeApi, Profile, QueueUrl, Region, VisibilityTimeout")]
public string? SQSEventSourceConfig { get; set; }


/// <summary>
/// The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example "TableName=my-table,FunctionName=function-name,BatchSize=100".
/// Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName
/// </summary>
[CommandOption("--dynamodbstreams-eventsource-config <CONFIG>")]
[Description("The configuration for the DynamoDB Streams event source. The format of the config is a comma delimited key pairs. For example \"TableName=<table-name>,FunctionName=<function-name>,BatchSize=100\". Possible keys are: BatchSize, FunctionName, LambdaRuntimeApi, Profile, Region, TableName")]
public string? DynamoDBStreamsEventSourceConfig { get; set; }
/// <summary>
/// The absolute path used to save global settings and saved requests. You will need to specify a path in order to enable saving global settings and requests.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using Amazon.DynamoDBStreams;
using Amazon.DynamoDBStreams.Model;
using Amazon.Lambda.DynamoDBEvents;
using Amazon.Lambda.Model;
using Amazon.Lambda.TestTool.Services;
using Amazon.Runtime;
using System.Text.Json;

namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource;

/// <summary>
/// IHostedService that will run continually polling a DynamoDB Stream for records and invoking the connected
/// Lambda function with the polled records.
/// </summary>
public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService
{
private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

private readonly ILogger<DynamoDBStreamsEventSourceProcess> _logger;
private readonly IAmazonDynamoDBStreams _streamsClient;
private readonly ILambdaClient _lambdaClient;
private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config;

/// <summary>
/// Constructs instance of <see cref="DynamoDBStreamsEventSourceBackgroundService"/>.
/// </summary>
public DynamoDBStreamsEventSourceBackgroundService(
ILogger<DynamoDBStreamsEventSourceProcess> logger,
IAmazonDynamoDBStreams streamsClient,
DynamoDBStreamsEventSourceBackgroundServiceConfig config,
ILambdaClient lambdaClient)
{
_logger = logger;
_streamsClient = streamsClient;
_config = config;
_lambdaClient = lambdaClient;
}

/// <summary>
/// Execute the DynamoDBStreamsEventSourceBackgroundService.
/// </summary>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting DynamoDB Streams poller for table: {tableName}", _config.TableName);

while (!stoppingToken.IsCancellationRequested)
{
try
{
var streamArn = await GetStreamArnForTable(stoppingToken);
if (streamArn == null)
{
_logger.LogWarning("No stream found for table {tableName}. Retrying in 5 seconds.", _config.TableName);
await Task.Delay(5000, stoppingToken);
continue;
}

await PollStream(streamArn, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception e)
{
_logger.LogWarning(e, "Exception occurred in DynamoDB Streams poller for {tableName}: {message}", _config.TableName, e.Message);
await Task.Delay(3000, stoppingToken);
}
}
}

private async Task<string?> GetStreamArnForTable(CancellationToken stoppingToken)
{
var response = await _streamsClient.ListStreamsAsync(new ListStreamsRequest
{
TableName = _config.TableName
}, stoppingToken);

// Use the first active stream for the table
return response.Streams.FirstOrDefault()?.StreamArn;
}

private async Task PollStream(string streamArn, CancellationToken stoppingToken)
{
var shardIterators = await GetShardIterators(streamArn, stoppingToken);

while (!stoppingToken.IsCancellationRequested)
{
var hasRecords = false;

for (int i = 0; i < shardIterators.Count; i++)
{
if (stoppingToken.IsCancellationRequested)
return;

var iterator = shardIterators[i];
Comment thread
normj marked this conversation as resolved.
Outdated
if (iterator == null)
continue;

var getRecordsResponse = await _streamsClient.GetRecordsAsync(new GetRecordsRequest
{
ShardIterator = iterator,
Limit = _config.BatchSize
}, stoppingToken);

shardIterators[i] = getRecordsResponse.NextShardIterator;

if (getRecordsResponse.Records.Count == 0)
continue;

hasRecords = true;
var lambdaRecords = ConvertToLambdaRecords(getRecordsResponse.Records, streamArn);

var lambdaPayload = new DynamoDBEvent
{
Records = lambdaRecords
};

var invokeRequest = new InvokeRequest
{
InvocationType = InvocationType.RequestResponse,
FunctionName = _config.FunctionName,
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
};

_logger.LogInformation("Invoking Lambda function {functionName} with {recordCount} DynamoDB stream records",
_config.FunctionName, lambdaRecords.Count);

var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi);

if (lambdaResponse.FunctionError != null)
{
_logger.LogError("Invoking Lambda {function} with {recordCount} DynamoDB stream records failed with error {errorMessage}",
_config.FunctionName, lambdaRecords.Count, lambdaResponse.FunctionError);
}
}

// Check for new shards periodically
if (shardIterators.All(s => s == null))
{
shardIterators = await GetShardIterators(streamArn, stoppingToken);
if (shardIterators.Count == 0)
{
await Task.Delay(1000, stoppingToken);
}
}
Comment thread
normj marked this conversation as resolved.
Outdated
else if (!hasRecords)
{
// No records found, wait before polling again
await Task.Delay(1000, stoppingToken);
}
}
}

private async Task<List<string?>> GetShardIterators(string streamArn, CancellationToken stoppingToken)
{
var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest
{
StreamArn = streamArn
}, stoppingToken);

var iterators = new List<string?>();

foreach (var shard in describeResponse.StreamDescription.Shards)
Comment thread
normj marked this conversation as resolved.
Outdated
{
var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest
{
StreamArn = streamArn,
ShardId = shard.ShardId,
ShardIteratorType = ShardIteratorType.LATEST
}, stoppingToken);

iterators.Add(iteratorResponse.ShardIterator);
}

return iterators;
}

/// <summary>
/// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type.
/// </summary>
internal static IList<DynamoDBEvent.DynamodbStreamRecord> ConvertToLambdaRecords(List<Record> records, string streamArn)
{
return records.Select(r => ConvertToLambdaRecord(r, streamArn)).ToList();
}

/// <summary>
/// Convert a single SDK stream record to the Lambda event record type.
/// </summary>
internal static DynamoDBEvent.DynamodbStreamRecord ConvertToLambdaRecord(Record record, string streamArn)
{
var lambdaRecord = new DynamoDBEvent.DynamodbStreamRecord
{
EventID = record.EventID,
EventName = record.EventName?.Value,
EventSource = "aws:dynamodb",
EventSourceArn = streamArn,
EventVersion = record.EventVersion,
AwsRegion = Arn.Parse(streamArn).Region
};

if (record.Dynamodb != null)
{
lambdaRecord.Dynamodb = new DynamoDBEvent.StreamRecord
{
ApproximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime ?? DateTime.MinValue,
SequenceNumber = record.Dynamodb.SequenceNumber,
SizeBytes = record.Dynamodb.SizeBytes ?? 0,
StreamViewType = record.Dynamodb.StreamViewType?.Value
};

if (record.Dynamodb.Keys != null)
{
lambdaRecord.Dynamodb.Keys = ConvertAttributeMap(record.Dynamodb.Keys);
}

if (record.Dynamodb.NewImage != null)
{
lambdaRecord.Dynamodb.NewImage = ConvertAttributeMap(record.Dynamodb.NewImage);
}

if (record.Dynamodb.OldImage != null)
{
lambdaRecord.Dynamodb.OldImage = ConvertAttributeMap(record.Dynamodb.OldImage);
}
}

if (record.UserIdentity != null)
{
lambdaRecord.UserIdentity = new DynamoDBEvent.Identity
{
PrincipalId = record.UserIdentity.PrincipalId,
Type = record.UserIdentity.Type
};
}

return lambdaRecord;
}

/// <summary>
/// Convert SDK AttributeValue dictionary to Lambda event AttributeValue dictionary.
/// </summary>
internal static Dictionary<string, DynamoDBEvent.AttributeValue> ConvertAttributeMap(Dictionary<string, AttributeValue> sdkMap)
{
var result = new Dictionary<string, DynamoDBEvent.AttributeValue>();
foreach (var kvp in sdkMap)
{
result[kvp.Key] = ConvertAttributeValue(kvp.Value);
}
return result;
}

/// <summary>
/// Convert a single SDK AttributeValue to the Lambda event AttributeValue.
/// </summary>
internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValue sdkValue)
{
var lambdaValue = new DynamoDBEvent.AttributeValue();

if (sdkValue.S != null)
lambdaValue.S = sdkValue.S;
if (sdkValue.N != null)
lambdaValue.N = sdkValue.N;
if (sdkValue.B != null)
lambdaValue.B = sdkValue.B;
if (sdkValue.BOOL != null)
lambdaValue.BOOL = sdkValue.BOOL;
if (sdkValue.NULL != null)
lambdaValue.NULL = sdkValue.NULL;
if (sdkValue.SS?.Count > 0)
lambdaValue.SS = sdkValue.SS;
if (sdkValue.NS?.Count > 0)
lambdaValue.NS = sdkValue.NS;
if (sdkValue.BS?.Count > 0)
lambdaValue.BS = sdkValue.BS;
if (sdkValue.L?.Count > 0)
lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList();
if (sdkValue.M?.Count > 0)
Comment thread
normj marked this conversation as resolved.
Outdated
Comment thread
normj marked this conversation as resolved.
Outdated
lambdaValue.M = ConvertAttributeMap(sdkValue.M);

return lambdaValue;
}
}
Loading
Loading