Skip to content

Commit dbce434

Browse files
authored
chore(@rocket.chat/ddp-client): introduce experimental Meteor independent DDP client (re-add) (RocketChat#40430)
1 parent 281c22a commit dbce434

27 files changed

Lines changed: 1688 additions & 102 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@rocket.chat/meteor': minor
3+
---
4+
5+
Adds a new admin setting `Use_RC_SDK` (General → Use Rocket.Chat SDK) that opts the workspace into the experimental SDK-over-DDP transport. When enabled, the client routes Meteor DDP traffic through `@rocket.chat/ddp-client` over a single WebSocket instead of the legacy Meteor stream. The flag is dormant by default; the server surfaces the value via a `<meta name="rc-sdk-transport-enabled">` tag, and the client also honors a per-tab `?sdk_transport=on|off` URL parameter and a `rc-config-sdk_transport` localStorage key (URL > localStorage > meta tag).

apps/meteor/app/notifications/client/lib/Presence.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,23 @@ import { UserStatus } from '@rocket.chat/core-typings';
22
import { Meteor } from 'meteor/meteor';
33

44
import { Presence } from '../../../../client/lib/presence';
5+
import { getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
6+
import { isSdkTransportEnabled } from '../../../../client/lib/sdk/sdkTransportEnabled';
7+
import { createDdpSdkStreamerAdapter } from '../../../../client/lib/sdk/streamerAdapter';
58
import { streamerCentral } from '../../../../client/lib/streamer';
69

710
// TODO implement API on Streamer to be able to listen to all streamed data
811
// this is a hacky way to listen to all streamed data from user-presence Streamer
912

13+
// Register the presence streamer on Meteor's connection. With the SDK transport
14+
// flag on, *also* register on the SDK socket so presence messages arriving on
15+
// either WS feed the same streamerCentral. With the flag off, only Meteor's
16+
// connection is used — duplicating the registration via the meteor-backed sdk
17+
// proxy would re-feed every frame back through streamerCentral via two paths.
1018
streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection });
19+
if (isSdkTransportEnabled()) {
20+
streamerCentral.setupDdpConnection('user-presence', createDdpSdkStreamerAdapter(getDdpSdk()));
21+
}
1122

1223
type args = [username: string, statusChanged?: UserStatus, statusText?: string];
1324

apps/meteor/app/ui-master/server/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ Meteor.startup(() => {
2828
injectIntoHead('noreferrer', `<meta name="referrer" content="${value}" />`);
2929
});
3030

31+
settings.watch<boolean>('Use_RC_SDK', (value) => {
32+
injectIntoHead('Use_RC_SDK', `<meta name="rc-sdk-transport-enabled" content="${value ? 'on' : 'off'}" />`);
33+
});
34+
3135
if (process.env.DISABLE_ANIMATION) {
3236
injectIntoHead(
3337
'disable-animation',

apps/meteor/app/utils/client/lib/SDKClient.ts

Lines changed: 135 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { DDPCommon } from 'meteor/ddp-common';
66
import { Meteor } from 'meteor/meteor';
77

88
import { APIClient } from './RestApiClient';
9+
import { ensureConnectedAndAuthenticated, getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
10+
import { isSdkTransportEnabled } from '../../../../client/lib/sdk/sdkTransportEnabled';
911

1012
declare module '@rocket.chat/ddp-client' {
1113
// eslint-disable-next-line @typescript-eslint/naming-convention
@@ -19,6 +21,8 @@ declare module '@rocket.chat/ddp-client' {
1921
}
2022
}
2123

24+
const sdkTransportEnabled = isSdkTransportEnabled();
25+
2226
const isChangedCollectionPayload = (
2327
msg: any,
2428
): msg is { msg: 'changed'; collection: string; fields: { eventName: string; args: unknown[] } } => {
@@ -142,13 +146,113 @@ const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys<StreamNa
142146
};
143147
};
144148

149+
const createNewDdpSdkStream = (
150+
streamProxy: Emitter<EventMap>,
151+
streamName: StreamNames,
152+
key: StreamKeys<StreamNames>,
153+
args: unknown[],
154+
): StreamMapValue => {
155+
const ee = new Emitter<{
156+
ready: [error: any] | [undefined, any];
157+
error: [error: any];
158+
stop: undefined;
159+
}>();
160+
const meta = { ready: false };
161+
162+
// Defer the actual `subscribe` until DDPSDK is authenticated. Without this,
163+
// stream subscriptions fired immediately after re-login (e.g. the
164+
// SubscriptionsCachedStore's `notify-user/<uid>/subscriptions-changed`
165+
// listener that re-arms via onLoggedIn) hit the SDK socket while it's
166+
// still anonymous — server rejects with `not-allowed`/`nosub`, the
167+
// stream's `ready` promise emits an error, and the cached store never
168+
// receives subsequent server events. The visible failure: an agent that
169+
// just took a livechat chat post-relogin sees the chat work but the
170+
// "Move to the queue" button never appears, because the new subscription
171+
// the server creates for that agent is never replicated to the client's
172+
// Subscriptions store, and pseudoRoom (= {...sub, ...room}) ends up with
173+
// no `u` for the canMoveQueue check.
174+
let subscription: ReturnType<ReturnType<typeof getDdpSdk>['client']['subscribe']> | undefined;
175+
let offCollection: (() => void) | undefined;
176+
let stopped = false;
177+
178+
void ensureConnectedAndAuthenticated()
179+
.catch(() => undefined)
180+
.then(() => {
181+
if (stopped) return;
182+
const sdk = getDdpSdk();
183+
subscription = sdk.client.subscribe(`stream-${streamName}`, key, { useCollection: false, args });
184+
185+
subscription
186+
.ready()
187+
.then(() => {
188+
if (stopped) return;
189+
meta.ready = true;
190+
ee.emit('ready', [undefined, { msg: 'ready', subs: [subscription!.id] }]);
191+
})
192+
.catch((err) => {
193+
if (stopped) return;
194+
ee.emit('ready', [err]);
195+
ee.emit('error', err);
196+
});
197+
198+
offCollection = sdk.client.onCollection(`stream-${streamName}`, (data: any) => {
199+
if (data?.msg !== 'changed') return;
200+
if (data.collection !== `stream-${streamName}`) return;
201+
if (data.fields?.eventName !== key) return;
202+
streamProxy.emit(`stream-${streamName}/${key}` as keyof EventMap, data.fields.args);
203+
});
204+
});
205+
206+
const onChange: ReturnType<ClientStream['subscribe']>['onChange'] = (cb) => {
207+
if (meta.ready) {
208+
cb({ msg: 'ready', subs: [] });
209+
return;
210+
}
211+
ee.once('ready', ([error, result]) => {
212+
if (error) {
213+
cb({ msg: 'nosub', id: '', error });
214+
return;
215+
}
216+
cb(result);
217+
});
218+
};
219+
220+
return {
221+
stop: () => {
222+
// Mirror Meteor's subscription semantics: explicit stop() does not fire the
223+
// 'stop' event (onStop is reserved for server-initiated closures).
224+
// Emitting it here would recurse through the onStop handler that
225+
// createStreamManager registers, which itself iterates the unsubList.
226+
stopped = true;
227+
offCollection?.();
228+
subscription?.stop();
229+
},
230+
onChange,
231+
ready: () => {
232+
if (meta.ready) return Promise.resolve();
233+
return new Promise<void>((resolve, reject) => {
234+
ee.once('ready', ([err]) => {
235+
if (err) {
236+
reject(err);
237+
return;
238+
}
239+
resolve();
240+
});
241+
});
242+
},
243+
onError: (cb: (...args: any[]) => void) => ee.once('error', (error) => cb(error)),
244+
onStop: (cb: () => void) => ee.once('stop', cb),
245+
get isReady() {
246+
return meta.ready;
247+
},
248+
unsubList: new Set(),
249+
};
250+
};
251+
145252
const createStreamManager = () => {
146253
// Emitter that replicates stream messages to registered callbacks
147254
const streamProxy = new Emitter<EventMap>();
148255

149-
// Collection of unsubscribe callbacks for each stream.
150-
// const proxyUnsubLists = new Map<string, Set<() => void>>();
151-
152256
const streams = new Map<string, StreamMapValue>();
153257

154258
Accounts.onLogout(() => {
@@ -157,13 +261,20 @@ const createStreamManager = () => {
157261
});
158262
});
159263

160-
Meteor.connection._stream.on('message', (rawMsg: string) => {
161-
const msg = DDPCommon.parseDDP(rawMsg);
162-
if (!isChangedCollectionPayload(msg)) {
163-
return;
164-
}
165-
streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any);
166-
});
264+
if (!sdkTransportEnabled) {
265+
// In legacy Meteor mode, stream frames arrive on Meteor.connection._stream
266+
// as `changed` collection messages — bridge them into streamProxy so the
267+
// per-stream callbacks fire. With SDK transport on, the frames arrive on
268+
// the SDK socket and createNewDdpSdkStream registers its own onCollection
269+
// listener instead.
270+
Meteor.connection._stream.on('message', (rawMsg: string) => {
271+
const msg = DDPCommon.parseDDP(rawMsg);
272+
if (!isChangedCollectionPayload(msg)) {
273+
return;
274+
}
275+
streamProxy.emit(`${msg.collection}/${msg.fields.eventName}` as any, msg.fields.args as any);
276+
});
277+
}
167278

168279
const stream: SDK['stream'] = <N extends StreamNames, K extends StreamKeys<N>>(
169280
name: N,
@@ -186,7 +297,11 @@ const createStreamManager = () => {
186297

187298
streamProxy.on(eventLiteral, proxyCallback);
188299

189-
const stream = streams.get(eventLiteral) || createNewMeteorStream(name, key, args);
300+
const stream =
301+
streams.get(eventLiteral) ||
302+
(sdkTransportEnabled
303+
? createNewDdpSdkStream(streamProxy, name as StreamNames, key as StreamKeys<StreamNames>, args)
304+
: createNewMeteorStream(name as StreamNames, key as StreamKeys<StreamNames>, args));
190305

191306
const stop = (): void => {
192307
streamProxy.off(eventLiteral, proxyCallback);
@@ -241,30 +356,26 @@ const createStreamManager = () => {
241356
export const createSDK = (rest: RestClientInterface) => {
242357
const { stream, stopAll } = createStreamManager();
243358

244-
const publish = (name: string, args: unknown[]) => {
245-
Meteor.call(`stream-${name}`, ...args);
246-
};
359+
const publish = sdkTransportEnabled
360+
? (name: string, args: unknown[]) => {
361+
// DDPSDK queues outbound frames until the WebSocket handshake completes,
362+
// so there's no need to gate on an isReady flag here.
363+
void getDdpSdk().client.callAsync(`stream-${name}`, ...args);
364+
}
365+
: (name: string, args: unknown[]) => {
366+
Meteor.call(`stream-${name}`, ...args);
367+
};
247368

248369
const call = <T extends keyof ServerMethods>(method: T, ...args: Parameters<ServerMethods[T]>): Promise<ReturnType<ServerMethods[T]>> => {
249370
return Meteor.callAsync(method, ...args);
250371
};
251372

252-
const disconnect = () => {
253-
Meteor.disconnect();
254-
};
255-
256-
const reconnect = () => {
257-
Meteor.reconnect();
258-
};
259-
260373
return {
261374
rest,
262375
stop: stopAll,
263376
stream,
264377
publish,
265378
call,
266-
disconnect,
267-
reconnect,
268379
};
269380
};
270381

apps/meteor/client/lib/cachedStores/CachedStore.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ export abstract class CachedStore<T extends IRocketChatRecord, U = T> implements
5050

5151
protected eventType: StreamNames;
5252

53-
private readonly version = 18;
53+
// Bumped from 18 → 19 to invalidate caches populated before the DDPSDK
54+
// wire encoding was switched from JSON to EJSON. Entries written by the
55+
// JSON window stored dates as ISO strings instead of Date instances, so
56+
// fields like subscription.ls would fail `.getTime()` when read back.
57+
private readonly version = 19;
5458

5559
private updatedAt = new Date(0);
5660

apps/meteor/client/lib/loggedIn.ts

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,61 @@
11
import { Accounts } from 'meteor/accounts-base';
22

3-
import { getUserId } from './user';
3+
import { isSdkTransportEnabled } from './sdk/sdkTransportEnabled';
4+
import { getUserId, userIdStore } from './user';
5+
6+
const sdkTransportEnabled = isSdkTransportEnabled();
47

58
const isLoggedIn = () => {
69
const uid = getUserId();
710
return !!uid;
811
};
912

13+
/**
14+
* Fire `cb` whenever the local userId transitions from absent → present.
15+
*
16+
* `Accounts.onLogin` would normally cover this, but Meteor only invokes
17+
* the onLogin hook from inside a Tracker.autorun that waits for
18+
* `Meteor.userAsync()` to resolve to a real user doc. When a login goes
19+
* through our REST fallback (e.g. logout → fresh login while DDPSDK is
20+
* reconnecting), the user document never lands in Meteor.users — it
21+
* normally arrives as a DDP collection frame, but the REST endpoint
22+
* only returns the method result. The autorun then sees a null user
23+
* forever, and onLogin never fires. By piggybacking on userIdStore (which
24+
* is updated synchronously the moment Accounts.connection.userId() is
25+
* set), we get a reliable login signal regardless of how the user doc
26+
* eventually arrives.
27+
*/
28+
const subscribeToLogin = (handler: () => void): (() => void) => {
29+
let lastSeen = userIdStore.getState();
30+
return userIdStore.subscribe((next) => {
31+
if (next === lastSeen) return;
32+
const wasLoggedOut = !lastSeen;
33+
lastSeen = next;
34+
if (next && wasLoggedOut) {
35+
handler();
36+
}
37+
});
38+
};
39+
1040
export const whenLoggedIn = () => {
1141
if (isLoggedIn()) {
1242
return Promise.resolve();
1343
}
1444

45+
if (!sdkTransportEnabled) {
46+
// Flag off: develop's exact implementation — wait on Accounts.onLogin only,
47+
// no userIdStore bridge.
48+
return new Promise<void>((resolve) => {
49+
const subscription = Accounts.onLogin(() => {
50+
subscription.stop();
51+
resolve();
52+
});
53+
});
54+
}
55+
1556
return new Promise<void>((resolve) => {
16-
const subscription = Accounts.onLogin(() => {
17-
subscription.stop();
57+
const stop = subscribeToLogin(() => {
58+
stop();
1859
resolve();
1960
});
2061
});
@@ -30,11 +71,18 @@ export const onLoggedIn = (cb: (() => () => void) | (() => Promise<() => void>)
3071
}
3172
};
3273

33-
const subscription = Accounts.onLogin(handler);
74+
// With the SDK transport on, login can land via REST (ddpOverREST) without
75+
// filling Meteor.users — Accounts.onLogin's autorun would never fire.
76+
// Bridge off userIdStore as belt-and-braces. With the flag off, the legacy
77+
// DDP path populates Meteor.users and Accounts.onLogin fires reliably; the
78+
// extra userIdStore subscription would just double-fire callbacks.
79+
const accountsSubscription = Accounts.onLogin(handler);
80+
const stopUserIdSubscription = sdkTransportEnabled ? subscribeToLogin(handler) : undefined;
3481
if (isLoggedIn()) handler();
3582

3683
return () => {
37-
subscription.stop();
84+
accountsSubscription.stop();
85+
stopUserIdSubscription?.();
3886
cleanup?.();
3987
};
4088
};

apps/meteor/client/lib/presence.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,30 @@ import type { EventHandlerOf } from '@rocket.chat/emitter';
44
import { Emitter } from '@rocket.chat/emitter';
55
import { Meteor } from 'meteor/meteor';
66

7+
import { getDdpSdk } from './sdk/ddpSdk';
8+
import { isSdkTransportEnabled } from './sdk/sdkTransportEnabled';
79
import { sdk } from '../../app/utils/client/lib/SDKClient';
810

11+
const sdkTransportEnabled = isSdkTransportEnabled();
12+
13+
const subscribeUserPresence = (payload: { added?: string[]; removed?: string[] }): void => {
14+
if (!sdkTransportEnabled) {
15+
// Flag off: route directly through Meteor.subscribe — bit-for-bit develop
16+
// behaviour, no DDPSDK socket created, no proxy in the call path.
17+
Meteor.subscribe('stream-user-presence', '', payload);
18+
return;
19+
}
20+
const ddp = getDdpSdk();
21+
if (ddp.connection.status === 'connected' && ddp.account.uid) {
22+
// Fire the command-style subscription over our SDK; it has no lifecycle
23+
// (the server registers the added/removed uids and moves on), matching
24+
// Meteor.subscribe's behaviour here.
25+
ddp.client.subscribe('stream-user-presence', '', payload);
26+
return;
27+
}
28+
Meteor.subscribe('stream-user-presence', '', payload);
29+
};
30+
931
type InternalEvents = {
1032
remove: IUser['_id'];
1133
reset: undefined;
@@ -55,7 +77,7 @@ const getPresence = ((): ((uid: UserPresence['_id']) => void) => {
5577
const ids = Array.from(currentUids);
5678
const removed = Array.from(deletedUids);
5779

58-
Meteor.subscribe('stream-user-presence', '', {
80+
subscribeUserPresence({
5981
...(ids.length > 0 && { added: Array.from(currentUids) }),
6082
...(removed.length && { removed: Array.from(deletedUids) }),
6183
});

0 commit comments

Comments
 (0)