-
Notifications
You must be signed in to change notification settings - Fork 498
Add DynamoDB Streams event source emulator #2356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
normj
wants to merge
23
commits into
dev
Choose a base branch
from
normj/lambda-ddbstreams
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
7ba1e7a
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj 36df4ef
Add DevConfig file
normj 340bc67
Add DynamoDB Streams event source emulator to Lambda Test Tool
normj 787e1dc
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj ecac169
Support stream ARN as direct input to DynamoDB Streams event source
normj b876946
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj db4fd0b
Make ShardIteratorType and PollingIntervalMs configurable, fix shard …
normj 640a47b
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj 2a50358
Use DescribeTable instead of ListStreams to get stream ARN
normj f3f69cf
Add logging to DynamoDB Streams background service
normj bc2eab6
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj b38b02a
Reduce shard re-discovery frequency for quiet tables
normj e644494
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj 7eb6740
Fix stream polling: filter to open shards only, re-discover on shard …
normj 592106d
Change default ShardIteratorType from LATEST to TRIM_HORIZON
normj 5ff8065
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj 5be6cd6
Fix DynamoDB Streams checkpointing - preserve iterator position acros…
normj bc88dd7
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj fd43290
Rework DynamoDB Streams polling to only process records created after…
normj 0799ed0
Add high-level strategy comment to PollStream explaining shard manage…
normj 0a7f339
Merge branch 'normj/lambda-ddbstreams' of https://github.com/aws/aws-…
normj b32a71a
Logging fixes
normj 381450b
Merge branch 'dev' into normj/lambda-ddbstreams
normj File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
293 changes: 293 additions & 0 deletions
293
...tTool/Processes/DynamoDBStreamsEventSource/DynamoDBStreamsEventSourceBackgroundService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,293 @@ | ||
| // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| using Amazon.DynamoDBStreams; | ||
| using Amazon.DynamoDBStreams.Model; | ||
| using Amazon.Lambda.DynamoDBEvents; | ||
| using Amazon.Lambda.Model; | ||
| using Amazon.Lambda.TestTool.Services; | ||
| using Amazon.Runtime; | ||
| using System.Text.Json; | ||
|
|
||
| namespace Amazon.Lambda.TestTool.Processes.DynamoDBStreamsEventSource; | ||
|
|
||
| /// <summary> | ||
| /// IHostedService that will run continually polling a DynamoDB Stream for records and invoking the connected | ||
| /// Lambda function with the polled records. | ||
| /// </summary> | ||
| public class DynamoDBStreamsEventSourceBackgroundService : BackgroundService | ||
| { | ||
| private static readonly JsonSerializerOptions _jsonOptions = new JsonSerializerOptions | ||
| { | ||
| PropertyNamingPolicy = JsonNamingPolicy.CamelCase | ||
| }; | ||
|
|
||
| private readonly ILogger<DynamoDBStreamsEventSourceProcess> _logger; | ||
| private readonly IAmazonDynamoDBStreams _streamsClient; | ||
| private readonly ILambdaClient _lambdaClient; | ||
| private readonly DynamoDBStreamsEventSourceBackgroundServiceConfig _config; | ||
|
|
||
| /// <summary> | ||
| /// Constructs instance of <see cref="DynamoDBStreamsEventSourceBackgroundService"/>. | ||
| /// </summary> | ||
| public DynamoDBStreamsEventSourceBackgroundService( | ||
| ILogger<DynamoDBStreamsEventSourceProcess> logger, | ||
| IAmazonDynamoDBStreams streamsClient, | ||
| DynamoDBStreamsEventSourceBackgroundServiceConfig config, | ||
| ILambdaClient lambdaClient) | ||
| { | ||
| _logger = logger; | ||
| _streamsClient = streamsClient; | ||
| _config = config; | ||
| _lambdaClient = lambdaClient; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Execute the DynamoDBStreamsEventSourceBackgroundService. | ||
| /// </summary> | ||
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
| { | ||
| _logger.LogInformation("Starting DynamoDB Streams poller for table: {tableName}", _config.TableName); | ||
|
|
||
| while (!stoppingToken.IsCancellationRequested) | ||
| { | ||
| try | ||
| { | ||
| var streamArn = await GetStreamArnForTable(stoppingToken); | ||
| if (streamArn == null) | ||
| { | ||
| _logger.LogWarning("No stream found for table {tableName}. Retrying in 5 seconds.", _config.TableName); | ||
| await Task.Delay(5000, stoppingToken); | ||
| continue; | ||
| } | ||
|
|
||
| await PollStream(streamArn, stoppingToken); | ||
| } | ||
| catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) | ||
| { | ||
| return; | ||
| } | ||
| catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) | ||
| { | ||
| return; | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| _logger.LogWarning(e, "Exception occurred in DynamoDB Streams poller for {tableName}: {message}", _config.TableName, e.Message); | ||
| await Task.Delay(3000, stoppingToken); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private async Task<string?> GetStreamArnForTable(CancellationToken stoppingToken) | ||
| { | ||
| var response = await _streamsClient.ListStreamsAsync(new ListStreamsRequest | ||
| { | ||
| TableName = _config.TableName | ||
| }, stoppingToken); | ||
|
|
||
| // Use the first active stream for the table | ||
| return response.Streams.FirstOrDefault()?.StreamArn; | ||
| } | ||
|
|
||
| private async Task PollStream(string streamArn, CancellationToken stoppingToken) | ||
| { | ||
| var shardIterators = await GetShardIterators(streamArn, stoppingToken); | ||
|
|
||
| while (!stoppingToken.IsCancellationRequested) | ||
| { | ||
| var hasRecords = false; | ||
|
|
||
| for (int i = 0; i < shardIterators.Count; i++) | ||
| { | ||
| if (stoppingToken.IsCancellationRequested) | ||
| return; | ||
|
|
||
| var iterator = shardIterators[i]; | ||
| if (iterator == null) | ||
| continue; | ||
|
|
||
| var getRecordsResponse = await _streamsClient.GetRecordsAsync(new GetRecordsRequest | ||
| { | ||
| ShardIterator = iterator, | ||
| Limit = _config.BatchSize | ||
| }, stoppingToken); | ||
|
|
||
| shardIterators[i] = getRecordsResponse.NextShardIterator; | ||
|
|
||
| if (getRecordsResponse.Records.Count == 0) | ||
| continue; | ||
|
|
||
| hasRecords = true; | ||
| var lambdaRecords = ConvertToLambdaRecords(getRecordsResponse.Records, streamArn); | ||
|
|
||
| var lambdaPayload = new DynamoDBEvent | ||
| { | ||
| Records = lambdaRecords | ||
| }; | ||
|
|
||
| var invokeRequest = new InvokeRequest | ||
| { | ||
| InvocationType = InvocationType.RequestResponse, | ||
| FunctionName = _config.FunctionName, | ||
| Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions) | ||
| }; | ||
|
|
||
| _logger.LogInformation("Invoking Lambda function {functionName} with {recordCount} DynamoDB stream records", | ||
| _config.FunctionName, lambdaRecords.Count); | ||
|
|
||
| var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi); | ||
|
|
||
| if (lambdaResponse.FunctionError != null) | ||
| { | ||
| _logger.LogError("Invoking Lambda {function} with {recordCount} DynamoDB stream records failed with error {errorMessage}", | ||
| _config.FunctionName, lambdaRecords.Count, lambdaResponse.FunctionError); | ||
| } | ||
| } | ||
|
|
||
| // Check for new shards periodically | ||
| if (shardIterators.All(s => s == null)) | ||
| { | ||
| shardIterators = await GetShardIterators(streamArn, stoppingToken); | ||
| if (shardIterators.Count == 0) | ||
| { | ||
| await Task.Delay(1000, stoppingToken); | ||
| } | ||
| } | ||
|
normj marked this conversation as resolved.
Outdated
|
||
| else if (!hasRecords) | ||
| { | ||
| // No records found, wait before polling again | ||
| await Task.Delay(1000, stoppingToken); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private async Task<List<string?>> GetShardIterators(string streamArn, CancellationToken stoppingToken) | ||
| { | ||
| var describeResponse = await _streamsClient.DescribeStreamAsync(new DescribeStreamRequest | ||
| { | ||
| StreamArn = streamArn | ||
| }, stoppingToken); | ||
|
|
||
| var iterators = new List<string?>(); | ||
|
|
||
| foreach (var shard in describeResponse.StreamDescription.Shards) | ||
|
normj marked this conversation as resolved.
Outdated
|
||
| { | ||
| var iteratorResponse = await _streamsClient.GetShardIteratorAsync(new GetShardIteratorRequest | ||
| { | ||
| StreamArn = streamArn, | ||
| ShardId = shard.ShardId, | ||
| ShardIteratorType = ShardIteratorType.LATEST | ||
| }, stoppingToken); | ||
|
|
||
| iterators.Add(iteratorResponse.ShardIterator); | ||
| } | ||
|
|
||
| return iterators; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Convert from the SDK's DynamoDB Streams records to the Lambda event's DynamoDB record type. | ||
| /// </summary> | ||
| internal static IList<DynamoDBEvent.DynamodbStreamRecord> ConvertToLambdaRecords(List<Record> records, string streamArn) | ||
| { | ||
| return records.Select(r => ConvertToLambdaRecord(r, streamArn)).ToList(); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Convert a single SDK stream record to the Lambda event record type. | ||
| /// </summary> | ||
| internal static DynamoDBEvent.DynamodbStreamRecord ConvertToLambdaRecord(Record record, string streamArn) | ||
| { | ||
| var lambdaRecord = new DynamoDBEvent.DynamodbStreamRecord | ||
| { | ||
| EventID = record.EventID, | ||
| EventName = record.EventName?.Value, | ||
| EventSource = "aws:dynamodb", | ||
| EventSourceArn = streamArn, | ||
| EventVersion = record.EventVersion, | ||
| AwsRegion = Arn.Parse(streamArn).Region | ||
| }; | ||
|
|
||
| if (record.Dynamodb != null) | ||
| { | ||
| lambdaRecord.Dynamodb = new DynamoDBEvent.StreamRecord | ||
| { | ||
| ApproximateCreationDateTime = record.Dynamodb.ApproximateCreationDateTime ?? DateTime.MinValue, | ||
| SequenceNumber = record.Dynamodb.SequenceNumber, | ||
| SizeBytes = record.Dynamodb.SizeBytes ?? 0, | ||
| StreamViewType = record.Dynamodb.StreamViewType?.Value | ||
| }; | ||
|
|
||
| if (record.Dynamodb.Keys != null) | ||
| { | ||
| lambdaRecord.Dynamodb.Keys = ConvertAttributeMap(record.Dynamodb.Keys); | ||
| } | ||
|
|
||
| if (record.Dynamodb.NewImage != null) | ||
| { | ||
| lambdaRecord.Dynamodb.NewImage = ConvertAttributeMap(record.Dynamodb.NewImage); | ||
| } | ||
|
|
||
| if (record.Dynamodb.OldImage != null) | ||
| { | ||
| lambdaRecord.Dynamodb.OldImage = ConvertAttributeMap(record.Dynamodb.OldImage); | ||
| } | ||
| } | ||
|
|
||
| if (record.UserIdentity != null) | ||
| { | ||
| lambdaRecord.UserIdentity = new DynamoDBEvent.Identity | ||
| { | ||
| PrincipalId = record.UserIdentity.PrincipalId, | ||
| Type = record.UserIdentity.Type | ||
| }; | ||
| } | ||
|
|
||
| return lambdaRecord; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Convert SDK AttributeValue dictionary to Lambda event AttributeValue dictionary. | ||
| /// </summary> | ||
| internal static Dictionary<string, DynamoDBEvent.AttributeValue> ConvertAttributeMap(Dictionary<string, AttributeValue> sdkMap) | ||
| { | ||
| var result = new Dictionary<string, DynamoDBEvent.AttributeValue>(); | ||
| foreach (var kvp in sdkMap) | ||
| { | ||
| result[kvp.Key] = ConvertAttributeValue(kvp.Value); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Convert a single SDK AttributeValue to the Lambda event AttributeValue. | ||
| /// </summary> | ||
| internal static DynamoDBEvent.AttributeValue ConvertAttributeValue(AttributeValue sdkValue) | ||
| { | ||
| var lambdaValue = new DynamoDBEvent.AttributeValue(); | ||
|
|
||
| if (sdkValue.S != null) | ||
| lambdaValue.S = sdkValue.S; | ||
| if (sdkValue.N != null) | ||
| lambdaValue.N = sdkValue.N; | ||
| if (sdkValue.B != null) | ||
| lambdaValue.B = sdkValue.B; | ||
| if (sdkValue.BOOL != null) | ||
| lambdaValue.BOOL = sdkValue.BOOL; | ||
| if (sdkValue.NULL != null) | ||
| lambdaValue.NULL = sdkValue.NULL; | ||
| if (sdkValue.SS?.Count > 0) | ||
| lambdaValue.SS = sdkValue.SS; | ||
| if (sdkValue.NS?.Count > 0) | ||
| lambdaValue.NS = sdkValue.NS; | ||
| if (sdkValue.BS?.Count > 0) | ||
| lambdaValue.BS = sdkValue.BS; | ||
| if (sdkValue.L?.Count > 0) | ||
| lambdaValue.L = sdkValue.L.Select(ConvertAttributeValue).ToList(); | ||
| if (sdkValue.M?.Count > 0) | ||
|
normj marked this conversation as resolved.
Outdated
normj marked this conversation as resolved.
Outdated
|
||
| lambdaValue.M = ConvertAttributeMap(sdkValue.M); | ||
|
|
||
| return lambdaValue; | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.