Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { EventEmitter } from "node:events";
import { type ClientReadableStream, type Metadata, type ServiceError, status } from "@grpc/grpc-js";
import type { Converter } from "./convert.js";
import { convertValue, typedValueToString } from "./convert.js";
import { DecreeError, mapGrpcError } from "./errors.js";
import { DecreeError, mapGrpcError, TypeMismatchError } from "./errors.js";
import type {
GetConfigRequest,
GetConfigResponse,
Expand Down Expand Up @@ -159,14 +159,30 @@ export class WatchedField<T> extends EventEmitter {
/**
* Load the initial value from a GetConfig snapshot.
*
* If `convertValue` throws (e.g. type mismatch or unsupported converter),
* the field falls back to its default value and emits a `'conversionError'` event.
* The error is non-fatal — the stream continues.
*
* @param rawValue - The raw string value from the snapshot, or null if absent.
* @internal
*/
_loadInitial(rawValue: string | null): void {
if (rawValue === null) {
this.currentValue = this.defaultValue;
} else {
this.currentValue = convertValue(rawValue, this.converter) as T;
try {
this.currentValue = convertValue(rawValue, this.converter) as T;
} catch (err) {
console.warn(
`[decree] convertValue failed for field "${this.path}" (value=${JSON.stringify(rawValue)}): ${err instanceof Error ? err.message : String(err)}`,
);
this.currentValue = this.defaultValue;
const decreeErr =
err instanceof DecreeError
? err
: new TypeMismatchError(err instanceof Error ? err.message : String(err));
this.emit("conversionError", decreeErr, rawValue);
}
}
}

Expand All @@ -176,6 +192,10 @@ export class WatchedField<T> extends EventEmitter {
* Emits a `'change'` event if the new value differs from the current value.
* Enqueues a Change for async iteration.
*
* If `convertValue` throws (e.g. type mismatch or unsupported converter),
* the field retains its current value, emits a `'conversionError'` event,
* and returns without updating. The stream continues processing other fields.
*
* @param rawValue - The new raw string value, or null if set to null.
* @param change - The Change object describing this update.
* @internal
Expand All @@ -185,7 +205,19 @@ export class WatchedField<T> extends EventEmitter {
if (rawValue === null) {
this.currentValue = this.defaultValue;
} else {
this.currentValue = convertValue(rawValue, this.converter) as T;
try {
this.currentValue = convertValue(rawValue, this.converter) as T;
} catch (err) {
console.warn(
`[decree] convertValue failed for field "${this.path}" (value=${JSON.stringify(rawValue)}): ${err instanceof Error ? err.message : String(err)}`,
);
const decreeErr =
err instanceof DecreeError
? err
: new TypeMismatchError(err instanceof Error ? err.message : String(err));
this.emit("conversionError", decreeErr, rawValue);
return;
}
}

// Only emit if the value actually changed.
Expand Down Expand Up @@ -223,6 +255,25 @@ export class WatchedField<T> extends EventEmitter {
}
}

/**
* Typed event map for WatchedField.
*/
export interface WatchedFieldEvents {
/**
* Emitted when the field value changes.
*
* Arguments are the old value and the new value.
*/
change: [oldValue: unknown, newValue: unknown];
/**
* Emitted when `convertValue` throws during `_loadInitial` or `_update`.
*
* The field retains its previous value (or default for `_loadInitial`).
* The gRPC stream continues — this error is non-fatal.
*/
conversionError: [err: DecreeError, rawValue: string];
}

/**
* Typed event map for ConfigWatcher.
*
Expand Down
160 changes: 159 additions & 1 deletion test/watcher.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from "node:events";
import { Metadata, type ServiceError, status } from "@grpc/grpc-js";
import { afterEach, beforeEach, describe, expect, it, type MockInstance, vi } from "vitest";
import { DecreeError } from "../src/errors.js";
import { DecreeError, TypeMismatchError } from "../src/errors.js";
import type { Change } from "../src/types.js";
import { ConfigWatcher, WatchedField } from "../src/watcher.js";

Expand Down Expand Up @@ -283,6 +283,72 @@ describe("WatchedField", () => {
expect(field.droppedChanges).toBe(0);
});
});

describe("conversionError handling", () => {
it("falls back to default and emits conversionError when _loadInitial value is unconvertible", () => {
const field = new WatchedField("payments.fee", Number, { default: 0.01 });

const errors: Array<{ err: DecreeError; raw: string }> = [];
field.on("conversionError", (err, raw) => errors.push({ err, raw }));

field._loadInitial("not-a-number");

expect(field.value).toBe(0.01);
expect(errors).toHaveLength(1);
expect(errors[0]?.err).toBeInstanceOf(TypeMismatchError);
expect(errors[0]?.raw).toBe("not-a-number");
});

it("retains current value and emits conversionError when _update value is unconvertible (type-flip)", () => {
const field = new WatchedField("payments.fee", Number, { default: 0.01 });
field._loadInitial("0.05");
expect(field.value).toBe(0.05);

const errors: Array<{ err: DecreeError; raw: string }> = [];
field.on("conversionError", (err, raw) => errors.push({ err, raw }));

const changeHandler = vi.fn();
field.on("change", changeHandler);

const change: Change = {
fieldPath: "payments.fee",
oldValue: "0.05",
newValue: "not-a-number",
version: 2,
changedBy: "admin",
};
field._update("not-a-number", change);

// Value must be retained, not overwritten.
expect(field.value).toBe(0.05);
// No change event should fire.
expect(changeHandler).not.toHaveBeenCalled();
// conversionError must be emitted.
expect(errors).toHaveLength(1);
expect(errors[0]?.err).toBeInstanceOf(TypeMismatchError);
expect(errors[0]?.raw).toBe("not-a-number");
});

it("does not fire change event after a failed conversion in _update", () => {
const field = new WatchedField("feature.enabled", Boolean, { default: false });
field._loadInitial("true");

const changeHandler = vi.fn();
field.on("change", changeHandler);

const change: Change = {
fieldPath: "feature.enabled",
oldValue: "true",
newValue: "maybe",
version: 2,
changedBy: "admin",
};
field._update("maybe", change);

expect(field.value).toBe(true);
expect(changeHandler).not.toHaveBeenCalled();
});
});
});

describe("ConfigWatcher", () => {
Expand Down Expand Up @@ -616,6 +682,98 @@ describe("ConfigWatcher", () => {

await watcher.stop();
});

it("does not crash the stream when convertValue throws (type-flip mid-stream)", async () => {
const watcher = createWatcher();
const fee = watcher.field("payments.fee", Number, { default: 0.01 });

mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]);

const conversionErrors: Array<DecreeError> = [];
fee.on("conversionError", (err) => conversionErrors.push(err));

await watcher.start();

// Simulate server flipping the field type to a non-numeric string.
mockStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 2,
fieldPath: "payments.fee",
oldValue: { numberValue: 0.05 },
newValue: { stringValue: "not-a-number" },
changedBy: "admin",
changedAt: new Date(),
},
});

// Value must remain at last good value (0.05).
expect(fee.value).toBe(0.05);
// conversionError must have been emitted.
expect(conversionErrors).toHaveLength(1);
expect(conversionErrors[0]).toBeInstanceOf(TypeMismatchError);

// Stream must still be alive — subsequent valid update must apply.
mockStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 3,
fieldPath: "payments.fee",
oldValue: { numberValue: 0.05 },
newValue: { numberValue: 0.99 },
changedBy: "admin",
changedAt: new Date(),
},
});

expect(fee.value).toBe(0.99);

await watcher.stop();
});

it("continues processing other fields after one field has a conversion error", async () => {
const watcher = createWatcher();
const fee = watcher.field("payments.fee", Number, { default: 0.01 });
const label = watcher.field("payments.label", String, { default: "default" });

mockGetConfigSuccess([
{ fieldPath: "payments.fee", value: { numberValue: 0.05 } },
{ fieldPath: "payments.label", value: { stringValue: "original" } },
]);

await watcher.start();

// Bad update for fee (type-flip).
mockStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 2,
fieldPath: "payments.fee",
oldValue: { numberValue: 0.05 },
newValue: { stringValue: "bad" },
changedBy: "admin",
changedAt: new Date(),
},
});

// Good update for label — must still apply.
mockStream.emit("data", {
change: {
tenantId: "tenant-1",
version: 3,
fieldPath: "payments.label",
oldValue: { stringValue: "original" },
newValue: { stringValue: "updated" },
changedBy: "admin",
changedAt: new Date(),
},
});

expect(fee.value).toBe(0.05); // unchanged due to conversion error
expect(label.value).toBe("updated"); // updated successfully

await watcher.stop();
});
});

describe("reconnection", () => {
Expand Down