Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ export class ConfigWatcher {
}
}

private subscribe(backoff = INITIAL_RECONNECT_BACKOFF): void {
private subscribe(initialBackoff = INITIAL_RECONNECT_BACKOFF): void {
if (this.stopped) {
return;
}
Expand All @@ -395,7 +395,10 @@ export class ConfigWatcher {

this.stream = this.configStub.subscribe(request, this.metadata);

let backoff = initialBackoff;

this.stream.on("data", (resp: SubscribeResponse) => {
backoff = INITIAL_RECONNECT_BACKOFF;
this.processChange(resp);
});

Expand Down Expand Up @@ -432,10 +435,23 @@ export class ConfigWatcher {

this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.subscribe(nextBackoff);
void this.reloadAndSubscribe(nextBackoff);
}, delay);
}

private async reloadAndSubscribe(backoff: number): Promise<void> {
if (this.stopped) {
return;
}
try {
await this.loadSnapshot();
} catch {
this.scheduleReconnect(backoff);
return;
}
this.subscribe(INITIAL_RECONNECT_BACKOFF);
}

private processChange(resp: SubscribeResponse): void {
if (!resp.change) {
return;
Expand Down
174 changes: 174 additions & 0 deletions test/watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,180 @@ describe("ConfigWatcher", () => {

vi.useRealTimers();
});

it("reloads snapshot on reconnect and applies updated values", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
const fee = watcher.field("payments.fee", Number, { default: 0.01 });

// Initial snapshot: fee = 0.05
mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]);
await watcher.start();
expect(fee.value).toBe(0.05);

// On reconnect, snapshot returns fee = 0.99 (updated while disconnected)
const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, {
config: {
tenantId: "tenant-1",
version: 2,
values: [
{ fieldPath: "payments.fee", value: { numberValue: 0.99 }, checksum: "xyz" },
],
},
});
},
);

mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable"));
await vi.advanceTimersByTimeAsync(60_000);

// loadSnapshot called again (total 2 getConfig calls)
expect(configStub.getConfig).toHaveBeenCalledTimes(2);
// Value reflects the reconnect snapshot
expect(fee.value).toBe(0.99);

newStream.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});

it("applies stream updates after reconnect snapshot", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
const fee = watcher.field("payments.fee", Number, { default: 0.01 });

mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]);
await watcher.start();

const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);
// Reconnect snapshot returns same value
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, {
config: {
tenantId: "tenant-1",
version: 2,
values: [
{ fieldPath: "payments.fee", value: { numberValue: 0.05 }, checksum: "xyz" },
],
},
});
},
);

mockStream.emit("end");
await vi.advanceTimersByTimeAsync(60_000);

// Now emit a live update on the new stream
newStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 3,
fieldPath: "payments.fee",
oldValue: { numberValue: 0.05 },
newValue: { numberValue: 0.77 },
changedBy: "admin",
changedAt: new Date(),
},
});

expect(fee.value).toBe(0.77);

newStream.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});

it("retries with backoff when snapshot fails during reconnect", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
watcher.field("payments.fee", Number, { default: 0.01 });

mockGetConfigSuccess([]);
await watcher.start();

const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);

// First reconnect snapshot fails
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(makeServiceError(status.UNAVAILABLE, "snapshot failed"));
},
);
// Second reconnect snapshot succeeds
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, {
config: { tenantId: "tenant-1", version: 2, values: [] },
});
},
);

mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable"));
// First reconnect attempt fires, snapshot fails, schedules another
await vi.advanceTimersByTimeAsync(60_000);
// Second reconnect attempt fires, snapshot succeeds
await vi.advanceTimersByTimeAsync(60_000);

// subscribe called twice (start + successful reconnect)
expect(configStub.subscribe).toHaveBeenCalledTimes(2);

newStream.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});

it("resets backoff after first successful data event", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
watcher.field("payments.fee", Number, { default: 0.01 });

mockGetConfigSuccess([]);
await watcher.start();

// Receive data — backoff should reset
mockStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 2,
fieldPath: "payments.fee",
oldValue: { numberValue: 0.01 },
newValue: { numberValue: 0.02 },
changedBy: "admin",
changedAt: new Date(),
},
});

const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } });
},
);

mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "disconnect"));

// INITIAL_RECONNECT_BACKOFF is 500ms; advance just past it
await vi.advanceTimersByTimeAsync(2_000);

// subscribe called a second time, confirming backoff was short (reset to initial)
expect(configStub.subscribe).toHaveBeenCalledTimes(2);

newStream.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});
});

describe("stopped guards", () => {
Expand Down