diff --git a/src/watcher.ts b/src/watcher.ts index 0b872b7..fa76986 100644 --- a/src/watcher.ts +++ b/src/watcher.ts @@ -382,7 +382,7 @@ export class ConfigWatcher { } } - private subscribe(backoff = INITIAL_RECONNECT_BACKOFF): void { + private subscribe(initialBackoff = INITIAL_RECONNECT_BACKOFF): void { if (this.stopped) { return; } @@ -395,7 +395,10 @@ export class ConfigWatcher { this.stream = this.configStub.subscribe(request, this.metadata); + let backoff = initialBackoff; + this.stream.on("data", (resp: SubscribeResponse) => { + backoff = INITIAL_RECONNECT_BACKOFF; this.processChange(resp); }); @@ -432,10 +435,23 @@ export class ConfigWatcher { this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; - this.subscribe(nextBackoff); + void this.reloadAndSubscribe(nextBackoff); }, delay); } + private async reloadAndSubscribe(backoff: number): Promise { + if (this.stopped) { + return; + } + try { + await this.loadSnapshot(); + } catch { + this.scheduleReconnect(backoff); + return; + } + this.subscribe(INITIAL_RECONNECT_BACKOFF); + } + private processChange(resp: SubscribeResponse): void { if (!resp.change) { return; diff --git a/test/watcher.test.ts b/test/watcher.test.ts index 2a82b12..297e352 100644 --- a/test/watcher.test.ts +++ b/test/watcher.test.ts @@ -693,6 +693,180 @@ describe("ConfigWatcher", () => { vi.useRealTimers(); }); + + it("reloads snapshot on reconnect and applies updated values", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + const fee = watcher.field("payments.fee", Number, { default: 0.01 }); + + // Initial snapshot: fee = 0.05 + mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]); + await watcher.start(); + expect(fee.value).toBe(0.05); + + // On reconnect, snapshot returns fee = 0.99 (updated while disconnected) + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { + config: { + tenantId: "tenant-1", + version: 2, + values: [ + { fieldPath: "payments.fee", value: { numberValue: 0.99 }, checksum: "xyz" }, + ], + }, + }); + }, + ); + + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable")); + await vi.advanceTimersByTimeAsync(60_000); + + // loadSnapshot called again (total 2 getConfig calls) + expect(configStub.getConfig).toHaveBeenCalledTimes(2); + // Value reflects the reconnect snapshot + expect(fee.value).toBe(0.99); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + + it("applies stream updates after reconnect snapshot", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + const fee = watcher.field("payments.fee", Number, { default: 0.01 }); + + mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]); + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + // Reconnect snapshot returns same value + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { + config: { + tenantId: "tenant-1", + version: 2, + values: [ + { fieldPath: "payments.fee", value: { numberValue: 0.05 }, checksum: "xyz" }, + ], + }, + }); + }, + ); + + mockStream.emit("end"); + await vi.advanceTimersByTimeAsync(60_000); + + // Now emit a live update on the new stream + newStream.emit("data", { + change: { + tenantId: "tenant-1", + version: 3, + fieldPath: "payments.fee", + oldValue: { numberValue: 0.05 }, + newValue: { numberValue: 0.77 }, + changedBy: "admin", + changedAt: new Date(), + }, + }); + + expect(fee.value).toBe(0.77); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + + it("retries with backoff when snapshot fails during reconnect", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + + mockGetConfigSuccess([]); + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + + // First reconnect snapshot fails + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(makeServiceError(status.UNAVAILABLE, "snapshot failed")); + }, + ); + // Second reconnect snapshot succeeds + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { + config: { tenantId: "tenant-1", version: 2, values: [] }, + }); + }, + ); + + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable")); + // First reconnect attempt fires, snapshot fails, schedules another + await vi.advanceTimersByTimeAsync(60_000); + // Second reconnect attempt fires, snapshot succeeds + await vi.advanceTimersByTimeAsync(60_000); + + // subscribe called twice (start + successful reconnect) + expect(configStub.subscribe).toHaveBeenCalledTimes(2); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + + it("resets backoff after first successful data event", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + + mockGetConfigSuccess([]); + await watcher.start(); + + // Receive data — backoff should reset + mockStream.emit("data", { + change: { + tenantId: "tenant-1", + version: 2, + fieldPath: "payments.fee", + oldValue: { numberValue: 0.01 }, + newValue: { numberValue: 0.02 }, + changedBy: "admin", + changedAt: new Date(), + }, + }); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } }); + }, + ); + + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "disconnect")); + + // INITIAL_RECONNECT_BACKOFF is 500ms; advance just past it + await vi.advanceTimersByTimeAsync(2_000); + + // subscribe called a second time, confirming backoff was short (reset to initial) + expect(configStub.subscribe).toHaveBeenCalledTimes(2); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); }); describe("stopped guards", () => {