Skip to content

Commit ab7df8e

Browse files
committed
Initial commit
1 parent d7c7642 commit ab7df8e

7 files changed

Lines changed: 376 additions & 0 deletions

File tree

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# SQS to DynamoDB using EventBridge Pipes with API Gateway and CDK/Python
2+
3+
This pattern will send messages from an SQS queue to a DynamoDB table via API Gateway using EventBridge Pipes.
4+
5+
Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-pipes-sqs-to-dynamodb
6+
7+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
8+
9+
## Requirements
10+
11+
- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
12+
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
13+
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
14+
- [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured
15+
16+
## Deployment Instructions
17+
18+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
19+
```bash
20+
git clone https://github.com/aws-samples/serverless-patterns
21+
```
22+
2. Change directory to the pattern directory:
23+
```bash
24+
cd serverless-patterns/eventbridge-pipes-sqs-to-dynamodb
25+
```
26+
3. To manually create a virtualenv on MacOS and Linux:
27+
```bash
28+
$ python3 -m venv .venv
29+
```
30+
4. After the init process completes and the virtualenv is created, you can use the following
31+
step to activate your virtualenv.
32+
```bash
33+
$ source .venv/bin/activate
34+
```
35+
5. If you are a Windows platform, you would activate the virtualenv like this:
36+
```bash
37+
% .venv\Scripts\activate.bat
38+
```
39+
6. Once the virtualenv is activated, you can install the required dependencies.
40+
```bash
41+
$ pip install -r requirements.txt
42+
```
43+
7. To deploy the application:
44+
```bash
45+
$ cdk deploy
46+
```
47+
48+
## How it works
49+
50+
This template will create an SQS queue, EventBridge Pipe, API Gateway and a DynamoDB table.
51+
52+
Messages sent to the SQS queue are polled by EventBridge Pipe. EventBridge Pipe processes the messages and sends them to API Gateway endpoint. API Gateway transforms the message and writes the data to DynamoDB table using direct integration.
53+
54+
## Testing
55+
56+
Once this stack is deployed in your AWS account, copy the SQS queue name value from the output.
57+
58+
Then, send a message to the SQS queue as follows:
59+
```sh
60+
aws sqs send-message \
61+
--queue-url "https://sqs.<region-id>.amazonaws.com/<account-id>/<queue-name>" \
62+
--message-body '{"Message": "{\"content\":\"Test message\",\"params\":{\"name\":\"Mario\",\"surname\":\"Rossi\"}}"}'
63+
```
64+
65+
When you check the DynamoDB table, you can see the entry with all the attributes parsed by API Gateway.
66+
67+
## Cleanup
68+
69+
1. Delete the stack
70+
71+
```bash
72+
cdk destroy
73+
```
74+
75+
---
76+
77+
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
78+
79+
SPDX-License-Identifier: MIT-0
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env python3
2+
import os
3+
import aws_cdk as cdk
4+
from eventbridge_pipes_sqs_to_dynamodb.eventbridge_pipes_sqs_to_dynamodb import EventbridgePipesSqsToDynamodb
5+
6+
app = cdk.App()
7+
stack = EventbridgePipesSqsToDynamodb(app, "EventbridgePipesSqsToDynamodb")
8+
9+
app.synth()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"app": "python3 app.py"
3+
}

eventbridge-pipes-sqs-to-dynamodb/eventbridge_pipes_sqs_to_dynamodb/__init__.py

Whitespace-only changes.
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
from aws_cdk import (
2+
Stack,
3+
aws_apigateway as apigateway,
4+
aws_dynamodb as dynamodb,
5+
aws_logs as logs,
6+
aws_pipes as pipes,
7+
aws_sqs as sqs,
8+
aws_iam as iam,
9+
RemovalPolicy,
10+
CfnOutput
11+
)
12+
from constructs import Construct
13+
import aws_cdk as cdk
14+
15+
class EventbridgePipesSqsToDynamodb(Stack):
16+
17+
def __init__(self, scope: Construct, construct_id: str, env: cdk.Environment, **kwargs) -> None:
18+
super().__init__(scope, construct_id, env=env, **kwargs)
19+
20+
# SQS queue for decoupling and buffering events for destination_lambda_b
21+
source_queue = sqs.Queue(
22+
self, "EntryPointToEventbridgePipe",
23+
visibility_timeout=cdk.Duration.seconds(60),
24+
retention_period=cdk.Duration.days(4),
25+
enforce_ssl=True
26+
)
27+
28+
# DynamoDB table
29+
self.table = dynamodb.Table(
30+
self, "EventTableNew",
31+
table_name="Audit-Table",
32+
partition_key=dynamodb.Attribute(
33+
name="id",
34+
type=dynamodb.AttributeType.STRING
35+
),
36+
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
37+
removal_policy=RemovalPolicy.DESTROY,
38+
point_in_time_recovery_specification=dynamodb.PointInTimeRecoverySpecification(
39+
point_in_time_recovery_enabled=True
40+
)
41+
)
42+
43+
# IAM role for API Gateway to access DynamoDB
44+
api_gateway_role = iam.Role(
45+
self, "ApiGatewayDynamoDBRole",
46+
assumed_by=iam.ServicePrincipal("apigateway.amazonaws.com"),
47+
inline_policies={
48+
"DynamoDBAccess": iam.PolicyDocument(
49+
statements=[
50+
iam.PolicyStatement(
51+
actions=[
52+
"dynamodb:PutItem",
53+
"dynamodb:GetItem",
54+
"dynamodb:UpdateItem",
55+
"dynamodb:DeleteItem",
56+
"dynamodb:Query",
57+
"dynamodb:Scan"
58+
],
59+
resources=[self.table.table_arn]
60+
)
61+
]
62+
)
63+
}
64+
)
65+
66+
# CloudWatch Log Group for API Gateway
67+
api_log_group = logs.LogGroup(
68+
self, "ApiGatewayLogGroup",
69+
removal_policy=RemovalPolicy.DESTROY
70+
)
71+
72+
stage_name = "test"
73+
74+
# API Gateway with Resource policy to be invoked only by Eventbridge Pipes
75+
self.api = apigateway.RestApi(
76+
self, "RegionalApi",
77+
rest_api_name="EventBridge-DynamoDB-API",
78+
endpoint_configuration=apigateway.EndpointConfiguration(
79+
types=[apigateway.EndpointType.REGIONAL]
80+
),
81+
policy=iam.PolicyDocument(
82+
statements=[
83+
iam.PolicyStatement(
84+
effect=iam.Effect.ALLOW,
85+
principals=[iam.ServicePrincipal("pipes.amazonaws.com")],
86+
actions=["execute-api:Invoke"],
87+
resources=[f"execute-api:/{stage_name}/POST/events"]
88+
)
89+
]
90+
),
91+
deploy_options=apigateway.StageOptions(
92+
stage_name=stage_name,
93+
access_log_destination=apigateway.LogGroupLogDestination(api_log_group),
94+
access_log_format=apigateway.AccessLogFormat.clf(),
95+
logging_level=apigateway.MethodLoggingLevel.INFO,
96+
data_trace_enabled=True
97+
)
98+
)
99+
100+
# API Gateway integration with DynamoDB
101+
dynamodb_integration = apigateway.AwsIntegration(
102+
service="dynamodb",
103+
action="PutItem",
104+
options=apigateway.IntegrationOptions(
105+
credentials_role=api_gateway_role,
106+
request_templates={
107+
"application/json": f'''#set($inputRoot = $input.path('$'))
108+
#set($body = $util.parseJson($inputRoot.body))
109+
#set($message = $util.parseJson($body.Message))
110+
{{
111+
"TableName": "{self.table.table_name}",
112+
"Item": {{
113+
"id": {{
114+
"S": "$context.requestId"
115+
}},
116+
"createdAt": {{
117+
"S": "$context.requestTime"
118+
}},
119+
"name": {{
120+
"S": "$message.params.name"
121+
}},
122+
"surname": {{
123+
"S": "$message.params.surname"
124+
}},
125+
"content": {{
126+
"S": "$util.escapeJavaScript($message.content)"
127+
}}
128+
}}
129+
}}'''
130+
},
131+
integration_responses=[
132+
apigateway.IntegrationResponse(
133+
status_code="200",
134+
response_templates={
135+
"application/json": '{"status": "success", "id": "$context.requestId"}'
136+
}
137+
)
138+
]
139+
)
140+
)
141+
142+
# API Gateway resource and method with IAM authentication
143+
events_resource = self.api.root.add_resource("events")
144+
events_resource.add_method(
145+
"POST",
146+
dynamodb_integration,
147+
authorization_type=apigateway.AuthorizationType.IAM,
148+
method_responses=[
149+
apigateway.MethodResponse(
150+
status_code="200",
151+
response_models={
152+
"application/json": apigateway.Model.EMPTY_MODEL
153+
}
154+
)
155+
]
156+
)
157+
158+
# IAM role for EventBridge Pipe
159+
pipe_role = iam.Role(
160+
self, "EventBridgePipeRole",
161+
assumed_by=iam.ServicePrincipal("pipes.amazonaws.com").with_conditions({
162+
"StringEquals": {
163+
"aws:SourceAccount": cdk.Stack.of(self).account,
164+
"aws:SourceArn": f"arn:aws:pipes:{cdk.Stack.of(self).region}:{cdk.Stack.of(self).account}:pipe/EventBridgePipe"
165+
}
166+
}),
167+
inline_policies={
168+
"SqsPipeSourceAccess": iam.PolicyDocument(
169+
statements=[
170+
iam.PolicyStatement(
171+
actions=[
172+
"sqs:ReceiveMessage",
173+
"sqs:DeleteMessage",
174+
"sqs:GetQueueAttributes"
175+
],
176+
resources=[source_queue.queue_arn]
177+
)
178+
]
179+
),
180+
"ApiGatewayPipeTargetAccess": iam.PolicyDocument(
181+
statements=[
182+
iam.PolicyStatement(
183+
actions=[
184+
"execute-api:Invoke",
185+
"execute-api:ManageConnections"
186+
],
187+
resources=[f"arn:aws:execute-api:{cdk.Stack.of(self).region}:{cdk.Stack.of(self).account}:{self.api.rest_api_id}/{stage_name}/*"]
188+
)
189+
]
190+
)
191+
}
192+
)
193+
194+
# EventBridge Pipe: SQS → API Gateway
195+
self.pipe = pipes.CfnPipe(
196+
self, "SqsToApiGatewayPipe",
197+
role_arn=pipe_role.role_arn,
198+
name="EventBridgePipe",
199+
desired_state="RUNNING",
200+
source=source_queue.queue_arn,
201+
source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
202+
sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
203+
batch_size=1
204+
)
205+
),
206+
target=f"arn:aws:execute-api:{cdk.Stack.of(self).region}:{cdk.Stack.of(self).account}:{self.api.rest_api_id}/{stage_name}/POST/events",
207+
target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
208+
http_parameters=pipes.CfnPipe.PipeTargetHttpParametersProperty(
209+
header_parameters={
210+
"Content-Type": "application/json"
211+
}
212+
)
213+
)
214+
)
215+
216+
# Output
217+
CfnOutput(self, "QueueName", value=source_queue.queue_name)
218+
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
{
2+
"title": "SQS to DynamoDB using EventBridge Pipes with API Gateway and CDK Python",
3+
"description": "Pattern that sends messages from SQS queue to DynamoDB table via API Gateway using EventBridge Pipes. Implemented with CDK using Python.",
4+
"language": "Python",
5+
"level": "300",
6+
"framework": "AWS CDK",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"Messages sent to the SQS queue are polled by EventBridge Pipe.",
11+
"EventBridge Pipe processes the messages and sends them to API Gateway endpoint.",
12+
"API Gateway transforms the message and writes the data to DynamoDB table using direct integration."
13+
]
14+
},
15+
"gitHub": {
16+
"template": {
17+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-sqs-to-dynamodb",
18+
"templateURL": "serverless-patterns/eventbridge-pipes-sqs-to-dynamodb",
19+
"projectFolder": "eventbridge-pipes-sqs-to-dynamodb",
20+
"templateFile": "eventbridge-pipes-sqs-to-dynamodb/eventbridge_pipes_sqs_to_dynamodb/eventbridge_pipes_sqs_to_dynamodb.py"
21+
}
22+
},
23+
"resources": {
24+
"bullets": [
25+
{
26+
"text": "Amazon Simple Queue Service",
27+
"link": "https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html"
28+
},
29+
{
30+
"text": "EventBridge Pipes Documentation",
31+
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html"
32+
},
33+
{
34+
"text": "Cloudformation API for Pipes",
35+
"link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html"
36+
},
37+
{
38+
"text": "Pipes Documentation for CDK v2 Python",
39+
"link": "https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_pipes/CfnPipe.html"
40+
},
41+
{
42+
"text": "API Gateway DynamoDB Integration",
43+
"link": "https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-overview-developer-experience.html"
44+
}
45+
]
46+
},
47+
"deploy": {
48+
"text": ["cdk deploy"]
49+
},
50+
"testing": {
51+
"text": ["See the README in the GitHub repo for detailed testing instructions."]
52+
},
53+
"cleanup": {
54+
"text": ["Delete the stack: <code>cdk destroy</code>."]
55+
},
56+
"authors": [
57+
{
58+
"name": "Michele Scarimbolo",
59+
"image": "https://avatars.githubusercontent.com/u/229997073",
60+
"bio": "Technical Account Manager at AWS. Serverless specialist and enthusiast.",
61+
"linkedin": "michele-scarimbolo",
62+
"twitter": ""
63+
}
64+
]
65+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
aws-cdk-lib==2.181.1
2+
constructs>=10.0.0,<11.0.0

0 commit comments

Comments
 (0)