Skip to content

Commit ddcfa65

Browse files
zeevdrclaude
andauthored
feat(watcher): add addField() for dynamic field registration after start (#101)
ConfigWatcher.field() was one-shot: calling it after start() threw. addField() fills that gap — it works both before and after start(). When called post-start it fetches a targeted GetConfig snapshot to seed the new field's initial value, cancels the current Subscribe stream, and re-opens it with the full updated field list. Internally, loadSnapshot() is refactored into a private loadSnapshotForFields(paths) helper so both the full reconnect path and the targeted addField path share the same logic. Closes #70 Co-authored-by: Claude <noreply@anthropic.com>
1 parent 69a38bf commit ddcfa65

2 files changed

Lines changed: 184 additions & 9 deletions

File tree

src/watcher.ts

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,9 @@ export class ConfigWatcher extends EventEmitter {
384384
}
385385

386386
/**
387-
* Register a field to watch.
387+
* Register a field to watch before `start()`.
388388
*
389-
* Must be called before `start()`. Returns a WatchedField that will be
390-
* populated with the initial value from the snapshot and updated in
391-
* real-time from the Subscribe stream.
389+
* Must be called before `start()`. For post-start registration use `addField()`.
392390
*
393391
* @param path - Dot-separated field path (e.g. "payments.fee").
394392
* @param converter - Type converter: String, Number, or Boolean.
@@ -403,13 +401,59 @@ export class ConfigWatcher extends EventEmitter {
403401
*/
404402
field<T>(path: string, converter: Converter, options: FieldOptions<T>): WatchedField<T> {
405403
if (this.started) {
406-
throw new DecreeError("cannot register fields after start()");
404+
throw new DecreeError("cannot register fields after start(); use addField() instead");
407405
}
408406
const wf = new WatchedField<T>(path, converter, options);
409407
this.fields.set(path, wf as WatchedField<unknown>);
410408
return wf;
411409
}
412410

411+
/**
412+
* Register a field to watch dynamically — works both before and after `start()`.
413+
*
414+
* When called after `start()`, loads the field's initial value from a GetConfig
415+
* snapshot and re-opens the Subscribe stream with the updated field list.
416+
*
417+
* @param path - Dot-separated field path (e.g. "payments.fee").
418+
* @param converter - Type converter: String, Number, or Boolean.
419+
* @param options - Options including the default value.
420+
* @returns A WatchedField instance for this path.
421+
* @throws DecreeError if called after stop().
422+
*
423+
* @example
424+
* ```ts
425+
* await watcher.start();
426+
* const label = await watcher.addField('payments.label', String, { default: '' });
427+
* console.log(label.value);
428+
* ```
429+
*/
430+
async addField<T>(
431+
path: string,
432+
converter: Converter,
433+
options: FieldOptions<T>,
434+
): Promise<WatchedField<T>> {
435+
if (this.stopped) {
436+
throw new DecreeError("cannot add fields after stop()");
437+
}
438+
const wf = new WatchedField<T>(path, converter, options);
439+
this.fields.set(path, wf as WatchedField<unknown>);
440+
441+
if (this.started) {
442+
await this.loadSnapshotForFields([path]);
443+
if (this.reconnectTimer !== null) {
444+
clearTimeout(this.reconnectTimer);
445+
this.reconnectTimer = null;
446+
}
447+
if (this.stream) {
448+
this.stream.cancel();
449+
this.stream = null;
450+
}
451+
this.subscribe();
452+
}
453+
454+
return wf;
455+
}
456+
413457
/**
414458
* Load the initial snapshot and start the Subscribe stream.
415459
*
@@ -480,6 +524,10 @@ export class ConfigWatcher extends EventEmitter {
480524
}
481525

482526
private async loadSnapshot(): Promise<void> {
527+
await this.loadSnapshotForFields([...this.fields.keys()]);
528+
}
529+
530+
private async loadSnapshotForFields(paths: string[]): Promise<void> {
483531
const resp = await this.callGetConfig({
484532
tenantId: this.tenantId,
485533
includeDescriptions: false,
@@ -492,9 +540,11 @@ export class ConfigWatcher extends EventEmitter {
492540
}
493541
}
494542

495-
for (const [path, field] of this.fields) {
496-
const raw = valueMap.get(path);
497-
field._loadInitial(raw ?? null);
543+
for (const path of paths) {
544+
const field = this.fields.get(path);
545+
if (field) {
546+
field._loadInitial(valueMap.get(path) ?? null);
547+
}
498548
}
499549
}
500550

test/watcher.test.ts

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,138 @@ describe("ConfigWatcher", () => {
416416
await watcher.start();
417417

418418
expect(() => watcher.field("other.field", String, { default: "" })).toThrow(
419-
"cannot register fields after start()",
419+
"cannot register fields after start(); use addField() instead",
420420
);
421421

422422
await watcher.stop();
423423
});
424424
});
425425

426+
describe("addField()", () => {
427+
it("works before start — returns WatchedField at default value", async () => {
428+
const watcher = createWatcher();
429+
const field = await watcher.addField("payments.fee", Number, { default: 0.01 });
430+
expect(field).toBeInstanceOf(WatchedField);
431+
expect(field.value).toBe(0.01);
432+
});
433+
434+
it("before start — field is included when start() runs", async () => {
435+
const watcher = createWatcher();
436+
const fee = await watcher.addField("payments.fee", Number, { default: 0.01 });
437+
438+
mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.99 } }]);
439+
await watcher.start();
440+
441+
expect(fee.value).toBe(0.99);
442+
await watcher.stop();
443+
});
444+
445+
it("after start — loads initial value and re-subscribes", async () => {
446+
const watcher = createWatcher();
447+
watcher.field("payments.fee", Number, { default: 0.01 });
448+
mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]);
449+
await watcher.start();
450+
451+
// New mock stream for the re-subscribe triggered by addField.
452+
const newStream = createMockStream();
453+
configStub.subscribe.mockReturnValue(newStream);
454+
configStub.getConfig.mockImplementationOnce(
455+
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
456+
cb(null, {
457+
config: {
458+
tenantId: "tenant-1",
459+
version: 2,
460+
values: [
461+
{ fieldPath: "payments.label", value: { stringValue: "hello" }, checksum: "x" },
462+
],
463+
},
464+
});
465+
},
466+
);
467+
468+
const label = await watcher.addField("payments.label", String, { default: "" });
469+
470+
expect(label.value).toBe("hello");
471+
// Original stream cancelled, new one opened.
472+
expect(mockStream.cancel).toHaveBeenCalledOnce();
473+
expect(configStub.subscribe).toHaveBeenCalledTimes(2);
474+
475+
newStream.cancel = vi.fn();
476+
await watcher.stop();
477+
});
478+
479+
it("after start — new subscribe call includes added field path", async () => {
480+
const watcher = createWatcher();
481+
watcher.field("payments.fee", Number, { default: 0.01 });
482+
mockGetConfigSuccess([]);
483+
await watcher.start();
484+
485+
const newStream = createMockStream();
486+
configStub.subscribe.mockReturnValue(newStream);
487+
configStub.getConfig.mockImplementationOnce(
488+
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
489+
cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } });
490+
},
491+
);
492+
493+
await watcher.addField("payments.label", String, { default: "" });
494+
495+
const subscribeArgs = configStub.subscribe.mock.calls[1];
496+
expect(subscribeArgs?.[0]).toMatchObject({
497+
tenantId: "tenant-1",
498+
fieldPaths: expect.arrayContaining(["payments.fee", "payments.label"]),
499+
});
500+
501+
newStream.cancel = vi.fn();
502+
await watcher.stop();
503+
});
504+
505+
it("after start — added field receives live changes", async () => {
506+
const watcher = createWatcher();
507+
watcher.field("payments.fee", Number, { default: 0.01 });
508+
mockGetConfigSuccess([]);
509+
await watcher.start();
510+
511+
const newStream = createMockStream();
512+
configStub.subscribe.mockReturnValue(newStream);
513+
configStub.getConfig.mockImplementationOnce(
514+
(_req: unknown, _meta: unknown, _opts: unknown, cb: (...args: unknown[]) => void) => {
515+
cb(null, { config: { tenantId: "tenant-1", version: 2, values: [] } });
516+
},
517+
);
518+
519+
const label = await watcher.addField("payments.label", String, { default: "" });
520+
521+
newStream.emit("data", {
522+
change: {
523+
tenantId: "tenant-1",
524+
version: 3,
525+
fieldPath: "payments.label",
526+
oldValue: { stringValue: "" },
527+
newValue: { stringValue: "updated" },
528+
changedBy: "admin",
529+
changedAt: new Date(),
530+
},
531+
});
532+
533+
expect(label.value).toBe("updated");
534+
535+
newStream.cancel = vi.fn();
536+
await watcher.stop();
537+
});
538+
539+
it("throws after stop()", async () => {
540+
const watcher = createWatcher();
541+
mockGetConfigSuccess([]);
542+
await watcher.start();
543+
await watcher.stop();
544+
545+
await expect(watcher.addField("payments.fee", Number, { default: 0.01 })).rejects.toThrow(
546+
"cannot add fields after stop()",
547+
);
548+
});
549+
});
550+
426551
describe("start()", () => {
427552
it("loads initial snapshot into registered fields", async () => {
428553
const watcher = createWatcher();

0 commit comments

Comments
 (0)