Skip to content

Commit 279db19

Browse files
committed
fix(metrics-pipeline): drop metric emits while the metrics Redis is not ready
Without a readiness guard, every fire-and-forget emit during a metrics Redis outage queued a command in ioredis's in-memory offline queue until rejection. Metrics are loss-tolerant by design, so drop instead; waitUntilReady() lets embedders await the initial connect.
1 parent bbd3eaa commit 279db19

2 files changed

Lines changed: 12 additions & 0 deletions

File tree

internal-packages/metrics-pipeline/src/consumer.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ redisTest(
212212
flag: { enabled: () => true },
213213
});
214214

215+
// Emits before the connection is ready are dropped by design (loss-tolerant).
216+
await emitter.waitUntilReady();
215217
emitter.emitGauge("q1", {
216218
op: "gauge",
217219
q: "q1",

internal-packages/metrics-pipeline/src/emitter.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ export class MetricsStreamEmitter {
114114
// the caller. Shares the counter stream (one stream family on the metrics Redis).
115115
emitGauge(shardKey: string, fields: MetricFields): void {
116116
if (!this.flag.enabled()) return;
117+
// Drop rather than queue while the metrics Redis is unreachable: ioredis would hold
118+
// every command in its offline queue until rejection, and metrics are loss-tolerant.
119+
if (this.redis.status !== "ready") return;
117120
const op = String(fields.op ?? "gauge");
118121
const stream = streamKey(this.def, shardFor(shardKey, this.def.shardCount));
119122
const args: string[] = [];
@@ -134,6 +137,7 @@ export class MetricsStreamEmitter {
134137
// lost XADD self-heals (the next reading restates the total); the INCR is never sampled.
135138
emit(shardKey: string, fields: MetricFields): void {
136139
if (!this.flag.enabled()) return;
140+
if (this.redis.status !== "ready") return;
137141
const op = String(fields.op ?? "unknown");
138142
const q = String(fields.q ?? "");
139143
const odometerKey = `${this.def.name}_cum:${op}:${q}`;
@@ -161,6 +165,12 @@ export class MetricsStreamEmitter {
161165
});
162166
}
163167

168+
// Resolves once the metrics Redis connection is ready (emits before that are dropped).
169+
waitUntilReady(): Promise<void> {
170+
if (this.redis.status === "ready") return Promise.resolve();
171+
return new Promise((resolve) => this.redis.once("ready", () => resolve()));
172+
}
173+
164174
async close(): Promise<void> {
165175
await this.redis.quit();
166176
}

0 commit comments

Comments
 (0)