Skip to content

Commit caa949e

Browse files
committed
feat(sdk): bridge user-presence streamer onto DDPSDK listener
The client/lib/presence.ts subscribe already routes through DDPSDK when the SDK is authenticated, but app/notifications/client/lib/Presence.ts was still registering streamerCentral against Meteor.connection only. That meant user-presence stream-changed frames arrived on the DDPSDK socket while the listener was wired to Meteor — the UI would miss status updates whenever the subscribe path picked DDPSDK. Add a thin DDPSDK → StreamerDDPConnection adapter (createDdpSdkStreamerAdapter) that exposes a compatible _stream.on('message', ...) surface by re-serialising DDPSDK's parsed payloads with EJSON so StreamerCentral's JSON-parse path round-trips dates identically to the raw Meteor frames. Call streamerCentral.setupDdpConnection a second time with the adapter: StreamerCentral tracks registration per connection object via hasMeteorStreamerEventListeners, so both transports get a listener and the streamerCentral emitter fires once per frame regardless of which socket delivered it.
1 parent eeeb957 commit caa949e

2 files changed

Lines changed: 58 additions & 0 deletions

File tree

apps/meteor/app/notifications/client/lib/Presence.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,21 @@ import { UserStatus } from '@rocket.chat/core-typings';
22
import { Meteor } from 'meteor/meteor';
33

44
import { Presence } from '../../../../client/lib/presence';
5+
import { getDdpSdk } from '../../../../client/lib/sdk/ddpSdk';
6+
import { createDdpSdkStreamerAdapter } from '../../../../client/lib/sdk/streamerAdapter';
57
import { streamerCentral } from '../../../../client/lib/streamer';
68

79
// TODO implement API on Streamer to be able to listen to all streamed data
810
// this is a hacky way to listen to all streamed data from user-presence Streamer
911

12+
// Register the presence streamer on BOTH transports. The subscribe call in
13+
// client/lib/presence.ts routes through DDPSDK when it's ready and falls back
14+
// to Meteor otherwise, so the corresponding messages can arrive on either WS.
15+
// StreamerCentral uses a per-connection `hasMeteorStreamerEventListeners` flag,
16+
// so calling setupDdpConnection twice with distinct connection objects
17+
// installs both listeners without duplicating within the same transport.
1018
streamerCentral.getStreamer('user-presence', { ddpConnection: Meteor.connection });
19+
streamerCentral.setupDdpConnection('user-presence', createDdpSdkStreamerAdapter(getDdpSdk()));
1120

1221
type args = [username: string, statusChanged?: UserStatus, statusText?: string];
1322

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import type { DDPSDK } from '@rocket.chat/ddp-client';
2+
import EJSON from 'ejson';
3+
4+
type StreamerDDPConnection = {
5+
_stream: {
6+
on: {
7+
(key: 'message', callback: (data: string) => void): void;
8+
(key: 'reset', callback: () => void): void;
9+
};
10+
};
11+
subscribe(name: string, ...args: unknown[]): { stop: () => void };
12+
call(methodName: string, ...args: unknown[]): void;
13+
hasMeteorStreamerEventListeners?: boolean;
14+
};
15+
16+
/**
17+
* Presents a DDPSDK instance with the shape `StreamerCentral` expects
18+
* (the subset of Meteor.connection exposed as `StreamerDDPConnection`).
19+
* Lets us run the existing streamer infrastructure against our SDK's
20+
* WebSocket without rewriting StreamerCentral.
21+
*/
22+
export const createDdpSdkStreamerAdapter = (sdk: DDPSDK): StreamerDDPConnection => {
23+
const ddp = (
24+
sdk.client as unknown as { ddp: { onMessage: (cb: (payload: unknown) => void) => () => void } }
25+
).ddp;
26+
27+
return {
28+
_stream: {
29+
on: ((key: 'message' | 'reset', callback: ((data: string) => void) | (() => void)): void => {
30+
if (key === 'message') {
31+
ddp.onMessage((payload) => {
32+
// StreamerCentral re-parses the string with JSON.parse; hand it
33+
// an EJSON-serialised payload so Dates/undefined round-trip the
34+
// same way Meteor.connection's raw WS frames did.
35+
(callback as (data: string) => void)(EJSON.stringify(payload));
36+
});
37+
return;
38+
}
39+
if (key === 'reset') {
40+
sdk.connection.on('disconnected', callback as () => void);
41+
}
42+
}) as StreamerDDPConnection['_stream']['on'],
43+
},
44+
subscribe: (name: string, ...args: unknown[]) => sdk.client.subscribe(name, ...args),
45+
call: (methodName: string, ...args: unknown[]): void => {
46+
void sdk.client.callAsync(methodName, ...args);
47+
},
48+
};
49+
};

0 commit comments

Comments
 (0)