@@ -633,6 +633,67 @@ describe("processEvents", () => {
633633 type : "evtType1" ,
634634 } ) ;
635635 } ) ;
636+
637+ it ( "should throw and log when onEventMaxErrorsReached hook throws an error" , async ( ) => {
638+ const hookError = new Error ( "hook error" ) ;
639+ const logger = {
640+ debug : vi . fn ( ) ,
641+ info : vi . fn ( ) ,
642+ warn : vi . fn ( ) ,
643+ error : vi . fn ( ) ,
644+ } ;
645+ const opts = {
646+ maxErrors : 5 ,
647+ backoff : vi . fn ( ) ,
648+ onEventMaxErrorsReached : vi . fn ( ( ) => {
649+ throw hookError ;
650+ } ) ,
651+ logger,
652+ } ;
653+ const errUnprocessable = new ErrorUnprocessableEventHandler (
654+ new Error ( "err1" ) ,
655+ ) ;
656+ const handlerMap = {
657+ evtType1 : {
658+ handler1 : vi . fn ( ( ) => Promise . reject ( errUnprocessable ) ) ,
659+ } ,
660+ } ;
661+ const evt1 : TxOBEvent < keyof typeof handlerMap > = {
662+ type : "evtType1" ,
663+ id : "1" ,
664+ timestamp : now ,
665+ data : { } ,
666+ correlation_id : "abc123" ,
667+ handler_results : { } ,
668+ errors : 0 ,
669+ } ;
670+ const events = [ evt1 ] ;
671+ mockClient . getEventsToProcess . mockImplementation ( ( ) => events ) ;
672+ mockTxClient . getEventByIdForUpdateSkipLocked . mockImplementation ( ( id ) => {
673+ return events . find ( ( e ) => e . id === id ) ;
674+ } ) ;
675+ mockTxClient . updateEvent . mockImplementation ( ( ) => {
676+ return Promise . resolve ( ) ;
677+ } ) ;
678+
679+ await processEvents ( mockClient , handlerMap , opts ) ;
680+
681+ expect ( opts . onEventMaxErrorsReached ) . toHaveBeenCalledOnce ( ) ;
682+ expect ( logger . error ) . toHaveBeenCalledWith (
683+ {
684+ eventId : "1" ,
685+ error : hookError ,
686+ } ,
687+ "error in onEventMaxErrorsReached hook" ,
688+ ) ;
689+ expect ( logger . error ) . toHaveBeenCalledWith (
690+ {
691+ eventId : "1" ,
692+ error : hookError ,
693+ } ,
694+ "error processing event" ,
695+ ) ;
696+ } ) ;
636697} ) ;
637698
638699describe ( "defaultBackoff" , ( ) => {
@@ -644,6 +705,16 @@ describe("defaultBackoff", () => {
644705
645706 expect ( diff ) . lessThanOrEqual ( 1 ) ;
646707 } ) ;
708+
709+ it ( "should cap backoff at maxDelayMs for large error counts" , ( ) => {
710+ const maxDelayMs = 1000 * 60 ; // 60 seconds
711+ const backoff = defaultBackoff ( 20 ) ; // Large error count that would exceed max
712+ const actual = backoff . getTime ( ) ;
713+ const expected = Date . now ( ) + maxDelayMs ;
714+ const diff = Math . abs ( actual - expected ) ;
715+
716+ expect ( diff ) . lessThanOrEqual ( 1 ) ;
717+ } ) ;
647718} ) ;
648719
649720describe ( "Processor" , ( ) => {
@@ -682,6 +753,91 @@ describe("Processor", () => {
682753 const diff = Date . now ( ) - start ;
683754 expect ( diff ) . toBeLessThan ( 50 ) ;
684755 } ) ;
756+ it ( "should warn when stopping a processor that is not started" , async ( ) => {
757+ const logger = {
758+ debug : vi . fn ( ) ,
759+ info : vi . fn ( ) ,
760+ warn : vi . fn ( ) ,
761+ error : vi . fn ( ) ,
762+ } ;
763+ const processor = Processor ( ( ) => sleep ( 1 ) , { sleepTimeMs : 0 , logger } ) ;
764+
765+ await processor . stop ( ) ;
766+
767+ expect ( logger . warn ) . toHaveBeenCalledWith (
768+ "cannot stop processor from 'stopped'" ,
769+ ) ;
770+ } ) ;
771+ it ( "should handle shutdown when processor completes before stop is called" , async ( ) => {
772+ let resolve : ( ( ) => void ) | null = null ;
773+ const promise = new Promise < void > ( ( r ) => {
774+ resolve = r ;
775+ } ) ;
776+
777+ const processor = Processor (
778+ ( ) => {
779+ return promise ;
780+ } ,
781+ { sleepTimeMs : 0 } ,
782+ ) ;
783+ processor . start ( ) ;
784+
785+ // Complete the processor's work
786+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
787+ resolve ! ( ) ;
788+
789+ // Wait a bit for the processor to emit shutdownComplete
790+ await sleep ( 10 ) ;
791+
792+ // Now stop should handle the already-completed case
793+ await processor . stop ( ) ;
794+ } ) ;
795+ it ( "should warn when starting a processor that is already started" , ( ) => {
796+ const logger = {
797+ debug : vi . fn ( ) ,
798+ info : vi . fn ( ) ,
799+ warn : vi . fn ( ) ,
800+ error : vi . fn ( ) ,
801+ } ;
802+ const processor = Processor ( ( ) => sleep ( 1000 ) , { sleepTimeMs : 0 , logger } ) ;
803+
804+ processor . start ( ) ;
805+ processor . start ( ) ; // Try to start again
806+
807+ expect ( logger . warn ) . toHaveBeenCalledWith (
808+ "cannot start processor from 'started'" ,
809+ ) ;
810+ } ) ;
811+ it ( "should handle non-abort errors and continue processing" , async ( ) => {
812+ let calls = 0 ;
813+ const logger = {
814+ debug : vi . fn ( ) ,
815+ info : vi . fn ( ) ,
816+ warn : vi . fn ( ) ,
817+ error : vi . fn ( ) ,
818+ } ;
819+ const error = new Error ( "processing error" ) ;
820+
821+ const processor = Processor (
822+ ( ) => {
823+ calls ++ ;
824+ if ( calls === 1 ) {
825+ throw error ;
826+ }
827+ return Promise . resolve ( ) ;
828+ } ,
829+ { sleepTimeMs : 0 , logger } ,
830+ ) ;
831+ processor . start ( ) ;
832+
833+ // Wait for the error to be logged and processing to continue
834+ await sleep ( 1100 ) ;
835+
836+ await processor . stop ( ) ;
837+
838+ expect ( logger . error ) . toHaveBeenCalledWith ( error ) ;
839+ expect ( calls ) . toBeGreaterThan ( 1 ) ; // Should continue processing after error
840+ } ) ;
685841} ) ;
686842
687843describe ( "EventProcessor" , ( ) => {
0 commit comments