Skip to content

Commit 69a38bf

Browse files
zeevdrclaude
andauthored
fix(stream): guard convertValue to prevent stream crash on unknown field type (#100)
* fix(stream): guard convertValue to prevent stream crash on unknown field type Wrap convertValue calls in _loadInitial() and _update() with try/catch. On conversion error the field falls back to its default (loadInitial) or retains its last good value (_update), emits a non-fatal conversionError event on the WatchedField, logs a warning, and returns — the gRPC stream continues processing remaining fields. Add WatchedFieldEvents type map with change and conversionError events. Closes #92 Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 7e6c4e0 commit 69a38bf

2 files changed

Lines changed: 213 additions & 4 deletions

File tree

src/watcher.ts

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { EventEmitter } from "node:events";
1111
import { type ClientReadableStream, type Metadata, type ServiceError, status } from "@grpc/grpc-js";
1212
import type { Converter } from "./convert.js";
1313
import { convertValue, typedValueToString } from "./convert.js";
14-
import { DecreeError, mapGrpcError } from "./errors.js";
14+
import { DecreeError, mapGrpcError, TypeMismatchError } from "./errors.js";
1515
import type {
1616
GetConfigRequest,
1717
GetConfigResponse,
@@ -159,14 +159,30 @@ export class WatchedField<T> extends EventEmitter {
159159
/**
160160
* Load the initial value from a GetConfig snapshot.
161161
*
162+
* If `convertValue` throws (e.g. type mismatch or unsupported converter),
163+
* the field falls back to its default value and emits a `'conversionError'` event.
164+
* The error is non-fatal — the stream continues.
165+
*
162166
* @param rawValue - The raw string value from the snapshot, or null if absent.
163167
* @internal
164168
*/
165169
_loadInitial(rawValue: string | null): void {
166170
if (rawValue === null) {
167171
this.currentValue = this.defaultValue;
168172
} else {
169-
this.currentValue = convertValue(rawValue, this.converter) as T;
173+
try {
174+
this.currentValue = convertValue(rawValue, this.converter) as T;
175+
} catch (err) {
176+
console.warn(
177+
`[decree] convertValue failed for field "${this.path}" (value=${JSON.stringify(rawValue)}): ${err instanceof Error ? err.message : String(err)}`,
178+
);
179+
this.currentValue = this.defaultValue;
180+
const decreeErr =
181+
err instanceof DecreeError
182+
? err
183+
: new TypeMismatchError(err instanceof Error ? err.message : String(err));
184+
this.emit("conversionError", decreeErr, rawValue);
185+
}
170186
}
171187
}
172188

@@ -176,6 +192,10 @@ export class WatchedField<T> extends EventEmitter {
176192
* Emits a `'change'` event if the new value differs from the current value.
177193
* Enqueues a Change for async iteration.
178194
*
195+
* If `convertValue` throws (e.g. type mismatch or unsupported converter),
196+
* the field retains its current value, emits a `'conversionError'` event,
197+
* and returns without updating. The stream continues processing other fields.
198+
*
179199
* @param rawValue - The new raw string value, or null if set to null.
180200
* @param change - The Change object describing this update.
181201
* @internal
@@ -185,7 +205,19 @@ export class WatchedField<T> extends EventEmitter {
185205
if (rawValue === null) {
186206
this.currentValue = this.defaultValue;
187207
} else {
188-
this.currentValue = convertValue(rawValue, this.converter) as T;
208+
try {
209+
this.currentValue = convertValue(rawValue, this.converter) as T;
210+
} catch (err) {
211+
console.warn(
212+
`[decree] convertValue failed for field "${this.path}" (value=${JSON.stringify(rawValue)}): ${err instanceof Error ? err.message : String(err)}`,
213+
);
214+
const decreeErr =
215+
err instanceof DecreeError
216+
? err
217+
: new TypeMismatchError(err instanceof Error ? err.message : String(err));
218+
this.emit("conversionError", decreeErr, rawValue);
219+
return;
220+
}
189221
}
190222

191223
// Only emit if the value actually changed.
@@ -223,6 +255,25 @@ export class WatchedField<T> extends EventEmitter {
223255
}
224256
}
225257

258+
/**
259+
* Typed event map for WatchedField.
260+
*/
261+
export interface WatchedFieldEvents {
262+
/**
263+
* Emitted when the field value changes.
264+
*
265+
* Arguments are the old value and the new value.
266+
*/
267+
change: [oldValue: unknown, newValue: unknown];
268+
/**
269+
* Emitted when `convertValue` throws during `_loadInitial` or `_update`.
270+
*
271+
* The field retains its previous value (or default for `_loadInitial`).
272+
* The gRPC stream continues — this error is non-fatal.
273+
*/
274+
conversionError: [err: DecreeError, rawValue: string];
275+
}
276+
226277
/**
227278
* Typed event map for ConfigWatcher.
228279
*

test/watcher.test.ts

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { EventEmitter } from "node:events";
22
import { Metadata, type ServiceError, status } from "@grpc/grpc-js";
33
import { afterEach, beforeEach, describe, expect, it, type MockInstance, vi } from "vitest";
4-
import { DecreeError } from "../src/errors.js";
4+
import { DecreeError, TypeMismatchError } from "../src/errors.js";
55
import type { Change } from "../src/types.js";
66
import { ConfigWatcher, WatchedField } from "../src/watcher.js";
77

@@ -283,6 +283,72 @@ describe("WatchedField", () => {
283283
expect(field.droppedChanges).toBe(0);
284284
});
285285
});
286+
287+
describe("conversionError handling", () => {
288+
it("falls back to default and emits conversionError when _loadInitial value is unconvertible", () => {
289+
const field = new WatchedField("payments.fee", Number, { default: 0.01 });
290+
291+
const errors: Array<{ err: DecreeError; raw: string }> = [];
292+
field.on("conversionError", (err, raw) => errors.push({ err, raw }));
293+
294+
field._loadInitial("not-a-number");
295+
296+
expect(field.value).toBe(0.01);
297+
expect(errors).toHaveLength(1);
298+
expect(errors[0]?.err).toBeInstanceOf(TypeMismatchError);
299+
expect(errors[0]?.raw).toBe("not-a-number");
300+
});
301+
302+
it("retains current value and emits conversionError when _update value is unconvertible (type-flip)", () => {
303+
const field = new WatchedField("payments.fee", Number, { default: 0.01 });
304+
field._loadInitial("0.05");
305+
expect(field.value).toBe(0.05);
306+
307+
const errors: Array<{ err: DecreeError; raw: string }> = [];
308+
field.on("conversionError", (err, raw) => errors.push({ err, raw }));
309+
310+
const changeHandler = vi.fn();
311+
field.on("change", changeHandler);
312+
313+
const change: Change = {
314+
fieldPath: "payments.fee",
315+
oldValue: "0.05",
316+
newValue: "not-a-number",
317+
version: 2,
318+
changedBy: "admin",
319+
};
320+
field._update("not-a-number", change);
321+
322+
// Value must be retained, not overwritten.
323+
expect(field.value).toBe(0.05);
324+
// No change event should fire.
325+
expect(changeHandler).not.toHaveBeenCalled();
326+
// conversionError must be emitted.
327+
expect(errors).toHaveLength(1);
328+
expect(errors[0]?.err).toBeInstanceOf(TypeMismatchError);
329+
expect(errors[0]?.raw).toBe("not-a-number");
330+
});
331+
332+
it("does not fire change event after a failed conversion in _update", () => {
333+
const field = new WatchedField("feature.enabled", Boolean, { default: false });
334+
field._loadInitial("true");
335+
336+
const changeHandler = vi.fn();
337+
field.on("change", changeHandler);
338+
339+
const change: Change = {
340+
fieldPath: "feature.enabled",
341+
oldValue: "true",
342+
newValue: "maybe",
343+
version: 2,
344+
changedBy: "admin",
345+
};
346+
field._update("maybe", change);
347+
348+
expect(field.value).toBe(true);
349+
expect(changeHandler).not.toHaveBeenCalled();
350+
});
351+
});
286352
});
287353

288354
describe("ConfigWatcher", () => {
@@ -616,6 +682,98 @@ describe("ConfigWatcher", () => {
616682

617683
await watcher.stop();
618684
});
685+
686+
it("does not crash the stream when convertValue throws (type-flip mid-stream)", async () => {
687+
const watcher = createWatcher();
688+
const fee = watcher.field("payments.fee", Number, { default: 0.01 });
689+
690+
mockGetConfigSuccess([{ fieldPath: "payments.fee", value: { numberValue: 0.05 } }]);
691+
692+
const conversionErrors: Array<DecreeError> = [];
693+
fee.on("conversionError", (err) => conversionErrors.push(err));
694+
695+
await watcher.start();
696+
697+
// Simulate server flipping the field type to a non-numeric string.
698+
mockStream.emit("data", {
699+
change: {
700+
tenantId: "tenant-1",
701+
version: 2,
702+
fieldPath: "payments.fee",
703+
oldValue: { numberValue: 0.05 },
704+
newValue: { stringValue: "not-a-number" },
705+
changedBy: "admin",
706+
changedAt: new Date(),
707+
},
708+
});
709+
710+
// Value must remain at last good value (0.05).
711+
expect(fee.value).toBe(0.05);
712+
// conversionError must have been emitted.
713+
expect(conversionErrors).toHaveLength(1);
714+
expect(conversionErrors[0]).toBeInstanceOf(TypeMismatchError);
715+
716+
// Stream must still be alive — subsequent valid update must apply.
717+
mockStream.emit("data", {
718+
change: {
719+
tenantId: "tenant-1",
720+
version: 3,
721+
fieldPath: "payments.fee",
722+
oldValue: { numberValue: 0.05 },
723+
newValue: { numberValue: 0.99 },
724+
changedBy: "admin",
725+
changedAt: new Date(),
726+
},
727+
});
728+
729+
expect(fee.value).toBe(0.99);
730+
731+
await watcher.stop();
732+
});
733+
734+
it("continues processing other fields after one field has a conversion error", async () => {
735+
const watcher = createWatcher();
736+
const fee = watcher.field("payments.fee", Number, { default: 0.01 });
737+
const label = watcher.field("payments.label", String, { default: "default" });
738+
739+
mockGetConfigSuccess([
740+
{ fieldPath: "payments.fee", value: { numberValue: 0.05 } },
741+
{ fieldPath: "payments.label", value: { stringValue: "original" } },
742+
]);
743+
744+
await watcher.start();
745+
746+
// Bad update for fee (type-flip).
747+
mockStream.emit("data", {
748+
change: {
749+
tenantId: "tenant-1",
750+
version: 2,
751+
fieldPath: "payments.fee",
752+
oldValue: { numberValue: 0.05 },
753+
newValue: { stringValue: "bad" },
754+
changedBy: "admin",
755+
changedAt: new Date(),
756+
},
757+
});
758+
759+
// Good update for label — must still apply.
760+
mockStream.emit("data", {
761+
change: {
762+
tenantId: "tenant-1",
763+
version: 3,
764+
fieldPath: "payments.label",
765+
oldValue: { stringValue: "original" },
766+
newValue: { stringValue: "updated" },
767+
changedBy: "admin",
768+
changedAt: new Date(),
769+
},
770+
});
771+
772+
expect(fee.value).toBe(0.05); // unchanged due to conversion error
773+
expect(label.value).toBe("updated"); // updated successfully
774+
775+
await watcher.stop();
776+
});
619777
});
620778

621779
describe("reconnection", () => {

0 commit comments

Comments
 (0)