Skip to content

Commit a09ac14

Browse files
authored
Merge pull request #438 from jgilbert01/conditional-update-fallback
Conditional update fallback
2 parents 828e10c + dc961c9 commit a09ac14

10 files changed

Lines changed: 571 additions & 160 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.19",
3+
"version": "1.1.20",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/flavors/materialize.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ export const materialize = (rule) => (s) => s // eslint-disable-line import/pref
2626
.map(toUpdateRequest(rule))
2727
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
2828

29+
.map(toFallbackUpdateRequest(rule))
30+
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
31+
2932
.through(updateDynamoDB(rule))
3033

3134
.tap(printEndPipeline);
@@ -37,3 +40,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
3740
...uow,
3841
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
3942
}));
43+
44+
const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
45+
...uow,
46+
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
47+
}));

src/flavors/update.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ export const update = (rule) => (s) => s // eslint-disable-line import/prefer-de
4141
.map(toUpdateRequest(rule))
4242
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
4343

44+
.map(toFallbackUpdateRequest(rule))
45+
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
46+
4447
.through(updateDynamoDB(rule))
4548

4649
.tap(printEndPipeline);
@@ -68,3 +71,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
6871
...uow,
6972
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
7073
}));
74+
75+
const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
76+
...uow,
77+
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
78+
}));

src/sinks/dynamodb.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import _ from 'highland';
22

3+
import { isEmpty } from 'lodash';
34
import Connector from '../connectors/dynamodb';
45

56
import { rejectWithFault } from '../utils/faults';
@@ -81,6 +82,7 @@ export const updateDynamoDB = ({
8182
tableName = process.env.ENTITY_TABLE_NAME || process.env.EVENT_TABLE_NAME,
8283
updateRequestField = 'updateRequest',
8384
updateResponseField = 'updateResponse',
85+
fallbackUpdateRequestField = 'fallbackUpdateRequest',
8486
parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4,
8587
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
8688
removeUndefinedValues = true,
@@ -95,11 +97,19 @@ export const updateDynamoDB = ({
9597
const invoke = (uow) => {
9698
if (!uow[updateRequestField]) return _(Promise.resolve(uow));
9799

98-
const p = () => connector.update(uow[updateRequestField], uow)
99-
.then((updateResponse) => ({ ...uow, [updateResponseField]: updateResponse }))
100+
const p = (updateRequest, isFallback) => () => connector.update(updateRequest, uow)
101+
.then((updateResponse) => {
102+
if (isEmpty(updateResponse) && uow[fallbackUpdateRequestField] && !isFallback) {
103+
// If its empty, that indicates a conditional write failure, in that case we want to run the fallback
104+
// update, if present.
105+
return uow.metrics?.w(p(uow[fallbackUpdateRequestField], true)) || p(uow[fallbackUpdateRequestField], true)();
106+
} else {
107+
return { ...uow, [updateResponseField]: updateResponse };
108+
}
109+
})
100110
.catch(rejectWithFault(uow));
101111

102-
return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
112+
return _(uow.metrics?.w(p(uow[updateRequestField], false), step) || p(uow[updateRequestField], false)()); // wrap promise in a stream
103113
};
104114

105115
return (s) => s

test/unit/flavors/materialize.test.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'mocha';
22
import { expect } from 'chai';
33
import sinon from 'sinon';
44

5+
import { cloneDeep } from 'lodash';
56
import {
67
initialize, initializeFrom,
78
ttl,
@@ -92,6 +93,99 @@ describe('flavors/materialize.js', () => {
9293
})
9394
.done(done);
9495
});
96+
97+
it('should optionally call the fallback update request', (done) => {
98+
const events = toKinesisRecords([
99+
{
100+
type: 'm1',
101+
timestamp: 1548967022000,
102+
thing: {
103+
id: '1',
104+
name: 'Thing One',
105+
description: 'This is thing one',
106+
},
107+
},
108+
{
109+
type: 'split',
110+
timestamp: 1548967022000,
111+
root: {
112+
things: [{
113+
id: '2',
114+
name: 'Thing One',
115+
description: 'This is thing one',
116+
}, {
117+
id: '3',
118+
name: 'Thing One',
119+
description: 'This is thing one',
120+
}],
121+
},
122+
},
123+
]);
124+
125+
const ruleWithFallbackUpdateRequest = cloneDeep(rules[0]);
126+
ruleWithFallbackUpdateRequest.toFallbackUpdateRequest = (uow) => ({
127+
Key: {
128+
pk: uow.event.thing.id,
129+
sk: 'thing',
130+
},
131+
...updateExpression({
132+
fallbackUpdate: true,
133+
}),
134+
});
135+
136+
initialize({
137+
...initializeFrom([ruleWithFallbackUpdateRequest]),
138+
})
139+
.assemble(fromKinesis(events), false)
140+
.collect()
141+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
142+
.tap((collected) => {
143+
expect(collected.length).to.equal(1);
144+
expect(collected[0].pipeline).to.equal('mv1');
145+
expect(collected[0].event.type).to.equal('m1');
146+
expect(collected[0].updateRequest).to.deep.equal({
147+
Key: {
148+
pk: '1',
149+
sk: 'thing',
150+
},
151+
ExpressionAttributeNames: {
152+
'#id': 'id',
153+
'#name': 'name',
154+
'#description': 'description',
155+
156+
'#discriminator': 'discriminator',
157+
'#ttl': 'ttl',
158+
'#timestamp': 'timestamp',
159+
},
160+
ExpressionAttributeValues: {
161+
':id': '1',
162+
':name': 'Thing One',
163+
':description': 'This is thing one',
164+
':discriminator': 'thing',
165+
':ttl': 1549053422,
166+
':timestamp': 1548967022000,
167+
},
168+
UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #ttl = :ttl, #timestamp = :timestamp',
169+
ReturnValues: 'ALL_NEW',
170+
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
171+
});
172+
expect(collected[0].fallbackUpdateRequest).to.deep.equal({
173+
Key: {
174+
pk: '1',
175+
sk: 'thing',
176+
},
177+
ExpressionAttributeNames: {
178+
'#fallbackUpdate': 'fallbackUpdate',
179+
},
180+
ExpressionAttributeValues: {
181+
':fallbackUpdate': true,
182+
},
183+
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
184+
ReturnValues: 'ALL_NEW',
185+
});
186+
})
187+
.done(done);
188+
});
95189
});
96190

97191
const toUpdateRequest = (uow) => ({

test/unit/flavors/materializeTimestream.test.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import sinon from 'sinon';
44

55
import {
66
initialize, initializeFrom,
7-
ttl,
87
} from '../../../src';
98

109
import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';

test/unit/flavors/update.test.js

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
77
import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';
88
import { mockClient } from 'aws-sdk-client-mock';
99

10+
import { cloneDeep } from 'lodash';
1011
import {
1112
initialize, initializeFrom,
1213
} from '../../../src';
@@ -28,7 +29,10 @@ describe('flavors/update.js', () => {
2829
let mockDdb;
2930

3031
beforeEach(() => {
31-
sinon.stub(DynamoDBConnector.prototype, 'update').resolves({});
32+
sinon.stub(DynamoDBConnector.prototype, 'update')
33+
.onFirstCall().resolves({})
34+
.onSecondCall()
35+
.resolves({});
3236
});
3337

3438
afterEach(() => {
@@ -214,6 +218,158 @@ describe('flavors/update.js', () => {
214218
.done(done);
215219
});
216220

221+
it('should optionally run fallback update request', (done) => {
222+
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
223+
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({
224+
Responses: {
225+
undefined: [{
226+
pk: '2',
227+
sk: 'thing',
228+
discriminator: 'thing',
229+
name: 'thing2',
230+
}],
231+
},
232+
UnprocessedKeys: {},
233+
});
234+
235+
sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE);
236+
237+
const events = toDynamodbRecords([
238+
{
239+
timestamp: 1572832690,
240+
keys: {
241+
pk: '1',
242+
sk: 'thing',
243+
},
244+
newImage: {
245+
pk: '1',
246+
sk: 'thing',
247+
discriminator: 'thing',
248+
name: 'Thing One',
249+
description: 'This is thing one',
250+
otherThing: 'thing|2',
251+
ttl: 1549053422,
252+
timestamp: 1548967022000,
253+
},
254+
},
255+
{
256+
timestamp: 1572832690,
257+
keys: {
258+
pk: '1',
259+
sk: 'other',
260+
},
261+
newImage: {
262+
pk: '1',
263+
sk: 'other',
264+
discriminator: 'other',
265+
name: 'Other One',
266+
description: 'This is other one',
267+
ttl: 1549053422,
268+
timestamp: 1548967022000,
269+
},
270+
},
271+
]);
272+
const rulesWithFallbackUpdateRequest = cloneDeep([rules[0]]);
273+
rulesWithFallbackUpdateRequest[0].toFallbackUpdateRequest = (uow) => ({
274+
Key: {
275+
pk: uow.event.raw.new.pk,
276+
sk: uow.event.raw.new.sk,
277+
},
278+
...updateExpression({
279+
fallbackUpdate: true,
280+
}),
281+
});
282+
283+
initialize({
284+
...initializeFrom(rulesWithFallbackUpdateRequest),
285+
}, { ...defaultOptions, AES: false })
286+
.assemble(fromDynamodb(events), false)
287+
.collect()
288+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
289+
.tap((collected) => {
290+
expect(collected.length).to.equal(1);
291+
expect(collected[0].pipeline).to.equal('update1');
292+
expect(collected[0].event.type).to.equal('thing-created');
293+
expect(collected[0].batchGetRequest).to.deep.equal({
294+
RequestItems: {
295+
undefined: {
296+
Keys: [{
297+
pk: '2',
298+
sk: 'thing',
299+
}],
300+
},
301+
},
302+
});
303+
expect(collected[0].batchGetResponse).to.deep.equal({
304+
Responses: {
305+
undefined: [
306+
{
307+
pk: '2',
308+
sk: 'thing',
309+
discriminator: 'thing',
310+
name: 'thing2',
311+
},
312+
],
313+
},
314+
UnprocessedKeys: {},
315+
});
316+
expect(collected[0].updateRequest).to.deep.equal({
317+
Key: {
318+
pk: '1',
319+
sk: 'thing',
320+
},
321+
ExpressionAttributeNames: {
322+
'#pk': 'pk',
323+
'#sk': 'sk',
324+
'#discriminator': 'discriminator',
325+
'#name': 'name',
326+
'#description': 'description',
327+
'#otherThing': 'otherThing',
328+
'#ttl': 'ttl',
329+
'#timestamp': 'timestamp',
330+
},
331+
ExpressionAttributeValues: {
332+
':pk': '1',
333+
':sk': 'thing',
334+
':discriminator': 'thing',
335+
':name': 'Thing One',
336+
':description': 'This is thing one',
337+
':ttl': 1549053422,
338+
':timestamp': 1548967022000,
339+
':otherThing': {
340+
pk: '2',
341+
sk: 'thing',
342+
discriminator: 'thing',
343+
name: 'thing2',
344+
},
345+
},
346+
UpdateExpression: 'SET #pk = :pk, #sk = :sk, #discriminator = :discriminator, #name = :name, #description = :description, #otherThing = :otherThing, #ttl = :ttl, #timestamp = :timestamp',
347+
ReturnValues: 'ALL_NEW',
348+
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
349+
});
350+
351+
expect(collected[0].fallbackUpdateRequest).to.deep.equal({
352+
Key: {
353+
pk: '1',
354+
sk: 'thing',
355+
},
356+
ExpressionAttributeNames: {
357+
'#fallbackUpdate': 'fallbackUpdate',
358+
},
359+
ExpressionAttributeValues: {
360+
':fallbackUpdate': true,
361+
},
362+
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
363+
ReturnValues: 'ALL_NEW',
364+
});
365+
366+
expect(collected[0].updateResponse).to.deep.equal({});
367+
expect(collected[0].queryRequest).to.be.undefined;
368+
expect(collected[0].getRequest).to.be.undefined;
369+
})
370+
.done(done);
371+
});
372+
217373
it('should fault on error', (done) => {
218374
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
219375
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({

0 commit comments

Comments
 (0)