@@ -1044,3 +1044,336 @@ suite.test('pipeline mode - pg-cursor is rejected', (done) => {
10441044 client . query ( cursor )
10451045 } )
10461046} )
1047+
1048+ suite . test ( 'pipeline mode - stress test with 200+ concurrent queries' , ( done ) => {
1049+ const client = new Client ( { pipelineMode : true } )
1050+ client . connect ( ( err ) => {
1051+ if ( err ) return done ( err )
1052+
1053+ const numQueries = 200
1054+ const promises = [ ]
1055+
1056+ for ( let i = 0 ; i < numQueries ; i ++ ) {
1057+ promises . push (
1058+ client . query ( 'SELECT $1::int as num, $2::text as txt' , [ i , `query-${ i } ` ] ) . then ( ( r ) => ( {
1059+ expected : i ,
1060+ actual : parseInt ( r . rows [ 0 ] . num ) ,
1061+ txt : r . rows [ 0 ] . txt ,
1062+ } ) )
1063+ )
1064+ }
1065+
1066+ Promise . all ( promises )
1067+ . then ( ( results ) => {
1068+ assert . equal ( results . length , numQueries , `Should have ${ numQueries } results` )
1069+ results . forEach ( ( r ) => {
1070+ assert . equal ( r . actual , r . expected , `Query ${ r . expected } should return correct num` )
1071+ assert . equal ( r . txt , `query-${ r . expected } ` , `Query ${ r . expected } should return correct txt` )
1072+ } )
1073+ client . end ( done )
1074+ } )
1075+ . catch ( ( err ) => {
1076+ client . end ( ( ) => done ( err ) )
1077+ } )
1078+ } )
1079+ } )
1080+
1081+ suite . test ( 'pipeline mode - mixed query types (INSERT, UPDATE, DELETE, SELECT)' , ( done ) => {
1082+ const client = new Client ( { pipelineMode : true } )
1083+ client . connect ( ( err ) => {
1084+ if ( err ) return done ( err )
1085+
1086+ // Setup table
1087+ client
1088+ . query ( 'CREATE TEMP TABLE mixed_ops (id serial PRIMARY KEY, value int, name text)' )
1089+ . then ( ( ) => {
1090+ // Send all operations in rapid succession
1091+ const p1 = client . query ( "INSERT INTO mixed_ops (value, name) VALUES (1, 'one') RETURNING id" )
1092+ const p2 = client . query ( "INSERT INTO mixed_ops (value, name) VALUES (2, 'two') RETURNING id" )
1093+ const p3 = client . query ( "INSERT INTO mixed_ops (value, name) VALUES (3, 'three') RETURNING id" )
1094+ const p4 = client . query ( 'UPDATE mixed_ops SET value = value * 10 WHERE value = 2 RETURNING value' )
1095+ const p5 = client . query ( 'DELETE FROM mixed_ops WHERE value = 1 RETURNING id' )
1096+ const p6 = client . query ( 'SELECT * FROM mixed_ops ORDER BY id' )
1097+
1098+ return Promise . all ( [ p1 , p2 , p3 , p4 , p5 , p6 ] )
1099+ } )
1100+ . then ( ( results ) => {
1101+ // Verify INSERT results
1102+ assert . ok ( results [ 0 ] . rows [ 0 ] . id , 'First INSERT should return id' )
1103+ assert . ok ( results [ 1 ] . rows [ 0 ] . id , 'Second INSERT should return id' )
1104+ assert . ok ( results [ 2 ] . rows [ 0 ] . id , 'Third INSERT should return id' )
1105+
1106+ // Verify UPDATE result
1107+ assert . equal ( results [ 3 ] . rows [ 0 ] . value , 20 , 'UPDATE should multiply value by 10' )
1108+ assert . equal ( results [ 3 ] . rowCount , 1 , 'UPDATE should affect 1 row' )
1109+
1110+ // Verify DELETE result
1111+ assert . equal ( results [ 4 ] . rowCount , 1 , 'DELETE should affect 1 row' )
1112+
1113+ // Verify final SELECT - should have 2 rows (one deleted)
1114+ assert . equal ( results [ 5 ] . rows . length , 2 , 'Should have 2 rows after DELETE' )
1115+ const values = results [ 5 ] . rows . map ( ( r ) => parseInt ( r . value ) ) . sort ( ( a , b ) => a - b )
1116+ assert . deepEqual ( values , [ 3 , 20 ] , 'Should have correct values after all operations' )
1117+
1118+ client . end ( done )
1119+ } )
1120+ . catch ( ( err ) => {
1121+ client . end ( ( ) => done ( err ) )
1122+ } )
1123+ } )
1124+ } )
1125+
1126+ suite . test ( 'pipeline mode - SAVEPOINT handling (nested transactions)' , ( done ) => {
1127+ const client = new Client ( { pipelineMode : true } )
1128+ client . connect ( ( err ) => {
1129+ if ( err ) return done ( err )
1130+
1131+ client
1132+ . query ( 'CREATE TEMP TABLE savepoint_test (id serial, value int)' )
1133+ . then ( ( ) => client . query ( 'BEGIN' ) )
1134+ . then ( ( ) => client . query ( 'INSERT INTO savepoint_test (value) VALUES (1)' ) )
1135+ . then ( ( ) => client . query ( 'SAVEPOINT sp1' ) )
1136+ . then ( ( ) => client . query ( 'INSERT INTO savepoint_test (value) VALUES (2)' ) )
1137+ . then ( ( ) => client . query ( 'SAVEPOINT sp2' ) )
1138+ . then ( ( ) => client . query ( 'INSERT INTO savepoint_test (value) VALUES (3)' ) )
1139+ . then ( ( ) => client . query ( 'ROLLBACK TO SAVEPOINT sp2' ) ) // Undo value=3
1140+ . then ( ( ) => client . query ( 'INSERT INTO savepoint_test (value) VALUES (4)' ) )
1141+ . then ( ( ) => client . query ( 'ROLLBACK TO SAVEPOINT sp1' ) ) // Undo value=2 and value=4
1142+ . then ( ( ) => client . query ( 'INSERT INTO savepoint_test (value) VALUES (5)' ) )
1143+ . then ( ( ) => client . query ( 'COMMIT' ) )
1144+ . then ( ( ) => client . query ( 'SELECT value FROM savepoint_test ORDER BY value' ) )
1145+ . then ( ( r ) => {
1146+ // Should only have values 1 and 5 (2, 3, 4 were rolled back)
1147+ const values = r . rows . map ( ( row ) => parseInt ( row . value ) )
1148+ assert . deepEqual ( values , [ 1 , 5 ] , 'Should only have values 1 and 5 after savepoint rollbacks' )
1149+ client . end ( done )
1150+ } )
1151+ . catch ( ( err ) => {
1152+ client . query ( 'ROLLBACK' ) . finally ( ( ) => client . end ( ( ) => done ( err ) ) )
1153+ } )
1154+ } )
1155+ } )
1156+
1157+ suite . test ( 'pipeline mode - rapid SAVEPOINT operations' , ( done ) => {
1158+ const client = new Client ( { pipelineMode : true } )
1159+ client . connect ( ( err ) => {
1160+ if ( err ) return done ( err )
1161+
1162+ client
1163+ . query ( 'CREATE TEMP TABLE rapid_sp (id serial, value int)' )
1164+ . then ( ( ) => {
1165+ // Send all savepoint operations in rapid succession
1166+ const ops = [
1167+ client . query ( 'BEGIN' ) ,
1168+ client . query ( 'INSERT INTO rapid_sp (value) VALUES (1)' ) ,
1169+ client . query ( 'SAVEPOINT a' ) ,
1170+ client . query ( 'INSERT INTO rapid_sp (value) VALUES (2)' ) ,
1171+ client . query ( 'SAVEPOINT b' ) ,
1172+ client . query ( 'INSERT INTO rapid_sp (value) VALUES (3)' ) ,
1173+ client . query ( 'RELEASE SAVEPOINT b' ) , // Keep value=3
1174+ client . query ( 'ROLLBACK TO SAVEPOINT a' ) , // Undo value=2 and value=3
1175+ client . query ( 'INSERT INTO rapid_sp (value) VALUES (4)' ) ,
1176+ client . query ( 'COMMIT' ) ,
1177+ ]
1178+ return Promise . all ( ops )
1179+ } )
1180+ . then ( ( ) => client . query ( 'SELECT value FROM rapid_sp ORDER BY value' ) )
1181+ . then ( ( r ) => {
1182+ const values = r . rows . map ( ( row ) => parseInt ( row . value ) )
1183+ assert . deepEqual ( values , [ 1 , 4 ] , 'Should only have values 1 and 4' )
1184+ client . end ( done )
1185+ } )
1186+ . catch ( ( err ) => {
1187+ client . query ( 'ROLLBACK' ) . finally ( ( ) => client . end ( ( ) => done ( err ) ) )
1188+ } )
1189+ } )
1190+ } )
1191+
1192+ suite . test ( 'pipeline mode - connection interruption handling' , ( done ) => {
1193+ const client = new Client ( { pipelineMode : true } )
1194+
1195+ let errorReceived = false
1196+ let endReceived = false
1197+
1198+ client . on ( 'error' , ( err ) => {
1199+ errorReceived = true
1200+ } )
1201+
1202+ client . on ( 'end' , ( ) => {
1203+ endReceived = true
1204+ } )
1205+
1206+ client . connect ( ( err ) => {
1207+ if ( err ) return done ( err )
1208+
1209+ // Start some queries
1210+ const p1 = client . query ( 'SELECT pg_sleep(0.5), 1 as num' )
1211+ const p2 = client . query ( 'SELECT 2 as num' )
1212+ const p3 = client . query ( 'SELECT 3 as num' )
1213+
1214+ // Destroy the connection after a short delay
1215+ setTimeout ( ( ) => {
1216+ client . connection . stream . destroy ( )
1217+ } , 50 )
1218+
1219+ // All queries should fail
1220+ Promise . allSettled ( [ p1 , p2 , p3 ] ) . then ( ( results ) => {
1221+ // At least some queries should have failed
1222+ const failedCount = results . filter ( ( r ) => r . status === 'rejected' ) . length
1223+ assert . ok ( failedCount > 0 , 'At least some queries should fail when connection is destroyed' )
1224+
1225+ // Error event should have been emitted
1226+ assert . ok ( errorReceived || endReceived , 'Should receive error or end event' )
1227+
1228+ done ( )
1229+ } )
1230+ } )
1231+ } )
1232+
1233+ suite . test ( 'pipeline mode - stress test with mixed success/failure' , ( done ) => {
1234+ const client = new Client ( { pipelineMode : true } )
1235+ client . connect ( ( err ) => {
1236+ if ( err ) return done ( err )
1237+
1238+ const promises = [ ]
1239+
1240+ // Mix of successful and failing queries
1241+ for ( let i = 0 ; i < 50 ; i ++ ) {
1242+ if ( i % 10 === 5 ) {
1243+ // Every 10th query (at position 5, 15, 25, 35, 45) will fail
1244+ promises . push ( client . query ( 'SELECT * FROM nonexistent_table_' + i ) )
1245+ } else {
1246+ promises . push ( client . query ( 'SELECT $1::int as num' , [ i ] ) )
1247+ }
1248+ }
1249+
1250+ Promise . allSettled ( promises )
1251+ . then ( ( results ) => {
1252+ let successCount = 0
1253+ let failCount = 0
1254+
1255+ results . forEach ( ( r , idx ) => {
1256+ if ( r . status === 'fulfilled' ) {
1257+ successCount ++
1258+ if ( idx % 10 !== 5 ) {
1259+ assert . equal ( r . value . rows [ 0 ] . num , idx , `Query ${ idx } should return correct value` )
1260+ }
1261+ } else {
1262+ failCount ++
1263+ assert . equal ( idx % 10 , 5 , `Only queries at position 5, 15, 25, 35, 45 should fail` )
1264+ }
1265+ } )
1266+
1267+ assert . equal ( successCount , 45 , 'Should have 45 successful queries' )
1268+ assert . equal ( failCount , 5 , 'Should have 5 failed queries' )
1269+
1270+ client . end ( done )
1271+ } )
1272+ . catch ( ( err ) => {
1273+ client . end ( ( ) => done ( err ) )
1274+ } )
1275+ } )
1276+ } )
1277+
1278+ suite . test ( 'pipeline mode - Pool stress test with concurrent connections' , ( done ) => {
1279+ const Pool = pg . Pool
1280+ const pool = new Pool ( { pipelineMode : true , max : 10 } )
1281+
1282+ const numUsers = 20
1283+ const queriesPerUser = 10
1284+ const userTasks = [ ]
1285+
1286+ for ( let userId = 0 ; userId < numUsers ; userId ++ ) {
1287+ const task = pool . connect ( ) . then ( ( client ) => {
1288+ const queries = [ ]
1289+ for ( let q = 0 ; q < queriesPerUser ; q ++ ) {
1290+ queries . push (
1291+ client . query ( 'SELECT $1::int as user_id, $2::int as query_num, pg_sleep(0.01)' , [ userId , q ] )
1292+ )
1293+ }
1294+ return Promise . all ( queries ) . then ( ( results ) => {
1295+ client . release ( )
1296+ return { userId, results }
1297+ } )
1298+ } )
1299+ userTasks . push ( task )
1300+ }
1301+
1302+ Promise . all ( userTasks )
1303+ . then ( ( allResults ) => {
1304+ assert . equal ( allResults . length , numUsers , `Should have results from ${ numUsers } users` )
1305+
1306+ allResults . forEach ( ( { userId, results } ) => {
1307+ assert . equal ( results . length , queriesPerUser , `User ${ userId } should have ${ queriesPerUser } results` )
1308+ results . forEach ( ( r , idx ) => {
1309+ assert . equal ( parseInt ( r . rows [ 0 ] . user_id ) , userId , `User ${ userId } query ${ idx } should have correct user_id` )
1310+ assert . equal ( parseInt ( r . rows [ 0 ] . query_num ) , idx , `User ${ userId } query ${ idx } should have correct query_num` )
1311+ } )
1312+ } )
1313+
1314+ return pool . end ( )
1315+ } )
1316+ . then ( ( ) => done ( ) )
1317+ . catch ( ( err ) => {
1318+ pool . end ( ) . then ( ( ) => done ( err ) )
1319+ } )
1320+ } )
1321+
1322+
1323+ suite . test ( 'pipeline mode - query cancellation removes from queue' , ( done ) => {
1324+ const client = new Client ( { pipelineMode : true } )
1325+
1326+ // Queue queries before connecting
1327+ const p1 = client . query ( 'SELECT 1 as num' )
1328+ const p2 = client . query ( 'SELECT 2 as num' )
1329+ const p3 = client . query ( 'SELECT 3 as num' )
1330+
1331+ // Remove p2 from queue before it's sent
1332+ const idx = client . _queryQueue . indexOf ( p2 . _result ? p2 : client . _queryQueue [ 1 ] )
1333+ if ( idx > - 1 ) {
1334+ client . _queryQueue . splice ( idx , 1 )
1335+ }
1336+
1337+ client . connect ( ( err ) => {
1338+ if ( err ) return done ( err )
1339+
1340+ Promise . allSettled ( [ p1 , p3 ] )
1341+ . then ( ( results ) => {
1342+ assert . equal ( results [ 0 ] . status , 'fulfilled' , 'Query 1 should succeed' )
1343+ assert . equal ( results [ 1 ] . status , 'fulfilled' , 'Query 3 should succeed' )
1344+ assert . equal ( results [ 0 ] . value . rows [ 0 ] . num , '1' )
1345+ assert . equal ( results [ 1 ] . value . rows [ 0 ] . num , '3' )
1346+ client . end ( done )
1347+ } )
1348+ . catch ( ( err ) => {
1349+ client . end ( ( ) => done ( err ) )
1350+ } )
1351+ } )
1352+ } )
1353+
1354+ suite . test ( 'pipeline mode - very large batch (500 queries)' , ( done ) => {
1355+ const client = new Client ( { pipelineMode : true } )
1356+ client . connect ( ( err ) => {
1357+ if ( err ) return done ( err )
1358+
1359+ const numQueries = 500
1360+ const promises = [ ]
1361+
1362+ for ( let i = 0 ; i < numQueries ; i ++ ) {
1363+ promises . push ( client . query ( 'SELECT $1::int as i' , [ i ] ) )
1364+ }
1365+
1366+ Promise . all ( promises )
1367+ . then ( ( results ) => {
1368+ assert . equal ( results . length , numQueries )
1369+ // Verify first, middle, and last
1370+ assert . equal ( results [ 0 ] . rows [ 0 ] . i , '0' )
1371+ assert . equal ( results [ 250 ] . rows [ 0 ] . i , '250' )
1372+ assert . equal ( results [ 499 ] . rows [ 0 ] . i , '499' )
1373+ client . end ( done )
1374+ } )
1375+ . catch ( ( err ) => {
1376+ client . end ( ( ) => done ( err ) )
1377+ } )
1378+ } )
1379+ } )
0 commit comments