Skip to content

Commit 3ecdd7a

Browse files
committed
feat: added possibility to turn off server alive timeout check in udp cluster manager & added extra logging
1 parent 04c6775 commit 3ecdd7a

File tree

4 files changed

+47
-14
lines changed

4 files changed

+47
-14
lines changed

src/ClusteredRedisQueue.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
169169
}
170170

171171
if (this.options.clusterManagers?.length) {
172+
this.verbose('Initializing cluster managers...');
173+
172174
for (const manager of this.options.clusterManagers) {
173175
this.initializedClusters.push(manager.init({
174176
add: this.addServer.bind(this),
@@ -297,6 +299,14 @@ export class ClusteredRedisQueue implements IMessageQueue,
297299
return lengths.reduce((total, length) => total + length, 0);
298300
}
299301

302+
private verbose(message: string): void {
303+
if (this.options.verbose) {
304+
this.logger.info(`[IMQ-CORE][ClusteredRedisQueue][${
305+
this.name
306+
}]: ${ message }`);
307+
}
308+
}
309+
300310
/**
301311
* Batch imq action processing on all registered imqs at once
302312
*
@@ -306,7 +316,7 @@ export class ClusteredRedisQueue implements IMessageQueue,
306316
* @return {Promise<this>}
307317
*/
308318
private async batch(action: string, message: string): Promise<this> {
309-
this.logger.log(message);
319+
this.logger.info(message);
310320

311321
const promises = [];
312322

@@ -498,6 +508,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
498508
* @returns {void}
499509
*/
500510
protected addServer(server: IServerInput): ClusterServer {
511+
this.verbose(`Adding new server: ${ JSON.stringify(server) }`);
512+
501513
return this.addServerWithQueueInitializing(server, true);
502514
}
503515

@@ -508,6 +520,8 @@ export class ClusteredRedisQueue implements IMessageQueue,
508520
* @returns {void}
509521
*/
510522
protected removeServer(server: IServerInput): void {
523+
this.verbose(`Removing the server: ${ JSON.stringify(server) }`);
524+
511525
const remove = this.findServer(server);
512526

513527
if (!remove) {
@@ -580,6 +594,9 @@ export class ClusteredRedisQueue implements IMessageQueue,
580594

581595
private async initializeQueue(imq: RedisQueue): Promise<void> {
582596
copyEventEmitter(this.templateEmitter, imq);
597+
this.verbose(`Initializing queue with state: ${
598+
JSON.stringify(this.state)
599+
}`);
583600

584601
if (this.state.started) {
585602
await imq.start();

src/RedisQueue.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -899,14 +899,11 @@ export class RedisQueue extends EventEmitter<EventMap>
899899
return;
900900
}
901901

902-
this.reconnecting[channel] = true;
903-
904902
const attempts = (this.reconnectAttempts[channel] || 0) + 1;
905-
this.reconnectAttempts[channel] = attempts;
903+
const delay = Math.min(30000, 1000 * Math.pow(2, attempts - 1));
906904

907-
const base = Math.min(30000, 1000 * Math.pow(2, attempts - 1));
908-
const jitter = Math.floor(base * 0.2 * Math.random());
909-
const delay = base + jitter;
905+
this.reconnecting[channel] = true;
906+
this.reconnectAttempts[channel] = attempts;
910907

911908
this.verbose(`Scheduling ${ channel } reconnect in ${
912909
delay } ms (attempt ${ attempts })`);

src/UDPClusterManager.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,28 @@ export interface UDPClusterManagerOptions {
5959
* @type {number}
6060
*/
6161
aliveTimeoutCorrection: number;
62+
63+
/**
64+
* Message queue alive-server check flag. If set to false, the server will
65+
* not be checked for liveness on each broadcast message with a timeout.
66+
* Can be specified by the environment variable if the given option is not
67+
* bypassed: IMQ_UDP_CLUSTER_MANAGER_ALIVE_CHECK
68+
*
69+
* @default true
70+
* @type {boolean}
71+
*/
72+
useAliveCheck: boolean;
6273
}
6374

75+
const IMQ_UDP_CLUSTER_MANAGER_ALIVE_CHECK = !!+(
76+
process.env.IMQ_UDP_CLUSTER_MANAGER_ALIVE_CHECK || 1
77+
);
78+
6479
export const DEFAULT_UDP_CLUSTER_MANAGER_OPTIONS: UDPClusterManagerOptions = {
6580
port: 63000,
6681
address: '255.255.255.255',
6782
aliveTimeoutCorrection: 5000,
83+
useAliveCheck: IMQ_UDP_CLUSTER_MANAGER_ALIVE_CHECK,
6884
};
6985

7086
export class UDPClusterManager extends ClusterManager {

src/UDPWorker.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ interface Message {
4949
timeout: number;
5050
}
5151

52-
class UDPClusterWorker {
52+
class UDPWorker {
5353
private readonly socket: Socket;
5454
private readonly servers = new Map<string, string>();
5555

@@ -91,16 +91,19 @@ class UDPClusterWorker {
9191
private addServer(message: Message): void {
9292
this.messagePort.postMessage({
9393
type: 'cluster:add',
94-
server: UDPClusterWorker.mapMessage(message),
94+
server: UDPWorker.mapMessage(message),
9595
});
96-
this.serverAliveWait(message);
96+
97+
if (this.options.useAliveCheck) {
98+
this.serverAliveWait(message);
99+
}
97100
}
98101

99102
private removeServer(message: Message): void {
100-
this.servers.delete(UDPClusterWorker.getServerKey(message));
103+
this.servers.delete(UDPWorker.getServerKey(message));
101104
this.messagePort.postMessage({
102105
type: 'cluster:remove',
103-
server: UDPClusterWorker.mapMessage(message),
106+
server: UDPWorker.mapMessage(message),
104107
});
105108
}
106109

@@ -119,7 +122,7 @@ class UDPClusterWorker {
119122
const stamp = uuid();
120123
const correction = this.options.aliveTimeoutCorrection ?? 0;
121124
const effectiveTimeout = message.timeout + correction + 1;
122-
const key = UDPClusterWorker.getServerKey(message);
125+
const key = UDPWorker.getServerKey(message);
123126

124127
this.servers.set(key, stamp);
125128

@@ -223,5 +226,5 @@ class UDPClusterWorker {
223226
}
224227

225228
if (!isMainThread && parentPort) {
226-
new UDPClusterWorker(workerData, parentPort);
229+
new UDPWorker(workerData, parentPort);
227230
}

0 commit comments

Comments
 (0)