Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.1.19",
"version": "1.1.20",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down
8 changes: 8 additions & 0 deletions src/flavors/materialize.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ export const materialize = (rule) => (s) => s // eslint-disable-line import/pref
.map(toUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.map(toFallbackUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.through(updateDynamoDB(rule))

.tap(printEndPipeline);
Expand All @@ -37,3 +40,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
}));

const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
}));
8 changes: 8 additions & 0 deletions src/flavors/update.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export const update = (rule) => (s) => s // eslint-disable-line import/prefer-de
.map(toUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.map(toFallbackUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.through(updateDynamoDB(rule))

.tap(printEndPipeline);
Expand Down Expand Up @@ -68,3 +71,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
}));

const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
}));
16 changes: 13 additions & 3 deletions src/sinks/dynamodb.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'highland';

import { isEmpty } from 'lodash';
import Connector from '../connectors/dynamodb';

import { rejectWithFault } from '../utils/faults';
Expand Down Expand Up @@ -81,6 +82,7 @@ export const updateDynamoDB = ({
tableName = process.env.ENTITY_TABLE_NAME || process.env.EVENT_TABLE_NAME,
updateRequestField = 'updateRequest',
updateResponseField = 'updateResponse',
fallbackUpdateRequestField = 'fallbackUpdateRequest',
parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4,
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
removeUndefinedValues = true,
Expand All @@ -95,11 +97,19 @@ export const updateDynamoDB = ({
const invoke = (uow) => {
if (!uow[updateRequestField]) return _(Promise.resolve(uow));

const p = () => connector.update(uow[updateRequestField], uow)
.then((updateResponse) => ({ ...uow, [updateResponseField]: updateResponse }))
const p = (updateRequest, isFallback) => () => connector.update(updateRequest, uow)
.then((updateResponse) => {
if (isEmpty(updateResponse) && uow[fallbackUpdateRequestField] && !isFallback) {
// If its empty, that indicates a conditional write failure, in that case we want to run the fallback
// update, if present.
return p(uow[fallbackUpdateRequestField], true)();
Comment thread
petermyers marked this conversation as resolved.
Outdated
} else {
return { ...uow, [updateResponseField]: updateResponse };
}
})
.catch(rejectWithFault(uow));

return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
return _(uow.metrics?.w(p(uow[updateRequestField], false), step) || p(uow[updateRequestField], false)()); // wrap promise in a stream
};

return (s) => s
Expand Down
94 changes: 94 additions & 0 deletions test/unit/flavors/materialize.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';

import { cloneDeep } from 'lodash';
import {
initialize, initializeFrom,
ttl,
Expand Down Expand Up @@ -92,6 +93,99 @@ describe('flavors/materialize.js', () => {
})
.done(done);
});

it('should optionally call the fallback update request', (done) => {
const events = toKinesisRecords([
{
type: 'm1',
timestamp: 1548967022000,
thing: {
id: '1',
name: 'Thing One',
description: 'This is thing one',
},
},
{
type: 'split',
timestamp: 1548967022000,
root: {
things: [{
id: '2',
name: 'Thing One',
description: 'This is thing one',
}, {
id: '3',
name: 'Thing One',
description: 'This is thing one',
}],
},
},
]);

const ruleWithFallbackUpdateRequest = cloneDeep(rules[0]);
ruleWithFallbackUpdateRequest.toFallbackUpdateRequest = (uow) => ({
Key: {
pk: uow.event.thing.id,
sk: 'thing',
},
...updateExpression({
fallbackUpdate: true,
}),
});

initialize({
...initializeFrom([ruleWithFallbackUpdateRequest]),
})
.assemble(fromKinesis(events), false)
.collect()
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
.tap((collected) => {
expect(collected.length).to.equal(1);
expect(collected[0].pipeline).to.equal('mv1');
expect(collected[0].event.type).to.equal('m1');
expect(collected[0].updateRequest).to.deep.equal({
Key: {
pk: '1',
sk: 'thing',
},
ExpressionAttributeNames: {
'#id': 'id',
'#name': 'name',
'#description': 'description',

'#discriminator': 'discriminator',
'#ttl': 'ttl',
'#timestamp': 'timestamp',
},
ExpressionAttributeValues: {
':id': '1',
':name': 'Thing One',
':description': 'This is thing one',
':discriminator': 'thing',
':ttl': 1549053422,
':timestamp': 1548967022000,
},
UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #ttl = :ttl, #timestamp = :timestamp',
ReturnValues: 'ALL_NEW',
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
});
expect(collected[0].fallbackUpdateRequest).to.deep.equal({
Key: {
pk: '1',
sk: 'thing',
},
ExpressionAttributeNames: {
'#fallbackUpdate': 'fallbackUpdate',
},
ExpressionAttributeValues: {
':fallbackUpdate': true,
},
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
ReturnValues: 'ALL_NEW',
});
})
.done(done);
});
});

const toUpdateRequest = (uow) => ({
Expand Down
1 change: 0 additions & 1 deletion test/unit/flavors/materializeTimestream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import sinon from 'sinon';

import {
initialize, initializeFrom,
ttl,
} from '../../../src';

import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';
Expand Down
158 changes: 157 additions & 1 deletion test/unit/flavors/update.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';
import { mockClient } from 'aws-sdk-client-mock';

import { cloneDeep } from 'lodash';
import {
initialize, initializeFrom,
} from '../../../src';
Expand All @@ -28,7 +29,10 @@ describe('flavors/update.js', () => {
let mockDdb;

beforeEach(() => {
sinon.stub(DynamoDBConnector.prototype, 'update').resolves({});
sinon.stub(DynamoDBConnector.prototype, 'update')
.onFirstCall().resolves({})
.onSecondCall()
.resolves({});
});

afterEach(() => {
Expand Down Expand Up @@ -214,6 +218,158 @@ describe('flavors/update.js', () => {
.done(done);
});

it('should optionally run fallback update request', (done) => {
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({
Responses: {
undefined: [{
pk: '2',
sk: 'thing',
discriminator: 'thing',
name: 'thing2',
}],
},
UnprocessedKeys: {},
});

sinon.stub(KmsConnector.prototype, 'generateDataKey').resolves(MOCK_GEN_DK_RESPONSE);

const events = toDynamodbRecords([
{
timestamp: 1572832690,
keys: {
pk: '1',
sk: 'thing',
},
newImage: {
pk: '1',
sk: 'thing',
discriminator: 'thing',
name: 'Thing One',
description: 'This is thing one',
otherThing: 'thing|2',
ttl: 1549053422,
timestamp: 1548967022000,
},
},
{
timestamp: 1572832690,
keys: {
pk: '1',
sk: 'other',
},
newImage: {
pk: '1',
sk: 'other',
discriminator: 'other',
name: 'Other One',
description: 'This is other one',
ttl: 1549053422,
timestamp: 1548967022000,
},
},
]);
const rulesWithFallbackUpdateRequest = cloneDeep([rules[0]]);
rulesWithFallbackUpdateRequest[0].toFallbackUpdateRequest = (uow) => ({
Key: {
pk: uow.event.raw.new.pk,
sk: uow.event.raw.new.sk,
},
...updateExpression({
fallbackUpdate: true,
}),
});

initialize({
...initializeFrom(rulesWithFallbackUpdateRequest),
}, { ...defaultOptions, AES: false })
.assemble(fromDynamodb(events), false)
.collect()
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
.tap((collected) => {
expect(collected.length).to.equal(1);
expect(collected[0].pipeline).to.equal('update1');
expect(collected[0].event.type).to.equal('thing-created');
expect(collected[0].batchGetRequest).to.deep.equal({
RequestItems: {
undefined: {
Keys: [{
pk: '2',
sk: 'thing',
}],
},
},
});
expect(collected[0].batchGetResponse).to.deep.equal({
Responses: {
undefined: [
{
pk: '2',
sk: 'thing',
discriminator: 'thing',
name: 'thing2',
},
],
},
UnprocessedKeys: {},
});
expect(collected[0].updateRequest).to.deep.equal({
Key: {
pk: '1',
sk: 'thing',
},
ExpressionAttributeNames: {
'#pk': 'pk',
'#sk': 'sk',
'#discriminator': 'discriminator',
'#name': 'name',
'#description': 'description',
'#otherThing': 'otherThing',
'#ttl': 'ttl',
'#timestamp': 'timestamp',
},
ExpressionAttributeValues: {
':pk': '1',
':sk': 'thing',
':discriminator': 'thing',
':name': 'Thing One',
':description': 'This is thing one',
':ttl': 1549053422,
':timestamp': 1548967022000,
':otherThing': {
pk: '2',
sk: 'thing',
discriminator: 'thing',
name: 'thing2',
},
},
UpdateExpression: 'SET #pk = :pk, #sk = :sk, #discriminator = :discriminator, #name = :name, #description = :description, #otherThing = :otherThing, #ttl = :ttl, #timestamp = :timestamp',
ReturnValues: 'ALL_NEW',
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
});

expect(collected[0].fallbackUpdateRequest).to.deep.equal({
Key: {
pk: '1',
sk: 'thing',
},
ExpressionAttributeNames: {
'#fallbackUpdate': 'fallbackUpdate',
},
ExpressionAttributeValues: {
':fallbackUpdate': true,
},
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
ReturnValues: 'ALL_NEW',
});

expect(collected[0].updateResponse).to.deep.equal({});
expect(collected[0].queryRequest).to.be.undefined;
expect(collected[0].getRequest).to.be.undefined;
})
.done(done);
});

it('should fault on error', (done) => {
sinon.stub(DynamoDBConnector.prototype, 'query').resolves([]);
sinon.stub(DynamoDBConnector.prototype, 'batchGet').resolves({
Expand Down
Loading