Skip to content

Commit 9cc83a6

Browse files
committed
Update materialize flavor.
1 parent 01a928e commit 9cc83a6

3 files changed

Lines changed: 118 additions & 16 deletions

File tree

src/flavors/materialize.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ export const materialize = (rule) => (s) => s // eslint-disable-line import/pref
2626
.map(toUpdateRequest(rule))
2727
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
2828

29+
.map(toFallbackUpdateRequest(rule))
30+
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
31+
2932
.through(updateDynamoDB(rule))
3033

3134
.tap(printEndPipeline);
@@ -37,3 +40,8 @@ const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
3740
...uow,
3841
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
3942
}));
43+
44+
const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
45+
...uow,
46+
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
47+
}));

test/unit/flavors/materialize.test.js

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'mocha';
22
import { expect } from 'chai';
33
import sinon from 'sinon';
44

5+
import { cloneDeep } from 'lodash';
56
import {
67
initialize, initializeFrom,
78
ttl,
@@ -92,6 +93,99 @@ describe('flavors/materialize.js', () => {
9293
})
9394
.done(done);
9495
});
96+
97+
it('should optionally call the fallback update request', (done) => {
98+
const events = toKinesisRecords([
99+
{
100+
type: 'm1',
101+
timestamp: 1548967022000,
102+
thing: {
103+
id: '1',
104+
name: 'Thing One',
105+
description: 'This is thing one',
106+
},
107+
},
108+
{
109+
type: 'split',
110+
timestamp: 1548967022000,
111+
root: {
112+
things: [{
113+
id: '2',
114+
name: 'Thing One',
115+
description: 'This is thing one',
116+
}, {
117+
id: '3',
118+
name: 'Thing One',
119+
description: 'This is thing one',
120+
}],
121+
},
122+
},
123+
]);
124+
125+
const ruleWithFallbackUpdateRequest = cloneDeep(rules[0]);
126+
ruleWithFallbackUpdateRequest.toFallbackUpdateRequest = (uow) => ({
127+
Key: {
128+
pk: uow.event.thing.id,
129+
sk: 'thing',
130+
},
131+
...updateExpression({
132+
fallbackUpdate: true,
133+
}),
134+
});
135+
136+
initialize({
137+
...initializeFrom([ruleWithFallbackUpdateRequest]),
138+
})
139+
.assemble(fromKinesis(events), false)
140+
.collect()
141+
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
142+
.tap((collected) => {
143+
expect(collected.length).to.equal(1);
144+
expect(collected[0].pipeline).to.equal('mv1');
145+
expect(collected[0].event.type).to.equal('m1');
146+
expect(collected[0].updateRequest).to.deep.equal({
147+
Key: {
148+
pk: '1',
149+
sk: 'thing',
150+
},
151+
ExpressionAttributeNames: {
152+
'#id': 'id',
153+
'#name': 'name',
154+
'#description': 'description',
155+
156+
'#discriminator': 'discriminator',
157+
'#ttl': 'ttl',
158+
'#timestamp': 'timestamp',
159+
},
160+
ExpressionAttributeValues: {
161+
':id': '1',
162+
':name': 'Thing One',
163+
':description': 'This is thing one',
164+
':discriminator': 'thing',
165+
':ttl': 1549053422,
166+
':timestamp': 1548967022000,
167+
},
168+
UpdateExpression: 'SET #id = :id, #name = :name, #description = :description, #discriminator = :discriminator, #ttl = :ttl, #timestamp = :timestamp',
169+
ReturnValues: 'ALL_NEW',
170+
ConditionExpression: 'attribute_not_exists(#timestamp) OR #timestamp < :timestamp',
171+
});
172+
expect(collected[0].fallbackUpdateRequest).to.deep.equal({
173+
Key: {
174+
pk: '1',
175+
sk: 'thing',
176+
},
177+
ExpressionAttributeNames: {
178+
'#fallbackUpdate': 'fallbackUpdate',
179+
},
180+
ExpressionAttributeValues: {
181+
':fallbackUpdate': true,
182+
},
183+
UpdateExpression: 'SET #fallbackUpdate = :fallbackUpdate',
184+
ReturnValues: 'ALL_NEW',
185+
});
186+
})
187+
.done(done);
188+
});
95189
});
96190

97191
const toUpdateRequest = (uow) => ({

test/unit/metrics/index.test.js

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -187,17 +187,17 @@ describe('metrics/index.js', () => {
187187
count: 3,
188188
},
189189
'p1|save|stream.pipeline.io.wait.time': {
190-
average: 20,
191-
min: 14,
192-
max: 26,
193-
sum: 60,
190+
average: 22,
191+
min: 16,
192+
max: 28,
193+
sum: 66,
194194
count: 3,
195195
},
196196
'p1|save|stream.pipeline.io.time': {
197-
average: 8,
198-
min: 8,
199-
max: 8,
200-
sum: 24,
197+
average: 6,
198+
min: 6,
199+
max: 6,
200+
sum: 18,
201201
count: 3,
202202
},
203203
'p1|stream.pipeline.time': {
@@ -229,17 +229,17 @@ describe('metrics/index.js', () => {
229229
count: 1,
230230
},
231231
'p2|get|stream.pipeline.io.wait.time': {
232-
average: 8,
233-
min: 8,
234-
max: 8,
235-
sum: 8,
232+
average: 2,
233+
min: 2,
234+
max: 2,
235+
sum: 2,
236236
count: 1,
237237
},
238238
'p2|get|stream.pipeline.io.time': {
239-
average: 14,
240-
min: 14,
241-
max: 14,
242-
sum: 14,
239+
average: 20,
240+
min: 20,
241+
max: 20,
242+
sum: 20,
243243
count: 1,
244244
},
245245
'p2|publish|stream.pipeline.io.wait.time': {

0 commit comments

Comments
 (0)