diff --git a/src/index.ts b/src/index.ts index af23c19..8959a5b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -45,5 +45,6 @@ export type { ServerVersion, TlsOptions, } from "./types.js"; +export type { ConfigWatcherEvents } from "./watcher.js"; // Watcher export { ConfigWatcher, WatchedField } from "./watcher.js"; diff --git a/src/watcher.ts b/src/watcher.ts index c0add05..0b4bccf 100644 --- a/src/watcher.ts +++ b/src/watcher.ts @@ -223,6 +223,27 @@ export class WatchedField extends EventEmitter { } } +/** + * Typed event map for ConfigWatcher. + * + * @example + * ```ts + * watcher.on('subscriptionError', (err) => { + * console.warn('subscription error:', err.message); + * }); + * ``` + */ +export interface ConfigWatcherEvents { + /** + * Emitted when a subscription error occurs. + * + * For retryable errors (UNAVAILABLE, INTERNAL) the watcher reconnects automatically. + * For non-retryable errors the watcher stops after emitting this event. + * The `err` argument is a typed `DecreeError` (e.g. `UnavailableError`). + */ + subscriptionError: [err: DecreeError]; +} + /** * ConfigWatcher subscribes to live configuration changes for a tenant. * @@ -231,6 +252,10 @@ export class WatchedField extends EventEmitter { * opens a Subscribe stream for real-time updates. On transient errors * (UNAVAILABLE, INTERNAL), it automatically reconnects with exponential backoff. * + * Subscription errors (both retryable and fatal) are emitted as `'subscriptionError'` + * events. Retryable errors trigger automatic reconnection; fatal errors cause the + * watcher to stop. + * * @example * ```ts * const client = new ConfigClient('localhost:9090', { subject: 'myapp' }); @@ -239,6 +264,10 @@ export class WatchedField extends EventEmitter { * const fee = watcher.field('payments.fee', Number, { default: 0.01 }); * const enabled = watcher.field('payments.enabled', Boolean, { default: false }); * + * watcher.on('subscriptionError', (err) => { + * console.warn('watcher error:', err.message); + * }); + * * await watcher.start(); * console.log(fee.value); // current value from server * @@ -251,7 +280,7 @@ export class WatchedField extends EventEmitter { * client.close(); * ``` */ -export class ConfigWatcher { +export class ConfigWatcher extends EventEmitter { private readonly configStub: InstanceType; private readonly metadata: Metadata; private readonly timeout: number; @@ -269,12 +298,37 @@ export class ConfigWatcher { timeout: number, tenantId: string, ) { + super(); this.configStub = configStub; this.metadata = metadata; this.timeout = timeout; this.tenantId = tenantId; } + on( + event: K, + listener: (...args: ConfigWatcherEvents[K]) => void, + ): this; + on(event: string | symbol, listener: (...args: unknown[]) => void): this; + on(event: string | symbol, listener: (...args: unknown[]) => void): this { + return super.on(event, listener); + } + + once( + event: K, + listener: (...args: ConfigWatcherEvents[K]) => void, + ): this; + once(event: string | symbol, listener: (...args: unknown[]) => void): this; + once(event: string | symbol, listener: (...args: unknown[]) => void): this { + return super.once(event, listener); + } + + emit(event: K, ...args: ConfigWatcherEvents[K]): boolean; + emit(event: string | symbol, ...args: unknown[]): boolean; + emit(event: string | symbol, ...args: unknown[]): boolean { + return super.emit(event, ...args); + } + /** * Register a field to watch. * @@ -414,10 +468,11 @@ export class ConfigWatcher { return; } + this.emit("subscriptionError", mapGrpcError(err)); + if (isRetryableError(err)) { this.scheduleReconnect(backoff); } else { - // Non-retryable error: stop the watcher. void this.stop(); } }); diff --git a/test/watcher.test.ts b/test/watcher.test.ts index eb34dd6..b705862 100644 --- a/test/watcher.test.ts +++ b/test/watcher.test.ts @@ -695,6 +695,94 @@ describe("ConfigWatcher", () => { expect(configStub.subscribe).toHaveBeenCalledTimes(1); }); + it("emits subscriptionError for retryable errors", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.on("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable")); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("server unavailable"); + + await vi.advanceTimersByTimeAsync(60_000); + + newStream.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + + it("emits subscriptionError for non-retryable errors", async () => { + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.on("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + mockStream.emit("error", makeServiceError(status.PERMISSION_DENIED, "access denied")); + + await new Promise((r) => setTimeout(r, 10)); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("access denied"); + }); + + it("once listener fires only for the first subscriptionError", async () => { + vi.useFakeTimers(); + + const watcher = createWatcher(); + watcher.field("payments.fee", Number, { default: 0.01 }); + mockGetConfigSuccess([]); + + const errors: Error[] = []; + watcher.once("subscriptionError", (err) => errors.push(err)); + + await watcher.start(); + + const newStream = createMockStream(); + configStub.subscribe.mockReturnValue(newStream); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } }); + }, + ); + + // First error fires the once listener. + mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "first error")); + await vi.advanceTimersByTimeAsync(60_000); + + // Second error on the new stream should NOT fire the once listener again. + const newStream2 = createMockStream(); + configStub.subscribe.mockReturnValue(newStream2); + configStub.getConfig.mockImplementationOnce( + (_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => { + cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } }); + }, + ); + newStream.emit("error", makeServiceError(status.UNAVAILABLE, "second error")); + await vi.advanceTimersByTimeAsync(60_000); + + expect(errors).toHaveLength(1); + expect(errors[0]?.message).toBe("first error"); + + newStream2.cancel = vi.fn(); + await watcher.stop(); + vi.useRealTimers(); + }); + it("reconnects on stream end", async () => { vi.useFakeTimers();