Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ export type {
ServerVersion,
TlsOptions,
} from "./types.js";
export type { ConfigWatcherEvents } from "./watcher.js";
// Watcher
export { ConfigWatcher, WatchedField } from "./watcher.js";
59 changes: 57 additions & 2 deletions src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,27 @@ export class WatchedField<T> extends EventEmitter {
}
}

/**
* Typed event map for ConfigWatcher.
*
* @example
* ```ts
* watcher.on('subscriptionError', (err) => {
* console.warn('subscription error:', err.message);
* });
* ```
*/
export interface ConfigWatcherEvents {
/**
* Emitted when a subscription error occurs.
*
* For retryable errors (UNAVAILABLE, INTERNAL) the watcher reconnects automatically.
* For non-retryable errors the watcher stops after emitting this event.
* The `err` argument is a typed `DecreeError` (e.g. `UnavailableError`).
*/
subscriptionError: [err: DecreeError];
}

/**
* ConfigWatcher subscribes to live configuration changes for a tenant.
*
Expand All @@ -231,6 +252,10 @@ export class WatchedField<T> extends EventEmitter {
* opens a Subscribe stream for real-time updates. On transient errors
* (UNAVAILABLE, INTERNAL), it automatically reconnects with exponential backoff.
*
* Subscription errors (both retryable and fatal) are emitted as `'subscriptionError'`
* events. Retryable errors trigger automatic reconnection; fatal errors cause the
* watcher to stop.
*
* @example
* ```ts
* const client = new ConfigClient('localhost:9090', { subject: 'myapp' });
Expand All @@ -239,6 +264,10 @@ export class WatchedField<T> extends EventEmitter {
* const fee = watcher.field('payments.fee', Number, { default: 0.01 });
* const enabled = watcher.field('payments.enabled', Boolean, { default: false });
*
* watcher.on('subscriptionError', (err) => {
* console.warn('watcher error:', err.message);
* });
*
* await watcher.start();
* console.log(fee.value); // current value from server
*
Expand All @@ -251,7 +280,7 @@ export class WatchedField<T> extends EventEmitter {
* client.close();
* ```
*/
export class ConfigWatcher {
export class ConfigWatcher extends EventEmitter {
private readonly configStub: InstanceType<typeof GrpcConfigServiceClient>;
private readonly metadata: Metadata;
private readonly timeout: number;
Expand All @@ -269,12 +298,37 @@ export class ConfigWatcher {
timeout: number,
tenantId: string,
) {
super();
this.configStub = configStub;
this.metadata = metadata;
this.timeout = timeout;
this.tenantId = tenantId;
}

on<K extends keyof ConfigWatcherEvents>(
event: K,
listener: (...args: ConfigWatcherEvents[K]) => void,
): this;
on(event: string | symbol, listener: (...args: unknown[]) => void): this;
on(event: string | symbol, listener: (...args: unknown[]) => void): this {
return super.on(event, listener);
}

once<K extends keyof ConfigWatcherEvents>(
event: K,
listener: (...args: ConfigWatcherEvents[K]) => void,
): this;
once(event: string | symbol, listener: (...args: unknown[]) => void): this;
once(event: string | symbol, listener: (...args: unknown[]) => void): this {
return super.once(event, listener);
}

emit<K extends keyof ConfigWatcherEvents>(event: K, ...args: ConfigWatcherEvents[K]): boolean;
emit(event: string | symbol, ...args: unknown[]): boolean;
emit(event: string | symbol, ...args: unknown[]): boolean {
return super.emit(event, ...args);
}

/**
* Register a field to watch.
*
Expand Down Expand Up @@ -414,10 +468,11 @@ export class ConfigWatcher {
return;
}

this.emit("subscriptionError", mapGrpcError(err));

if (isRetryableError(err)) {
this.scheduleReconnect(backoff);
} else {
// Non-retryable error: stop the watcher.
void this.stop();
}
});
Expand Down
88 changes: 88 additions & 0 deletions test/watcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,94 @@ describe("ConfigWatcher", () => {
expect(configStub.subscribe).toHaveBeenCalledTimes(1);
});

it("emits subscriptionError for retryable errors", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
watcher.field("payments.fee", Number, { default: 0.01 });
mockGetConfigSuccess([]);

const errors: Error[] = [];
watcher.on("subscriptionError", (err) => errors.push(err));

await watcher.start();

const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);

mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "server unavailable"));

expect(errors).toHaveLength(1);
expect(errors[0]?.message).toBe("server unavailable");

await vi.advanceTimersByTimeAsync(60_000);

newStream.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});

it("emits subscriptionError for non-retryable errors", async () => {
const watcher = createWatcher();
watcher.field("payments.fee", Number, { default: 0.01 });
mockGetConfigSuccess([]);

const errors: Error[] = [];
watcher.on("subscriptionError", (err) => errors.push(err));

await watcher.start();

mockStream.emit("error", makeServiceError(status.PERMISSION_DENIED, "access denied"));

await new Promise((r) => setTimeout(r, 10));

expect(errors).toHaveLength(1);
expect(errors[0]?.message).toBe("access denied");
});

it("once listener fires only for the first subscriptionError", async () => {
vi.useFakeTimers();

const watcher = createWatcher();
watcher.field("payments.fee", Number, { default: 0.01 });
mockGetConfigSuccess([]);

const errors: Error[] = [];
watcher.once("subscriptionError", (err) => errors.push(err));

await watcher.start();

const newStream = createMockStream();
configStub.subscribe.mockReturnValue(newStream);
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } });
},
);

// First error fires the once listener.
mockStream.emit("error", makeServiceError(status.UNAVAILABLE, "first error"));
await vi.advanceTimersByTimeAsync(60_000);

// Second error on the new stream should NOT fire the once listener again.
const newStream2 = createMockStream();
configStub.subscribe.mockReturnValue(newStream2);
configStub.getConfig.mockImplementationOnce(
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } });
},
);
newStream.emit("error", makeServiceError(status.UNAVAILABLE, "second error"));
await vi.advanceTimersByTimeAsync(60_000);

expect(errors).toHaveLength(1);
expect(errors[0]?.message).toBe("first error");

newStream2.cancel = vi.fn();
await watcher.stop();
vi.useRealTimers();
});

it("reconnects on stream end", async () => {
vi.useFakeTimers();

Expand Down