-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathsqs.js
More file actions
63 lines (52 loc) · 1.88 KB
/
sqs.js
File metadata and controls
63 lines (52 loc) · 1.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import _ from 'highland';
import Connector from '../connectors/sqs';
import { batchWithPayloadSizeOrCount, toBatchUow, unBatchUow } from '../utils/batch';
import { ratelimit } from '../utils/ratelimit';
import { rejectWithFault } from '../utils/faults';
import { debug as d } from '../utils/print';
export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
debug = d('sqs'),
queueUrl = process.env.QUEUE_URL,
messageField = 'message',
batchSize = Number(process.env.SQS_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10,
maxPayloadSize = Number(process.env.SQS_MAX_PAYLOAD_SIZE) || Number(process.env.MAX_PAYLOAD_SIZE) || 256 * 1024,
parallel = Number(process.env.SQS_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'send',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, queueUrl, ...opt,
});
const toInputParams = (batchUow) => ({
...batchUow,
inputParams: {
Entries: batchUow.batch
.filter((uow) => uow[messageField])
.map((uow) => uow[messageField]),
},
});
const sendMessageBatch = (batchUow) => {
/* istanbul ignore next */
if (!batchUow.inputParams.Entries.length) {
return _(Promise.resolve(batchUow));
}
const p = () => connector.sendMessageBatch(batchUow.inputParams, batchUow)
.then((sendMessageBatchResponse) => ({ ...batchUow, sendMessageBatchResponse }))
.catch(rejectWithFault(batchUow));
return _(batchUow.batch[0].metrics?.w(p, step) || p()); // wrap promise in a stream
};
return (s) => s
.through(ratelimit(opt))
.consume(batchWithPayloadSizeOrCount({
batchSize,
maxPayloadSize,
payloadField: messageField,
...opt,
}))
.map(toBatchUow)
.map(toInputParams)
.map(sendMessageBatch)
.parallel(parallel)
.flatMap(unBatchUow); // for cleaner logging and testing
};