Skip to content

Commit bfb8182

Browse files
improve backoff function context
1 parent acf2260 commit bfb8182

3 files changed

Lines changed: 123 additions & 21 deletions

File tree

README.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -454,9 +454,11 @@ new EventProcessor({
454454
client,
455455
handlerMap: handlers,
456456
maxErrors: 5, // Retry up to 5 times
457-
backoff: (errorCount) => {
457+
backoff: ({ attempt, error, event, errors, maxErrors }) => {
458458
// Custom backoff strategy
459-
const delayMs = 1000 * 2 ** errorCount; // Exponential: 1s, 2s, 4s, 8s, 16s
459+
// error is the latest handler error, errors contains all handler errors for this attempt
460+
// event is the event snapshot after this attempt's handlers finished
461+
const delayMs = 1000 * 2 ** attempt; // Exponential: 1s, 2s, 4s, 8s, 16s
460462
return new Date(Date.now() + delayMs);
461463
},
462464
});
@@ -747,10 +749,10 @@ new EventProcessor({
747749
maxErrors: 5,
748750

749751
// Backoff calculation function (default: exponential backoff capped at 60s)
750-
backoff: (errorCount: number): Date => {
752+
backoff: ({ attempt, error, event, errors, maxErrors }): Date => {
751753
const baseDelayMs = 1000;
752754
const maxDelayMs = 60000;
753-
const backoffMs = Math.min(baseDelayMs * 2 ** errorCount, maxDelayMs);
755+
const backoffMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
754756
return new Date(Date.now() + backoffMs);
755757
},
756758

@@ -1005,8 +1007,8 @@ const processor = new EventProcessor({
10051007

10061008
```typescript
10071009
// Linear backoff: 5s, 10s, 15s, 20s, 25s
1008-
const linearBackoff = (errorCount: number): Date => {
1009-
const delayMs = 5000 * errorCount;
1010+
const linearBackoff = ({ attempt }): Date => {
1011+
const delayMs = 5000 * attempt;
10101012
return new Date(Date.now() + delayMs);
10111013
};
10121014

@@ -1018,8 +1020,8 @@ const fixedBackoff = (): Date => {
10181020
// Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s, 13s...
10191021
const fibonacciBackoff = (() => {
10201022
const fib = (n: number): number => (n <= 1 ? 1 : fib(n - 1) + fib(n - 2));
1021-
return (errorCount: number): Date => {
1022-
const delayMs = fib(errorCount) * 1000;
1023+
return ({ attempt }): Date => {
1024+
const delayMs = fib(attempt) * 1000;
10231025
return new Date(Date.now() + delayMs);
10241026
};
10251027
})();

src/processor.test.ts

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,18 @@ describe("EventProcessor - processEvents", () => {
220220
);
221221

222222
expect(opts.backoff).toHaveBeenCalledOnce();
223-
expect(opts.backoff).toHaveBeenCalledWith(5); // evt.errors + 1
223+
expect(opts.backoff).toHaveBeenCalledWith(
224+
expect.objectContaining({
225+
attempt: 5,
226+
error: err,
227+
errors: expect.arrayContaining([err]),
228+
event: expect.objectContaining({
229+
id: "1",
230+
errors: 5,
231+
}),
232+
maxErrors: opts.maxErrors,
233+
}),
234+
); // evt.errors + 1
224235

225236
expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1);
226237
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
@@ -525,7 +536,12 @@ describe("EventProcessor - processEvents", () => {
525536

526537
// Backoff is called even when reaching maxErrors (then backoff_until is nulled)
527538
expect(opts.backoff).toHaveBeenCalledOnce();
528-
expect(opts.backoff).toHaveBeenCalledWith(opts.maxErrors);
539+
expect(opts.backoff).toHaveBeenCalledWith(
540+
expect.objectContaining({
541+
attempt: opts.maxErrors,
542+
maxErrors: opts.maxErrors,
543+
}),
544+
);
529545

530546
// Should call the maxErrors callback
531547
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledOnce();
@@ -623,7 +639,12 @@ describe("EventProcessor - processEvents", () => {
623639
expect(handlerMap.evtType1.handler2).toHaveBeenCalledOnce();
624640

625641
// Should call backoff even when jumping to maxErrors
626-
expect(opts.backoff).toHaveBeenCalledWith(opts.maxErrors);
642+
expect(opts.backoff).toHaveBeenCalledWith(
643+
expect.objectContaining({
644+
attempt: opts.maxErrors,
645+
maxErrors: opts.maxErrors,
646+
}),
647+
);
627648

628649
// Should call the maxErrors callback since all remaining handlers are unprocessable
629650
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledOnce();
@@ -711,7 +732,13 @@ describe("EventProcessor - processEvents", () => {
711732
expect(handlerMap.evtType1.handler3).toHaveBeenCalledOnce();
712733

713734
// Should call backoff normally since handler3 has retryable error
714-
expect(opts.backoff).toHaveBeenCalledWith(1);
735+
expect(opts.backoff).toHaveBeenCalledWith(
736+
expect.objectContaining({
737+
attempt: 1,
738+
error: retryableError,
739+
errors: expect.arrayContaining([retryableError]),
740+
}),
741+
);
715742

716743
// Should NOT call the maxErrors callback since errors < maxErrors
717744
expect(opts.onEventMaxErrorsReached).not.toHaveBeenCalled();
@@ -888,7 +915,12 @@ describe("EventProcessor - processEvents", () => {
888915
expect(handlerMap.evtType1.handler3).toHaveBeenCalledOnce();
889916

890917
// Default backoff should also be called
891-
expect(opts.backoff).toHaveBeenCalledWith(1);
918+
expect(opts.backoff).toHaveBeenCalledWith(
919+
expect.objectContaining({
920+
attempt: 1,
921+
maxErrors: opts.maxErrors,
922+
}),
923+
);
892924

893925
// The latest backoff (backoff2 = 20 seconds) should be used
894926
expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1);
@@ -950,7 +982,13 @@ describe("EventProcessor - processEvents", () => {
950982
await processor.stop();
951983

952984
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
953-
expect(opts.backoff).toHaveBeenCalledWith(1);
985+
expect(opts.backoff).toHaveBeenCalledWith(
986+
expect.objectContaining({
987+
attempt: 1,
988+
error,
989+
errors: expect.arrayContaining([error]),
990+
}),
991+
);
954992

955993
// The latest backoff (laterBackoff = 30 seconds) should be used, not the default (5 seconds)
956994
const updateCall = mockTxClient.updateEvent.mock.calls[0][0];
@@ -1013,7 +1051,13 @@ describe("EventProcessor - processEvents", () => {
10131051
await processor.stop();
10141052

10151053
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
1016-
expect(opts.backoff).toHaveBeenCalledWith(1);
1054+
expect(opts.backoff).toHaveBeenCalledWith(
1055+
expect.objectContaining({
1056+
attempt: 1,
1057+
error,
1058+
errors: expect.arrayContaining([error]),
1059+
}),
1060+
);
10171061

10181062
// The latest backoff (defaultBackoffTime = 5 seconds) should be used, not the earlier one (2 seconds)
10191063
const updateCall = mockTxClient.updateEvent.mock.calls[0][0];
@@ -1026,7 +1070,21 @@ describe("EventProcessor - processEvents", () => {
10261070

10271071
describe("defaultBackoff", () => {
10281072
it("should calculate a backoff", () => {
1029-
const backoff = defaultBackoff(3);
1073+
const backoff = defaultBackoff({
1074+
attempt: 3,
1075+
error: new Error("test"),
1076+
errors: [],
1077+
event: {
1078+
id: "event-1",
1079+
type: "evtType1",
1080+
timestamp: now,
1081+
data: {},
1082+
correlation_id: "abc123",
1083+
handler_results: {},
1084+
errors: 3,
1085+
},
1086+
maxErrors: 5,
1087+
});
10301088
const actual = backoff.getTime();
10311089
const expected = Date.now() + 1000 * 2 ** 3;
10321090
const diff = Math.abs(actual - expected);
@@ -1036,7 +1094,21 @@ describe("defaultBackoff", () => {
10361094

10371095
it("should cap backoff at maxDelayMs for large error counts", () => {
10381096
const maxDelayMs = 1000 * 60; // 60 seconds
1039-
const backoff = defaultBackoff(20); // Large error count that would exceed max
1097+
const backoff = defaultBackoff({
1098+
attempt: 20,
1099+
error: new Error("test"),
1100+
errors: [],
1101+
event: {
1102+
id: "event-1",
1103+
type: "evtType1",
1104+
timestamp: now,
1105+
data: {},
1106+
correlation_id: "abc123",
1107+
handler_results: {},
1108+
errors: 20,
1109+
},
1110+
maxErrors: 30,
1111+
}); // Large error count that would exceed max
10401112
const actual = backoff.getTime();
10411113
const expected = Date.now() + maxDelayMs;
10421114
const diff = Math.abs(actual - expected);

src/processor.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,23 @@ export interface TxOBTransactionProcessorClient<
169169
): Promise<void>;
170170
}
171171

172-
export const defaultBackoff = (errorCount: number): Date => {
172+
export type TxOBBackoffContext<
173+
TxOBEventType extends string,
174+
TEventDataMap extends TxOBEventDataMap<TxOBEventType> = TxOBEventDataMap<TxOBEventType>,
175+
> = {
176+
attempt: number;
177+
error?: unknown;
178+
errors: readonly unknown[];
179+
event: Readonly<TxOBEventByType<TxOBEventType, TEventDataMap>>;
180+
maxErrors: number;
181+
};
182+
183+
export const defaultBackoff = ({
184+
attempt,
185+
}: TxOBBackoffContext<string>): Date => {
173186
const baseDelayMs = 1000;
174187
const maxDelayMs = 1000 * 60;
175-
const backoffMs = Math.min(baseDelayMs * 2 ** errorCount, maxDelayMs);
188+
const backoffMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
176189
const retryTimestamp = new Date(Date.now() + backoffMs);
177190

178191
return retryTimestamp;
@@ -190,7 +203,7 @@ type TxOBProcessEventsOpts<
190203
TEventDataMap extends TxOBEventDataMap<TxOBEventType>,
191204
> = {
192205
maxErrors: number;
193-
backoff: (count: number) => Date;
206+
backoff: (context: TxOBBackoffContext<TxOBEventType, TEventDataMap>) => Date;
194207
signal?: AbortSignal;
195208
logger?: Logger;
196209
maxEventConcurrency?: number;
@@ -353,6 +366,8 @@ const processEvent = async <
353366
);
354367

355368
const backoffs: Date[] = [];
369+
const backoffErrors: unknown[] = [];
370+
let latestBackoffError: unknown;
356371

357372
const handlerLimit = pLimit(maxHandlerConcurrency);
358373
await Promise.allSettled(
@@ -430,6 +445,8 @@ const processEvent = async <
430445
);
431446
} catch (error) {
432447
handlerError = error;
448+
latestBackoffError = error;
449+
backoffErrors.push(error);
433450
logger?.error(
434451
{
435452
eventId: lockedEvent.id,
@@ -508,7 +525,18 @@ const processEvent = async <
508525

509526
if (errored) {
510527
lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors);
511-
backoffs.push(backoff(lockedEvent.errors));
528+
const backoffContext: TxOBBackoffContext<TxOBEventType, TEventDataMap> = {
529+
attempt: lockedEvent.errors,
530+
error: latestBackoffError,
531+
errors: backoffErrors,
532+
event: deepClone(lockedEvent) as Readonly<
533+
TxOBEventByType<TxOBEventType, TEventDataMap>
534+
>,
535+
maxErrors,
536+
};
537+
backoffs.push(
538+
backoff(backoffContext),
539+
);
512540
const latestBackoff = backoffs.sort(
513541
(a, b) => b.getTime() - a.getTime(),
514542
)[0];

0 commit comments

Comments
 (0)