1313from pydantic import validate_email
1414from tenacity import retry , stop_after_attempt , wait_fixed
1515
16- from crowdgit .database .crud import batch_insert_activities , save_service_execution
16+ from crowdgit .database .crud import (
17+ batch_check_parent_activities ,
18+ batch_insert_activities ,
19+ save_service_execution ,
20+ )
1721from crowdgit .enums import (
1822 DataSinkWorkerQueueMessageType ,
1923 ErrorCode ,
@@ -115,6 +119,7 @@ async def process_single_batch_commits(
115119 "total_commits" : 0 ,
116120 "processed_commits" : 0 ,
117121 "bad_commits" : 0 ,
122+ "skipped_activities" : 0 ,
118123 "total_activities" : 0 ,
119124 }
120125
@@ -139,6 +144,7 @@ async def process_single_batch_commits(
139144 batch_info .remote ,
140145 repository .segment_id ,
141146 repository .integration_id ,
147+ repository .parent_repo ,
142148 )
143149
144150 batch_end_time = time .time ()
@@ -164,6 +170,7 @@ async def process_single_batch_commits(
164170 "total_commits" : self ._metrics_context ["total_commits" ],
165171 "processed_commits" : self ._metrics_context ["processed_commits" ],
166172 "bad_commits" : self ._metrics_context ["bad_commits" ],
173+ "skipped_activities" : self ._metrics_context ["skipped_activities" ],
167174 "total_activities" : self ._metrics_context ["total_activities" ],
168175 },
169176 )
@@ -200,6 +207,7 @@ async def process_single_batch_commits(
200207 "total_commits" : self ._metrics_context ["total_commits" ],
201208 "processed_commits" : self ._metrics_context ["processed_commits" ],
202209 "bad_commits" : self ._metrics_context ["bad_commits" ],
210+ "skipped_activities" : self ._metrics_context ["skipped_activities" ],
203211 "total_activities" : self ._metrics_context ["total_activities" ],
204212 },
205213 )
@@ -609,6 +617,66 @@ def create_activities_from_commit(
609617
610618 return activities_db , activities_queue
611619
620+ async def _filter_parent_repo_activities (
621+ self ,
622+ activities_db : list [tuple ],
623+ activities_queue : list [dict ],
624+ parent_repo : Repository ,
625+ ) -> tuple [list [tuple ], list [dict ], int ]:
626+ """
627+ Filter out activities that exist in parent repo (for fork detection).
628+ Uses full dedup key (timestamp, platform, type, sourceId, channel, segmentId) for optimal index usage.
629+
630+ Args:
631+ activities_db: List of activity tuples for database
632+ activities_queue: List of activity dicts for Kafka queue
633+ parent_repo: Parent repository information
634+
635+ Returns:
636+ Tuple of (filtered_activities_db, filtered_activities_queue, skipped_activities_count)
637+ """
638+ if not activities_db :
639+ return activities_db , activities_queue , 0
640+
641+ # Extract (timestamp, type, sourceId) for each activity to use full dedup index
642+ activity_keys = []
643+ for act in activities_db :
644+ data = orjson .loads (act [2 ])["data" ]
645+ activity_keys .append ((data ["timestamp" ], data ["type" ], data ["sourceId" ]))
646+
647+ # Batch check which activities exist in parent repo
648+ parent_source_ids = await batch_check_parent_activities (
649+ activity_keys ,
650+ parent_repo .url ,
651+ parent_repo .segment_id ,
652+ )
653+
654+ if not parent_source_ids :
655+ return activities_db , activities_queue , 0
656+
657+ filtered_activities_db = []
658+ filtered_activities_queue = []
659+ skipped_activities_count = 0
660+
661+ for i , activity_tuple in enumerate (activities_db ):
662+ activity_data = orjson .loads (activity_tuple [2 ])
663+ source_id = activity_data ["data" ]["sourceId" ]
664+
665+ if source_id not in parent_source_ids :
666+ # Activity doesn't exist in parent repo, keep it
667+ filtered_activities_db .append (activity_tuple )
668+ filtered_activities_queue .append (activities_queue [i ])
669+ else :
670+ # Activity exists in parent repo, skip it
671+ skipped_activities_count += 1
672+
673+ if skipped_activities_count > 0 :
674+ self .logger .info (
675+ f"Filtered out { skipped_activities_count } activities from parent repo { parent_repo .url } "
676+ )
677+
678+ return filtered_activities_db , filtered_activities_queue , skipped_activities_count
679+
612680 async def process_commits_chunk (
613681 self ,
614682 commit_texts_chunk : list [str | None ],
@@ -617,6 +685,7 @@ async def process_commits_chunk(
617685 remote : str ,
618686 segment_id : str ,
619687 integration_id : str ,
688+ parent_repo : Repository | None ,
620689 ) -> None :
621690 """
622691 Process a chunk of raw commit texts into activities and write them to DB and Kafka.
@@ -674,15 +743,31 @@ async def process_commits_chunk(
674743 del commit_lines
675744 del numstats_text
676745
677- self .logger .info (
678- f"Processed { processed_commits } commits, skipped { bad_commits } invalid commits in { repo_path } "
679- )
746+ # Filter out activities from parent repo (for forks)
747+ skipped_activities = 0
748+ if parent_repo :
749+ (
750+ activities_db ,
751+ activities_queue ,
752+ skipped_activities ,
753+ ) = await self ._filter_parent_repo_activities (
754+ activities_db , activities_queue , parent_repo
755+ )
680756
757+ if skipped_activities > 0 :
758+ self .logger .info (
759+ f"Processed { processed_commits } commits, skipped { bad_commits } invalid commits, filtered { skipped_activities } activities from parent repo in { repo_path } "
760+ )
761+ else :
762+ self .logger .info (
763+ f"Processed { processed_commits } commits, skipped { bad_commits } invalid commits in { repo_path } "
764+ )
681765 # Update metrics context
682766 if self ._metrics_context :
683767 self ._metrics_context ["processed_commits" ] += processed_commits
684768 self ._metrics_context ["bad_commits" ] += bad_commits
685769 self ._metrics_context ["total_activities" ] += len (activities_db )
770+ self ._metrics_context ["skipped_activities" ] += skipped_activities
686771
687772 # Write activities to database and queue
688773 if activities_db :
@@ -701,6 +786,7 @@ async def _process_activities_from_commits(
701786 remote : str ,
702787 segment_id : str ,
703788 integration_id : str ,
789+ parent_repo : Repository | None = None ,
704790 ):
705791 """
706792 Parse raw git log output, process commits into activities, and save to database.
@@ -747,6 +833,7 @@ async def process_single_chunk(chunk_start_idx: int, chunk_end_idx: int):
747833 remote ,
748834 segment_id ,
749835 integration_id ,
836+ parent_repo ,
750837 )
751838 completed_chunks += 1
752839 self .logger .info (f"Progress: { completed_chunks } /{ total_chunks } chunks" )
0 commit comments