Skip to content

Commit f09547c

Browse files
task: Prohibit EB batch size to be greater than 10
1 parent a09ac14 commit f09547c

3 files changed

Lines changed: 36 additions & 2 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.20",
3+
"version": "1.1.21",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",

src/sinks/eventbridge.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa
2828
step = 'publish',
2929
...opt
3030
} = {}) => {
31+
// EB Batchsize can not exceed 10 - since BATCH_SIZE is used in many places, we should reduce it to 10 here.
32+
if (batchSize > 10) {
33+
batchSize = 10;
34+
}
3135
if (endpointId) import('@aws-sdk/signature-v4-crt');
3236

3337
const connector = new Connector({

test/unit/sinks/eventbridge.test.js

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import 'mocha';
22
import { expect } from 'chai';
33
import sinon from 'sinon';
44
import _ from 'highland';
5-
5+
import { v4 } from 'uuid';
66
import { publishToEventBridge as publish } from '../../../src/sinks/eventbridge';
77

88
import Connector from '../../../src/connectors/eventbridge';
@@ -89,6 +89,36 @@ describe('sinks/eventbridge.js', () => {
8989
.done(done);
9090
});
9191

92+
it('should batch and publish, multiple', (done) => {
93+
process.env.BATCH_SIZE = 100;
94+
sinon.stub(Connector.prototype, 'putEvents').resolves({ FailedEntryCount: 0 });
95+
96+
const uows = [];
97+
for (let i = 1; i <= 15; i += 1) {
98+
const id = v4();
99+
uows.push({
100+
event: {
101+
id,
102+
type: `p${i}`,
103+
partitionKey: id,
104+
},
105+
});
106+
}
107+
108+
_(uows)
109+
.through(publish({ busName: 'b1', debug: (msg, v) => console.log(msg, v), metricsEnabled: true }))
110+
.collect()
111+
.tap((collected) => {
112+
console.log('C', JSON.stringify(collected, null, 2));
113+
114+
expect(collected.length).to.equal(15);
115+
collected.forEach((c) => {
116+
expect(c.publishRequest.Entries.length < 11).to.be.true;
117+
});
118+
})
119+
.done(done);
120+
});
121+
92122
it('should not publish', (done) => {
93123
const uows = [{
94124
}];

0 commit comments

Comments
 (0)