Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.1.8",
"version": "1.1.9",
Comment thread
petermyers marked this conversation as resolved.
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down
10 changes: 8 additions & 2 deletions src/sinks/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import _ from 'highland';

import Connector from '../connectors/sqs';

import { toBatchUow, unBatchUow } from '../utils/batch';
import { batchWithPayloadSizeOrCount, toBatchUow, unBatchUow } from '../utils/batch';
import { ratelimit } from '../utils/ratelimit';
import { rejectWithFault } from '../utils/faults';
import { debug as d } from '../utils/print';
Expand All @@ -13,6 +13,7 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
queueUrl = process.env.QUEUE_URL,
messageField = 'message',
batchSize = Number(process.env.SQS_BATCH_SIZE) || Number(process.env.BATCH_SIZE) || 10,
maxPayloadSize = Number(process.env.SQS_MAX_PAYLOAD_SIZE) || Number(process.env.MAX_PAYLOAD_SIZE) || 256 * 1024,
Comment thread
petermyers marked this conversation as resolved.
Outdated
parallel = Number(process.env.SQS_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'send',
...opt
Expand Down Expand Up @@ -46,7 +47,12 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export
return (s) => s
.through(ratelimit(opt))

.batch(batchSize)
.consume(batchWithPayloadSizeOrCount({
batchSize,
maxPayloadSize,
payloadField: messageField,
...opt,
}))
.map(toBatchUow)

.map(toInputParams)
Expand Down
59 changes: 59 additions & 0 deletions src/utils/batch.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'highland';
import isFunction from 'lodash/isFunction';
import { get } from 'lodash';
import { toClaimcheckEvent, toPutClaimcheckRequest } from '../sinks/claimcheck';

// used after highland batch step
Expand Down Expand Up @@ -48,6 +49,9 @@ export const compact = (rule) => {
})));
};

/**
* Batch EB request entries by size to avoid writing a batch that's too large to EB.
*/
export const batchWithSize = ({
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
putClaimcheckRequest = 'putClaimcheckRequest',
Expand Down Expand Up @@ -116,3 +120,58 @@ const logMetrics = (batch, sizes, opt) => {
batch[0].metrics?.gauge('publish|stream.pipeline.eventSize.bytes', sizes);
}
};

/**
* Batches by aggregate payload size with a cap on payload count.
*/
export const batchWithPayloadSizeOrCount = ({
batchSize,
maxPayloadSize,
payloadField,
...opt
}) => {
let batched = [];
let sizes = [];

return (err, x, push, next) => {
/* istanbul ignore if */
if (err) {
push(err);
next();
} else if (x === _.nil) {
if (batched.length > 0) {
logMetrics(batched, sizes, opt);
push(null, batched);
}

push(null, _.nil);
} else {
if (!get(x, payloadField)) {
push(null, [x]);
} else {
const size = Buffer.byteLength(JSON.stringify(get(x, payloadField)));
if (size > maxPayloadSize) {
logMetrics([x], [size], opt);
const error = new Error(`Payload size: ${size}, exceeded max: ${maxPayloadSize}`);
error.uow = x;
push(error);
next();
return;
}

const totalSize = sizes.reduce((a, c) => a + c, size);
if (totalSize <= maxPayloadSize && batched.length + 1 <= batchSize) {
batched.push(x);
sizes.push(size);
} else {
logMetrics(batched, sizes, opt);
push(null, batched);
batched = [x];
sizes = [size];
}
}

next();
}
};
};
135 changes: 134 additions & 1 deletion test/unit/sinks/sqs.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ describe('sinks/sqs.js', () => {
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
},
}, {
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
}];

_(uows)
Expand All @@ -26,7 +31,68 @@ describe('sinks/sqs.js', () => {
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(collected.length).to.equal(1);
expect(collected.length).to.equal(2);
expect(collected[0]).to.deep.equal({
message: {
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
},
inputParams: {
Entries: [{
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
}, {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
}],
},
sendMessageBatchResponse: {},
});

expect(collected[1]).to.deep.equal({
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
inputParams: {
Entries: [{
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
}, {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
}],
},
sendMessageBatchResponse: {},
});
})
.done(done);
});

it('should split a batch due to batch size', (done) => {
sinon.stub(Connector.prototype, 'sendMessageBatch').resolves({});

const uows = [{
message: {
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
},
}, {
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
}];

_(uows)
.through(sendToSqs({
batchSize: 1,
}))
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(collected.length).to.equal(2);
expect(collected[0]).to.deep.equal({
message: {
Id: '1',
Expand All @@ -40,6 +106,73 @@ describe('sinks/sqs.js', () => {
},
sendMessageBatchResponse: {},
});
expect(collected[1]).to.deep.equal({
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
inputParams: {
Entries: [{
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
}],
},
sendMessageBatchResponse: {},
});
})
.done(done);
});

it('should split a batch due to payload size', (done) => {
sinon.stub(Connector.prototype, 'sendMessageBatch').resolves({});

const uows = [{
message: {
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
},
}, {
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
}];

_(uows)
.through(sendToSqs({
maxPayloadSize: 50,
}))
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(collected.length).to.equal(2);
expect(collected[0]).to.deep.equal({
message: {
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
},
inputParams: {
Entries: [{
Id: '1',
MessageBody: JSON.stringify({ f1: 'v1' }),
}],
},
sendMessageBatchResponse: {},
});
expect(collected[1]).to.deep.equal({
message: {
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
},
inputParams: {
Entries: [{
Id: '2',
MessageBody: JSON.stringify({ f1: 'v2' }),
}],
},
sendMessageBatchResponse: {},
});
})
.done(done);
});
Expand Down
Loading