Skip to content

Commit 4d42a7b

Browse files
zeevdrclaude
andcommitted
feat(watcher): surface subscription errors via subscriptionError event
ConfigWatcher now extends EventEmitter and emits a typed 'subscriptionError' event whenever the Subscribe stream encounters an error. Both retryable errors (UNAVAILABLE, INTERNAL — which trigger reconnection) and fatal errors (all others — which stop the watcher) are surfaced, letting callers log or monitor connection health without polling. The ConfigWatcherEvents interface is exported for consumers that want typed listener signatures. The 'subscriptionError' name is intentional: using 'error' would throw on unhandled listeners per Node.js convention. Closes #61 Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 4ad953f commit 4d42a7b

3 files changed

Lines changed: 103 additions & 2 deletions

File tree

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ export type {
4545
ServerVersion,
4646
TlsOptions,
4747
} from "./types.js";
48+
export type { ConfigWatcherEvents } from "./watcher.js";
4849
// Watcher
4950
export { ConfigWatcher, WatchedField } from "./watcher.js";

src/watcher.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,27 @@ export class WatchedField<T> extends EventEmitter {
223223
}
224224
}
225225

226+
/**
227+
* Typed event map for ConfigWatcher.
228+
*
229+
* @example
230+
* ```ts
231+
* watcher.on('subscriptionError', (err) => {
232+
* console.warn('subscription error:', err.message);
233+
* });
234+
* ```
235+
*/
236+
export interface ConfigWatcherEvents {
237+
/**
238+
* Emitted when a subscription error occurs.
239+
*
240+
* For retryable errors (UNAVAILABLE, INTERNAL) the watcher reconnects automatically.
241+
* For non-retryable errors the watcher stops after emitting this event.
242+
* The `err` argument is a typed `DecreeError` (e.g. `UnavailableError`).
243+
*/
244+
subscriptionError: [err: DecreeError];
245+
}
246+
226247
/**
227248
* ConfigWatcher subscribes to live configuration changes for a tenant.
228249
*
@@ -231,6 +252,10 @@ export class WatchedField<T> extends EventEmitter {
231252
* opens a Subscribe stream for real-time updates. On transient errors
232253
* (UNAVAILABLE, INTERNAL), it automatically reconnects with exponential backoff.
233254
*
255+
* Subscription errors (both retryable and fatal) are emitted as `'subscriptionError'`
256+
* events. Retryable errors trigger automatic reconnection; fatal errors cause the
257+
* watcher to stop.
258+
*
234259
* @example
235260
* ```ts
236261
* const client = new ConfigClient('localhost:9090', { subject: 'myapp' });
@@ -239,6 +264,10 @@ export class WatchedField<T> extends EventEmitter {
239264
* const fee = watcher.field('payments.fee', Number, { default: 0.01 });
240265
* const enabled = watcher.field('payments.enabled', Boolean, { default: false });
241266
*
267+
* watcher.on('subscriptionError', (err) => {
268+
* console.warn('watcher error:', err.message);
269+
* });
270+
*
242271
* await watcher.start();
243272
* console.log(fee.value); // current value from server
244273
*
@@ -251,7 +280,7 @@ export class WatchedField<T> extends EventEmitter {
251280
* client.close();
252281
* ```
253282
*/
254-
export class ConfigWatcher {
283+
export class ConfigWatcher extends EventEmitter {
255284
private readonly configStub: InstanceType<typeof GrpcConfigServiceClient>;
256285
private readonly metadata: Metadata;
257286
private readonly timeout: number;
@@ -269,12 +298,37 @@ export class ConfigWatcher {
269298
timeout: number,
270299
tenantId: string,
271300
) {
301+
super();
272302
this.configStub = configStub;
273303
this.metadata = metadata;
274304
this.timeout = timeout;
275305
this.tenantId = tenantId;
276306
}
277307

308+
on<K extends keyof ConfigWatcherEvents>(
309+
event: K,
310+
listener: (...args: ConfigWatcherEvents[K]) => void,
311+
): this;
312+
on(event: string | symbol, listener: (...args: unknown[]) => void): this;
313+
on(event: string | symbol, listener: (...args: unknown[]) => void): this {
314+
return super.on(event, listener);
315+
}
316+
317+
once<K extends keyof ConfigWatcherEvents>(
318+
event: K,
319+
listener: (...args: ConfigWatcherEvents[K]) => void,
320+
): this;
321+
once(event: string | symbol, listener: (...args: unknown[]) => void): this;
322+
once(event: string | symbol, listener: (...args: unknown[]) => void): this {
323+
return super.once(event, listener);
324+
}
325+
326+
emit<K extends keyof ConfigWatcherEvents>(event: K, ...args: ConfigWatcherEvents[K]): boolean;
327+
emit(event: string | symbol, ...args: unknown[]): boolean;
328+
emit(event: string | symbol, ...args: unknown[]): boolean {
329+
return super.emit(event, ...args);
330+
}
331+
278332
/**
279333
* Register a field to watch.
280334
*
@@ -414,10 +468,11 @@ export class ConfigWatcher {
414468
return;
415469
}
416470

471+
this.emit("subscriptionError", mapGrpcError(err));
472+
417473
if (isRetryableError(err)) {
418474
this.scheduleReconnect(backoff);
419475
} else {
420-
// Non-retryable error: stop the watcher.
421476
void this.stop();
422477
}
423478
});

test/watcher.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,51 @@ describe("ConfigWatcher", () => {
695695
expect(configStub.subscribe).toHaveBeenCalledTimes(1);
696696
});
697697

698+
it("emits subscriptionError for retryable errors", async () => {
699+
vi.useFakeTimers();
700+
701+
const watcher = createWatcher();
702+
watcher.field("payments.fee", Number, { default: 0.01 });
703+
mockGetConfigSuccess([]);
704+
705+
const errors: Error[] = [];
706+
watcher.on("subscriptionError", (err) => errors.push(err));
707+
708+
await watcher.start();
709+
710+
const newStream = createMockStream();
711+
configStub.subscribe.mockReturnValue(newStream);
712+
713+
mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable"));
714+
715+
expect(errors).toHaveLength(1);
716+
expect(errors[0]?.message).toBe("server unavailable");
717+
718+
await vi.advanceTimersByTimeAsync(60_000);
719+
720+
newStream.cancel = vi.fn();
721+
await watcher.stop();
722+
vi.useRealTimers();
723+
});
724+
725+
it("emits subscriptionError for non-retryable errors", async () => {
726+
const watcher = createWatcher();
727+
watcher.field("payments.fee", Number, { default: 0.01 });
728+
mockGetConfigSuccess([]);
729+
730+
const errors: Error[] = [];
731+
watcher.on("subscriptionError", (err) => errors.push(err));
732+
733+
await watcher.start();
734+
735+
mockStream.emit("error", makeServiceError(status.PERMISSION_DENIED, "access denied"));
736+
737+
await new Promise((r) => setTimeout(r, 10));
738+
739+
expect(errors).toHaveLength(1);
740+
expect(errors[0]?.message).toBe("access denied");
741+
});
742+
698743
it("reconnects on stream end", async () => {
699744
vi.useFakeTimers();
700745

0 commit comments

Comments
 (0)