-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathmaterialize.js
More file actions
47 lines (35 loc) · 1.29 KB
/
materialize.js
File metadata and controls
47 lines (35 loc) · 1.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import {
printStartPipeline, printEndPipeline,
faulty, faultyAsyncStream, faultify,
splitObject, compact,
} from '../utils';
import {
updateDynamoDB,
} from '../sinks/dynamodb';
import {
filterOnEventType, filterOnContent,
outSourceIsSelf,
} from '../filters';
export const materialize = (rule) => (s) => s // eslint-disable-line import/prefer-default-export
.filter(outSourceIsSelf)
.filter(onEventType(rule))
.tap(printStartPipeline)
.filter(onContent(rule))
.through(compact(rule))
.through(splitObject(rule))
.map(toUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
.map(toFallbackUpdateRequest(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
.through(updateDynamoDB(rule))
.tap(printEndPipeline);
const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));
const toUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
updateRequest: await faultify(rule.toUpdateRequest)(uow, rule),
}));
const toFallbackUpdateRequest = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
fallbackUpdateRequest: rule.toFallbackUpdateRequest ? await faultify(rule.toFallbackUpdateRequest)(uow, rule) : undefined,
}));