Skip to content

Commit 6d56dd6

Browse files
author
Aidan Zimmermann
committed
STREAM-1362: skip retry requests when not yet connected
1 parent e4bef71 commit 6d56dd6

2 files changed

Lines changed: 123 additions & 5 deletions

File tree

src/notifications.ts

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export class Notifications implements StreamingClientExtension {
3030

3131
private internalSubscriptions: string[];
3232
private pendingBulkSubscribeRetry?: RetryPromise<any>;
33+
private pendingConnectedWait?: { cancel: (reason?: any) => void };
3334

3435
constructor (client, options?: IClientOptions) {
3536
this.subscriptions = {};
@@ -210,16 +211,43 @@ export class Notifications implements StreamingClientExtension {
210211
return keptTopics;
211212
}
212213

213-
makeBulkSubscribeRequest (topics: string[], options): Promise<AxiosResponse<ChannelTopicsEntityListing>> {
214-
// Cancel any in-flight retry from a previous bulk subscribe. If the topic list has changed
215-
// (subscribe/unsubscribe called between retries), the stale request would send an outdated
216-
// topic list. Since bulk subscribes with replace=true use PUT (full replacement), only the
217-
// most recent request matters.
214+
async makeBulkSubscribeRequest (topics: string[], options): Promise<AxiosResponse<ChannelTopicsEntityListing>> {
215+
// Cancel any in-flight retry or connected-wait from a previous bulk subscribe.
216+
// If the topic list has changed (subscribe/unsubscribe called between retries),
217+
// the stale request would send an outdated topic list. Since bulk subscribes with
218+
// replace=true use PUT (full replacement), only the most recent request matters.
219+
if (this.pendingConnectedWait) {
220+
this.pendingConnectedWait.cancel(new Error('Superseded by newer bulk subscribe request'));
221+
this.pendingConnectedWait = undefined;
222+
}
218223
if (this.pendingBulkSubscribeRetry && !this.pendingBulkSubscribeRetry.hasCompleted()) {
219224
this.client.logger.info('Cancelling previous bulk subscribe retry — new request supersedes it');
220225
this.pendingBulkSubscribeRetry.cancel(new Error('Superseded by newer bulk subscribe request'));
221226
}
222227

228+
// If the client is not connected, wait for reconnection before attempting the request.
229+
// This prevents firing requests against a dead/stale channel ID which would result in
230+
// 400 errors (notification.unable.to.get.channel.id). After reconnection, the stanza
231+
// instance will have the fresh channel ID from the new connection.
232+
if (!this.client.connected) {
233+
this.client.logger.info('Client not connected, waiting for reconnection before bulk subscribe', {
234+
topicCount: topics.length
235+
});
236+
await new Promise<void>((resolve, reject) => {
237+
const onConnected = () => {
238+
this.pendingConnectedWait = undefined;
239+
resolve();
240+
};
241+
this.client.once('connected', onConnected);
242+
this.pendingConnectedWait = {
243+
cancel: (reason?: any) => {
244+
this.client.off('connected', onConnected);
245+
reject(reason);
246+
}
247+
};
248+
});
249+
}
250+
223251
const requestOptions: RequestApiOptions = {
224252
method: options.replace ? 'put' : 'post',
225253
host: this.client.config.apiHost,

test/unit/notifications.test.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ describe('Notifications', () => {
140140
channelId,
141141
apiHost: 'example.com',
142142
});
143+
client.connected = true;
143144
bulkSubscribeUrl = `https://api.example.com/api/v2/notifications/channels/${channelId}/subscriptions`;
144145
axiosMock.onPut(bulkSubscribeUrl).reply((config) => {
145146
const topics = JSON.parse(config.data).map((topic) => topic.id);
@@ -738,6 +739,7 @@ describe('Notifications', () => {
738739
apiHost: 'example.com',
739740
channelId: 'notification-test-channel'
740741
});
742+
client.connected = true;
741743
const notification = new Notifications(client);
742744
notification.stanzaInstance = fakeStanza;
743745
const xmppSubscribeSpy = jest.fn().mockResolvedValue({});
@@ -1135,6 +1137,7 @@ describe('Notifications', () => {
11351137
apiHost: 'example.com',
11361138
channelId: 'notification-test-channel'
11371139
});
1140+
client.connected = true;
11381141
notification = new Notifications(client);
11391142
notification.stanzaInstance = getFakeStanzaClient();
11401143
});
@@ -1233,5 +1236,92 @@ describe('Notifications', () => {
12331236
await expect(notification.bulkSubscribe(['topicA'], { replace: true }))
12341237
.rejects.toBeNull();
12351238
});
1239+
1240+
it('should wait for connected event before making bulk subscribe request when disconnected', async () => {
1241+
client.connected = false;
1242+
const requestSpy = jest.spyOn(client.http, 'requestApiWithRetry').mockReturnValue({
1243+
promise: Promise.resolve({ data: { entities: [] } }),
1244+
cancel: jest.fn(),
1245+
complete: jest.fn(),
1246+
hasCompleted: () => false,
1247+
_id: 'test-id'
1248+
} as any);
1249+
1250+
// Start the request — it should not fire immediately
1251+
const promise = notification.makeBulkSubscribeRequest(['topicA'], { replace: true });
1252+
expect(requestSpy).not.toHaveBeenCalled();
1253+
1254+
// Simulate reconnection
1255+
client.connected = true;
1256+
(client as any).emit('connected');
1257+
1258+
await promise;
1259+
expect(requestSpy).toHaveBeenCalledTimes(1);
1260+
});
1261+
1262+
it('should use the fresh channel ID after waiting for reconnection', async () => {
1263+
client.connected = false;
1264+
const oldChannelId = notification.stanzaInstance!.channelId;
1265+
1266+
const requestSpy = jest.spyOn(client.http, 'requestApiWithRetry').mockReturnValue({
1267+
promise: Promise.resolve({ data: { entities: [] } }),
1268+
cancel: jest.fn(),
1269+
complete: jest.fn(),
1270+
hasCompleted: () => false,
1271+
_id: 'test-id'
1272+
} as any);
1273+
1274+
const promise = notification.makeBulkSubscribeRequest(['topicA'], { replace: true });
1275+
1276+
// Simulate reconnection with a new stanza instance (new channel)
1277+
const newStanza = getFakeStanzaClient();
1278+
newStanza.channelId = 'brand-new-channel';
1279+
notification.stanzaInstance = newStanza;
1280+
client.connected = true;
1281+
(client as any).emit('connected');
1282+
1283+
await promise;
1284+
1285+
// Verify the request used the NEW channel ID, not the old one
1286+
const calledPath = requestSpy.mock.calls[0][0];
1287+
expect(calledPath).toContain('brand-new-channel');
1288+
expect(calledPath).not.toContain(oldChannelId);
1289+
});
1290+
1291+
it('should cancel pending connected-wait when a new bulk subscribe supersedes it', async () => {
1292+
client.connected = false;
1293+
1294+
// Start first request — will wait for connected
1295+
const firstPromise = notification.makeBulkSubscribeRequest(['topicA'], { replace: true });
1296+
1297+
// Second request while first is waiting — should cancel the first
1298+
client.connected = true;
1299+
const requestSpy = jest.spyOn(client.http, 'requestApiWithRetry').mockReturnValue({
1300+
promise: Promise.resolve({ data: { entities: [] } }),
1301+
cancel: jest.fn(),
1302+
complete: jest.fn(),
1303+
hasCompleted: () => false,
1304+
_id: 'test-id'
1305+
} as any);
1306+
1307+
await notification.makeBulkSubscribeRequest(['topicA', 'topicB'], { replace: true });
1308+
1309+
// The first promise should reject with supersession error
1310+
await expect(firstPromise).rejects.toThrow('Superseded by newer bulk subscribe request');
1311+
});
1312+
1313+
it('should proceed immediately when client is already connected', async () => {
1314+
client.connected = true;
1315+
const requestSpy = jest.spyOn(client.http, 'requestApiWithRetry').mockReturnValue({
1316+
promise: Promise.resolve({ data: { entities: [] } }),
1317+
cancel: jest.fn(),
1318+
complete: jest.fn(),
1319+
hasCompleted: () => false,
1320+
_id: 'test-id'
1321+
} as any);
1322+
1323+
await notification.makeBulkSubscribeRequest(['topicA'], { replace: true });
1324+
expect(requestSpy).toHaveBeenCalledTimes(1);
1325+
});
12361326
});
12371327
});

0 commit comments

Comments
 (0)