Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/ddp-client-idempotent-reconnect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/ddp-client": patch
---

Make `Connection.connect()` and `Connection.reconnect()` idempotent. Previously they rejected with `Error('Connection in progress')` when called while a connection was already in flight or established. Because the internal retry timer (`ws.onclose` → `setTimeout(() => void this.reconnect(), …)`) fires with no `.catch`, that rejection surfaced as an unhandled rejection at the page level whenever an external caller (e.g. an SDK consumer's bootstrap path) won the race against the timer. While `status === 'connecting'`, both methods now return the in-flight handshake promise so a later `failed` payload still propagates to every caller instead of being masked by a synthesized success; while `status === 'connected'` they resolve with `true`. The timer also no-ops when the connection has already been re-established, and a stale `ws.onclose` from a replaced socket no longer clobbers the new socket's status or schedules a redundant retry.
5 changes: 5 additions & 0 deletions .changeset/ddp-client-reset-retry-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/ddp-client": patch
---

Reset `Connection.retryCount` to zero on a successful (re)connection. The counter was only ever incremented (in `ws.onclose`), never zeroed, so the retry budget was monotonically consumed across the connection's lifetime. With the default budget of `1`, any chain of two close events — for example the SDK reconnecting after a server force-logout, then the client running a follow-up `Meteor.logout()` whose server handler closes the WS again — drained the budget; the second close handler bailed at `retryCount >= retryOptions.retryCount` and the SDK stayed permanently disconnected. Method frames already in the dispatcher queue (e.g. a fresh `login` retry from the consumer) stayed queued forever.
5 changes: 5 additions & 0 deletions .changeset/ddp-dispatcher-non-method-frames.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rocket.chat/ddp-client": patch
---

Fix `DDPDispatcher` dropping non-method frames (connect, sub, unsub, ping, pong) when a `wait` block is at the head of the queue. Previously every payload flowed through the same wait-serialization path: a `connect` frame dispatched after a `wait: true` method (e.g. `login`) would be queued in a new non-wait block but never actually sent, wedging the DDP handshake — the socket stayed open, the server never replied `connected`, and any caller awaiting the connection hung. Non-method payloads now bypass the queue and emit immediately; wait-method serialization between methods is unchanged.
5 changes: 5 additions & 0 deletions .changeset/use-rc-sdk-transport-setting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rocket.chat/meteor': minor
---

Adds a new admin setting `Use_RC_SDK` (General → Use Rocket.Chat SDK) that opts the workspace into the experimental SDK-over-DDP transport. When enabled, the client routes Meteor DDP traffic through `@rocket.chat/ddp-client` over a single WebSocket instead of the legacy Meteor stream. The flag is dormant by default; the server surfaces the value via a `<meta name="rc-sdk-transport-enabled">` tag, and the client also honors a per-tab `?sdk_transport=on|off` URL parameter and a `rc-config-sdk_transport` localStorage key (URL > localStorage > meta tag).
11 changes: 11 additions & 0 deletions apps/meteor/app/notifications/client/lib/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ import { UserStatus } from '@rocket.chat/core-typings';
import { Meteor } from 'meteor/meteor';

import { Presence } from '../../../../client/lib/presence';
import { getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
import { isSdkTransportEnabled } from '../../../../client/lib/sdk/sdkTransportEnabled';
import { createDdpSdkStreamerAdapter } from '../../../../client/lib/sdk/streamerAdapter';
import { streamerCentral } from '../../../../client/lib/streamer';

// TODO implement API on Streamer to be able to listen to all streamed data
// this is a hacky way to listen to all streamed data from user-presence Streamer

// Register the presence streamer on Meteor's connection. With the SDK transport
// flag on, *also* register on the SDK socket so presence messages arriving on
// either WS feed the same streamerCentral. With the flag off, only Meteor's
// connection is used — duplicating the registration via the meteor-backed sdk
// proxy would re-feed every frame back through streamerCentral via two paths.
streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection });
if (isSdkTransportEnabled()) {
streamerCentral.setupDdpConnection('user-presence', createDdpSdkStreamerAdapter(getDdpSdk()));
}

type args = [username: string, statusChanged?: UserStatus, statusText?: string];

Expand Down
4 changes: 4 additions & 0 deletions apps/meteor/app/ui-master/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Meteor.startup(() => {
injectIntoHead('noreferrer', `<meta name="referrer" content="${value}" />`);
});

settings.watch<boolean>('Use_RC_SDK', (value) => {
injectIntoHead('Use_RC_SDK', `<meta name="rc-sdk-transport-enabled" content="${value ? 'on' : 'off'}" />`);
});

if (process.env.DISABLE_ANIMATION) {
injectIntoHead(
'disable-animation',
Expand Down
159 changes: 135 additions & 24 deletions apps/meteor/app/utils/client/lib/SDKClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { DDPCommon } from 'meteor/ddp-common';
import { Meteor } from 'meteor/meteor';

import { APIClient } from './RestApiClient';
import { ensureConnectedAndAuthenticated, getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
import { isSdkTransportEnabled } from '../../../../client/lib/sdk/sdkTransportEnabled';

declare module '@rocket.chat/ddp-client' {
// eslint-disable-next-line @typescript-eslint/naming-convention
Expand All @@ -19,6 +21,8 @@ declare module '@rocket.chat/ddp-client' {
}
}

const sdkTransportEnabled = isSdkTransportEnabled();

const isChangedCollectionPayload = (
msg: any,
): msg is { msg: 'changed'; collection: string; fields: { eventName: string; args: unknown[] } } => {
Expand Down Expand Up @@ -142,13 +146,113 @@ const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys<StreamNa
};
};

const createNewDdpSdkStream = (
streamProxy: Emitter<EventMap>,
streamName: StreamNames,
key: StreamKeys<StreamNames>,
args: unknown[],
): StreamMapValue => {
const ee = new Emitter<{
ready: [error: any] | [undefined, any];
error: [error: any];
stop: undefined;
}>();
const meta = { ready: false };

// Defer the actual `subscribe` until DDPSDK is authenticated. Without this,
// stream subscriptions fired immediately after re-login (e.g. the
// SubscriptionsCachedStore's `notify-user/<uid>/subscriptions-changed`
// listener that re-arms via onLoggedIn) hit the SDK socket while it's
// still anonymous — server rejects with `not-allowed`/`nosub`, the
// stream's `ready` promise emits an error, and the cached store never
// receives subsequent server events. The visible failure: an agent that
// just took a livechat chat post-relogin sees the chat work but the
// "Move to the queue" button never appears, because the new subscription
// the server creates for that agent is never replicated to the client's
// Subscriptions store, and pseudoRoom (= {...sub, ...room}) ends up with
// no `u` for the canMoveQueue check.
let subscription: ReturnType<ReturnType<typeof getDdpSdk>['client']['subscribe']> | undefined;
let offCollection: (() => void) | undefined;
let stopped = false;

void ensureConnectedAndAuthenticated()
.catch(() => undefined)
.then(() => {
if (stopped) return;
const sdk = getDdpSdk();
subscription = sdk.client.subscribe(`stream-${streamName}`, key, { useCollection: false, args });

subscription
.ready()
.then(() => {
if (stopped) return;
meta.ready = true;
ee.emit('ready', [undefined, { msg: 'ready', subs: [subscription!.id] }]);
})
.catch((err) => {
if (stopped) return;
ee.emit('ready', [err]);
ee.emit('error', err);
});

offCollection = sdk.client.onCollection(`stream-${streamName}`, (data: any) => {
if (data?.msg !== 'changed') return;
if (data.collection !== `stream-${streamName}`) return;
if (data.fields?.eventName !== key) return;
streamProxy.emit(`stream-${streamName}/${key}` as keyof EventMap, data.fields.args);
});
});

const onChange: ReturnType<ClientStream['subscribe']>['onChange'] = (cb) => {
if (meta.ready) {
cb({ msg: 'ready', subs: [] });
return;
}
ee.once('ready', ([error, result]) => {
if (error) {
cb({ msg: 'nosub', id: '', error });
return;
}
cb(result);
});
};

return {
stop: () => {
// Mirror Meteor's subscription semantics: explicit stop() does not fire the
// 'stop' event (onStop is reserved for server-initiated closures).
// Emitting it here would recurse through the onStop handler that
// createStreamManager registers, which itself iterates the unsubList.
stopped = true;
offCollection?.();
subscription?.stop();
},
onChange,
ready: () => {
if (meta.ready) return Promise.resolve();
return new Promise<void>((resolve, reject) => {
ee.once('ready', ([err]) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
},
onError: (cb: (...args: any[]) => void) => ee.once('error', (error) => cb(error)),
onStop: (cb: () => void) => ee.once('stop', cb),
get isReady() {
return meta.ready;
},
unsubList: new Set(),
};
};

const createStreamManager = () => {
// Emitter that replicates stream messages to registered callbacks
const streamProxy = new Emitter<EventMap>();

// Collection of unsubscribe callbacks for each stream.
// const proxyUnsubLists = new Map<string, Set<() => void>>();

const streams = new Map<string, StreamMapValue>();

Accounts.onLogout(() => {
Expand All @@ -157,13 +261,20 @@ const createStreamManager = () => {
});
});

Meteor.connection._stream.on('message', (rawMsg: string) => {
const msg = DDPCommon.parseDDP(rawMsg);
if (!isChangedCollectionPayload(msg)) {
return;
}
streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any);
});
if (!sdkTransportEnabled) {
// In legacy Meteor mode, stream frames arrive on Meteor.connection._stream
// as `changed` collection messages — bridge them into streamProxy so the
// per-stream callbacks fire. With SDK transport on, the frames arrive on
// the SDK socket and createNewDdpSdkStream registers its own onCollection
// listener instead.
Meteor.connection._stream.on('message', (rawMsg: string) => {
const msg = DDPCommon.parseDDP(rawMsg);
if (!isChangedCollectionPayload(msg)) {
return;
}
streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any);
});
}

const stream: SDK['stream'] = <N extends StreamNames, K extends StreamKeys<N>>(
name: N,
Expand All @@ -186,7 +297,11 @@ const createStreamManager = () => {

streamProxy.on(eventLiteral, proxyCallback);

const stream = streams.get(eventLiteral) || createNewMeteorStream(name, key, args);
const stream =
streams.get(eventLiteral) ||
(sdkTransportEnabled
? createNewDdpSdkStream(streamProxy, name as StreamNames, key as StreamKeys<StreamNames>, args)
: createNewMeteorStream(name as StreamNames, key as StreamKeys<StreamNames>, args));

const stop = (): void => {
streamProxy.off(eventLiteral, proxyCallback);
Expand Down Expand Up @@ -241,30 +356,26 @@ const createStreamManager = () => {
export const createSDK = (rest: RestClientInterface) => {
const { stream, stopAll } = createStreamManager();

const publish = (name: string, args: unknown[]) => {
Meteor.call(`stream-${name}`, ...args);
};
const publish = sdkTransportEnabled
? (name: string, args: unknown[]) => {
// DDPSDK queues outbound frames until the WebSocket handshake completes,
// so there's no need to gate on an isReady flag here.
void getDdpSdk().client.callAsync(`stream-${name}`, ...args);
}
: (name: string, args: unknown[]) => {
Meteor.call(`stream-${name}`, ...args);
};

const call = <T extends keyof ServerMethods>(method: T, ...args: Parameters<ServerMethods[T]>): Promise<ReturnType<ServerMethods[T]>> => {
return Meteor.callAsync(method, ...args);
};

const disconnect = () => {
Meteor.disconnect();
};

const reconnect = () => {
Meteor.reconnect();
};

return {
rest,
stop: stopAll,
stream,
publish,
call,
disconnect,
reconnect,
};
};

Expand Down
6 changes: 5 additions & 1 deletion apps/meteor/client/lib/cachedStores/CachedStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ export abstract class CachedStore<T extends IRocketChatRecord, U = T> implements

protected eventType: StreamNames;

private readonly version = 18;
// Bumped from 18 → 19 to invalidate caches populated before the DDPSDK
// wire encoding was switched from JSON to EJSON. Entries written by the
// JSON window stored dates as ISO strings instead of Date instances, so
// fields like subscription.ls would fail `.getTime()` when read back.
private readonly version = 19;

private updatedAt = new Date(0);

Expand Down
58 changes: 53 additions & 5 deletions apps/meteor/client/lib/loggedIn.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,61 @@
import { Accounts } from 'meteor/accounts-base';

import { getUserId } from './user';
import { isSdkTransportEnabled } from './sdk/sdkTransportEnabled';
import { getUserId, userIdStore } from './user';

const sdkTransportEnabled = isSdkTransportEnabled();

const isLoggedIn = () => {
const uid = getUserId();
return !!uid;
};

/**
* Fire `cb` whenever the local userId transitions from absent → present.
*
* `Accounts.onLogin` would normally cover this, but Meteor only invokes
* the onLogin hook from inside a Tracker.autorun that waits for
* `Meteor.userAsync()` to resolve to a real user doc. When a login goes
* through our REST fallback (e.g. logout → fresh login while DDPSDK is
* reconnecting), the user document never lands in Meteor.users — it
* normally arrives as a DDP collection frame, but the REST endpoint
* only returns the method result. The autorun then sees a null user
* forever, and onLogin never fires. By piggybacking on userIdStore (which
* is updated synchronously the moment Accounts.connection.userId() is
* set), we get a reliable login signal regardless of how the user doc
* eventually arrives.
*/
const subscribeToLogin = (handler: () => void): (() => void) => {
let lastSeen = userIdStore.getState();
return userIdStore.subscribe((next) => {
if (next === lastSeen) return;
const wasLoggedOut = !lastSeen;
lastSeen = next;
if (next && wasLoggedOut) {
handler();
}
});
};

export const whenLoggedIn = () => {
if (isLoggedIn()) {
return Promise.resolve();
}

if (!sdkTransportEnabled) {
// Flag off: develop's exact implementation — wait on Accounts.onLogin only,
// no userIdStore bridge.
return new Promise<void>((resolve) => {
const subscription = Accounts.onLogin(() => {
subscription.stop();
resolve();
});
});
}

return new Promise<void>((resolve) => {
const subscription = Accounts.onLogin(() => {
subscription.stop();
const stop = subscribeToLogin(() => {
stop();
resolve();
});
});
Expand All @@ -30,11 +71,18 @@ export const onLoggedIn = (cb: (() => () => void) | (() => Promise<() => void>)
}
};

const subscription = Accounts.onLogin(handler);
// With the SDK transport on, login can land via REST (ddpOverREST) without
// filling Meteor.users — Accounts.onLogin's autorun would never fire.
// Bridge off userIdStore as belt-and-braces. With the flag off, the legacy
// DDP path populates Meteor.users and Accounts.onLogin fires reliably; the
// extra userIdStore subscription would just double-fire callbacks.
const accountsSubscription = Accounts.onLogin(handler);
const stopUserIdSubscription = sdkTransportEnabled ? subscribeToLogin(handler) : undefined;
if (isLoggedIn()) handler();

return () => {
subscription.stop();
accountsSubscription.stop();
stopUserIdSubscription?.();
cleanup?.();
};
};
Loading
Loading