Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const RETRYABLE_CODES = new Set([status.UNAVAILABLE, status.INTERNAL]);
/** Maximum reconnect backoff in milliseconds. */
const MAX_RECONNECT_BACKOFF = 30_000;

/** Default maximum number of unread changes buffered per WatchedField before oldest are dropped. */
const DEFAULT_QUEUE_SIZE = 1024;

/** Initial reconnect backoff in milliseconds. */
const INITIAL_RECONNECT_BACKOFF = 500;

Expand All @@ -39,6 +42,12 @@ const RECONNECT_MULTIPLIER = 2;
interface FieldOptions<T> {
/** Default value returned when the field has no value on the server. */
readonly default: T;
/**
* Maximum number of unread changes buffered for async iteration.
* When the queue is full, the oldest entry is dropped and `droppedChanges` is incremented.
* Default: 1024.
*/
readonly queueSize?: number;
}

/**
Expand Down Expand Up @@ -78,6 +87,8 @@ export class WatchedField<T> extends EventEmitter {
private stopped = false;
private pendingResolve: ((value: IteratorResult<Change>) => void) | null = null;
private readonly changeQueue: Change[] = [];
private readonly maxQueueSize: number;
private _droppedChanges = 0;

/** @internal */
constructor(path: string, converter: Converter, options: FieldOptions<T>) {
Expand All @@ -86,6 +97,17 @@ export class WatchedField<T> extends EventEmitter {
this.converter = converter;
this.defaultValue = options.default;
this.currentValue = options.default;
this.maxQueueSize = options.queueSize ?? DEFAULT_QUEUE_SIZE;
}

/**
* Number of changes dropped because the queue was full.
*
* Increments whenever a slow consumer causes a buffered change to be evicted.
* Reset to zero only by creating a new WatchedField instance.
*/
get droppedChanges(): number {
return this._droppedChanges;
}

/**
Expand Down Expand Up @@ -178,6 +200,10 @@ export class WatchedField<T> extends EventEmitter {
this.pendingResolve = null;
resolve({ done: false, value: change });
} else {
if (this.changeQueue.length >= this.maxQueueSize) {
this.changeQueue.shift();
this._droppedChanges++;
}
this.changeQueue.push(change);
}
}
Expand Down
56 changes: 56 additions & 0 deletions test/watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,62 @@ describe("WatchedField", () => {
await iterPromise;
expect(changes).toHaveLength(2);
});

it("drops oldest change when queue is full", () => {
const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 2 });
field._loadInitial("0.01");

const makeChange = (from: string, to: string, v: number): Change => ({
fieldPath: "payments.fee",
oldValue: from,
newValue: to,
version: v,
changedBy: "admin",
});

field._update("0.02", makeChange("0.01", "0.02", 2));
field._update("0.03", makeChange("0.02", "0.03", 3));
// Queue is now full (size 2): [v2, v3].
expect(field.droppedChanges).toBe(0);

field._update("0.04", makeChange("0.03", "0.04", 4));
// v2 dropped; queue: [v3, v4].
expect(field.droppedChanges).toBe(1);

field._update("0.05", makeChange("0.04", "0.05", 5));
// v3 dropped; queue: [v4, v5].
expect(field.droppedChanges).toBe(2);
});

it("droppedChanges is zero when consumer keeps up", async () => {
const field = new WatchedField("payments.fee", Number, { default: 0.01, queueSize: 4 });
field._loadInitial("0.01");

const changes: Change[] = [];
const iterPromise = (async () => {
for await (const change of field) {
changes.push(change);
if (changes.length === 3) break;
}
})();

await new Promise((r) => setTimeout(r, 10));

const makeChange = (from: string, to: string, v: number): Change => ({
fieldPath: "payments.fee",
oldValue: from,
newValue: to,
version: v,
changedBy: "admin",
});
field._update("0.02", makeChange("0.01", "0.02", 2));
field._update("0.03", makeChange("0.02", "0.03", 3));
field._update("0.04", makeChange("0.03", "0.04", 4));

await iterPromise;
expect(changes).toHaveLength(3);
expect(field.droppedChanges).toBe(0);
});
});
});

Expand Down