Skip to content

Commit b733a01

Browse files
committed
feat: expand API Gateway Management connector and add disconnect/query support
- Rename connector websocket.js → apigatewayclient.js to match SDK client naming convention - Add GetConnectionCommand and DeleteConnectionCommand to apigatewayclient connector - Add disconnectConnections sink (with silent 410 handling) to sinks/websocket.js - Add queryConnection step in new queries/websocket.js - Add guard clauses to skip uow missing connectionId or messageField across all three steps - Add tests - Fix flavors/websocket.test.js assertion for no-connections case - Update README WebSocket Support section
1 parent 1e5115b commit b733a01

10 files changed

Lines changed: 328 additions & 14 deletions

File tree

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,6 @@ These features are intended for implementing intra-service logic.
447447
* `Connector` - connector for the S3 SDK
448448
* `putObjectToS3` - stream steps to put an object in a bucket
449449

450-
451450
## SNS Support
452451
These features are intended for implementing intra-service logic.
453452
* `fromSns` - creates a stream from an SNS topic
@@ -464,9 +463,11 @@ These features are intended for implementing intra-service logic. They are frequ
464463

465464
## WebSocket Support
466465
These features are intended for broadcasting events to connected WebSocket clients via API Gateway WebSocket API. Useful for lightweight real-time notifications (e.g. job completion, status updates) as an alternative to polling or SQS-based live data.
467-
* `Connector` - connector for the API Gateway Management API SDK (`PostToConnectionCommand`)
466+
* `Connector` - connector for the API Gateway Management API SDK (`PostToConnectionCommand`, `GetConnectionCommand`, `DeleteConnectionCommand`)
468467
* `publishToConnections` - stream steps to post a message to connected WebSocket clients
469-
* `broadcastToWebSocket` - a flavor that composes the full pipeline: filter on event type → format message via `rule.toMessage` → resolve target connections via `rule.toConnections` → fan out and publish to each connection
468+
* `disconnectConnections` - stream steps to forcibly disconnect WebSocket clients
469+
* `queryConnection` - query step to fetch connection metadata (identity, last active time) and adorn it to the `uow` as `getConnectionResponse`
470+
* `broadcastToWebSocket` - a flavor that composes the full pipeline: filter on event type, format message via `rule.toMessage`, resolve target connections via `rule.toConnections`, fan out and publish to each connection
470471

471472
## Encryption Support
472473
* https://github.com/jgilbert01/aws-lambda-stream/issues/20
Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22

33
import Promise from 'bluebird';
44

5-
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
5+
import {
6+
ApiGatewayManagementApiClient,
7+
DeleteConnectionCommand,
8+
GetConnectionCommand,
9+
PostToConnectionCommand,
10+
} from '@aws-sdk/client-apigatewaymanagementapi';
611
import { NodeHttpHandler } from '@smithy/node-http-handler';
712
import { ConfiguredRetryStrategy } from '@smithy/util-retry';
813
import { omit, pick } from 'lodash';
@@ -55,6 +60,14 @@ class Connector {
5560
return this._sendCommand(new PostToConnectionCommand(params), ctx);
5661
}
5762

63+
getConnection(connectionId, ctx) {
64+
return this._sendCommand(new GetConnectionCommand({ ConnectionId: connectionId }), ctx);
65+
}
66+
67+
deleteConnection(connectionId, ctx) {
68+
return this._sendCommand(new DeleteConnectionCommand({ ConnectionId: connectionId }), ctx);
69+
}
70+
5871
_sendCommand(command, ctx) {
5972
this.opt.metrics?.capture(this.client, command, 'ws', this.opt, ctx);
6073
return Promise.resolve(this.client.send(command))

src/connectors/index.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ export { default as SecretsMgrConnector } from './secretsmgr';
88
export { default as S3Connector } from './s3';
99
export { default as SnsConnector } from './sns';
1010
export { default as SqsConnector } from './sqs';
11-
export { default as WebSocketConnector } from './websocket';
11+
export { default as ApiGatewayClientConnector } from './apigatewayclient';

src/queries/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ export * from './claimcheck';
22
export * from './dynamodb';
33
export * from './secretsmgr';
44
export * from './s3';
5+
export * from './websocket';

src/queries/websocket.js

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import _ from 'highland';
2+
3+
import Connector from '../connectors/apigatewayclient';
4+
5+
import { rejectWithFault } from '../utils/faults';
6+
import { debug as d } from '../utils/print';
7+
import { ratelimit } from '../utils/ratelimit';
8+
9+
export const queryConnection = ({ // eslint-disable-line import/prefer-default-export
10+
id: pipelineId,
11+
debug = d('ws'),
12+
endpoint = process.env.WEBSOCKET_ENDPOINT,
13+
getConnectionResponseField = 'getConnectionResponse',
14+
parallel = Number(process.env.WS_PARALLEL) || Number(process.env.PARALLEL) || 8,
15+
step = 'getConnection',
16+
...opt
17+
} = {}) => {
18+
const connector = new Connector({
19+
pipelineId, debug, endpoint, ...opt,
20+
});
21+
22+
const get = (uow) => {
23+
if (!uow.connectionId) return _(Promise.resolve(uow));
24+
25+
const p = () => connector.getConnection(uow.connectionId, uow)
26+
.then((getConnectionResponse) => ({ ...uow, [getConnectionResponseField]: getConnectionResponse }))
27+
.catch(rejectWithFault(uow));
28+
29+
return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
30+
};
31+
32+
return (s) => s
33+
.through(ratelimit(opt))
34+
.map(get)
35+
.parallel(parallel);
36+
};

src/sinks/websocket.js

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import _ from 'highland';
22

3-
import Connector from '../connectors/websocket';
3+
import Connector from '../connectors/apigatewayclient';
44

55
import { rejectWithFault } from '../utils/faults';
66
import { debug as d } from '../utils/print';
77
import { ratelimit } from '../utils/ratelimit';
88

9-
export const publishToConnections = ({ // eslint-disable-line import/prefer-default-export
9+
export const publishToConnections = ({
1010
id: pipelineId,
1111
debug = d('ws'),
1212
endpoint = process.env.WEBSOCKET_ENDPOINT,
@@ -20,6 +20,8 @@ export const publishToConnections = ({ // eslint-disable-line import/prefer-defa
2020
});
2121

2222
const post = (uow) => {
23+
if (!uow.connectionId || !uow[messageField]) return _(Promise.resolve(uow));
24+
2325
const p = () => connector.postToConnection(uow.connectionId, uow[messageField], uow)
2426
.then((postResponse) => ({ ...uow, postResponse }))
2527
.catch((err) => {
@@ -38,3 +40,37 @@ export const publishToConnections = ({ // eslint-disable-line import/prefer-defa
3840
.map(post)
3941
.parallel(parallel);
4042
};
43+
44+
export const disconnectConnections = ({
45+
id: pipelineId,
46+
debug = d('ws'),
47+
endpoint = process.env.WEBSOCKET_ENDPOINT,
48+
parallel = Number(process.env.WS_PARALLEL) || Number(process.env.PARALLEL) || 8,
49+
step = 'deleteConnection',
50+
...opt
51+
} = {}) => {
52+
const connector = new Connector({
53+
pipelineId, debug, endpoint, ...opt,
54+
});
55+
56+
const disconnect = (uow) => {
57+
if (!uow.connectionId) return _(Promise.resolve(uow));
58+
59+
const p = () => connector.deleteConnection(uow.connectionId, uow)
60+
.then((deleteResponse) => ({ ...uow, deleteResponse }))
61+
.catch((err) => {
62+
// 410 = connection already gone, clean up silently
63+
if (err.statusCode === 410 || err.$metadata?.httpStatusCode === 410) {
64+
return { ...uow, deleteResponse: { statusCode: 410, connectionId: uow.connectionId } };
65+
}
66+
return rejectWithFault(uow)(err);
67+
});
68+
69+
return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
70+
};
71+
72+
return (s) => s
73+
.through(ratelimit(opt))
74+
.map(disconnect)
75+
.parallel(parallel);
76+
};

test/unit/connectors/websocket.test.js renamed to test/unit/connectors/apigatewayclient.test.js

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import 'mocha';
22
import { expect } from 'chai';
33
import sinon from 'sinon';
44
import { mockClient } from 'aws-sdk-client-mock';
5-
import { ApiGatewayManagementApiClient, PostToConnectionCommand } from '@aws-sdk/client-apigatewaymanagementapi';
5+
import {
6+
ApiGatewayManagementApiClient,
7+
DeleteConnectionCommand,
8+
GetConnectionCommand,
9+
PostToConnectionCommand,
10+
} from '@aws-sdk/client-apigatewaymanagementapi';
611

7-
import Connector from '../../../src/connectors/websocket';
12+
import Connector from '../../../src/connectors/apigatewayclient';
813

914
import { debug } from '../../../src/utils';
1015

@@ -59,4 +64,30 @@ describe('connectors/websocket.js', () => {
5964
});
6065
expect(data).to.deep.equal({});
6166
});
67+
68+
it('should get connection', async () => {
69+
const spy = sinon.spy((_) => ({ ConnectedAt: '2024-01-01T00:00:00Z', LastActiveAt: '2024-01-01T01:00:00Z' }));
70+
mockWs.on(GetConnectionCommand).callsFake(spy);
71+
72+
const data = await new Connector({
73+
debug: debug('ws'),
74+
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
75+
}).getConnection('conn-1');
76+
77+
expect(spy).to.have.been.calledWith({ ConnectionId: 'conn-1' });
78+
expect(data).to.deep.equal({ ConnectedAt: '2024-01-01T00:00:00Z', LastActiveAt: '2024-01-01T01:00:00Z' });
79+
});
80+
81+
it('should delete connection', async () => {
82+
const spy = sinon.spy((_) => ({}));
83+
mockWs.on(DeleteConnectionCommand).callsFake(spy);
84+
85+
const data = await new Connector({
86+
debug: debug('ws'),
87+
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
88+
}).deleteConnection('conn-1');
89+
90+
expect(spy).to.have.been.calledWith({ ConnectionId: 'conn-1' });
91+
expect(data).to.deep.equal({});
92+
});
6293
});

test/unit/flavors/websocket.test.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import { initialize, initializeFrom } from '../../../src';
77
import { toDynamodbRecords, fromDynamodb } from '../../../src/from/dynamodb';
88

99
import { defaultOptions } from '../../../src/utils/opt';
10-
import { WebSocketConnector } from '../../../src/connectors';
10+
import { ApiGatewayClientConnector } from '../../../src/connectors';
1111

1212
import { broadcastToWebSocket } from '../../../src/flavors/websocket';
1313

1414
describe('flavors/websocket.js', () => {
1515
beforeEach(() => {
16-
sinon.stub(WebSocketConnector.prototype, 'postToConnection').resolves({});
16+
sinon.stub(ApiGatewayClientConnector.prototype, 'postToConnection').resolves({});
1717
});
1818

1919
afterEach(sinon.restore);
@@ -121,7 +121,7 @@ describe('flavors/websocket.js', () => {
121121
.tap((collected) => {
122122
expect(collected.length).to.equal(1);
123123
expect(collected[0].connectionId).to.be.undefined;
124-
expect(collected[0].postResponse).to.deep.equal({});
124+
expect(collected[0].postResponse).to.be.undefined;
125125
})
126126
.done(done);
127127
});
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
import _ from 'highland';
5+
6+
import { queryConnection } from '../../../src/queries/websocket';
7+
8+
import Connector from '../../../src/connectors/apigatewayclient';
9+
10+
describe('queries/websocket.js', () => {
11+
afterEach(sinon.restore);
12+
13+
it('should skip uow without connectionId', (done) => {
14+
const stub = sinon.stub(Connector.prototype, 'getConnection');
15+
16+
const uows = [{ someField: 'value' }];
17+
18+
_(uows)
19+
.through(queryConnection())
20+
.collect()
21+
.tap((collected) => {
22+
expect(collected.length).to.equal(1);
23+
expect(collected[0].getConnectionResponse).to.be.undefined;
24+
expect(stub).to.not.have.been.called;
25+
})
26+
.done(done);
27+
});
28+
29+
it('should get connection info', (done) => {
30+
const connectionInfo = { ConnectedAt: '2024-01-01T00:00:00Z', LastActiveAt: '2024-01-01T01:00:00Z' };
31+
sinon.stub(Connector.prototype, 'getConnection').resolves(connectionInfo);
32+
33+
const uows = [{ connectionId: 'conn-1' }];
34+
35+
_(uows)
36+
.through(queryConnection())
37+
.collect()
38+
.tap((collected) => {
39+
expect(collected.length).to.equal(1);
40+
expect(collected[0].connectionId).to.equal('conn-1');
41+
expect(collected[0].getConnectionResponse).to.deep.equal(connectionInfo);
42+
})
43+
.done(done);
44+
});
45+
46+
it('should support custom response field name', (done) => {
47+
sinon.stub(Connector.prototype, 'getConnection').resolves({ ConnectedAt: '2024-01-01T00:00:00Z' });
48+
49+
const uows = [{ connectionId: 'conn-1' }];
50+
51+
_(uows)
52+
.through(queryConnection({ getConnectionResponseField: 'wsInfo' }))
53+
.collect()
54+
.tap((collected) => {
55+
expect(collected.length).to.equal(1);
56+
expect(collected[0].wsInfo).to.deep.equal({ ConnectedAt: '2024-01-01T00:00:00Z' });
57+
})
58+
.done(done);
59+
});
60+
61+
it('should fault on error', (done) => {
62+
const err = new Error('Forbidden');
63+
sinon.stub(Connector.prototype, 'getConnection').rejects(err);
64+
65+
const uows = [{ connectionId: 'conn-1' }];
66+
67+
_(uows)
68+
.through(queryConnection())
69+
.errors((e) => {
70+
expect(e.message).to.equal('Forbidden');
71+
expect(e.uow).to.not.be.undefined;
72+
done();
73+
})
74+
.resume();
75+
});
76+
});

0 commit comments

Comments
 (0)