From 5c32eebd5bd262ef53b7ac8eba7ad09fc74c388f Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 25 May 2026 12:43:03 +0300 Subject: [PATCH 1/2] feat(watcher): surface subscription errors via subscriptionError event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ConfigWatcher now extends EventEmitter and emits a typed 'subscriptionError' event whenever the Subscribe stream encounters an error. Both retryable errors (UNAVAILABLE, INTERNAL — which trigger reconnection) and fatal errors (all others — which stop the watcher) are surfaced, letting callers log or monitor connection health without polling. The ConfigWatcherEvents interface is exported for consumers that want typed listener signatures. The 'subscriptionError' name is intentional: using 'error' would throw on unhandled listeners per Node.js convention. Closes #61 Co-Authored-By: Claude --- src/index.ts | 1 + src/watcher.ts | 59 ++++++++++++++++++++++++++++++++++++++++++-- test/watcher.test.ts | 45 +++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index af23c19..8959a5b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,5 +45,6 @@ export type { ServerVersion, TlsOptions, } from "./types.js"; +export type { ConfigWatcherEvents } from "./watcher.js"; // Watcher export { ConfigWatcher, WatchedField } from "./watcher.js"; diff --git a/src/watcher.ts b/src/watcher.ts index c0add05..0b4bccf 100644 --- a/src/watcher.ts +++ b/src/watcher.ts @@ -223,6 +223,27 @@ export class WatchedField extends EventEmitter { } } +/** + * Typed event map for ConfigWatcher. + * + * @example + * ```ts + * watcher.on('subscriptionError', (err) => { + * console.warn('subscription error:', err.message); + * }); + * ``` + */ +export interface ConfigWatcherEvents { + /** + * Emitted when a subscription error occurs. + * + * For retryable errors (UNAVAILABLE, INTERNAL) the watcher reconnects automatically. + * For non-retryable errors the watcher stops after emitting this event. + * The `err` argument is a typed `DecreeError` (e.g. `UnavailableError`). + */ + subscriptionError: [err: DecreeError]; +} + /** * ConfigWatcher subscribes to live configuration changes for a tenant. * @@ -231,6 +252,10 @@ export class WatchedField extends EventEmitter { * opens a Subscribe stream for real-time updates. On transient errors * (UNAVAILABLE, INTERNAL), it automatically reconnects with exponential backoff. * + * Subscription errors (both retryable and fatal) are emitted as `'subscriptionError'` + * events. Retryable errors trigger automatic reconnection; fatal errors cause the + * watcher to stop. + * * @example * ```ts * const client = new ConfigClient('localhost:9090', { subject: 'myapp' }); @@ -239,6 +264,10 @@ export class WatchedField extends EventEmitter { * const fee = watcher.field('payments.fee', Number, { default: 0.01 }); * const enabled = watcher.field('payments.enabled', Boolean, { default: false }); * + * watcher.on('subscriptionError', (err) => { + * console.warn('watcher error:', err.message); + * }); + * * await watcher.start(); * console.log(fee.value); // current value from server * @@ -251,7 +280,7 @@ export class WatchedField extends EventEmitter { * client.close(); * ``` */ -export class ConfigWatcher { +export class ConfigWatcher extends EventEmitter { private readonly configStub: InstanceType; private readonly metadata: Metadata; private readonly timeout: number; @@ -269,12 +298,37 @@ export class ConfigWatcher { timeout: number, tenantId: string, ) { + super(); this.configStub = configStub; this.metadata = metadata; this.timeout = timeout; this.tenantId = tenantId; } + on( + event: K, + listener: (...args: ConfigWatcherEvents[K]) => void, + ): this; + on(event: string | symbol, listener: (...args: unknown[]) => void): this; + on(event: string | symbol, listener: (...args: unknown[]) => void): this { + return super.on(event, listener); + } + + once( + event: K, + listener: (...args: ConfigWatcherEvents[K]) => void, + ): this; + once(event: string | symbol, listener: (...args: unknown[]) => void): this; + once(event: string | symbol, listener: (...args: unknown[]) => void): this { + return super.once(event, listener); + } + + emit(event: K, ...args: ConfigWatcherEvents[K]): boolean; + emit(event: string | symbol, ...args: unknown[]): boolean; + emit(event: string | symbol, ...args: unknown[]): boolean { + return super.emit(event, ...args); + } + /** * Register a field to watch. * @@ -414,10 +468,11 @@ export class ConfigWatcher { return; } + this.emit("subscriptionError", mapGrpcError(err)); + if (isRetryableError(err)) { this.scheduleReconnect(backoff); } else { - // Non-retryable error: stop the watcher. void this.stop(); } }); diff --git a/test/watcher.test.ts b/test/watcher.test.ts index eb34dd6..277a2b3 100644 --- a/test/watcher.test.ts +++ b/test/watcher.test.ts @@ -695,6 +695,51 @@ describe("ConfigWatcher", () => { expect(configStub.subscribe).toHaveBeenCalledTimes(1); }); + it("emits subscriptionError for retryable errors", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.on("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable")); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("server unavailable"); + + await vi.advanceTimersByTimeAsync(60_000); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + + it("emits subscriptionError for non-retryable errors", async () => { + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.on("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + mockStream.emit("error", makeServiceError(status.PERMISSION_DENIED, "access denied")); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("access denied"); + }); + it("reconnects on stream end", async () => { vi.useFakeTimers(); From 8edbcad50a45519e2d638a47badcd529316f0776 Mon Sep 17 00:00:00 2001 From: zeevdr Date: Mon, 25 May 2026 12:53:18 +0300 Subject: [PATCH 2/2] test(watcher): cover once() override with subscriptionError once-listener test Co-Authored-By: Claude --- test/watcher.test.ts | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/watcher.test.ts b/test/watcher.test.ts index 277a2b3..b705862 100644 --- a/test/watcher.test.ts +++ b/test/watcher.test.ts @@ -740,6 +740,49 @@ describe("ConfigWatcher", () => { expect(errors[0]?.message).toBe("access denied"); }); + it("once listener fires only for the first subscriptionError", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.once("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } }); + }, + ); + + // First error fires the once listener. + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "first error")); + await vi.advanceTimersByTimeAsync(60_000); + + // Second error on the new stream should NOT fire the once listener again. + const newStream2 = createMockStream(); + configStub.subscribe.mockReturnValue(newStream2); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } }); + }, + ); + newStream.emit("error", makeServiceError(status.UNAVAILABLE, "second error")); + await vi.advanceTimersByTimeAsync(60_000); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("first error"); + + newStream2.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + it("reconnects on stream end", async () => { vi.useFakeTimers();