Skip to content

Commit 5ffa660

Browse files
zeevdrclaude
andauthored
fix(watcher): cap changeQueue to prevent OOM on slow consumer (#77)
Add a configurable queue size limit to WatchedField (default 1024). When the queue is full, the oldest entry is evicted (drop-oldest policy) and a droppedChanges counter is incremented for observability. Config changes are idempotent — the latest value is always current — so dropping stale queued entries is safe. - Add optional queueSize to FieldOptions (default DEFAULT_QUEUE_SIZE = 1024) - Expose droppedChanges read-only getter on WatchedField - Add tests: cap enforced, counter increments on drop, no drops when consumer keeps up Closes #47 Co-authored-by: Claude <noreply@anthropic.com>
1 parent 6b6bbe1 commit 5ffa660

2 files changed

Lines changed: 82 additions & 0 deletions

File tree

src/watcher.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ const RETRYABLE_CODES = new Set([status.UNAVAILABLE, status.INTERNAL]);
2727
/** Maximum reconnect backoff in milliseconds. */
2828
const MAX_RECONNECT_BACKOFF = 30_000;
2929

30+
/** Default maximum number of unread changes buffered per WatchedField before oldest are dropped. */
31+
const DEFAULT_QUEUE_SIZE = 1024;
32+
3033
/** Initial reconnect backoff in milliseconds. */
3134
const INITIAL_RECONNECT_BACKOFF = 500;
3235

@@ -39,6 +42,12 @@ const RECONNECT_MULTIPLIER = 2;
3942
interface FieldOptions<T> {
4043
/** Default value returned when the field has no value on the server. */
4144
readonly default: T;
45+
/**
46+
* Maximum number of unread changes buffered for async iteration.
47+
* When the queue is full, the oldest entry is dropped and `droppedChanges` is incremented.
48+
* Default: 1024.
49+
*/
50+
readonly queueSize?: number;
4251
}
4352

4453
/**
@@ -78,6 +87,8 @@ export class WatchedField<T> extends EventEmitter {
7887
private stopped = false;
7988
private pendingResolve: ((value: IteratorResult<Change>) => void) | null = null;
8089
private readonly changeQueue: Change[] = [];
90+
private readonly maxQueueSize: number;
91+
private _droppedChanges = 0;
8192

8293
/** @internal */
8394
constructor(path: string, converter: Converter, options: FieldOptions<T>) {
@@ -86,6 +97,17 @@ export class WatchedField<T> extends EventEmitter {
8697
this.converter = converter;
8798
this.defaultValue = options.default;
8899
this.currentValue = options.default;
100+
this.maxQueueSize = options.queueSize ?? DEFAULT_QUEUE_SIZE;
101+
}
102+
103+
/**
104+
* Number of changes dropped because the queue was full.
105+
*
106+
* Increments whenever a slow consumer causes a buffered change to be evicted.
107+
* Reset to zero only by creating a new WatchedField instance.
108+
*/
109+
get droppedChanges(): number {
110+
return this._droppedChanges;
89111
}
90112

91113
/**
@@ -178,6 +200,10 @@ export class WatchedField<T> extends EventEmitter {
178200
this.pendingResolve = null;
179201
resolve({ done: false, value: change });
180202
} else {
203+
if (this.changeQueue.length >= this.maxQueueSize) {
204+
this.changeQueue.shift();
205+
this._droppedChanges++;
206+
}
181207
this.changeQueue.push(change);
182208
}
183209
}

test/watcher.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,62 @@ describe("WatchedField", () => {
226226
await iterPromise;
227227
expect(changes).toHaveLength(2);
228228
});
229+
230+
it("drops oldest change when queue is full", () => {
231+
const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 2 });
232+
field._loadInitial("0.01");
233+
234+
const makeChange = (from: string, to: string, v: number): Change => ({
235+
fieldPath: "payments.fee",
236+
oldValue: from,
237+
newValue: to,
238+
version: v,
239+
changedBy: "admin",
240+
});
241+
242+
field._update("0.02", makeChange("0.01", "0.02", 2));
243+
field._update("0.03", makeChange("0.02", "0.03", 3));
244+
// Queue is now full (size 2): [v2, v3].
245+
expect(field.droppedChanges).toBe(0);
246+
247+
field._update("0.04", makeChange("0.03", "0.04", 4));
248+
// v2 dropped; queue: [v3, v4].
249+
expect(field.droppedChanges).toBe(1);
250+
251+
field._update("0.05", makeChange("0.04", "0.05", 5));
252+
// v3 dropped; queue: [v4, v5].
253+
expect(field.droppedChanges).toBe(2);
254+
});
255+
256+
it("droppedChanges is zero when consumer keeps up", async () => {
257+
const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 4 });
258+
field._loadInitial("0.01");
259+
260+
const changes: Change[] = [];
261+
const iterPromise = (async () => {
262+
for await (const change of field) {
263+
changes.push(change);
264+
if (changes.length === 3) break;
265+
}
266+
})();
267+
268+
await new Promise((r) => setTimeout(r, 10));
269+
270+
const makeChange = (from: string, to: string, v: number): Change => ({
271+
fieldPath: "payments.fee",
272+
oldValue: from,
273+
newValue: to,
274+
version: v,
275+
changedBy: "admin",
276+
});
277+
field._update("0.02", makeChange("0.01", "0.02", 2));
278+
field._update("0.03", makeChange("0.02", "0.03", 3));
279+
field._update("0.04", makeChange("0.03", "0.04", 4));
280+
281+
await iterPromise;
282+
expect(changes).toHaveLength(3);
283+
expect(field.droppedChanges).toBe(0);
284+
});
229285
});
230286
});
231287

0 commit comments

Comments
 (0)