diff --git a/src/watcher.ts b/src/watcher.ts index ad18cb9..0b872b7 100644 --- a/src/watcher.ts +++ b/src/watcher.ts @@ -27,6 +27,9 @@ const RETRYABLE_CODES = new Set([status.UNAVAILABLE, status.INTERNAL]); /** Maximum reconnect backoff in milliseconds. */ const MAX_RECONNECT_BACKOFF = 30_000; +/** Default maximum number of unread changes buffered per WatchedField before oldest are dropped. */ +const DEFAULT_QUEUE_SIZE = 1024; + /** Initial reconnect backoff in milliseconds. */ const INITIAL_RECONNECT_BACKOFF = 500; @@ -39,6 +42,12 @@ const RECONNECT_MULTIPLIER = 2; interface FieldOptions { /** Default value returned when the field has no value on the server. */ readonly default: T; + /** + * Maximum number of unread changes buffered for async iteration. + * When the queue is full, the oldest entry is dropped and `droppedChanges` is incremented. + * Default: 1024. + */ + readonly queueSize?: number; } /** @@ -78,6 +87,8 @@ export class WatchedField extends EventEmitter { private stopped = false; private pendingResolve: ((value: IteratorResult) => void) | null = null; private readonly changeQueue: Change[] = []; + private readonly maxQueueSize: number; + private _droppedChanges = 0; /** @internal */ constructor(path: string, converter: Converter, options: FieldOptions) { @@ -86,6 +97,17 @@ export class WatchedField extends EventEmitter { this.converter = converter; this.defaultValue = options.default; this.currentValue = options.default; + this.maxQueueSize = options.queueSize ?? DEFAULT_QUEUE_SIZE; + } + + /** + * Number of changes dropped because the queue was full. + * + * Increments whenever a slow consumer causes a buffered change to be evicted. + * Reset to zero only by creating a new WatchedField instance. + */ + get droppedChanges(): number { + return this._droppedChanges; } /** @@ -178,6 +200,10 @@ export class WatchedField extends EventEmitter { this.pendingResolve = null; resolve({ done: false, value: change }); } else { + if (this.changeQueue.length >= this.maxQueueSize) { + this.changeQueue.shift(); + this._droppedChanges++; + } this.changeQueue.push(change); } } diff --git a/test/watcher.test.ts b/test/watcher.test.ts index e519b45..2a82b12 100644 --- a/test/watcher.test.ts +++ b/test/watcher.test.ts @@ -226,6 +226,62 @@ describe("WatchedField", () => { await iterPromise; expect(changes).toHaveLength(2); }); + + it("drops oldest change when queue is full", () => { + const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 2 }); + field._loadInitial("0.01"); + + const makeChange = (from: string, to: string, v: number): Change => ({ + fieldPath: "payments.fee", + oldValue: from, + newValue: to, + version: v, + changedBy: "admin", + }); + + field._update("0.02", makeChange("0.01", "0.02", 2)); + field._update("0.03", makeChange("0.02", "0.03", 3)); + // Queue is now full (size 2): [v2, v3]. + expect(field.droppedChanges).toBe(0); + + field._update("0.04", makeChange("0.03", "0.04", 4)); + // v2 dropped; queue: [v3, v4]. + expect(field.droppedChanges).toBe(1); + + field._update("0.05", makeChange("0.04", "0.05", 5)); + // v3 dropped; queue: [v4, v5]. + expect(field.droppedChanges).toBe(2); + }); + + it("droppedChanges is zero when consumer keeps up", async () => { + const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 4 }); + field._loadInitial("0.01"); + + const changes: Change[] = []; + const iterPromise = (async () => { + for await (const change of field) { + changes.push(change); + if (changes.length === 3) break; + } + })(); + + await new Promise((r) => setTimeout(r, 10)); + + const makeChange = (from: string, to: string, v: number): Change => ({ + fieldPath: "payments.fee", + oldValue: from, + newValue: to, + version: v, + changedBy: "admin", + }); + field._update("0.02", makeChange("0.01", "0.02", 2)); + field._update("0.03", makeChange("0.02", "0.03", 3)); + field._update("0.04", makeChange("0.03", "0.04", 4)); + + await iterPromise; + expect(changes).toHaveLength(3); + expect(field.droppedChanges).toBe(0); + }); }); });