@@ -6,8 +6,6 @@ import { DDPCommon } from 'meteor/ddp-common';
66import { Meteor } from 'meteor/meteor' ;
77
88import { APIClient } from './RestApiClient' ;
9- import { ensureConnectedAndAuthenticated , getDdpSdk } from '../../../../client/lib/sdk/ddpSdk' ;
10- import { isSdkTransportEnabled } from '../../../../client/lib/sdk/sdkTransportEnabled' ;
119
1210declare module '@rocket.chat/ddp-client' {
1311 // eslint-disable-next-line @typescript-eslint/naming-convention
@@ -21,8 +19,6 @@ declare module '@rocket.chat/ddp-client' {
2119 }
2220}
2321
24- const sdkTransportEnabled = isSdkTransportEnabled ( ) ;
25-
2622const isChangedCollectionPayload = (
2723 msg : any ,
2824) : msg is { msg : 'changed' ; collection : string ; fields : { eventName : string ; args : unknown [ ] } } => {
@@ -146,113 +142,13 @@ const createNewMeteorStream = (streamName: StreamNames, key: StreamKeys<StreamNa
146142 } ;
147143} ;
148144
149- const createNewDdpSdkStream = (
150- streamProxy : Emitter < EventMap > ,
151- streamName : StreamNames ,
152- key : StreamKeys < StreamNames > ,
153- args : unknown [ ] ,
154- ) : StreamMapValue => {
155- const ee = new Emitter < {
156- ready : [ error : any ] | [ undefined , any ] ;
157- error : [ error : any ] ;
158- stop : undefined ;
159- } > ( ) ;
160- const meta = { ready : false } ;
161-
162- // Defer the actual `subscribe` until DDPSDK is authenticated. Without this,
163- // stream subscriptions fired immediately after re-login (e.g. the
164- // SubscriptionsCachedStore's `notify-user/<uid>/subscriptions-changed`
165- // listener that re-arms via onLoggedIn) hit the SDK socket while it's
166- // still anonymous — server rejects with `not-allowed`/`nosub`, the
167- // stream's `ready` promise emits an error, and the cached store never
168- // receives subsequent server events. The visible failure: an agent that
169- // just took a livechat chat post-relogin sees the chat work but the
170- // "Move to the queue" button never appears, because the new subscription
171- // the server creates for that agent is never replicated to the client's
172- // Subscriptions store, and pseudoRoom (= {...sub, ...room}) ends up with
173- // no `u` for the canMoveQueue check.
174- let subscription : ReturnType < ReturnType < typeof getDdpSdk > [ 'client' ] [ 'subscribe' ] > | undefined ;
175- let offCollection : ( ( ) => void ) | undefined ;
176- let stopped = false ;
177-
178- void ensureConnectedAndAuthenticated ( )
179- . catch ( ( ) => undefined )
180- . then ( ( ) => {
181- if ( stopped ) return ;
182- const sdk = getDdpSdk ( ) ;
183- subscription = sdk . client . subscribe ( `stream-${ streamName } ` , key , { useCollection : false , args } ) ;
184-
185- subscription
186- . ready ( )
187- . then ( ( ) => {
188- if ( stopped ) return ;
189- meta . ready = true ;
190- ee . emit ( 'ready' , [ undefined , { msg : 'ready' , subs : [ subscription ! . id ] } ] ) ;
191- } )
192- . catch ( ( err ) => {
193- if ( stopped ) return ;
194- ee . emit ( 'ready' , [ err ] ) ;
195- ee . emit ( 'error' , err ) ;
196- } ) ;
197-
198- offCollection = sdk . client . onCollection ( `stream-${ streamName } ` , ( data : any ) => {
199- if ( data ?. msg !== 'changed' ) return ;
200- if ( data . collection !== `stream-${ streamName } ` ) return ;
201- if ( data . fields ?. eventName !== key ) return ;
202- streamProxy . emit ( `stream-${ streamName } /${ key } ` as keyof EventMap , data . fields . args ) ;
203- } ) ;
204- } ) ;
205-
206- const onChange : ReturnType < ClientStream [ 'subscribe' ] > [ 'onChange' ] = ( cb ) => {
207- if ( meta . ready ) {
208- cb ( { msg : 'ready' , subs : [ ] } ) ;
209- return ;
210- }
211- ee . once ( 'ready' , ( [ error , result ] ) => {
212- if ( error ) {
213- cb ( { msg : 'nosub' , id : '' , error } ) ;
214- return ;
215- }
216- cb ( result ) ;
217- } ) ;
218- } ;
219-
220- return {
221- stop : ( ) => {
222- // Mirror Meteor's subscription semantics: explicit stop() does not fire the
223- // 'stop' event (onStop is reserved for server-initiated closures).
224- // Emitting it here would recurse through the onStop handler that
225- // createStreamManager registers, which itself iterates the unsubList.
226- stopped = true ;
227- offCollection ?.( ) ;
228- subscription ?. stop ( ) ;
229- } ,
230- onChange,
231- ready : ( ) => {
232- if ( meta . ready ) return Promise . resolve ( ) ;
233- return new Promise < void > ( ( resolve , reject ) => {
234- ee . once ( 'ready' , ( [ err ] ) => {
235- if ( err ) {
236- reject ( err ) ;
237- return ;
238- }
239- resolve ( ) ;
240- } ) ;
241- } ) ;
242- } ,
243- onError : ( cb : ( ...args : any [ ] ) => void ) => ee . once ( 'error' , ( error ) => cb ( error ) ) ,
244- onStop : ( cb : ( ) => void ) => ee . once ( 'stop' , cb ) ,
245- get isReady ( ) {
246- return meta . ready ;
247- } ,
248- unsubList : new Set ( ) ,
249- } ;
250- } ;
251-
252145const createStreamManager = ( ) => {
253146 // Emitter that replicates stream messages to registered callbacks
254147 const streamProxy = new Emitter < EventMap > ( ) ;
255148
149+ // Collection of unsubscribe callbacks for each stream.
150+ // const proxyUnsubLists = new Map<string, Set<() => void>>();
151+
256152 const streams = new Map < string , StreamMapValue > ( ) ;
257153
258154 Accounts . onLogout ( ( ) => {
@@ -261,20 +157,13 @@ const createStreamManager = () => {
261157 } ) ;
262158 } ) ;
263159
264- if ( ! sdkTransportEnabled ) {
265- // In legacy Meteor mode, stream frames arrive on Meteor.connection._stream
266- // as `changed` collection messages — bridge them into streamProxy so the
267- // per-stream callbacks fire. With SDK transport on, the frames arrive on
268- // the SDK socket and createNewDdpSdkStream registers its own onCollection
269- // listener instead.
270- Meteor . connection . _stream . on ( 'message' , ( rawMsg : string ) => {
271- const msg = DDPCommon . parseDDP ( rawMsg ) ;
272- if ( ! isChangedCollectionPayload ( msg ) ) {
273- return ;
274- }
275- streamProxy . emit ( `${ msg . collection } /${ msg . fields . eventName } ` as any , msg . fields . args as any ) ;
276- } ) ;
277- }
160+ Meteor . connection . _stream . on ( 'message' , ( rawMsg : string ) => {
161+ const msg = DDPCommon . parseDDP ( rawMsg ) ;
162+ if ( ! isChangedCollectionPayload ( msg ) ) {
163+ return ;
164+ }
165+ streamProxy . emit ( `${ msg . collection } /${ msg . fields . eventName } ` as any , msg . fields . args as any ) ;
166+ } ) ;
278167
279168 const stream : SDK [ 'stream' ] = < N extends StreamNames , K extends StreamKeys < N > > (
280169 name : N ,
@@ -297,11 +186,7 @@ const createStreamManager = () => {
297186
298187 streamProxy . on ( eventLiteral , proxyCallback ) ;
299188
300- const stream =
301- streams . get ( eventLiteral ) ||
302- ( sdkTransportEnabled
303- ? createNewDdpSdkStream ( streamProxy , name as StreamNames , key as StreamKeys < StreamNames > , args )
304- : createNewMeteorStream ( name as StreamNames , key as StreamKeys < StreamNames > , args ) ) ;
189+ const stream = streams . get ( eventLiteral ) || createNewMeteorStream ( name , key , args ) ;
305190
306191 const stop = ( ) : void => {
307192 streamProxy . off ( eventLiteral , proxyCallback ) ;
@@ -356,26 +241,30 @@ const createStreamManager = () => {
356241export const createSDK = ( rest : RestClientInterface ) => {
357242 const { stream, stopAll } = createStreamManager ( ) ;
358243
359- const publish = sdkTransportEnabled
360- ? ( name : string , args : unknown [ ] ) => {
361- // DDPSDK queues outbound frames until the WebSocket handshake completes,
362- // so there's no need to gate on an isReady flag here.
363- void getDdpSdk ( ) . client . callAsync ( `stream-${ name } ` , ...args ) ;
364- }
365- : ( name : string , args : unknown [ ] ) => {
366- Meteor . call ( `stream-${ name } ` , ...args ) ;
367- } ;
244+ const publish = ( name : string , args : unknown [ ] ) => {
245+ Meteor . call ( `stream-${ name } ` , ...args ) ;
246+ } ;
368247
369248 const call = < T extends keyof ServerMethods > ( method : T , ...args : Parameters < ServerMethods [ T ] > ) : Promise < ReturnType < ServerMethods [ T ] > > => {
370249 return Meteor . callAsync ( method , ...args ) ;
371250 } ;
372251
252+ const disconnect = ( ) => {
253+ Meteor . disconnect ( ) ;
254+ } ;
255+
256+ const reconnect = ( ) => {
257+ Meteor . reconnect ( ) ;
258+ } ;
259+
373260 return {
374261 rest,
375262 stop : stopAll ,
376263 stream,
377264 publish,
378265 call,
266+ disconnect,
267+ reconnect,
379268 } ;
380269} ;
381270
0 commit comments