Skip to content
Open
3 changes: 3 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ export default [
argsIgnorePattern: '^_',
caughtErrorsIgnorePattern: '^_',
varsIgnorePattern: '^_'
}],
'@typescript-eslint/no-empty-object-type': ['error', {
allowObjectTypes: 'always'
}]
}
},
Expand Down
156 changes: 156 additions & 0 deletions packages/client/lib/cluster/cluster-reconnection-tracker.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import { strict as assert } from "node:assert";
import ClusterReconnectionTracker from "./cluster-reconnection-tracker";

describe("ClusterReconnectionTracker", () => {
describe("validation", () => {
for (const strategy of [-1, 1.5, Number.NaN, true, null, "1000", {}]) {
it(`should throw when strategy is ${strategy}`, () => {
assert.throws(
() => new ClusterReconnectionTracker(strategy as never),
new TypeError(
"topologyRefreshOnReconnectionAttempt must be undefined, false, a non-negative integer, or a function",
),
);
});
}

it("should allow the default, false, 0, positive integer, and function strategies", () => {
assert.doesNotThrow(() => new ClusterReconnectionTracker());
assert.doesNotThrow(() => new ClusterReconnectionTracker(false));
assert.doesNotThrow(() => new ClusterReconnectionTracker(0));
assert.doesNotThrow(() => new ClusterReconnectionTracker(1));
assert.doesNotThrow(
() => new ClusterReconnectionTracker(() => undefined),
);
});
});

it("should not track anything when disabled", () => {
for (const strategy of [false, 0] as const) {
const state = new ClusterReconnectionTracker(strategy);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
false,
);
assert.deepEqual([...state.reconnectingAddresses], []);
assert.equal(state.firstReconnectionAt, undefined);
}
});

it("should default to refreshing after five seconds", () => {
const state = new ClusterReconnectionTracker();

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
false,
);
assert.equal(state.firstReconnectionAt, 100);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 5_099),
false,
);
assert.equal(state.firstReconnectionAt, 100);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 5_100),
true,
);
assert.equal(state.firstReconnectionAt, 5_100);
});

it("should track reconnecting clients by client id and remove them independently", () => {
const state = new ClusterReconnectionTracker(() => undefined);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
false,
);
assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:1"]);
assert.equal(state.firstReconnectionAt, 100);

assert.equal(
state.onReconnectionAttempt("client-2", "127.0.0.1:2", 150),
false,
);
assert.deepEqual([...state.reconnectingAddresses].sort(), [
"127.0.0.1:1",
"127.0.0.1:2",
]);
assert.equal(state.firstReconnectionAt, 100);

state.removeClient("client-1");
assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:2"]);
assert.equal(state.firstReconnectionAt, 100);

state.removeClient("client-2");
assert.deepEqual([...state.reconnectingAddresses], []);
assert.equal(state.firstReconnectionAt, undefined);
});

it("should clear all reconnecting state", () => {
const state = new ClusterReconnectionTracker(() => undefined);

state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100);
state.onReconnectionAttempt("client-2", "127.0.0.1:2", 150);
state.clear();

assert.deepEqual([...state.reconnectingAddresses], []);
assert.equal(state.firstReconnectionAt, undefined);
});

it("should return true when enough time has elapsed and reset the timestamp", () => {
const state = new ClusterReconnectionTracker(50);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
false,
);
assert.equal(state.firstReconnectionAt, 100);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 149),
false,
);
assert.equal(state.firstReconnectionAt, 100);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 150),
true,
);
assert.equal(state.firstReconnectionAt, 150);
});

it("should skip refresh when the function strategy returns false", () => {
const state = new ClusterReconnectionTracker(() => false);

assert.equal(
state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
false,
);
assert.deepEqual([...state.reconnectingAddresses], ["127.0.0.1:1"]);
assert.equal(state.firstReconnectionAt, 100);
});

it("should throw when the function strategy throws", () => {
const error = new Error("strategy failed");
const state = new ClusterReconnectionTracker(() => {
throw error;
});

assert.throws(
() => state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
error,
);
});

it("should throw when the function strategy returns an invalid value", () => {
const state = new ClusterReconnectionTracker(() => -1);

assert.throws(
() => state.onReconnectionAttempt("client-1", "127.0.0.1:1", 100),
/topologyRefreshOnReconnectionAttempt should return/,
);
});
});
130 changes: 130 additions & 0 deletions packages/client/lib/cluster/cluster-reconnection-tracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import type { ClusterTopologyRefreshOnReconnectionAttemptStrategy } from './index';

/**
* Tracks which cluster node clients are currently reconnecting and decides when
* to trigger a cluster topology refresh based on a configurable strategy.
*
* The strategy can be:
* - `undefined` - uses the default delay (5 seconds)
* - `false` or `0` - disables topology refresh on reconnection
* - a positive integer - delay in ms after the first reconnection attempt before refreshing
* - a function - custom logic receiving the timestamp of the first reconnection attempt,
* returning a delay or `false`/`undefined` to skip
*
* After the delay elapses, {@link onReconnectionAttempt} returns `true` once to signal
* that a refresh should be scheduled, then resets the timer.
*/
export default class ClusterReconnectionTracker {
/** Default delay (ms) before triggering a topology refresh after reconnection starts */
static #DEFAULT_TOPOLOGY_REFRESH_ON_RECONNECTION_ATTEMPT = 5_000;

readonly #strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy;
/** Maps client ID to its node address for clients currently in a reconnecting state */
readonly #reconnectingClients = new Map<string, string>();
/** Timestamp of the first reconnection attempt in the current reconnection cycle */
#firstReconnectionAt?: number;

/**
* Validates that a strategy value is acceptable before use.
* @throws If the strategy is not supported
*/
#validate(strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy) {
if (
strategy === undefined ||
strategy === false ||
typeof strategy === 'function' ||
(
typeof strategy === 'number' &&
Number.isInteger(strategy) &&
strategy >= 0
)
) {
return;
}

throw new TypeError('topologyRefreshOnReconnectionAttempt must be undefined, false, a non-negative integer, or a function');
}

constructor(strategy?: ClusterTopologyRefreshOnReconnectionAttemptStrategy) {
this.#validate(strategy);
this.#strategy = strategy;
}

get reconnectingAddresses() {
return new Set(this.#reconnectingClients.values());
}

get firstReconnectionAt() {
return this.#firstReconnectionAt;
}

/**
* Records a reconnection attempt for the given client and evaluates whether
* the configured delay has elapsed since the first attempt in this cycle.
*
* @returns `true` if a topology refresh should be triggered, `false` otherwise
* @throws If a user-supplied strategy function returns an invalid value
*/
onReconnectionAttempt(clientId: string, address: string, now = Date.now()) {
if (this.#strategy === false || this.#strategy === 0) {
return false;
}

this.#reconnectingClients.set(clientId, address);
this.#firstReconnectionAt ??= now;

const delay = this.#getDelay(this.#firstReconnectionAt);
if (delay === undefined || now - this.#firstReconnectionAt < delay) {
return false;
}

this.#firstReconnectionAt = now;
return true;
}

/** Removes a client from tracking (e.g. when it reconnects successfully or disconnects) */
removeClient(clientId: string) {
if (!this.#reconnectingClients.delete(clientId)) return;

this.#clearTimestampIfClean();
}

/** Resets all tracking state (e.g. on cluster disconnect or destroy) */
clear() {
this.#reconnectingClients.clear();
this.#firstReconnectionAt = undefined;
}

/**
* Evaluates the configured strategy to determine the delay before a topology refresh.
* @returns The delay in ms, or `undefined` if no refresh should occur
*/
#getDelay(firstReconnectionAt: number) {
if (this.#strategy === undefined) {
return ClusterReconnectionTracker.#DEFAULT_TOPOLOGY_REFRESH_ON_RECONNECTION_ATTEMPT;
}

if (this.#strategy === false) {
return;
}

if (typeof this.#strategy === 'number') {
return this.#strategy;
}

const delay = this.#strategy(firstReconnectionAt);
if (delay === false || delay === undefined || delay === 0) return;

if (!Number.isInteger(delay) || delay < 0) {
throw new TypeError(`topologyRefreshOnReconnectionAttempt should return \`false | undefined | number\`, got ${delay} instead`);
}

return delay;
}

#clearTimestampIfClean() {
if (this.#reconnectingClients.size === 0) {
this.#firstReconnectionAt = undefined;
}
}
}
2 changes: 1 addition & 1 deletion packages/client/lib/cluster/cluster-slots.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import RedisClusterSlots from './cluster-slots';
describe('RedisClusterSlots', () => {
describe('initialization', () => {
describe('clientSideCache validation', () => {
const mockEmit = ((_event: string | symbol, ..._args: any[]): boolean => true) as EventEmitter['emit'];
const mockEmit: EventEmitter['emit'] = () => true;
const clientSideCacheConfig = { ttl: 0, maxEntries: 0 };
const rootNodes: Array<RedisClusterClientOptions> = [
{ socket: { host: 'localhost', port: 30001 } }
Expand Down
Loading
Loading