Skip to content

Commit 0a2576e

Browse files
authored
Merge pull request #426 from jgilbert01/issue-filter-ttl-removes
Fix DDB stream TTL REMOVE event filtering.
2 parents 25990d0 + cbfbca8 commit 0a2576e

4 files changed

Lines changed: 106 additions & 21 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.12",
3+
"version": "1.1.13",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/from/dynamodb.js

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -165,22 +165,20 @@ export const outGlobalTableExtraModify = (record) => {
165165
//--------------------------------------------
166166

167167
export const outTtlExpiredEvents = (ignoreTtlExpiredEvents) => (record) => {
168-
// this is not a REMOVE event
169-
if (record.eventName !== 'REMOVE') return true;
170-
171-
const { OldImage } = record.dynamodb;
172-
173-
// this record does not have ttl
174-
if (!OldImage.ttl || !OldImage.timestamp) return true;
175-
176-
// ttl has not expired
177-
if (Number(OldImage.ttl.N) * 1000 > Number(OldImage.timestamp.N)) return true;
178-
179-
// this is a ttl expired event
180-
// should we ignore it
181-
/* istanbul ignore else */
182-
if (ignoreTtlExpiredEvents) {
183-
return false;
168+
const { eventName, userIdentity, dynamodb: { OldImage, ApproximateCreationDateTime } } = record;
169+
// this is not a REMOVE event or we're not ignoring the ttl expired events anyway.
170+
if (eventName !== 'REMOVE' || !ignoreTtlExpiredEvents) return true;
171+
172+
if (userIdentity) {
173+
// See https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_Record.html
174+
// We trust dynamodb that the ttl expired if its a remove and has the ttl expiry indicating
175+
// identity attributes.
176+
return !(userIdentity?.type === 'Service' && userIdentity?.principalId === 'dynamodb.amazonaws.com');
177+
} else if (OldImage.ttl?.N) {
178+
// If no user identity attribute is present, this may be a replicated TTL delete, but we still
179+
// want to honor it because filtering out replica region events may be disabled.
180+
const ttlSec = Number(OldImage.ttl.N);
181+
return !(ttlSec <= ApproximateCreationDateTime);
184182
} else {
185183
return true;
186184
}
@@ -206,6 +204,12 @@ export const toDynamodbRecords = (events, { removeUndefinedValues = true } = {})
206204
StreamViewType: 'NEW_AND_OLD_IMAGES',
207205
},
208206
// eventSourceARN: 'arn:aws:dynamodb:us-west-2:123456789012:table/myservice-entities/stream/2016-11-16T20:42:48.104',
207+
...(e.ttlDelete && {
208+
userIdentity: {
209+
principalId: 'dynamodb.amazonaws.com',
210+
type: 'Service',
211+
},
212+
}),
209213
})),
210214
});
211215

test/unit/from/dynamodb.test.js

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,49 @@ describe('from/dynamodb.js', () => {
907907
it('should ignore expired ttl', (done) => {
908908
const events = toDynamodbRecords([
909909
{
910-
timestamp: 1573005490000,
910+
timestamp: 1573005490,
911+
keys: {
912+
pk: '1',
913+
sk: 'thing',
914+
},
915+
oldImage: {
916+
pk: '1',
917+
sk: 'thing',
918+
name: 'N1',
919+
ttl: 1573005490,
920+
timestamp: 1573005490000,
921+
},
922+
ttlDelete: true,
923+
},
924+
{
925+
timestamp: 1573005490,
926+
keys: {
927+
pk: '1',
928+
sk: 'thing',
929+
},
930+
oldImage: {
931+
pk: '1',
932+
sk: 'thing',
933+
name: 'N1',
934+
ttl: 1573015491,
935+
timestamp: 1573005490000,
936+
},
937+
},
938+
]);
939+
940+
fromDynamodb(events, { ignoreTtlExpiredEvents: true })
941+
.collect()
942+
.tap((collected) => {
943+
// console.log(JSON.stringify(collected, null, 2));
944+
expect(collected.length).to.equal(1);
945+
})
946+
.done(done);
947+
});
948+
949+
it('should ignore replicated ttl', (done) => {
950+
const events = toDynamodbRecords([
951+
{
952+
timestamp: 1573005491,
911953
keys: {
912954
pk: '1',
913955
sk: 'thing',
@@ -930,7 +972,7 @@ describe('from/dynamodb.js', () => {
930972
pk: '1',
931973
sk: 'thing',
932974
name: 'N1',
933-
ttl: 1573015490, // hasn't expired yet
975+
ttl: 1573015490, // expired, has no identity attributes to indicate ttl delete
934976
timestamp: 1573005490000,
935977
},
936978
},
@@ -945,6 +987,45 @@ describe('from/dynamodb.js', () => {
945987
.done(done);
946988
});
947989

990+
it('should passes through record with no ttl if ignore ttl events is true', (done) => {
991+
const events = toDynamodbRecords([
992+
{
993+
timestamp: 1573005491,
994+
keys: {
995+
pk: '1',
996+
sk: 'thing',
997+
},
998+
oldImage: {
999+
pk: '1',
1000+
sk: 'thing',
1001+
name: 'N1',
1002+
timestamp: 1573005490000,
1003+
},
1004+
},
1005+
{
1006+
timestamp: 1573005490,
1007+
keys: {
1008+
pk: '1',
1009+
sk: 'thing',
1010+
},
1011+
oldImage: {
1012+
pk: '1',
1013+
sk: 'thing',
1014+
name: 'N1',
1015+
timestamp: 1573005490000,
1016+
},
1017+
},
1018+
]);
1019+
1020+
fromDynamodb(events, { ignoreTtlExpiredEvents: true })
1021+
.collect()
1022+
.tap((collected) => {
1023+
// console.log(JSON.stringify(collected, null, 2));
1024+
expect(collected.length).to.equal(2);
1025+
})
1026+
.done(done);
1027+
});
1028+
9481029
it('should keep replica records if ignoreReplicas is false', (done) => {
9491030
const events = toDynamodbRecords([
9501031
{

0 commit comments

Comments
 (0)