Skip to content

Commit 5708303

Browse files
committed
Optionally provide a fallback update request to the updateDynamoDB sink.
1 parent 828e10c commit 5708303

5 files changed

Lines changed: 450 additions & 141 deletions

File tree

src/flavors/update.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ export const update = (rule) => (s) => s // eslint-disable-line import/prefer-de
4141
.map(toUpdateRequest(rule))
4242
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
4343

44+
.map(toFallbackUpdateRequest(rule))
45+
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
46+
4447
.through(updateDynamoDB(rule))
4548

4649
.tap(printEndPipeline);
@@ -68,3 +71,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
6871
...uow,
6972
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
7073
}));
74+
75+
const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
76+
...uow,
77+
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
78+
}));

src/sinks/dynamodb.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import _ from 'highland';
22

3+
import { isEmpty } from 'lodash';
34
import Connector from '../connectors/dynamodb';
45

56
import { rejectWithFault } from '../utils/faults';
@@ -81,6 +82,7 @@ export const updateDynamoDB = ({
8182
tableName = process.env.ENTITY_TABLE_NAME || process.env.EVENT_TABLE_NAME,
8283
updateRequestField = 'updateRequest',
8384
updateResponseField = 'updateResponse',
85+
fallbackUpdateRequestField = 'fallbackUpdateRequest',
8486
parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4,
8587
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
8688
removeUndefinedValues = true,
@@ -95,11 +97,19 @@ export const updateDynamoDB = ({
9597
const invoke = (uow) => {
9698
if (!uow[updateRequestField]) return _(Promise.resolve(uow));
9799

98-
const p = () => connector.update(uow[updateRequestField], uow)
99-
.then((updateResponse) => ({ ...uow, [updateResponseField]: updateResponse }))
100+
const p = (updateRequest, isFallback) => () => connector.update(updateRequest, uow)
101+
.then((updateResponse) => {
102+
if (isEmpty(updateResponse) && uow[fallbackUpdateRequestField] && !isFallback) {
103+
// If its empty, that indicates a conditional write failure, in that case we want to run the fallback
104+
// update, if present.
105+
return p(uow[fallbackUpdateRequestField], true)();
106+
} else {
107+
return { ...uow, [updateResponseField]: updateResponse };
108+
}
109+
})
100110
.catch(rejectWithFault(uow));
101111

102-
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
112+
return _(uow.metrics?.w(p(uow[updateRequestField], false), step) || p(uow[updateRequestField], false)()); // wrap promise in a stream
103113
};
104114

105115
return (s) => s

test/unit/flavors/materializeTimestream.test.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import sinon from 'sinon';
44

55
import {
66
initialize, initializeFrom,
7-
ttl,
87
} from '../../../src';
98

109
import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';

test/unit/flavors/update.test.js

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
77
import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';
88
import { mockClient } from 'aws-sdk-client-mock';
99

10+
import { cloneDeep } from 'lodash';
1011
import {
1112
initialize, initializeFrom,
1213
} from '../../../src';
@@ -28,7 +29,10 @@ describe('flavors/update.js', () => {
2829
let mockDdb;
2930

3031
beforeEach(() => {
31-
sinon.stub(DynamoDBConnector.prototype, 'update').resolves({});
32+
sinon.stub(DynamoDBConnector.prototype, 'update')
33+
.onFirstCall().resolves({})
34+
.onSecondCall()
35+
.resolves({});
3236
});
3337

3438
afterEach(() => {
@@ -214,6 +218,158 @@ describe('flavors/update.js', () => {
214218
.done(done);
215219
});
216220

221+
it('should optionally run fallback update request', (done) => {
222+
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
223+
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({
224+
Responses: {
225+
undefined: [{
226+
pk: '2',
227+
sk: 'thing',
228+
discriminator: 'thing',
229+
name: 'thing2',
230+
}],
231+
},
232+
UnprocessedKeys: {},
233+
});
234+
235+
sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE);
236+
237+
const events = toDynamodbRecords([
238+
{
239+
timestamp: 1572832690,
240+
keys: {
241+
pk: '1',
242+
sk: 'thing',
243+
},
244+
newImage: {
245+
pk: '1',
246+
sk: 'thing',
247+
discriminator: 'thing',
248+
name: 'Thing One',
249+
description: 'This is thing one',
250+
otherThing: 'thing|2',
251+
ttl: 1549053422,
252+
timestamp: 1548967022000,
253+
},
254+
},
255+
{
256+
timestamp: 1572832690,
257+
keys: {
258+
pk: '1',
259+
sk: 'other',
260+
},
261+
newImage: {
262+
pk: '1',
263+
sk: 'other',
264+
discriminator: 'other',
265+
name: 'Other One',
266+
description: 'This is other one',
267+
ttl: 1549053422,
268+
timestamp: 1548967022000,
269+
},
270+
},
271+
]);
272+
const rulesWithFallbackUpdateRequest = cloneDeep([rules[0]]);
273+
rulesWithFallbackUpdateRequest[0].toFallbackUpdateRequest = (uow) => ({
274+
Key: {
275+
pk: uow.event.raw.new.pk,
276+
sk: uow.event.raw.new.sk,
277+
},
278+
...updateExpression({
279+
fallbackUpdate: true,
280+
}),
281+
});
282+
283+
initialize({
284+
...initializeFrom(rulesWithFallbackUpdateRequest),
285+
}, { ...defaultOptions, AES: false })
286+
.assemble(fromDynamodb(events), false)
287+
.collect()
288+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
289+
.tap((collected) => {
290+
expect(collected.length).to.equal(1);
291+
expect(collected[0].pipeline).to.equal('update1');
292+
expect(collected[0].event.type).to.equal('thing-created');
293+
expect(collected[0].batchGetRequest).to.deep.equal({
294+
RequestItems: {
295+
undefined: {
296+
Keys: [{
297+
pk: '2',
298+
sk: 'thing',
299+
}],
300+
},
301+
},
302+
});
303+
expect(collected[0].batchGetResponse).to.deep.equal({
304+
Responses: {
305+
undefined: [
306+
{
307+
pk: '2',
308+
sk: 'thing',
309+
discriminator: 'thing',
310+
name: 'thing2',
311+
},
312+
],
313+
},
314+
UnprocessedKeys: {},
315+
});
316+
expect(collected[0].updateRequest).to.deep.equal({
317+
Key: {
318+
pk: '1',
319+
sk: 'thing',
320+
},
321+
ExpressionAttributeNames: {
322+
'#pk': 'pk',
323+
'#sk': 'sk',
324+
'#discriminator': 'discriminator',
325+
'#name': 'name',
326+
'#description': 'description',
327+
'#otherThing': 'otherThing',
328+
'#ttl': 'ttl',
329+
'#timestamp': 'timestamp',
330+
},
331+
ExpressionAttributeValues: {
332+
':pk': '1',
333+
':sk': 'thing',
334+
':discriminator': 'thing',
335+
':name': 'Thing One',
336+
':description': 'This is thing one',
337+
':ttl': 1549053422,
338+
':timestamp': 1548967022000,
339+
':otherThing': {
340+
pk: '2',
341+
sk: 'thing',
342+
discriminator: 'thing',
343+
name: 'thing2',
344+
},
345+
},
346+
UpdateExpression: 'SET #pk = :pk, #sk = :sk, #discriminator = :discriminator, #name = :name, #description = :description, #otherThing = :otherThing, #ttl = :ttl, #timestamp = :timestamp',
347+
ReturnValues: 'ALL_NEW',
348+
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
349+
});
350+
351+
expect(collected[0].fallbackUpdateRequest).to.deep.equal({
352+
Key: {
353+
pk: '1',
354+
sk: 'thing',
355+
},
356+
ExpressionAttributeNames: {
357+
'#fallbackUpdate': 'fallbackUpdate',
358+
},
359+
ExpressionAttributeValues: {
360+
':fallbackUpdate': true,
361+
},
362+
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
363+
ReturnValues: 'ALL_NEW',
364+
});
365+
366+
expect(collected[0].updateResponse).to.deep.equal({});
367+
expect(collected[0].queryRequest).to.be.undefined;
368+
expect(collected[0].getRequest).to.be.undefined;
369+
})
370+
.done(done);
371+
});
372+
217373
it('should fault on error', (done) => {
218374
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
219375
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({

0 commit comments

Comments
 (0)