Skip to content

Commit 2bc6d95

Browse files
committed
Add batching util for SQS sink.
1 parent 17ead99 commit 2bc6d95

5 files changed

Lines changed: 418 additions & 4 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.2",
3+
"version": "1.1.3",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/sinks/sqs.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import _ from 'highland';
22

33
import Connector from '../connectors/sqs';
44

5-
import { toBatchUow, unBatchUow } from '../utils/batch';
5+
import { batchWithPayloadSizeOrCount, toBatchUow, unBatchUow } from '../utils/batch';
66
import { ratelimit } from '../utils/ratelimit';
77
import { rejectWithFault } from '../utils/faults';
88
import { debug as d } from '../utils/print';
@@ -13,6 +13,7 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
1313
queueUrl = process.env.QUEUE_URL,
1414
messageField = 'message',
1515
batchSize = Number(process.env.SQS_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10,
16+
maxPayloadSize = Number(process.env.SQS_MAX_PAYLOAD_SIZE) || Number(process.env.MAX_PAYLOAD_SIZE) || 256 * 1024,
1617
parallel = Number(process.env.SQS_PARALLEL) || Number(process.env.PARALLEL) || 8,
1718
step = 'send',
1819
...opt
@@ -46,7 +47,12 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
4647
return (s) => s
4748
.through(ratelimit(opt))
4849

49-
.batch(batchSize)
50+
.consume(batchWithPayloadSizeOrCount({
51+
batchSize,
52+
maxPayloadSize,
53+
payloadField: messageField,
54+
...opt,
55+
}))
5056
.map(toBatchUow)
5157

5258
.map(toInputParams)

src/utils/batch.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import _ from 'highland';
22
import isFunction from 'lodash/isFunction';
3+
import { get } from 'lodash';
34
import { toClaimcheckEvent, toPutClaimcheckRequest } from '../sinks/claimcheck';
45

56
// used after highland batch step
@@ -48,6 +49,10 @@ export const compact = (rule) => {
4849
})));
4950
};
5051

52+
/**
53+
* Batch EB request entries by size to avoid writing a batch that's too large to
54+
* EB.
55+
*/
5156
export const batchWithSize = ({
5257
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
5358
putClaimcheckRequest = 'putClaimcheckRequest',
@@ -116,3 +121,58 @@ const logMetrics = (batch, sizes, opt) => {
116121
batch[0].metrics?.gauge('publish|stream.pipeline.eventSize.bytes', sizes);
117122
}
118123
};
124+
125+
/**
126+
* Batches by aggregate payload size with a cap on payload count.
127+
*/
128+
export const batchWithPayloadSizeOrCount = ({
129+
batchSize,
130+
maxPayloadSize,
131+
payloadField,
132+
...opt
133+
}) => {
134+
let batched = [];
135+
let sizes = [];
136+
137+
return (err, x, push, next) => {
138+
/* istanbul ignore if */
139+
if (err) {
140+
push(err);
141+
next();
142+
} else if (x === _.nil) {
143+
if (batched.length > 0) {
144+
logMetrics(batched, sizes, opt);
145+
push(null, batched);
146+
}
147+
148+
push(null, _.nil);
149+
} else {
150+
if (!get(x, payloadField)) {
151+
push(null, [x]);
152+
} else {
153+
const size = Buffer.byteLength(JSON.stringify(get(x, payloadField)));
154+
if (size > maxPayloadSize) {
155+
logMetrics([x], [size], opt);
156+
const error = new Error(`Payload size: ${size}, exceeded max: ${maxPayloadSize}`);
157+
error.uow = x;
158+
push(error);
159+
next();
160+
return;
161+
}
162+
163+
const totalSize = sizes.reduce((a, c) => a + c, size);
164+
if (totalSize <= maxPayloadSize && batched.length + 1 <= batchSize) {
165+
batched.push(x);
166+
sizes.push(size);
167+
} else {
168+
logMetrics(batched, sizes, opt);
169+
push(null, batched);
170+
batched = [x];
171+
sizes = [size];
172+
}
173+
}
174+
175+
next();
176+
}
177+
};
178+
};

test/unit/sinks/sqs.test.js

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ describe('sinks/sqs.js', () => {
1818
Id: '1',
1919
MessageBody: JSON.stringify({ f1: 'v1' }),
2020
},
21+
}, {
22+
message: {
23+
Id: '2',
24+
MessageBody: JSON.stringify({ f1: 'v2' }),
25+
},
2126
}];
2227

2328
_(uows)
@@ -26,7 +31,68 @@ describe('sinks/sqs.js', () => {
2631
.tap((collected) => {
2732
// console.log(JSON.stringify(collected, null, 2));
2833

29-
expect(collected.length).to.equal(1);
34+
expect(collected.length).to.equal(2);
35+
expect(collected[0]).to.deep.equal({
36+
message: {
37+
Id: '1',
38+
MessageBody: JSON.stringify({ f1: 'v1' }),
39+
},
40+
inputParams: {
41+
Entries: [{
42+
Id: '1',
43+
MessageBody: JSON.stringify({ f1: 'v1' }),
44+
}, {
45+
Id: '2',
46+
MessageBody: JSON.stringify({ f1: 'v2' }),
47+
}],
48+
},
49+
sendMessageBatchResponse: {},
50+
});
51+
52+
expect(collected[1]).to.deep.equal({
53+
message: {
54+
Id: '2',
55+
MessageBody: JSON.stringify({ f1: 'v2' }),
56+
},
57+
inputParams: {
58+
Entries: [{
59+
Id: '1',
60+
MessageBody: JSON.stringify({ f1: 'v1' }),
61+
}, {
62+
Id: '2',
63+
MessageBody: JSON.stringify({ f1: 'v2' }),
64+
}],
65+
},
66+
sendMessageBatchResponse: {},
67+
});
68+
})
69+
.done(done);
70+
});
71+
72+
it('should split a batch due to batch size', (done) => {
73+
sinon.stub(Connector.prototype, 'sendMessageBatch').resolves({});
74+
75+
const uows = [{
76+
message: {
77+
Id: '1',
78+
MessageBody: JSON.stringify({ f1: 'v1' }),
79+
},
80+
}, {
81+
message: {
82+
Id: '2',
83+
MessageBody: JSON.stringify({ f1: 'v2' }),
84+
},
85+
}];
86+
87+
_(uows)
88+
.through(sendToSqs({
89+
batchSize: 1,
90+
}))
91+
.collect()
92+
.tap((collected) => {
93+
// console.log(JSON.stringify(collected, null, 2));
94+
95+
expect(collected.length).to.equal(2);
3096
expect(collected[0]).to.deep.equal({
3197
message: {
3298
Id: '1',
@@ -40,6 +106,73 @@ describe('sinks/sqs.js', () => {
40106
},
41107
sendMessageBatchResponse: {},
42108
});
109+
expect(collected[1]).to.deep.equal({
110+
message: {
111+
Id: '2',
112+
MessageBody: JSON.stringify({ f1: 'v2' }),
113+
},
114+
inputParams: {
115+
Entries: [{
116+
Id: '2',
117+
MessageBody: JSON.stringify({ f1: 'v2' }),
118+
}],
119+
},
120+
sendMessageBatchResponse: {},
121+
});
122+
})
123+
.done(done);
124+
});
125+
126+
it('should split a batch due to payload size', (done) => {
127+
sinon.stub(Connector.prototype, 'sendMessageBatch').resolves({});
128+
129+
const uows = [{
130+
message: {
131+
Id: '1',
132+
MessageBody: JSON.stringify({ f1: 'v1' }),
133+
},
134+
}, {
135+
message: {
136+
Id: '2',
137+
MessageBody: JSON.stringify({ f1: 'v2' }),
138+
},
139+
}];
140+
141+
_(uows)
142+
.through(sendToSqs({
143+
maxPayloadSize: 50,
144+
}))
145+
.collect()
146+
.tap((collected) => {
147+
// console.log(JSON.stringify(collected, null, 2));
148+
149+
expect(collected.length).to.equal(2);
150+
expect(collected[0]).to.deep.equal({
151+
message: {
152+
Id: '1',
153+
MessageBody: JSON.stringify({ f1: 'v1' }),
154+
},
155+
inputParams: {
156+
Entries: [{
157+
Id: '1',
158+
MessageBody: JSON.stringify({ f1: 'v1' }),
159+
}],
160+
},
161+
sendMessageBatchResponse: {},
162+
});
163+
expect(collected[1]).to.deep.equal({
164+
message: {
165+
Id: '2',
166+
MessageBody: JSON.stringify({ f1: 'v2' }),
167+
},
168+
inputParams: {
169+
Entries: [{
170+
Id: '2',
171+
MessageBody: JSON.stringify({ f1: 'v2' }),
172+
}],
173+
},
174+
sendMessageBatchResponse: {},
175+
});
43176
})
44177
.done(done);
45178
});

0 commit comments

Comments
 (0)