@@ -847,4 +847,250 @@ nodeTest(
847847 } ,
848848) ;
849849
850+ // Regression test for unhandled Temporal.Duration.from() error in NOTIFY
851+ // callback crashing the process.
852+ //
853+ // When the NOTIFY payload is malformed (e.g., empty string or invalid duration
854+ // format), Temporal.Duration.from() throws a RangeError. Without a try-catch,
855+ // this error propagates as an unhandled promise rejection through the postgres
856+ // driver's NotificationResponse handler, crashing the entire process.
857+ //
858+ // This test sends a malformed NOTIFY payload directly via SQL, then verifies
859+ // that the listener survives and continues to process subsequent messages.
860+ //
861+ // See: https://github.com/fedify-dev/fedify/issues/594
862+ nodeTest (
863+ "PostgresMessageQueue survives malformed NOTIFY payload" ,
864+ { skip : dbUrl == null } ,
865+ async ( ) => {
866+ if ( dbUrl == null ) return ; // Bun does not support skip option
867+
868+ const tableName = getRandomKey ( "message" ) ;
869+ const channelName = getRandomKey ( "channel" ) ;
870+
871+ const sql = postgres ( dbUrl ! ) ;
872+ const mq = new PostgresMessageQueue ( sql , {
873+ tableName,
874+ channelName,
875+ pollInterval : { milliseconds : 100 } ,
876+ } ) ;
877+
878+ try {
879+ await mq . initialize ( ) ;
880+
881+ const messages : string [ ] = [ ] ;
882+ const controller = new AbortController ( ) ;
883+
884+ const listening = mq . listen (
885+ ( message : string ) => {
886+ messages . push ( message ) ;
887+ } ,
888+ { signal : controller . signal } ,
889+ ) ;
890+
891+ // Wait for the LISTEN subscription to become active
892+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
893+
894+ // Send malformed NOTIFY payloads that will cause
895+ // Temporal.Duration.from() to throw a RangeError
896+ await sql `SELECT pg_notify(${ channelName } , '')` ;
897+ await sql `SELECT pg_notify(${ channelName } , 'not-a-duration')` ;
898+ await sql `SELECT pg_notify(${ channelName } , '!!!')` ;
899+
900+ // Give the listener time to process (and survive) the bad payloads
901+ await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
902+
903+ // Now enqueue a real message and verify the listener is still alive
904+ await mq . enqueue ( "after-malformed" ) ;
905+
906+ const start = Date . now ( ) ;
907+ while ( messages . length < 1 && Date . now ( ) - start < 10_000 ) {
908+ await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
909+ }
910+
911+ deepStrictEqual (
912+ messages ,
913+ [ "after-malformed" ] ,
914+ "Listener should survive malformed NOTIFY payloads and continue " +
915+ "processing subsequent messages" ,
916+ ) ;
917+
918+ controller . abort ( ) ;
919+ await listening ;
920+ } finally {
921+ await mq . drop ( ) ;
922+ await sql . end ( ) ;
923+ }
924+ } ,
925+ ) ;
926+
927+ // Regression test for serializedPoll permanently stalling when handler hangs.
928+ //
929+ // In PostgresMessageQueue.listen(), the serializedPoll mechanism chains every
930+ // poll() invocation onto a single promise (pollLock). If a poll() call never
931+ // resolves—because the message handler hangs indefinitely on a network request
932+ // or other I/O—then all subsequent poll() invocations are chained onto the
933+ // pending promise and also never execute, permanently halting all message
934+ // processing.
935+ //
936+ // This test verifies that with handlerTimeout configured, a hung handler is
937+ // aborted after the timeout, allowing the poll loop to recover and process
938+ // subsequent messages.
939+ //
940+ // See: https://github.com/fedify-dev/fedify/issues/595
941+ nodeTest (
942+ "PostgresMessageQueue continues processing when handler hangs (no ordering key)" ,
943+ { skip : dbUrl == null } ,
944+ async ( ) => {
945+ if ( dbUrl == null ) return ; // Bun does not support skip option
946+
947+ const tableName = getRandomKey ( "message" ) ;
948+ const channelName = getRandomKey ( "channel" ) ;
949+
950+ const sql = postgres ( dbUrl ! ) ;
951+ const mq = new PostgresMessageQueue ( sql , {
952+ tableName,
953+ channelName,
954+ pollInterval : { milliseconds : 100 } ,
955+ handlerTimeout : { seconds : 1 } ,
956+ } ) ;
957+
958+ try {
959+ await mq . initialize ( ) ;
960+
961+ // Enqueue two messages — the handler will hang on the first one
962+ await mq . enqueue ( "hang" ) ;
963+ await mq . enqueue ( "normal" ) ;
964+
965+ const processed : string [ ] = [ ] ;
966+ const controller = new AbortController ( ) ;
967+
968+ const listening = mq . listen (
969+ async ( message : string ) => {
970+ if ( message === "hang" ) {
971+ // Simulate a handler that hangs forever (e.g., unresponsive
972+ // remote server)
973+ await new Promise ( ( ) => { } ) ; // never resolves
974+ }
975+ processed . push ( message ) ;
976+ } ,
977+ { signal : controller . signal } ,
978+ ) ;
979+
980+ // Wait for the second message to be processed.
981+ // Without the timeout fix, this would hang forever because the first
982+ // handler never completes and serializedPoll blocks all subsequent
983+ // polls.
984+ const start = Date . now ( ) ;
985+ while ( ! processed . includes ( "normal" ) && Date . now ( ) - start < 15_000 ) {
986+ await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
987+ }
988+
989+ deepStrictEqual (
990+ processed . includes ( "normal" ) ,
991+ true ,
992+ "Second message should be processed despite first handler hanging" ,
993+ ) ;
994+
995+ controller . abort ( ) ;
996+ await listening ;
997+ } finally {
998+ await mq . drop ( ) ;
999+ await sql . end ( ) ;
1000+ }
1001+ } ,
1002+ ) ;
1003+
1004+ // Regression test for serializedPoll stalling with ordering key messages.
1005+ //
1006+ // Same issue as above, but for messages with ordering keys. This also verifies
1007+ // that the advisory lock and reserved connection are properly released when the
1008+ // handler times out, so subsequent messages with the same ordering key can be
1009+ // processed.
1010+ //
1011+ // See: https://github.com/fedify-dev/fedify/issues/595
1012+ nodeTest (
1013+ "PostgresMessageQueue continues processing when handler hangs (ordering key)" ,
1014+ { skip : dbUrl == null } ,
1015+ async ( ) => {
1016+ if ( dbUrl == null ) return ; // Bun does not support skip option
1017+
1018+ const tableName = getRandomKey ( "message" ) ;
1019+ const channelName = getRandomKey ( "channel" ) ;
1020+
1021+ const sql = postgres ( dbUrl ! ) ;
1022+ const sqlCheck = postgres ( dbUrl ! ) ;
1023+ const mq = new PostgresMessageQueue ( sql , {
1024+ tableName,
1025+ channelName,
1026+ pollInterval : { milliseconds : 100 } ,
1027+ handlerTimeout : { seconds : 1 } ,
1028+ } ) ;
1029+
1030+ const orderingKey = "hang-test-key" ;
1031+
1032+ try {
1033+ await mq . initialize ( ) ;
1034+
1035+ // Enqueue two messages with the same ordering key
1036+ await mq . enqueue ( "hang" , { orderingKey } ) ;
1037+ await mq . enqueue ( "normal" , { orderingKey } ) ;
1038+
1039+ const processed : string [ ] = [ ] ;
1040+ const controller = new AbortController ( ) ;
1041+
1042+ const listening = mq . listen (
1043+ async ( message : string ) => {
1044+ if ( message === "hang" ) {
1045+ await new Promise ( ( ) => { } ) ; // never resolves
1046+ }
1047+ processed . push ( message ) ;
1048+ } ,
1049+ { signal : controller . signal } ,
1050+ ) ;
1051+
1052+ // Wait for the second message to be processed
1053+ const start = Date . now ( ) ;
1054+ while ( ! processed . includes ( "normal" ) && Date . now ( ) - start < 15_000 ) {
1055+ await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
1056+ }
1057+
1058+ deepStrictEqual (
1059+ processed . includes ( "normal" ) ,
1060+ true ,
1061+ "Second message should be processed despite first handler hanging " +
1062+ "(ordering key)" ,
1063+ ) ;
1064+
1065+ controller . abort ( ) ;
1066+ await listening ;
1067+
1068+ // Verify advisory lock was released after timeout
1069+ const lockResult = await sqlCheck `
1070+ SELECT pg_try_advisory_lock(
1071+ hashtext(${ tableName } ),
1072+ hashtext(${ orderingKey } )
1073+ ) AS acquired
1074+ ` ;
1075+ deepStrictEqual (
1076+ lockResult [ 0 ] . acquired ,
1077+ true ,
1078+ "Advisory lock should be released after handler timeout" ,
1079+ ) ;
1080+ if ( lockResult [ 0 ] . acquired ) {
1081+ await sqlCheck `
1082+ SELECT pg_advisory_unlock(
1083+ hashtext(${ tableName } ),
1084+ hashtext(${ orderingKey } )
1085+ )
1086+ ` ;
1087+ }
1088+ } finally {
1089+ await mq . drop ( ) ;
1090+ await sql . end ( ) ;
1091+ await sqlCheck . end ( ) ;
1092+ }
1093+ } ,
1094+ ) ;
1095+
8501096// cspell: ignore sqls
0 commit comments