DynamoDB provides CDC data to Kinesis following the instructions here. The steps are simple.
- Create a Kinesis stream
- Create a DynamoDB table
- Go to the
Exports and streamsand enableAmazon Kinesis data stream details. - Provide the Kinesis stream you created.
This example creates a Kinesis source connection to the stream you created. The data loader is implemented in NodeJS.
Below is a diagram of a use case. We will be implementing everything above Cleanse.
flowchart TD;
NodeJS-->ddb[DynamoDB]-->Kinesis-->dc[Decodable Kinesis Source]-->Cleanse-->Materialize-->pg[(Materialize in a Database)]
Other[Other Streaming Data]-->Enrichment
Materialize-->Enrichment
Create a .env file to place your credentials. The Makefile will use this information to build the stream and Kinesis source connection.
AWS_REGION=us-west-2
AWS_ARN=
AWS_KINESIS_STREAM=hubert_dynamocdc
We will be using NodeJS to send data to DynamoDB. Install the modules below.
npm install aws-sdk
npm install randomstring
Set up the Decodable components using the Makefile
make raw # creates the stream
make kinesis # creates the Kinesis source connectionLoad the data
make loadThe object we are sending to DynamoDB is a Customer record. Below is the change event DynamoDB sends to Kinesis. The Decodable stream schema was hand created to match this schema exactly.
Important fields
- The
eventNameproperty indicates the operation that occurred for this event. tableNameindicates the table.- The data exists in the
dynamodbelement which identifies the DynamoDBKeysNewImageis the state of the record after the changeOldImageis the state of the record before the change.ApproximateCreationDateTimeis the timestamp to use.
{
"awsRegion": "us-west-2",
"eventID": "d3ed405e-420b-4601-8aaf-bda773a16d86",
"eventName": "MODIFY",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "hubert_dynamocdc",
"dynamodb": {
"ApproximateCreationDateTime": 1667565639982,
"Keys": {
"userid": {
"N": "8"
}
},
"NewImage": {
"last_name": {
"S": "gX8fQ2n"
},
"phone": {
"N": "8"
},
"userid": {
"N": "8"
},
"first_name": {
"S": "gX8fQ2n"
}
},
"OldImage": {
"last_name": {
"S": "K2zqYSK"
},
"phone": {
"N": "8"
},
"userid": {
"N": "8"
},
"first_name": {
"S": "K2zqYSK"
}
},
"SizeBytes": 104
},
"eventSource": "aws:dynamodb"
}
Here is the data in the append stream in Decodable.
