Skip to content

Commit 9bd8d03

Browse files
custom txob error to specify backoff from within handler
1 parent e44f18c commit 9bd8d03

6 files changed

Lines changed: 275 additions & 28 deletions

File tree

package.json

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@
2525
"dist",
2626
"README.md"
2727
],
28-
"main": "./dist/cjs/processor.js",
29-
"types": "dist/processor.d.ts",
28+
"main": "./dist/cjs/index.js",
29+
"types": "dist/index.d.ts",
3030
"exports": {
3131
".": {
32-
"types": "./dist/processor.d.ts",
33-
"import": "./dist/processor.js",
34-
"require": "./dist/cjs/processor.js"
32+
"types": "./dist/index.d.ts",
33+
"import": "./dist/index.js",
34+
"require": "./dist/cjs/index.js"
3535
},
3636
"./pg": {
3737
"types": "./dist/pg/client.d.ts",

src/error.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* TxobError can be thrown by an event handler to indicate that the event processing should be retried.
3+
* It allows handlers to specify a custom backoff time via the `backoffUntil` property.
4+
*
5+
* When multiple handlers throw TxobError with different `backoffUntil` dates, the processor will use
6+
* the latest (maximum) backoff time among all handlers and the default backoff calculation.
7+
*
8+
* @example
9+
* ```typescript
10+
* // Throw an error with a custom backoff time
11+
* throw new TxobError("Rate limit exceeded", {
12+
* backoffUntil: new Date(Date.now() + 60000) // Retry after 1 minute
13+
* });
14+
*
15+
* // Throw an error with a cause
16+
* throw new TxobError("Processing failed", {
17+
* cause: originalError,
18+
* backoffUntil: new Date(Date.now() + 30000) // Retry after 30 seconds
19+
* });
20+
* ```
21+
*/
22+
export class TxobError extends Error {
23+
/**
24+
* Optional date indicating when the event should be retried.
25+
* If provided, this backoff time will be considered along with the default backoff calculation,
26+
* and the latest (maximum) backoff time will be used.
27+
*/
28+
backoffUntil?: Date;
29+
30+
constructor(
31+
message: string,
32+
options: { cause?: Error; backoffUntil?: Date } = {},
33+
) {
34+
super(message);
35+
this.cause = options.cause;
36+
this.backoffUntil = options.backoffUntil;
37+
}
38+
}
39+
40+
/**
41+
* ErrorUnprocessableEventHandler can be thrown by an event handler to indicate that the event handler is unprocessable.
42+
* It wraps the original error that caused the handler to be unprocessable.
43+
* This error will signal the processor to stop processing the event handler and mark the event handler as unprocessable.
44+
*/
45+
export class ErrorUnprocessableEventHandler extends Error {
46+
error: Error;
47+
48+
constructor(error: Error) {
49+
const message = `unprocessable event handler: ${error.message}`;
50+
super(message);
51+
this.error = error;
52+
}
53+
}

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from "./processor.js";
2+
export * from "./error.js";

src/mongodb/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export const createProcessorClient = <EventType extends string>(
3737
.find(filter)
3838
.project({ id: 1, errors: 1 })
3939
.limit(limit)
40-
.sort('timestamp', 'asc')
40+
.sort("timestamp", "asc")
4141
.toArray()) as Pick<TxOBEvent<EventType>, "id" | "errors">[];
4242

4343
return events;

src/processor.test.ts

Lines changed: 202 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { describe, it, expect, vi, afterEach } from "vitest";
2-
import {
3-
EventProcessor,
4-
TxOBEvent,
5-
ErrorUnprocessableEventHandler,
6-
defaultBackoff,
7-
} from "./processor.js";
2+
import { EventProcessor, TxOBEvent, defaultBackoff } from "./processor.js";
3+
import { TxobError, ErrorUnprocessableEventHandler } from "./error.js";
84
import { sleep } from "./sleep.js";
95

106
const mockTxClient = {
@@ -763,6 +759,206 @@ describe("EventProcessor - processEvents", () => {
763759
"error processing event",
764760
);
765761
});
762+
763+
it("should use the latest backoff when multiple TxobErrors have different backoffUntil dates", async () => {
764+
const opts = {
765+
maxErrors: 5,
766+
backoff: vi.fn(() => new Date(now.getTime() + 5000)), // Default backoff: 5 seconds
767+
pollingIntervalMs: 10,
768+
};
769+
770+
// Create different backoff times
771+
const backoff1 = new Date(now.getTime() + 10000); // 10 seconds
772+
const backoff2 = new Date(now.getTime() + 20000); // 20 seconds (latest)
773+
const backoff3 = new Date(now.getTime() + 15000); // 15 seconds
774+
775+
const error1 = new TxobError("error 1", { backoffUntil: backoff1 });
776+
const error2 = new TxobError("error 2", { backoffUntil: backoff2 });
777+
const error3 = new TxobError("error 3", { backoffUntil: backoff3 });
778+
779+
const handlerMap = {
780+
evtType1: {
781+
handler1: vi.fn(() => Promise.reject(error1)),
782+
handler2: vi.fn(() => Promise.reject(error2)),
783+
handler3: vi.fn(() => Promise.reject(error3)),
784+
},
785+
};
786+
787+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
788+
type: "evtType1",
789+
id: "1",
790+
timestamp: now,
791+
data: {},
792+
correlation_id: "abc123",
793+
handler_results: {},
794+
errors: 0,
795+
};
796+
797+
const events = [evt1];
798+
let callCount = 0;
799+
mockClient.getEventsToProcess.mockImplementation(() => {
800+
callCount++;
801+
return Promise.resolve(callCount === 1 ? events : []);
802+
});
803+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
804+
return Promise.resolve(events.find((e) => e.id === id) ?? null);
805+
});
806+
mockTxClient.updateEvent.mockImplementation(() => {
807+
return Promise.resolve();
808+
});
809+
810+
const processor = new EventProcessor({
811+
client: mockClient,
812+
handlerMap,
813+
...opts,
814+
});
815+
processor.start();
816+
await sleep(50); // Wait for processing
817+
await processor.stop();
818+
819+
expect(mockClient.getEventsToProcess).toHaveBeenCalled();
820+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
821+
822+
// All handlers should have been called
823+
expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
824+
expect(handlerMap.evtType1.handler2).toHaveBeenCalledOnce();
825+
expect(handlerMap.evtType1.handler3).toHaveBeenCalledOnce();
826+
827+
// Default backoff should also be called
828+
expect(opts.backoff).toHaveBeenCalledWith(1);
829+
830+
// The latest backoff (backoff2 = 20 seconds) should be used
831+
expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1);
832+
const updateCall = mockTxClient.updateEvent.mock.calls[0][0];
833+
expect(updateCall.backoff_until).toEqual(backoff2);
834+
expect(updateCall.errors).toBe(1);
835+
});
836+
837+
it("should use the latest backoff when TxobError backoff is later than default backoff", async () => {
838+
const laterBackoff = new Date(now.getTime() + 30000); // 30 seconds
839+
const defaultBackoffTime = new Date(now.getTime() + 5000); // 5 seconds
840+
841+
const opts = {
842+
maxErrors: 5,
843+
backoff: vi.fn(() => defaultBackoffTime),
844+
pollingIntervalMs: 10,
845+
};
846+
847+
const error = new TxobError("error with backoff", {
848+
backoffUntil: laterBackoff,
849+
});
850+
851+
const handlerMap = {
852+
evtType1: {
853+
handler1: vi.fn(() => Promise.reject(error)),
854+
},
855+
};
856+
857+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
858+
type: "evtType1",
859+
id: "1",
860+
timestamp: now,
861+
data: {},
862+
correlation_id: "abc123",
863+
handler_results: {},
864+
errors: 0,
865+
};
866+
867+
const events = [evt1];
868+
let callCount = 0;
869+
mockClient.getEventsToProcess.mockImplementation(() => {
870+
callCount++;
871+
return Promise.resolve(callCount === 1 ? events : []);
872+
});
873+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
874+
return Promise.resolve(events.find((e) => e.id === id) ?? null);
875+
});
876+
mockTxClient.updateEvent.mockImplementation(() => {
877+
return Promise.resolve();
878+
});
879+
880+
const processor = new EventProcessor({
881+
client: mockClient,
882+
handlerMap,
883+
...opts,
884+
});
885+
processor.start();
886+
await sleep(50); // Wait for processing
887+
await processor.stop();
888+
889+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
890+
expect(opts.backoff).toHaveBeenCalledWith(1);
891+
892+
// The latest backoff (laterBackoff = 30 seconds) should be used, not the default (5 seconds)
893+
const updateCall = mockTxClient.updateEvent.mock.calls[0][0];
894+
expect(updateCall.backoff_until).toEqual(laterBackoff);
895+
expect(updateCall.backoff_until?.getTime()).toBeGreaterThan(
896+
defaultBackoffTime.getTime(),
897+
);
898+
});
899+
900+
it("should use default backoff when TxobError backoff is earlier than default backoff", async () => {
901+
const earlierBackoff = new Date(now.getTime() + 2000); // 2 seconds
902+
const defaultBackoffTime = new Date(now.getTime() + 5000); // 5 seconds
903+
904+
const opts = {
905+
maxErrors: 5,
906+
backoff: vi.fn(() => defaultBackoffTime),
907+
pollingIntervalMs: 10,
908+
};
909+
910+
const error = new TxobError("error with backoff", {
911+
backoffUntil: earlierBackoff,
912+
});
913+
914+
const handlerMap = {
915+
evtType1: {
916+
handler1: vi.fn(() => Promise.reject(error)),
917+
},
918+
};
919+
920+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
921+
type: "evtType1",
922+
id: "1",
923+
timestamp: now,
924+
data: {},
925+
correlation_id: "abc123",
926+
handler_results: {},
927+
errors: 0,
928+
};
929+
930+
const events = [evt1];
931+
let callCount = 0;
932+
mockClient.getEventsToProcess.mockImplementation(() => {
933+
callCount++;
934+
return Promise.resolve(callCount === 1 ? events : []);
935+
});
936+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
937+
return Promise.resolve(events.find((e) => e.id === id) ?? null);
938+
});
939+
mockTxClient.updateEvent.mockImplementation(() => {
940+
return Promise.resolve();
941+
});
942+
943+
const processor = new EventProcessor({
944+
client: mockClient,
945+
handlerMap,
946+
...opts,
947+
});
948+
processor.start();
949+
await sleep(50); // Wait for processing
950+
await processor.stop();
951+
952+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
953+
expect(opts.backoff).toHaveBeenCalledWith(1);
954+
955+
// The latest backoff (defaultBackoffTime = 5 seconds) should be used, not the earlier one (2 seconds)
956+
const updateCall = mockTxClient.updateEvent.mock.calls[0][0];
957+
expect(updateCall.backoff_until).toEqual(defaultBackoffTime);
958+
expect(updateCall.backoff_until?.getTime()).toBeGreaterThan(
959+
earlierBackoff.getTime(),
960+
);
961+
});
766962
});
767963

768964
describe("defaultBackoff", () => {

src/processor.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { sleep } from "./sleep.js";
33
import pLimit from "p-limit";
44
import { deepClone } from "./clone.js";
55
import PQueue from "p-queue";
6+
import { ErrorUnprocessableEventHandler, TxobError } from "./error.js";
67

78
type TxOBEventHandlerResult = {
89
processed_at?: Date;
@@ -199,6 +200,8 @@ const processEvent = async <TxOBEventType extends string>({
199200
`processing event`,
200201
);
201202

203+
let backoffs: Date[] = [];
204+
202205
const handlerLimit = pLimit(maxHandlerConcurrency);
203206
await Promise.allSettled(
204207
Object.entries(eventHandlerMap).map(([handlerName, handler]) =>
@@ -263,6 +266,10 @@ const processEvent = async <TxOBEventType extends string>({
263266
});
264267
errored = true;
265268
} else {
269+
if (error instanceof TxobError && error.backoffUntil) {
270+
backoffs.push(error.backoffUntil);
271+
}
272+
266273
errored = true;
267274
handlerResults.errors?.push({
268275
error: (error as Error)?.message ?? error,
@@ -299,7 +306,11 @@ const processEvent = async <TxOBEventType extends string>({
299306

300307
if (errored) {
301308
lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors);
302-
lockedEvent.backoff_until = backoff(lockedEvent.errors);
309+
backoffs.push(backoff(lockedEvent.errors));
310+
const latestBackoff = backoffs.sort(
311+
(a, b) => b.getTime() - a.getTime(),
312+
)[0];
313+
lockedEvent.backoff_until = latestBackoff;
303314
if (lockedEvent.errors === maxErrors) {
304315
lockedEvent.backoff_until = null;
305316
lockedEvent.processed_at = getDate();
@@ -333,21 +344,6 @@ const processEvent = async <TxOBEventType extends string>({
333344
});
334345
};
335346

336-
/**
337-
* ErrorUnprocessableEventHandler can be thrown by an event handler to indicate that the event handler is unprocessable.
338-
* It wraps the original error that caused the handler to be unprocessable.
339-
* This error will signal the processor to stop processing the event handler and mark the event handler as unprocessable.
340-
*/
341-
export class ErrorUnprocessableEventHandler extends Error {
342-
error: Error;
343-
344-
constructor(error: Error) {
345-
const message = `unprocessable event handler: ${error.message}`;
346-
super(message);
347-
this.error = error;
348-
}
349-
}
350-
351347
export interface Logger {
352348
debug(message?: unknown, ...optionalParams: unknown[]): void;
353349
info(message?: unknown, ...optionalParams: unknown[]): void;

0 commit comments

Comments
 (0)