1- import { ClusterAdapterWithHeartbeat , MessageType } from "socket.io-adapter" ;
1+ import { ClusterAdapter , MessageType } from "socket.io-adapter" ;
22import type {
33 ClusterAdapterOptions ,
44 ClusterMessage ,
@@ -13,11 +13,16 @@ import {
1313 hasBinary ,
1414 GETDEL ,
1515 SET ,
16+ SUBSCRIBE ,
1617 XADD ,
1718 XRANGE ,
1819 XREAD ,
1920 hashCode ,
2021 duplicateClient ,
22+ SPUBLISH ,
23+ PUBLISH ,
24+ PUBSUB ,
25+ SSUBSCRIBE ,
2126} from "./util" ;
2227
2328const debug = debugModule ( "socket.io-redis-streams-adapter" ) ;
@@ -42,6 +47,17 @@ export interface RedisStreamsAdapterOptions {
4247 * @default 1
4348 */
4449 streamCount ?: number ;
50+ /**
51+ * The prefix of the Redis PUB/SUB channels used to communicate between the nodes.
52+ * @default "socket.io"
53+ */
54+ channelPrefix ?: string ;
55+ /**
56+ * Whether to use sharded PUB/SUB (added in Redis 7.0) to communicate between the nodes.
57+ * @default false
58+ * @see https://redis.io/docs/latest/develop/pubsub/#sharded-pubsub
59+ */
60+ useShardedPubSub ?: boolean ;
4561 /**
4662 * The maximum size of the stream. Almost exact trimming (~) is used.
4763 * @default 10_000
@@ -168,6 +184,8 @@ export function createAdapter(
168184 {
169185 streamName : "socket.io" ,
170186 streamCount : 1 ,
187+ channelPrefix : "socket.io" ,
188+ useShardedPubSub : false ,
171189 maxLen : 10_000 ,
172190 readCount : 100 ,
173191 blockTimeInMs : 5_000 ,
@@ -197,8 +215,19 @@ export function createAdapter(
197215 }
198216 } ) ;
199217
218+ const subClientPromise = duplicateClient ( redisClient ) ;
219+
220+ controller . signal . addEventListener ( "abort" , ( ) => {
221+ subClientPromise . then ( ( subClient ) => subClient . disconnect ( ) ) ;
222+ } ) ;
223+
200224 return function ( nsp ) {
201- const adapter = new RedisStreamsAdapter ( nsp , redisClient , options ) ;
225+ const adapter = new RedisStreamsAdapter (
226+ nsp ,
227+ redisClient ,
228+ subClientPromise ,
229+ options
230+ ) ;
202231 namespaceToAdapters . set ( nsp . name , adapter ) ;
203232
204233 const defaultClose = adapter . close ;
@@ -229,28 +258,71 @@ function computeStreamName(
229258 }
230259}
231260
232- class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
261+ function isEphemeral ( message : ClusterMessage ) {
262+ const isBroadcastWithAck =
263+ message . type === MessageType . BROADCAST &&
264+ message . data . requestId !== undefined ;
265+
266+ return (
267+ isBroadcastWithAck ||
268+ [ MessageType . SERVER_SIDE_EMIT , MessageType . FETCH_SOCKETS ] . includes (
269+ message . type
270+ )
271+ ) ;
272+ }
273+
274+ class RedisStreamsAdapter extends ClusterAdapter {
233275 readonly #redisClient: any ;
234276 readonly #opts: Required < RedisStreamsAdapterOptions > ;
235277 readonly #streamName: string ;
278+ readonly #publicChannel: string ;
279+ readonly #privateChannel: string ;
236280
237281 constructor (
238- nsp ,
239- redisClient ,
282+ nsp : any ,
283+ redisClient : any ,
284+ subClientPromise : Promise < any > ,
240285 opts : Required < RedisStreamsAdapterOptions > & ClusterAdapterOptions
241286 ) {
242- super ( nsp , opts ) ;
287+ super ( nsp ) ;
243288 this . #redisClient = redisClient ;
244289 this . #opts = opts ;
245290 // each namespace is routed to a specific stream to ensure the ordering of messages
246291 this . #streamName = computeStreamName ( nsp . name , opts ) ;
247292
248- this . init ( ) ;
293+ this . #publicChannel = `${ opts . channelPrefix } #${ nsp . name } #` ;
294+ this . #privateChannel = `${ opts . channelPrefix } #${ nsp . name } #${ this . uid } #` ;
295+
296+ subClientPromise . then ( ( subClient ) => {
297+ ( this . #opts. useShardedPubSub ? SSUBSCRIBE : SUBSCRIBE ) (
298+ subClient ,
299+ [ this . #publicChannel, this . #privateChannel] ,
300+ ( payload : Buffer ) => {
301+ try {
302+ const message = decode ( payload ) as ClusterMessage ;
303+ this . onMessage ( message ) ;
304+ } catch ( e ) {
305+ return debug ( "invalid format: %s" , e . message ) ;
306+ }
307+ }
308+ ) ;
309+ } ) ;
249310 }
250311
251312 override doPublish ( message : ClusterMessage ) {
252313 debug ( "publishing %o" , message ) ;
253314
315+ if ( isEphemeral ( message ) ) {
316+ // ephemeral messages are sent with Redis PUB/SUB
317+ const payload = Buffer . from ( encode ( message ) ) ;
318+ ( this . #opts. useShardedPubSub ? SPUBLISH : PUBLISH ) (
319+ this . #redisClient,
320+ this . #publicChannel,
321+ payload
322+ ) ;
323+ return Promise . resolve ( "" ) ;
324+ }
325+
254326 return XADD (
255327 this . #redisClient,
256328 this . #streamName,
@@ -263,8 +335,15 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
263335 requesterUid : ServerId ,
264336 response : ClusterResponse
265337 ) : Promise < void > {
266- // @ts -ignore
267- return this . doPublish ( response ) ;
338+ const responseChannel = `${ this . #opts. channelPrefix } #${
339+ this . nsp . name
340+ } #${ requesterUid } #`;
341+ const payload = Buffer . from ( encode ( response ) ) ;
342+ return ( this . #opts. useShardedPubSub ? SPUBLISH : PUBLISH ) (
343+ this . #redisClient,
344+ responseChannel ,
345+ payload
346+ ) . then ( ) ;
268347 }
269348
270349 private encode ( message : ClusterMessage ) : RawClusterMessage {
@@ -335,6 +414,14 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
335414 return message ;
336415 }
337416
417+ override serverCount ( ) : Promise < number > {
418+ return PUBSUB (
419+ this . #redisClient,
420+ this . #opts. useShardedPubSub ? "SHARDNUMSUB" : "NUMSUB" ,
421+ this . #publicChannel
422+ ) ;
423+ }
424+
338425 override persistSession ( session ) {
339426 debug ( "persisting session %o" , session ) ;
340427 const sessionKey = this . #opts. sessionKeyPrefix + session . pid ;
0 commit comments