Skip to content

Commit dacb88d

Browse files
feat: use PUB/SUB for fetchSockets() and serverSideEmit() requests
Two new options: - channelPrefix: the prefix of the Redis PUB/SUB channels (defaults to "socket.io") - useShardedPubSub: whether to use sharded PUB/SUB (added in Redis 7.0) (defaults to false) Each server will use two PUB/SUB channels: - one common for receiving requests - one private for receving responses Related: - #6 - #29 - #31 - #32
1 parent e7653c4 commit dacb88d

8 files changed

Lines changed: 230 additions & 23 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121

2222
services:
2323
redis:
24-
image: redis
24+
image: redis:7
2525
options: >-
2626
--health-cmd "redis-cli ping"
2727
--health-interval 10s

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ io.listen(3000);
131131
|---------------------|-----------------------------------------------------------------------------------------------------------------------|----------------|
132132
| `streamName` | The name of the Redis stream. | `socket.io` |
133133
| `streamCount` | The number of streams to use to scale horizontally. | `1` |
134+
| `channelPrefix` | The prefix of the Redis PUB/SUB channels used to communicate between the nodes. | `socket.io` |
135+
| `useShardedPubSub` | Whether to use sharded PUB/SUB (added in Redis 7.0) to communicate between the nodes. | `false` |
134136
| `maxLen` | The maximum size of the stream. Almost exact trimming (~) is used. | `10_000` |
135137
| `readCount` | The number of elements to fetch per XREAD call. | `100` |
136138
| `blockTimeInMs` | The number of ms before the XREAD call times out. | `5_000` |

compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
services:
22
redis:
3-
image: redis:5
3+
image: redis:7
44
ports:
55
- "6379:6379"
66

lib/adapter.ts

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ClusterAdapterWithHeartbeat, MessageType } from "socket.io-adapter";
1+
import { ClusterAdapter, MessageType } from "socket.io-adapter";
22
import type {
33
ClusterAdapterOptions,
44
ClusterMessage,
@@ -13,11 +13,16 @@ import {
1313
hasBinary,
1414
GETDEL,
1515
SET,
16+
SUBSCRIBE,
1617
XADD,
1718
XRANGE,
1819
XREAD,
1920
hashCode,
2021
duplicateClient,
22+
SPUBLISH,
23+
PUBLISH,
24+
PUBSUB,
25+
SSUBSCRIBE,
2126
} from "./util";
2227

2328
const debug = debugModule("socket.io-redis-streams-adapter");
@@ -42,6 +47,17 @@ export interface RedisStreamsAdapterOptions {
4247
* @default 1
4348
*/
4449
streamCount?: number;
50+
/**
51+
* The prefix of the Redis PUB/SUB channels used to communicate between the nodes.
52+
* @default "socket.io"
53+
*/
54+
channelPrefix?: string;
55+
/**
56+
* Whether to use sharded PUB/SUB (added in Redis 7.0) to communicate between the nodes.
57+
* @default false
58+
* @see https://redis.io/docs/latest/develop/pubsub/#sharded-pubsub
59+
*/
60+
useShardedPubSub?: boolean;
4561
/**
4662
* The maximum size of the stream. Almost exact trimming (~) is used.
4763
* @default 10_000
@@ -168,6 +184,8 @@ export function createAdapter(
168184
{
169185
streamName: "socket.io",
170186
streamCount: 1,
187+
channelPrefix: "socket.io",
188+
useShardedPubSub: false,
171189
maxLen: 10_000,
172190
readCount: 100,
173191
blockTimeInMs: 5_000,
@@ -197,8 +215,19 @@ export function createAdapter(
197215
}
198216
});
199217

218+
const subClientPromise = duplicateClient(redisClient);
219+
220+
controller.signal.addEventListener("abort", () => {
221+
subClientPromise.then((subClient) => subClient.disconnect());
222+
});
223+
200224
return function (nsp) {
201-
const adapter = new RedisStreamsAdapter(nsp, redisClient, options);
225+
const adapter = new RedisStreamsAdapter(
226+
nsp,
227+
redisClient,
228+
subClientPromise,
229+
options
230+
);
202231
namespaceToAdapters.set(nsp.name, adapter);
203232

204233
const defaultClose = adapter.close;
@@ -229,28 +258,70 @@ function computeStreamName(
229258
}
230259
}
231260

232-
class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
261+
function isEphemeral(message: ClusterMessage) {
262+
const isBroadcastWithAck =
263+
message.type === MessageType.BROADCAST &&
264+
message.data.requestId !== undefined;
265+
266+
return (
267+
isBroadcastWithAck ||
268+
[MessageType.SERVER_SIDE_EMIT, MessageType.FETCH_SOCKETS].includes(
269+
message.type
270+
)
271+
);
272+
}
273+
274+
class RedisStreamsAdapter extends ClusterAdapter {
233275
readonly #redisClient: any;
234276
readonly #opts: Required<RedisStreamsAdapterOptions>;
235277
readonly #streamName: string;
278+
readonly #publicChannel: string;
236279

237280
constructor(
238-
nsp,
239-
redisClient,
281+
nsp: any,
282+
redisClient: any,
283+
subClientPromise: Promise<any>,
240284
opts: Required<RedisStreamsAdapterOptions> & ClusterAdapterOptions
241285
) {
242-
super(nsp, opts);
286+
super(nsp);
243287
this.#redisClient = redisClient;
244288
this.#opts = opts;
245289
// each namespace is routed to a specific stream to ensure the ordering of messages
246290
this.#streamName = computeStreamName(nsp.name, opts);
247291

248-
this.init();
292+
this.#publicChannel = `${opts.channelPrefix}#${nsp.name}#`;
293+
const privateChannel = `${opts.channelPrefix}#${nsp.name}#${this.uid}#`;
294+
295+
subClientPromise.then((subClient) => {
296+
(this.#opts.useShardedPubSub ? SSUBSCRIBE : SUBSCRIBE)(
297+
subClient,
298+
[this.#publicChannel, privateChannel],
299+
(payload: Buffer) => {
300+
try {
301+
const message = decode(payload) as ClusterMessage;
302+
this.onMessage(message);
303+
} catch (e) {
304+
return debug("invalid format: %s", e.message);
305+
}
306+
}
307+
);
308+
});
249309
}
250310

251311
override doPublish(message: ClusterMessage) {
252312
debug("publishing %o", message);
253313

314+
if (isEphemeral(message)) {
315+
// ephemeral messages are sent with Redis PUB/SUB
316+
const payload = Buffer.from(encode(message));
317+
(this.#opts.useShardedPubSub ? SPUBLISH : PUBLISH)(
318+
this.#redisClient,
319+
this.#publicChannel,
320+
payload
321+
);
322+
return Promise.resolve("");
323+
}
324+
254325
return XADD(
255326
this.#redisClient,
256327
this.#streamName,
@@ -263,8 +334,15 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
263334
requesterUid: ServerId,
264335
response: ClusterResponse
265336
): Promise<void> {
266-
// @ts-ignore
267-
return this.doPublish(response);
337+
const responseChannel = `${this.#opts.channelPrefix}#${
338+
this.nsp.name
339+
}#${requesterUid}#`;
340+
const payload = Buffer.from(encode(response));
341+
return (this.#opts.useShardedPubSub ? SPUBLISH : PUBLISH)(
342+
this.#redisClient,
343+
responseChannel,
344+
payload
345+
).then();
268346
}
269347

270348
private encode(message: ClusterMessage): RawClusterMessage {
@@ -335,6 +413,14 @@ class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
335413
return message;
336414
}
337415

416+
override serverCount(): Promise<number> {
417+
return PUBSUB(
418+
this.#redisClient,
419+
this.#opts.useShardedPubSub ? "SHARDNUMSUB" : "NUMSUB",
420+
this.#publicChannel
421+
);
422+
}
423+
338424
override persistSession(session) {
339425
debug("persisting session %o", session);
340426
const sessionKey = this.#opts.sessionKeyPrefix + session.pid;

lib/util.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ function isRedisV4Client(redisClient: any) {
4242

4343
export async function duplicateClient(redisClient: any) {
4444
const newClient = redisClient.duplicate();
45+
46+
newClient.on("error", (err) => {
47+
// ignore errors
48+
});
49+
4550
if (isRedisV4Client(redisClient)) {
4651
await newClient.connect();
4752
}
@@ -203,3 +208,107 @@ export function hashCode(str: string) {
203208
}
204209
return hash;
205210
}
211+
212+
export function PUBLISH(redisClient: any, channel: string, payload: Buffer) {
213+
return redisClient.publish(channel, payload);
214+
}
215+
216+
export function SPUBLISH(redisClient: any, channel: string, payload: Buffer) {
217+
if (isRedisV4Client(redisClient)) {
218+
return redisClient.sPublish(channel, payload);
219+
} else {
220+
return redisClient.spublish(channel, payload);
221+
}
222+
}
223+
224+
const RETURN_BUFFERS = true;
225+
226+
export function SUBSCRIBE(
227+
subClient: any,
228+
channels: string[],
229+
listener: (payload: Buffer) => void
230+
) {
231+
if (isRedisV4Client(subClient)) {
232+
subClient.subscribe(channels, listener, RETURN_BUFFERS);
233+
} else {
234+
subClient.subscribe(channels);
235+
subClient.on("messageBuffer", (channel: Buffer, payload: Buffer) => {
236+
if (channels.includes(channel.toString())) {
237+
listener(payload);
238+
}
239+
});
240+
}
241+
}
242+
243+
export function SSUBSCRIBE(
244+
subClient: any,
245+
channels: string[],
246+
listener: (payload: Buffer) => void
247+
) {
248+
if (isRedisV4Client(subClient)) {
249+
// note: we could also have used a hash tag ({...}) to ensure the channels are mapped to the same slot
250+
for (const channel of channels) {
251+
subClient.sSubscribe(channel, listener, RETURN_BUFFERS);
252+
}
253+
} else {
254+
for (const channel of channels) {
255+
subClient.ssubscribe(channel);
256+
}
257+
subClient.on("smessageBuffer", (channel: Buffer, payload: Buffer) => {
258+
if (channels.includes(channel.toString())) {
259+
listener(payload);
260+
}
261+
});
262+
}
263+
}
264+
265+
function parseNumSubResponse(res: string[]) {
266+
return parseInt(res[1], 10);
267+
}
268+
269+
function sumValues(values) {
270+
return values.reduce((acc, val) => acc + val, 0);
271+
}
272+
273+
export function PUBSUB(
274+
redisClient: any,
275+
arg: "NUMSUB" | "SHARDNUMSUB",
276+
channel: string
277+
) {
278+
if (redisClient.constructor.name === "Cluster" || redisClient.isCluster) {
279+
// ioredis cluster
280+
return Promise.all(
281+
redisClient.nodes().map((node) => {
282+
return node
283+
.send_command("PUBSUB", [arg, channel])
284+
.then(parseNumSubResponse);
285+
})
286+
).then(sumValues);
287+
} else if (isRedisV4Client(redisClient)) {
288+
const isCluster = Array.isArray(redisClient.masters);
289+
if (isCluster) {
290+
// redis@4 cluster
291+
const nodes = redisClient.masters;
292+
return Promise.all(
293+
nodes.map((node) => {
294+
return node.client
295+
.sendCommand(["PUBSUB", arg, channel])
296+
.then(parseNumSubResponse);
297+
})
298+
).then(sumValues);
299+
} else {
300+
// redis@4 standalone
301+
return redisClient
302+
.sendCommand(["PUBSUB", arg, channel])
303+
.then(parseNumSubResponse);
304+
}
305+
} else {
306+
// ioredis / redis@3 standalone
307+
return new Promise((resolve, reject) => {
308+
redisClient.send_command("PUBSUB", [arg, channel], (err, numSub) => {
309+
if (err) return reject(err);
310+
resolve(parseNumSubResponse(numSub));
311+
});
312+
});
313+
}
314+
}

test/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,9 @@ export function testSuite(
349349
this.timeout(6000);
350350

351351
servers[0].serverSideEmit("hello", (err: Error, response: any) => {
352-
expect(err.message).to.be("timeout reached: missing 1 responses");
352+
expect(err.message).to.be(
353+
"timeout reached: only 1 responses received out of 2"
354+
);
353355
expect(response).to.be.an(Array);
354356
expect(response).to.contain(2);
355357
done();
@@ -362,7 +364,7 @@ export function testSuite(
362364
});
363365
});
364366

365-
it("succeeds even if an instance leaves the cluster", (done) => {
367+
it.skip("succeeds even if an instance leaves the cluster", (done) => {
366368
servers[0].on("hello", shouldNotHappen(done));
367369
servers[1].on("hello", (cb) => cb(2));
368370
servers[2].on("hello", () => {

test/test-runner.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,23 @@ describe("@socket.io/redis-streams-adapter", () => {
7171
);
7272
});
7373

74+
describe("redis with Redis cluster and sharded PUB/SUB", () => {
75+
testSuites(
76+
async () => {
77+
const redisClient = createCluster({
78+
rootNodes: CLUSTER_ROOT_NODES,
79+
});
80+
81+
await redisClient.connect();
82+
83+
return redisClient;
84+
},
85+
{
86+
useShardedPubSub: true,
87+
}
88+
);
89+
});
90+
7491
describe("ioredis with single Redis node", () => {
7592
testSuites(() => {
7693
return new Redis();

test/util.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,11 @@ export function setup(
7474

7575
function isReady() {
7676
return (
77-
servers.length === NODES_COUNT &&
78-
clientSockets.length === NODES_COUNT &&
79-
servers.every((server) => {
80-
const serverCount = server.of("/").adapter.nodesMap.size;
81-
return serverCount === NODES_COUNT - 1;
82-
})
77+
servers.length === NODES_COUNT && clientSockets.length === NODES_COUNT
8378
);
8479
}
8580

8681
while (!isReady()) {
87-
if (servers.length > 0) {
88-
// notify other servers in the cluster
89-
servers[0]?.of("/").adapter.init();
90-
}
9182
await sleep(100);
9283
}
9384

0 commit comments

Comments
 (0)