Skip to content

Commit 55a5877

Browse files
update processor to guarantee max errors is called even if unprocessable errors are used
1 parent 8dbee71 commit 55a5877

2 files changed

Lines changed: 281 additions & 1 deletion

File tree

src/processor.test.ts

Lines changed: 259 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ describe("processEvents", () => {
279279
const opts = {
280280
maxErrors: 5,
281281
backoff: vi.fn(),
282+
onEventMaxErrorsReached: vi.fn(),
282283
};
283284
const err = new Error("some error");
284285
const errUnprocessable = new ErrorUnprocessableEventHandler(
@@ -342,12 +343,23 @@ describe("processEvents", () => {
342343
1,
343344
);
344345

346+
// Should call onEventMaxErrorsReached since all remaining handlers are unprocessable
347+
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledOnce();
348+
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledWith({
349+
event: expect.objectContaining({
350+
id: "1",
351+
errors: opts.maxErrors,
352+
}),
353+
txClient: mockTxClient,
354+
signal: undefined,
355+
});
356+
345357
expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1);
346358
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
347359
backoff_until: null,
348360
correlation_id: "abc123",
349361
data: {},
350-
errors: 1,
362+
errors: opts.maxErrors,
351363
handler_results: {
352364
handler1: {
353365
unprocessable_at: now,
@@ -375,6 +387,252 @@ describe("processEvents", () => {
375387
processed_at: now,
376388
});
377389
});
390+
391+
it("sets errors to maxErrors when all handlers are unprocessable and calls onEventMaxErrorsReached", async () => {
392+
const opts = {
393+
maxErrors: 5,
394+
backoff: vi.fn(),
395+
onEventMaxErrorsReached: vi.fn(),
396+
};
397+
const errUnprocessable1 = new ErrorUnprocessableEventHandler(
398+
new Error("err1"),
399+
);
400+
const errUnprocessable2 = new ErrorUnprocessableEventHandler(
401+
new Error("err2"),
402+
);
403+
const handlerMap = {
404+
evtType1: {
405+
handler1: vi.fn(() => Promise.reject(errUnprocessable1)),
406+
handler2: vi.fn(() => Promise.reject(errUnprocessable2)),
407+
},
408+
};
409+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
410+
type: "evtType1",
411+
id: "1",
412+
timestamp: now,
413+
data: {},
414+
correlation_id: "abc123",
415+
handler_results: {},
416+
errors: 0,
417+
};
418+
const events = [evt1];
419+
mockClient.getEventsToProcess.mockImplementation(() => events);
420+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
421+
return events.find((e) => e.id === id);
422+
});
423+
mockTxClient.updateEvent.mockImplementation(() => {
424+
return Promise.resolve();
425+
});
426+
427+
await processEvents(mockClient, handlerMap, opts);
428+
429+
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
430+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
431+
expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
432+
expect(handlerMap.evtType1.handler2).toHaveBeenCalledOnce();
433+
434+
// Backoff is called even when reaching maxErrors (then backoff_until is nulled)
435+
expect(opts.backoff).toHaveBeenCalledOnce();
436+
expect(opts.backoff).toHaveBeenCalledWith(opts.maxErrors);
437+
438+
// Should call the maxErrors callback
439+
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledOnce();
440+
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledWith({
441+
event: expect.objectContaining({
442+
id: "1",
443+
errors: opts.maxErrors,
444+
}),
445+
txClient: mockTxClient,
446+
signal: undefined,
447+
});
448+
449+
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
450+
backoff_until: null,
451+
correlation_id: "abc123",
452+
data: {},
453+
errors: opts.maxErrors,
454+
handler_results: {
455+
handler1: {
456+
unprocessable_at: now,
457+
errors: [
458+
{
459+
error: errUnprocessable1.message,
460+
timestamp: now,
461+
},
462+
],
463+
},
464+
handler2: {
465+
unprocessable_at: now,
466+
errors: [
467+
{
468+
error: errUnprocessable2.message,
469+
timestamp: now,
470+
},
471+
],
472+
},
473+
},
474+
id: "1",
475+
timestamp: now,
476+
type: "evtType1",
477+
processed_at: now,
478+
});
479+
});
480+
481+
it("sets errors to maxErrors when some handlers succeed and remaining are unprocessable", async () => {
482+
const opts = {
483+
maxErrors: 5,
484+
backoff: vi.fn(),
485+
onEventMaxErrorsReached: vi.fn(),
486+
};
487+
const errUnprocessable = new ErrorUnprocessableEventHandler(
488+
new Error("err1"),
489+
);
490+
const handlerMap = {
491+
evtType1: {
492+
handler1: vi.fn(() => Promise.resolve()),
493+
handler2: vi.fn(() => Promise.reject(errUnprocessable)),
494+
},
495+
};
496+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
497+
type: "evtType1",
498+
id: "1",
499+
timestamp: now,
500+
data: {},
501+
correlation_id: "abc123",
502+
handler_results: {},
503+
errors: 0,
504+
};
505+
const events = [evt1];
506+
mockClient.getEventsToProcess.mockImplementation(() => events);
507+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
508+
return events.find((e) => e.id === id);
509+
});
510+
mockTxClient.updateEvent.mockImplementation(() => {
511+
return Promise.resolve();
512+
});
513+
514+
await processEvents(mockClient, handlerMap, opts);
515+
516+
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
517+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
518+
expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
519+
expect(handlerMap.evtType1.handler2).toHaveBeenCalledOnce();
520+
521+
// Should call backoff even when jumping to maxErrors
522+
expect(opts.backoff).toHaveBeenCalledWith(opts.maxErrors);
523+
524+
// Should call the maxErrors callback since all remaining handlers are unprocessable
525+
expect(opts.onEventMaxErrorsReached).toHaveBeenCalledOnce();
526+
527+
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
528+
backoff_until: null,
529+
correlation_id: "abc123",
530+
data: {},
531+
errors: opts.maxErrors,
532+
handler_results: {
533+
handler1: {
534+
errors: [],
535+
processed_at: now,
536+
},
537+
handler2: {
538+
unprocessable_at: now,
539+
errors: [
540+
{
541+
error: errUnprocessable.message,
542+
timestamp: now,
543+
},
544+
],
545+
},
546+
},
547+
id: "1",
548+
timestamp: now,
549+
type: "evtType1",
550+
processed_at: now,
551+
});
552+
});
553+
554+
it("does not set errors to maxErrors when remaining handlers have retryable errors", async () => {
555+
const opts = {
556+
maxErrors: 5,
557+
backoff: vi.fn(() => now),
558+
onEventMaxErrorsReached: vi.fn(),
559+
};
560+
const retryableError = new Error("temporary failure");
561+
const errUnprocessable = new ErrorUnprocessableEventHandler(
562+
new Error("err1"),
563+
);
564+
const handlerMap = {
565+
evtType1: {
566+
handler1: vi.fn(() => Promise.resolve()),
567+
handler2: vi.fn(() => Promise.reject(errUnprocessable)),
568+
handler3: vi.fn(() => Promise.reject(retryableError)),
569+
},
570+
};
571+
const evt1: TxOBEvent<keyof typeof handlerMap> = {
572+
type: "evtType1",
573+
id: "1",
574+
timestamp: now,
575+
data: {},
576+
correlation_id: "abc123",
577+
handler_results: {},
578+
errors: 0,
579+
};
580+
const events = [evt1];
581+
mockClient.getEventsToProcess.mockImplementation(() => events);
582+
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
583+
return events.find((e) => e.id === id);
584+
});
585+
mockTxClient.updateEvent.mockImplementation(() => {
586+
return Promise.resolve();
587+
});
588+
589+
await processEvents(mockClient, handlerMap, opts);
590+
591+
expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
592+
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
593+
expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
594+
expect(handlerMap.evtType1.handler2).toHaveBeenCalledOnce();
595+
expect(handlerMap.evtType1.handler3).toHaveBeenCalledOnce();
596+
597+
// Should call backoff normally since handler3 has retryable error
598+
expect(opts.backoff).toHaveBeenCalledWith(1);
599+
600+
// Should NOT call the maxErrors callback since errors < maxErrors
601+
expect(opts.onEventMaxErrorsReached).not.toHaveBeenCalled();
602+
603+
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
604+
backoff_until: expect.any(Date),
605+
correlation_id: "abc123",
606+
data: {},
607+
errors: 1,
608+
handler_results: {
609+
handler1: {
610+
errors: [],
611+
processed_at: now,
612+
},
613+
handler2: {
614+
unprocessable_at: now,
615+
errors: [
616+
{
617+
error: errUnprocessable.message,
618+
timestamp: now,
619+
},
620+
],
621+
},
622+
handler3: {
623+
errors: [
624+
{
625+
error: retryableError.message,
626+
timestamp: now,
627+
},
628+
],
629+
},
630+
},
631+
id: "1",
632+
timestamp: now,
633+
type: "evtType1",
634+
});
635+
});
378636
});
379637

380638
describe("defaultBackoff", () => {

src/processor.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ export const processEvents = async <TxOBEventType extends string>(
265265
error: error.message ?? error,
266266
timestamp: getDate(),
267267
});
268+
errored = true;
268269
} else {
269270
errored = true;
270271
handlerResults.errors?.push({
@@ -279,6 +280,27 @@ export const processEvents = async <TxOBEventType extends string>(
279280
),
280281
);
281282

283+
// Check if all remaining handlers (those that haven't succeeded) are unprocessable
284+
// If so, there's nothing left to retry, so set errors to maxErrors to stop processing
285+
const remainingHandlers = Object.entries(eventHandlerMap).filter(
286+
([handlerName, _]) => {
287+
const result = lockedEvent.handler_results[handlerName];
288+
return !result?.processed_at;
289+
},
290+
);
291+
292+
const allRemainingHandlersUnprocessable =
293+
remainingHandlers.length > 0 &&
294+
remainingHandlers.every(([handlerName, _]) => {
295+
const result = lockedEvent.handler_results[handlerName];
296+
return result?.unprocessable_at;
297+
});
298+
299+
if (allRemainingHandlersUnprocessable) {
300+
lockedEvent.errors = maxErrors;
301+
errored = true;
302+
}
303+
282304
if (errored) {
283305
lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors);
284306
lockedEvent.backoff_until = backoff(lockedEvent.errors);

0 commit comments

Comments
 (0)