Skip to content

Commit bede0e2

Browse files
authored
fix(ddp-client): never queue non-method frames behind a wait block (RocketChat#40307)
1 parent 8a23778 commit bede0e2

7 files changed

Lines changed: 313 additions & 14 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@rocket.chat/ddp-client": patch
3+
---
4+
5+
Make `Connection.connect()` and `Connection.reconnect()` idempotent. Previously they rejected with `Error('Connection in progress')` when called while a connection was already in flight or established. Because the internal retry timer (`ws.onclose``setTimeout(() => void this.reconnect(), …)`) fires with no `.catch`, that rejection surfaced as an unhandled rejection at the page level whenever an external caller (e.g. an SDK consumer's bootstrap path) won the race against the timer. While `status === 'connecting'`, both methods now return the in-flight handshake promise so a later `failed` payload still propagates to every caller instead of being masked by a synthesized success; while `status === 'connected'` they resolve with `true`. The timer also no-ops when the connection has already been re-established, and a stale `ws.onclose` from a replaced socket no longer clobbers the new socket's status or schedules a redundant retry.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@rocket.chat/ddp-client": patch
3+
---
4+
5+
Reset `Connection.retryCount` to zero on a successful (re)connection. The counter was only ever incremented (in `ws.onclose`), never zeroed, so the retry budget was monotonically consumed across the connection's lifetime. With the default budget of `1`, any chain of two close events — for example the SDK reconnecting after a server force-logout, then the client running a follow-up `Meteor.logout()` whose server handler closes the WS again — drained the budget; the second close handler bailed at `retryCount >= retryOptions.retryCount` and the SDK stayed permanently disconnected. Method frames already in the dispatcher queue (e.g. a fresh `login` retry from the consumer) stayed queued forever.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@rocket.chat/ddp-client": patch
3+
---
4+
5+
Fix `DDPDispatcher` dropping non-method frames (connect, sub, unsub, ping, pong) when a `wait` block is at the head of the queue. Previously every payload flowed through the same wait-serialization path: a `connect` frame dispatched after a `wait: true` method (e.g. `login`) would be queued in a new non-wait block but never actually sent, wedging the DDP handshake — the socket stayed open, the server never replied `connected`, and any caller awaiting the connection hung. Non-method payloads now bypass the queue and emit immediately; wait-method serialization between methods is unchanged.

packages/ddp-client/__tests__/Connection.spec.ts

Lines changed: 182 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,20 +139,198 @@ it('should queue messages if the connection is not ready', async () => {
139139
await handleMethod(server, 'method', ['arg1', 'arg2'], '1');
140140
});
141141

142-
it('should throw an error if a reconnect is called while a connection is in progress', async () => {
142+
it('should be idempotent if reconnect is called while already connected', async () => {
143143
const client = new MinimalDDPClient();
144144
const connection = ConnectionImpl.create('ws://localhost:1234', globalThis.WebSocket, client, { retryCount: 0, retryTime: 0 });
145145

146146
await handleConnection(server, connection.connect());
147147

148-
await expect(connection.reconnect()).rejects.toThrow('Connection in progress');
148+
// Previous behavior was to throw "Connection in progress" — the consumer's
149+
// `void this.reconnect()` paths (notably the ws.onclose retry timer)
150+
// surfaced that as an unhandled rejection / pageError. Now a redundant
151+
// reconnect is just a no-op resolving with the current state.
152+
await expect(connection.reconnect()).resolves.toBe(true);
153+
expect(connection.status).toBe('connected');
149154
});
150155

151-
it('should throw an error if a connect is called while a connection is in progress', async () => {
156+
it('should be idempotent if connect is called while already connected', async () => {
152157
const client = new MinimalDDPClient();
153158
const connection = ConnectionImpl.create('ws://localhost:1234', globalThis.WebSocket, client, { retryCount: 0, retryTime: 0 });
154159

155160
await handleConnection(server, connection.connect());
156161

157-
await expect(connection.connect()).rejects.toThrow('Connection in progress');
162+
await expect(connection.connect()).resolves.toBe(true);
163+
expect(connection.status).toBe('connected');
164+
});
165+
166+
it('should share the in-flight connect promise with a concurrent connect() caller', async () => {
167+
// Regression: while status === 'connecting', a second connect() used to
168+
// receive Promise.resolve(true) immediately, hiding any subsequent
169+
// 'failed' payload from the in-flight handshake. Both callers must now
170+
// observe the real outcome.
171+
const client = new MinimalDDPClient();
172+
const connection = new ConnectionImpl('ws://localhost:1234', WebSocket as any, client, { retryCount: 0, retryTime: 0 });
173+
174+
const first = connection.connect();
175+
expect(connection.status).toBe('connecting');
176+
177+
const second = connection.connect();
178+
expect(second).toBe(first);
179+
180+
await expect(handleConnectionAndRejects(server, first, second)).rejects.toBe('1');
181+
expect(connection.status).toBe('failed');
182+
});
183+
184+
it('should share the in-flight connect promise with a concurrent reconnect() caller', async () => {
185+
// Same as above for the reconnect() entry point — the ws.onclose retry
186+
// timer fires `reconnect()` and must piggyback on any handshake the
187+
// consumer's bootstrap path already started.
188+
const client = new MinimalDDPClient();
189+
const connection = new ConnectionImpl('ws://localhost:1234', WebSocket as any, client, { retryCount: 0, retryTime: 0 });
190+
191+
const first = connection.connect();
192+
expect(connection.status).toBe('connecting');
193+
194+
const second = connection.reconnect();
195+
expect(second).toBe(first);
196+
197+
await handleConnection(server, first, second);
198+
expect(connection.status).toBe('connected');
199+
});
200+
201+
it('should not surface the retry timer rejection when an external connect won the race', async () => {
202+
// Regression: ws.onclose schedules a `void this.reconnect()` timer; if the
203+
// consumer (e.g. ddpSdk.ts startConnect) opens a fresh socket before that
204+
// timer fires, the timer used to reject with "Connection in progress" and,
205+
// because of the leading `void`, the rejection became an unhandled
206+
// rejection on the page. The timer must now no-op silently when the
207+
// connection has already been re-established.
208+
const client = new MinimalDDPClient();
209+
const connection = ConnectionImpl.create('ws://localhost:1234', WebSocket, client, { retryCount: 1, retryTime: 100 });
210+
211+
await handleConnection(server, connection.connect());
212+
expect(connection.status).toBe('connected');
213+
214+
jest.useFakeTimers();
215+
216+
server.close();
217+
WS.clean();
218+
server = new WS('ws://localhost:1234/websocket');
219+
220+
expect(connection.status).toBe('disconnected');
221+
222+
// Track unhandled rejections on the timer's promise.
223+
const unhandled = jest.fn();
224+
process.on('unhandledRejection', unhandled);
225+
226+
// External code opens a new connection BEFORE the retry timer fires.
227+
const externalConnect = handleConnection(server, connection.connect());
228+
229+
// Run the timer.
230+
await jest.advanceTimersByTimeAsync(200);
231+
await externalConnect;
232+
233+
// Drain any microtasks the timer might have queued.
234+
await Promise.resolve();
235+
await Promise.resolve();
236+
237+
expect(connection.status).toBe('connected');
238+
expect(unhandled).not.toHaveBeenCalled();
239+
process.off('unhandledRejection', unhandled);
240+
jest.useRealTimers();
241+
});
242+
243+
it('should reset retryCount on a successful connection so subsequent drops can retry', async () => {
244+
// Regression: retryCount was only ever incremented on disconnect, never
245+
// zeroed on successful (re)connect. With default retryCount=1 budget, a
246+
// single force-logout cycle (server close → SDK reconnects → app calls
247+
// `Meteor.logout()` → server's logout handler closes WS again) drained the
248+
// budget, and the second close left the SDK permanently disconnected.
249+
// Method frames queued on the SDK during that window stayed queued
250+
// forever — observed in e2e-encryption/e2ee-passphrase-management as the
251+
// next loginByUserState login frame never being delivered.
252+
const client = new MinimalDDPClient();
253+
const connection = ConnectionImpl.create('ws://localhost:1234', WebSocket, client, { retryCount: 1, retryTime: 100 });
254+
255+
await handleConnection(server, connection.connect());
256+
expect(connection.status).toBe('connected');
257+
expect((connection as unknown as { retryCount: number }).retryCount).toBe(0);
258+
259+
// First disconnect → schedules retry (retryCount: 0 → 1).
260+
jest.useFakeTimers();
261+
server.close();
262+
WS.clean();
263+
server = new WS('ws://localhost:1234/websocket');
264+
265+
expect(connection.status).toBe('disconnected');
266+
267+
await handleConnection(
268+
server,
269+
jest.advanceTimersByTimeAsync(200),
270+
new Promise((resolve) => connection.once('reconnecting', () => resolve(undefined))),
271+
new Promise((resolve) => connection.once('connection', (data) => resolve(data))),
272+
);
273+
jest.useRealTimers();
274+
expect(connection.status).toBe('connected');
275+
276+
// Successful reconnect must zero the retry budget.
277+
expect((connection as unknown as { retryCount: number }).retryCount).toBe(0);
278+
279+
// Second disconnect should still schedule a retry now that the budget reset.
280+
jest.useFakeTimers();
281+
server.close();
282+
WS.clean();
283+
server = new WS('ws://localhost:1234/websocket');
284+
285+
await handleConnection(
286+
server,
287+
jest.advanceTimersByTimeAsync(200),
288+
new Promise((resolve) => connection.once('reconnecting', () => resolve(undefined))),
289+
new Promise((resolve) => connection.once('connection', (data) => resolve(data))),
290+
);
291+
jest.useRealTimers();
292+
expect(connection.status).toBe('connected');
293+
});
294+
295+
it('should ignore a stale ws.onclose that fires after the socket has been replaced', async () => {
296+
// Regression: ws.onclose handlers were closed over the original ws but
297+
// mutated `this.status`/`this.retryCount` unconditionally. If a late close
298+
// event from an old socket arrives after a new socket is connected, the
299+
// handler would flip status back to 'disconnected' and schedule another
300+
// retry timer.
301+
const client = new MinimalDDPClient();
302+
const connection = ConnectionImpl.create('ws://localhost:1234', WebSocket, client, { retryCount: 1, retryTime: 100 });
303+
304+
await handleConnection(server, connection.connect());
305+
const firstWs = (connection as unknown as { ws: WebSocket }).ws;
306+
expect(connection.status).toBe('connected');
307+
308+
jest.useFakeTimers();
309+
server.close();
310+
WS.clean();
311+
server = new WS('ws://localhost:1234/websocket');
312+
313+
expect(connection.status).toBe('disconnected');
314+
315+
await handleConnection(
316+
server,
317+
jest.advanceTimersByTimeAsync(200),
318+
new Promise((resolve) => connection.once('reconnecting', () => resolve(undefined))),
319+
new Promise((resolve) => connection.once('connection', (data) => resolve(data))),
320+
);
321+
322+
expect(connection.status).toBe('connected');
323+
jest.useRealTimers();
324+
const secondWs = (connection as unknown as { ws: WebSocket }).ws;
325+
expect(secondWs).not.toBe(firstWs);
326+
327+
const statusBefore = connection.status;
328+
const retryBefore = (connection as unknown as { retryCount: number }).retryCount;
329+
330+
// Synthesize a late `close` event on the original socket — the handler
331+
// must short-circuit because `this.ws !== ws` for the closed-over ws.
332+
(firstWs as unknown as { onclose?: () => void }).onclose?.();
333+
334+
expect(connection.status).toBe(statusBefore);
335+
expect((connection as unknown as { retryCount: number }).retryCount).toBe(retryBefore);
158336
});

packages/ddp-client/__tests__/DDPDispatcher.spec.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,34 @@ it('should send outstanding blocks if there is no block waiting and item is adde
7272
expect(fn).toHaveBeenCalledTimes(1);
7373
});
7474

75+
it('emits non-method payloads immediately, even when a wait block is at the head', () => {
76+
// Regression: a connect frame dispatched while a wait `login` method is
77+
// queued must still reach the server. Otherwise the DDP handshake never
78+
// completes and the socket wedges open but unconnected.
79+
const fn = jest.fn();
80+
const ddpDispatcher = new DDPDispatcher();
81+
ddpDispatcher.on('send', fn);
82+
83+
const login = ddp.call('login');
84+
ddpDispatcher.dispatch(login, { wait: true });
85+
expect(fn).toHaveBeenCalledTimes(1);
86+
expect(fn).toHaveBeenNthCalledWith(1, login);
87+
88+
const connectPayload = { msg: 'connect' as const, version: '1', support: ['1'] };
89+
ddpDispatcher.dispatch(connectPayload);
90+
expect(fn).toHaveBeenCalledTimes(2);
91+
expect(fn).toHaveBeenNthCalledWith(2, connectPayload);
92+
93+
const subPayload = { msg: 'sub' as const, id: 'a', name: 'foo', params: [] };
94+
ddpDispatcher.dispatch(subPayload);
95+
expect(fn).toHaveBeenCalledTimes(3);
96+
expect(fn).toHaveBeenNthCalledWith(3, subPayload);
97+
98+
// Wait block remains pending — only the wait method is queued, the
99+
// non-method frames bypassed it.
100+
expect(ddpDispatcher.queue).toEqual([{ wait: true, items: [login] }]);
101+
});
102+
75103
it('should send the next blocks if the outstanding block was completed', () => {
76104
const fn = jest.fn();
77105

packages/ddp-client/src/Connection.ts

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ export class ConnectionImpl
7474

7575
retryCount = 0;
7676

77+
private connectPromise?: Promise<boolean>;
78+
7779
public queue = new Set<string>();
7880

7981
constructor(
@@ -109,8 +111,23 @@ export class ConnectionImpl
109111
}
110112

111113
reconnect(): Promise<boolean> {
112-
if (this.status === 'connecting' || this.status === 'connected') {
113-
return Promise.reject(new Error('Connection in progress'));
114+
// Idempotent — if another caller already started (or finished) a connection
115+
// since this reconnect was scheduled, we don't need to do anything. The
116+
// retry timer enqueued by `ws.onclose` runs with no awareness of any
117+
// concurrent `connect()` (e.g. the consumer's own bootstrap or
118+
// resume-on-userId-change path), so without this guard a late timer
119+
// rejected with "Connection in progress" — and because the timer fires
120+
// from `void this.reconnect()` the rejection became an unhandled
121+
// rejection at the page level.
122+
if (this.status === 'connected') {
123+
clearTimeout(this.retryOptions.retryTimer);
124+
return Promise.resolve(true);
125+
}
126+
if (this.status === 'connecting') {
127+
// Share the in-flight handshake promise so a `failed` payload
128+
// later in the same attempt isn't masked by a synthesized success.
129+
clearTimeout(this.retryOptions.retryTimer);
130+
return this.connectPromise as Promise<boolean>;
114131
}
115132

116133
clearTimeout(this.retryOptions.retryTimer);
@@ -123,19 +140,31 @@ export class ConnectionImpl
123140
}
124141

125142
connect() {
126-
if (this.status === 'connecting' || this.status === 'connected') {
127-
return Promise.reject(new Error('Connection in progress'));
143+
// Same idempotency guard as `reconnect()` — multiple call sites
144+
// (`reconnect()`, ws.onclose retry timer, external `startConnect`) can
145+
// race; rejecting forced every caller to wrap in `.catch(() => {})`
146+
// just to silence noise, and the internal timer's `void this.reconnect()`
147+
// path didn't have a catch at all.
148+
if (this.status === 'connected') {
149+
clearTimeout(this.retryOptions.retryTimer);
150+
return Promise.resolve(true);
151+
}
152+
if (this.status === 'connecting') {
153+
clearTimeout(this.retryOptions.retryTimer);
154+
return this.connectPromise as Promise<boolean>;
128155
}
129156

130157
this.status = 'connecting';
131-
this.emit('connecting');
132-
this.emitStatus();
133158

134159
const ws = new this.WS(`${this.ssl ? 'wss://' : 'ws://'}${this.url}/websocket`);
135160

136161
this.ws = ws;
137162

138-
return new Promise<boolean>((resolve, reject) => {
163+
// Build the in-flight promise and publish it on `this.connectPromise`
164+
// before emitting any status change, so a synchronous re-entrant
165+
// caller (an event listener that calls `connect()`/`reconnect()`)
166+
// hits the `'connecting'` guard and gets this same promise.
167+
const connectPromise = new Promise<boolean>((resolve, reject) => {
139168
ws.onopen = () => {
140169
ws.onmessage = (event) => {
141170
this.client.handleMessage(String(event.data));
@@ -166,6 +195,15 @@ export class ConnectionImpl
166195
this.client.onConnection((payload) => {
167196
if (payload.msg === 'connected') {
168197
this.status = 'connected';
198+
// Reset the retry budget on successful connection so a future
199+
// disconnect can schedule reconnects again. Without this,
200+
// long-lived connections that recover once would burn through
201+
// `retryCount` permanently and stop reconnecting on subsequent
202+
// drops — observed when a server-side ws.close (logout, force-
203+
// logout) chained with a reconnect cycle saturated the
204+
// budget; the next disconnect left frames stuck in the
205+
// dispatcher queue forever because the socket never came back.
206+
this.retryCount = 0;
169207
this.emitStatus();
170208
this.emit('connected', payload.session);
171209
this.session = payload.session;
@@ -183,6 +221,15 @@ export class ConnectionImpl
183221
};
184222

185223
ws.onclose = () => {
224+
// If a newer ws has already taken over (this socket was closed
225+
// after `connect()` opened a replacement), ignore the late
226+
// onclose. Otherwise its handler would clobber `this.status` and
227+
// `retryCount`, and could even schedule a redundant retry timer
228+
// that fires while the new socket is healthy — observed as the
229+
// "Connection in progress" pageError racing on every reconnect.
230+
if (this.ws !== ws) {
231+
return;
232+
}
186233
clearTimeout(this.retryOptions.retryTimer);
187234
if (this.status === 'closed') {
188235
return;
@@ -198,10 +245,27 @@ export class ConnectionImpl
198245
this.retryCount += 1;
199246

200247
this.retryOptions.retryTimer = setTimeout(() => {
248+
// Re-check the status when the timer actually fires. If the
249+
// consumer bootstrapped a fresh `connect()` in the meantime
250+
// (status flipped from 'disconnected' to 'connecting' or
251+
// 'connected'), there's nothing for us to do. Without this
252+
// the timer would call `this.reconnect()`, which (pre-this
253+
// patch) rejected with "Connection in progress" and surfaced
254+
// as an unhandled rejection.
255+
if (this.status === 'connecting' || this.status === 'connected' || this.status === 'closed') {
256+
return;
257+
}
201258
void this.reconnect();
202259
}, this.retryOptions.retryTime * this.retryCount);
203260
};
204261
});
262+
263+
this.connectPromise = connectPromise;
264+
265+
this.emit('connecting');
266+
this.emitStatus();
267+
268+
return connectPromise;
205269
}
206270

207271
close() {

0 commit comments

Comments
 (0)