Skip to content

Commit 46ba2e6

Browse files
committed
feat: add WebSocket support via API Gateway Management API
- Add WebSocketConnector wrapping PostToConnectionCommand with retry and timeout support - Add publishToConnections sink with silent 410 (stale connection) handling - Add broadcastToWebSocket flavor: filter by event type → format message → fan out to connections - Add @aws-sdk/client-apigatewaymanagementapi dev dependency - Unit tests - Document WebSocket support in README
1 parent a09ac14 commit 46ba2e6

12 files changed

Lines changed: 5577 additions & 3216 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,12 @@ These features are intended for implementing intra-service logic. They are frequ
462462
* `Connector` - connector for the SQS SDK
463463
* `sendToSqs` - stream steps to send a message to a queue
464464

465+
## WebSocket Support
466+
These features are intended for broadcasting events to connected WebSocket clients via API Gateway WebSocket API. Useful for lightweight real-time notifications (e.g. job completion, status updates) as an alternative to polling or SQS-based live data.
467+
* `Connector` - connector for the API Gateway Management API SDK (`PostToConnectionCommand`)
468+
* `publishToConnections` - stream steps to post a message to connected WebSocket clients
469+
* `broadcastToWebSocket` - a flavor that composes the full pipeline: filter on event type → format message via `rule.toMessage` → resolve target connections via `rule.toConnections` → fan out and publish to each connection
470+
465471
## Encryption Support
466472
* https://github.com/jgilbert01/aws-lambda-stream/issues/20
467473

package-lock.json

Lines changed: 5125 additions & 3215 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "aws-lambda-stream",
3-
"version": "1.1.20",
3+
"version": "1.1.21",
44
"description": "Create stream processors with AWS Lambda functions.",
55
"keywords": [
66
"aws",
@@ -64,6 +64,7 @@
6464
"@aws-sdk/client-kinesis": "^3.450.0",
6565
"@aws-sdk/client-kms": "^3.450.0",
6666
"@aws-sdk/client-lambda": "^3.450.0",
67+
"@aws-sdk/client-apigatewaymanagementapi": "^3.450.0",
6768
"@aws-sdk/client-s3": "^3.450.0",
6869
"@aws-sdk/client-secrets-manager": "^3.450.0",
6970
"@aws-sdk/client-scheduler": "^3.450.0",

src/connectors/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ export { default as SecretsMgrConnector } from './secretsmgr';
88
export { default as S3Connector } from './s3';
99
export { default as SnsConnector } from './sns';
1010
export { default as SqsConnector } from './sqs';
11+
export { default as WebSocketConnector } from './websocket';

src/connectors/websocket.js

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */
2+
3+
import Promise from 'bluebird';
4+
5+
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
6+
import { NodeHttpHandler } from '@smithy/node-http-handler';
7+
import { ConfiguredRetryStrategy } from '@smithy/util-retry';
8+
import { omit, pick } from 'lodash';
9+
import { defaultBackoffDelay } from '../utils/retry';
10+
import { defaultDebugLogger } from '../utils/log';
11+
12+
class Connector {
13+
constructor({
14+
debug,
15+
pipelineId,
16+
endpoint = process.env.WEBSOCKET_ENDPOINT,
17+
timeout = Number(process.env.WS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
18+
additionalClientOpts = {},
19+
...opt
20+
}) {
21+
this.debug = (msg) => debug('%j', msg);
22+
this.endpoint = endpoint || 'undefined';
23+
this.client = Connector.getClient(pipelineId, debug, timeout, endpoint, additionalClientOpts);
24+
this.opt = opt;
25+
}
26+
27+
static clients = {};
28+
29+
static getClient(pipelineId, debug, timeout, endpoint, additionalClientOpts) {
30+
const addlRequestHandlerOpts = pick(additionalClientOpts, ['requestHandler']);
31+
const addlClientOpts = omit(additionalClientOpts, ['requestHandler']);
32+
33+
if (!this.clients[pipelineId]) {
34+
this.clients[pipelineId] = new ApiGatewayManagementApiClient({
35+
endpoint,
36+
requestHandler: new NodeHttpHandler({
37+
requestTimeout: timeout,
38+
connectionTimeout: timeout,
39+
...addlRequestHandlerOpts,
40+
}),
41+
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
42+
logger: defaultDebugLogger(debug),
43+
...addlClientOpts,
44+
});
45+
}
46+
return this.clients[pipelineId];
47+
}
48+
49+
postToConnection(connectionId, data, ctx) {
50+
const params = {
51+
ConnectionId: connectionId,
52+
Data: typeof data === 'string' ? data : JSON.stringify(data),
53+
};
54+
55+
return this._sendCommand(new PostToConnectionCommand(params), ctx);
56+
}
57+
58+
_sendCommand(command, ctx) {
59+
this.opt.metrics?.capture(this.client, command, 'ws', this.opt, ctx);
60+
return Promise.resolve(this.client.send(command))
61+
.tap(this.debug)
62+
.tapCatch(this.debug);
63+
}
64+
}
65+
66+
export default Connector;

src/flavors/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ export * from './materialize';
99
export * from './materializeS3';
1010
export * from './sendMessages';
1111
export * from './update';
12+
export * from './websocket';

src/flavors/websocket.js

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import _ from 'highland';
2+
3+
import {
4+
printStartPipeline, printEndPipeline,
5+
faulty, faultyAsyncStream, faultify,
6+
} from '../utils';
7+
8+
import { publishToConnections } from '../sinks/websocket';
9+
import { filterOnEventType, filterOnContent } from '../filters';
10+
11+
export const broadcastToWebSocket = (rule) => (s) => s // eslint-disable-line import/prefer-default-export
12+
.filter(onEventType(rule))
13+
.tap(printStartPipeline)
14+
15+
.filter(onContent(rule))
16+
17+
.map(toMessage(rule))
18+
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)
19+
20+
.flatMap(toConnections(rule))
21+
22+
.through(publishToConnections(rule))
23+
24+
.tap(printEndPipeline);
25+
26+
const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
27+
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));
28+
29+
const toMessage = (rule) => faultyAsyncStream(async (uow) => ({
30+
...uow,
31+
message: await faultify(rule.toMessage)(uow, rule),
32+
}));
33+
34+
// rule.toConnections returns a promise resolving to an array of { connectionId }
35+
// flatMap fans out the uow into one per connection
36+
const toConnections = (rule) => (uow) => {
37+
if (!rule.toConnections) return _([uow]);
38+
const p = faultify(rule.toConnections)(uow, rule)
39+
.then((connections) => connections.map((conn) => ({
40+
...uow,
41+
connectionId: conn.connectionId,
42+
})));
43+
return _(p).flatten();
44+
};

src/sinks/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export * from './eventbridge';
77
export * from './s3';
88
export * from './sns';
99
export * from './sqs';
10+
export * from './websocket';

src/sinks/websocket.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import _ from 'highland';
2+
3+
import Connector from '../connectors/websocket';
4+
5+
import { rejectWithFault } from '../utils/faults';
6+
import { debug as d } from '../utils/print';
7+
import { ratelimit } from '../utils/ratelimit';
8+
9+
export const publishToConnections = ({ // eslint-disable-line import/prefer-default-export
10+
id: pipelineId,
11+
debug = d('ws'),
12+
endpoint = process.env.WEBSOCKET_ENDPOINT,
13+
messageField = 'message',
14+
parallel = Number(process.env.WS_PARALLEL) || Number(process.env.PARALLEL) || 8,
15+
step = 'postToConnection',
16+
...opt
17+
} = {}) => {
18+
const connector = new Connector({
19+
pipelineId, debug, endpoint, ...opt,
20+
});
21+
22+
const post = (uow) => {
23+
const p = () => connector.postToConnection(uow.connectionId, uow[messageField], uow)
24+
.then((postResponse) => ({ ...uow, postResponse }))
25+
.catch((err) => {
26+
// 410 = connection is gone, clean up silently
27+
if (err.statusCode === 410 || err.$metadata?.httpStatusCode === 410) {
28+
return { ...uow, postResponse: { statusCode: 410, connectionId: uow.connectionId } };
29+
}
30+
return rejectWithFault(uow)(err);
31+
});
32+
33+
return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
34+
};
35+
36+
return (s) => s
37+
.through(ratelimit(opt))
38+
.map(post)
39+
.parallel(parallel);
40+
};
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
import { mockClient } from 'aws-sdk-client-mock';
5+
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
6+
7+
import Connector from '../../../src/connectors/websocket';
8+
9+
import { debug } from '../../../src/utils';
10+
11+
describe('connectors/websocket.js', () => {
12+
let mockWs = mockClient(ApiGatewayManagementApiClient);
13+
14+
beforeEach(() => {
15+
mockWs = mockClient(ApiGatewayManagementApiClient);
16+
});
17+
18+
afterEach(() => {
19+
mockWs.restore();
20+
});
21+
22+
it('should reuse client per pipeline', () => {
23+
const client1 = Connector.getClient('ws-test1', debug('test'), 1000, 'https://test');
24+
const client2 = Connector.getClient('ws-test1', debug('test'), 1000, 'https://test');
25+
const client3 = Connector.getClient('ws-test2', debug('test'), 1000, 'https://test');
26+
27+
expect(client1).to.eq(client2);
28+
expect(client2).to.not.eq(client3);
29+
});
30+
31+
it('should post to connection with object data', async () => {
32+
const spy = sinon.spy((_) => ({}));
33+
mockWs.on(PostToConnectionCommand).callsFake(spy);
34+
35+
const data = await new Connector({
36+
debug: debug('ws'),
37+
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
38+
}).postToConnection('conn-1', { type: 'thing-updated', data: { id: '1' } });
39+
40+
expect(spy).to.have.been.calledWith({
41+
ConnectionId: 'conn-1',
42+
Data: JSON.stringify({ type: 'thing-updated', data: { id: '1' } }),
43+
});
44+
expect(data).to.deep.equal({});
45+
});
46+
47+
it('should post to connection with string data', async () => {
48+
const spy = sinon.spy((_) => ({}));
49+
mockWs.on(PostToConnectionCommand).callsFake(spy);
50+
51+
const data = await new Connector({
52+
debug: debug('ws'),
53+
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
54+
}).postToConnection('conn-1', 'raw string');
55+
56+
expect(spy).to.have.been.calledWith({
57+
ConnectionId: 'conn-1',
58+
Data: 'raw string',
59+
});
60+
expect(data).to.deep.equal({});
61+
});
62+
});

0 commit comments

Comments
 (0)