@@ -783,4 +783,308 @@ describe("RunsReplicationService (part 2/2)", () => {
783783 await runsReplicationService . stop ( ) ;
784784 }
785785 ) ;
786+
787+ containerTest (
788+ "should merge duplicate event+run.id combinations keeping the latest version" ,
789+ async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
790+ await prisma . $executeRawUnsafe ( `ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;` ) ;
791+
792+ const clickhouse = new ClickHouse ( {
793+ url : clickhouseContainer . getConnectionUrl ( ) ,
794+ name : "runs-replication-merge-batch" ,
795+ } ) ;
796+
797+ const runsReplicationService = new RunsReplicationService ( {
798+ clickhouse,
799+ pgConnectionUrl : postgresContainer . getConnectionUri ( ) ,
800+ serviceName : "runs-replication-merge-batch" ,
801+ slotName : "task_runs_to_clickhouse_v1" ,
802+ publicationName : "task_runs_to_clickhouse_v1_publication" ,
803+ redisOptions,
804+ maxFlushConcurrency : 1 ,
805+ flushIntervalMs : 100 ,
806+ flushBatchSize : 10 , // Higher batch size to test merging
807+ leaderLockTimeoutMs : 5000 ,
808+ leaderLockExtendIntervalMs : 1000 ,
809+ ackIntervalSeconds : 5 ,
810+ logger : new Logger ( "runs-replication-merge-batch" , "info" ) ,
811+ } ) ;
812+
813+ await runsReplicationService . start ( ) ;
814+
815+ const organization = await prisma . organization . create ( {
816+ data : {
817+ title : "test-merge-batch" ,
818+ slug : "test-merge-batch" ,
819+ } ,
820+ } ) ;
821+
822+ const project = await prisma . project . create ( {
823+ data : {
824+ name : "test-merge-batch" ,
825+ slug : "test-merge-batch" ,
826+ organizationId : organization . id ,
827+ externalRef : "test-merge-batch" ,
828+ } ,
829+ } ) ;
830+
831+ const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
832+ data : {
833+ slug : "test-merge-batch" ,
834+ type : "DEVELOPMENT" ,
835+ projectId : project . id ,
836+ organizationId : organization . id ,
837+ apiKey : "test-merge-batch" ,
838+ pkApiKey : "test-merge-batch" ,
839+ shortcode : "test-merge-batch" ,
840+ } ,
841+ } ) ;
842+
843+ // Create a run and rapidly update it multiple times in a transaction
844+ // This should create multiple events for the same run that get merged
845+ const [ taskRun ] = await prisma . $transaction ( async ( tx ) => {
846+ const run = await tx . taskRun . create ( {
847+ data : {
848+ friendlyId : `run_merge_${ Date . now ( ) } ` ,
849+ taskIdentifier : "my-task-merge" ,
850+ payload : JSON . stringify ( { version : 1 } ) ,
851+ payloadType : "application/json" ,
852+ traceId : `merge-${ Date . now ( ) } ` ,
853+ spanId : `merge-${ Date . now ( ) } ` ,
854+ queue : "test-merge-batch" ,
855+ runtimeEnvironmentId : runtimeEnvironment . id ,
856+ projectId : project . id ,
857+ organizationId : organization . id ,
858+ environmentType : "DEVELOPMENT" ,
859+ engine : "V2" ,
860+ status : "PENDING" ,
861+ } ,
862+ } ) ;
863+
864+ await tx . taskRun . update ( {
865+ where : { id : run . id } ,
866+ data : { status : "EXECUTING" , payload : JSON . stringify ( { version : 2 } ) } ,
867+ } ) ;
868+
869+ await tx . taskRun . update ( {
870+ where : { id : run . id } ,
871+ data : { status : "COMPLETED_SUCCESSFULLY" , payload : JSON . stringify ( { version : 3 } ) } ,
872+ } ) ;
873+
874+ return [ run ] ;
875+ } ) ;
876+
877+ // Wait for replication
878+ await setTimeout ( 2000 ) ;
879+
880+ // Query ClickHouse for the run using FINAL
881+ const queryRuns = clickhouse . reader . query ( {
882+ name : "runs-replication-merge-batch" ,
883+ query : "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}" ,
884+ schema : z . any ( ) ,
885+ params : z . object ( { run_id : z . string ( ) } ) ,
886+ } ) ;
887+
888+ const [ queryError , result ] = await queryRuns ( { run_id : taskRun . id } ) ;
889+
890+ expect ( queryError ) . toBeNull ( ) ;
891+ expect ( result ?. length ) . toBe ( 1 ) ;
892+
893+ // Should have the final status from the last update
894+ expect ( result ?. [ 0 ] ) . toEqual (
895+ expect . objectContaining ( {
896+ run_id : taskRun . id ,
897+ status : "COMPLETED_SUCCESSFULLY" ,
898+ } )
899+ ) ;
900+
901+ // Check payload was also updated to latest version
902+ const queryPayloads = clickhouse . reader . query ( {
903+ name : "runs-replication-merge-batch" ,
904+ query : "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}" ,
905+ schema : z . any ( ) ,
906+ params : z . object ( { run_id : z . string ( ) } ) ,
907+ } ) ;
908+
909+ const [ payloadError , payloadResult ] = await queryPayloads ( { run_id : taskRun . id } ) ;
910+
911+ expect ( payloadError ) . toBeNull ( ) ;
912+ expect ( payloadResult ?. length ) . toBe ( 1 ) ;
913+ expect ( payloadResult ?. [ 0 ] ) . toEqual (
914+ expect . objectContaining ( {
915+ run_id : taskRun . id ,
916+ payload : expect . objectContaining ( {
917+ data : { version : 3 } ,
918+ } ) ,
919+ } )
920+ ) ;
921+
922+ await runsReplicationService . stop ( ) ;
923+ }
924+ ) ;
925+
926+ containerTest (
927+ "should sort batch inserts according to table schema ordering for optimal performance" ,
928+ async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
929+ await prisma . $executeRawUnsafe ( `ALTER TABLE public.\"TaskRun\" REPLICA IDENTITY FULL;` ) ;
930+
931+ const clickhouse = new ClickHouse ( {
932+ url : clickhouseContainer . getConnectionUrl ( ) ,
933+ name : "runs-replication-sorting" ,
934+ } ) ;
935+
936+ const runsReplicationService = new RunsReplicationService ( {
937+ clickhouse,
938+ pgConnectionUrl : postgresContainer . getConnectionUri ( ) ,
939+ serviceName : "runs-replication-sorting" ,
940+ slotName : "task_runs_to_clickhouse_v1" ,
941+ publicationName : "task_runs_to_clickhouse_v1_publication" ,
942+ redisOptions,
943+ maxFlushConcurrency : 1 ,
944+ flushIntervalMs : 100 ,
945+ flushBatchSize : 10 ,
946+ leaderLockTimeoutMs : 5000 ,
947+ leaderLockExtendIntervalMs : 1000 ,
948+ ackIntervalSeconds : 5 ,
949+ logger : new Logger ( "runs-replication-sorting" , "info" ) ,
950+ } ) ;
951+
952+ await runsReplicationService . start ( ) ;
953+
954+ // Create two organizations to test sorting by organization_id
955+ const org1 = await prisma . organization . create ( {
956+ data : { title : "org-z" , slug : "org-z" } ,
957+ } ) ;
958+
959+ const org2 = await prisma . organization . create ( {
960+ data : { title : "org-a" , slug : "org-a" } ,
961+ } ) ;
962+
963+ const project1 = await prisma . project . create ( {
964+ data : {
965+ name : "test-sorting-z" ,
966+ slug : "test-sorting-z" ,
967+ organizationId : org1 . id ,
968+ externalRef : "test-sorting-z" ,
969+ } ,
970+ } ) ;
971+
972+ const project2 = await prisma . project . create ( {
973+ data : {
974+ name : "test-sorting-a" ,
975+ slug : "test-sorting-a" ,
976+ organizationId : org2 . id ,
977+ externalRef : "test-sorting-a" ,
978+ } ,
979+ } ) ;
980+
981+ const env1 = await prisma . runtimeEnvironment . create ( {
982+ data : {
983+ slug : "test-sorting-z" ,
984+ type : "DEVELOPMENT" ,
985+ projectId : project1 . id ,
986+ organizationId : org1 . id ,
987+ apiKey : "test-sorting-z" ,
988+ pkApiKey : "test-sorting-z" ,
989+ shortcode : "test-sorting-z" ,
990+ } ,
991+ } ) ;
992+
993+ const env2 = await prisma . runtimeEnvironment . create ( {
994+ data : {
995+ slug : "test-sorting-a" ,
996+ type : "DEVELOPMENT" ,
997+ projectId : project2 . id ,
998+ organizationId : org2 . id ,
999+ apiKey : "test-sorting-a" ,
1000+ pkApiKey : "test-sorting-a" ,
1001+ shortcode : "test-sorting-a" ,
1002+ } ,
1003+ } ) ;
1004+
1005+ const now = Date . now ( ) ;
1006+
1007+ // Create runs in reverse alphabetical order by organization
1008+ // The sorting should put org2 (org-a) before org1 (org-z)
1009+ const [ run1 , run2 ] = await prisma . $transaction ( async ( tx ) => {
1010+ const run1 = await tx . taskRun . create ( {
1011+ data : {
1012+ friendlyId : `run_sort_org_z_${ now } ` ,
1013+ taskIdentifier : "my-task-sort" ,
1014+ payload : JSON . stringify ( { org : "z" } ) ,
1015+ payloadType : "application/json" ,
1016+ traceId : `sort-z-${ now } ` ,
1017+ spanId : `sort-z-${ now } ` ,
1018+ queue : "test-sorting" ,
1019+ runtimeEnvironmentId : env1 . id ,
1020+ projectId : project1 . id ,
1021+ organizationId : org1 . id ,
1022+ environmentType : "DEVELOPMENT" ,
1023+ engine : "V2" ,
1024+ status : "PENDING" ,
1025+ createdAt : new Date ( now + 100 ) , // Later timestamp
1026+ } ,
1027+ } ) ;
1028+
1029+ const run2 = await tx . taskRun . create ( {
1030+ data : {
1031+ friendlyId : `run_sort_org_a_${ now } ` ,
1032+ taskIdentifier : "my-task-sort" ,
1033+ payload : JSON . stringify ( { org : "a" } ) ,
1034+ payloadType : "application/json" ,
1035+ traceId : `sort-a-${ now } ` ,
1036+ spanId : `sort-a-${ now } ` ,
1037+ queue : "test-sorting" ,
1038+ runtimeEnvironmentId : env2 . id ,
1039+ projectId : project2 . id ,
1040+ organizationId : org2 . id ,
1041+ environmentType : "DEVELOPMENT" ,
1042+ engine : "V2" ,
1043+ status : "PENDING" ,
1044+ createdAt : new Date ( now ) , // Earlier timestamp
1045+ } ,
1046+ } ) ;
1047+
1048+ return [ run1 , run2 ] ;
1049+ } ) ;
1050+
1051+ // Wait for replication
1052+ await setTimeout ( 2000 ) ;
1053+
1054+ // Query ClickHouse for both runs
1055+ const queryRuns = clickhouse . reader . query ( {
1056+ name : "runs-replication-sorting" ,
1057+ query : `SELECT run_id, organization_id, project_id, environment_id, created_at, friendly_id
1058+ FROM trigger_dev.task_runs_v2 FINAL
1059+ WHERE run_id IN ({run_id_1:String}, {run_id_2:String})
1060+ ORDER BY organization_id, project_id, environment_id, created_at, run_id` ,
1061+ schema : z . any ( ) ,
1062+ params : z . object ( { run_id_1 : z . string ( ) , run_id_2 : z . string ( ) } ) ,
1063+ } ) ;
1064+
1065+ const [ queryError , result ] = await queryRuns ( { run_id_1 : run1 . id , run_id_2 : run2 . id } ) ;
1066+
1067+ expect ( queryError ) . toBeNull ( ) ;
1068+ expect ( result ?. length ) . toBe ( 2 ) ;
1069+
1070+ // Due to sorting, org2 (org-a) should come first even though it was created second
1071+ expect ( result ?. [ 0 ] ) . toEqual (
1072+ expect . objectContaining ( {
1073+ run_id : run2 . id ,
1074+ organization_id : org2 . id ,
1075+ friendly_id : `run_sort_org_a_${ now } ` ,
1076+ } )
1077+ ) ;
1078+
1079+ expect ( result ?. [ 1 ] ) . toEqual (
1080+ expect . objectContaining ( {
1081+ run_id : run1 . id ,
1082+ organization_id : org1 . id ,
1083+ friendly_id : `run_sort_org_z_${ now } ` ,
1084+ } )
1085+ ) ;
1086+
1087+ await runsReplicationService . stop ( ) ;
1088+ }
1089+ ) ;
7861090} ) ;
0 commit comments