From 8e1e525cf53641e76c97d86aecbf5610b3ecbbdf Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Sun, 12 Oct 2025 19:16:40 +0100 Subject: [PATCH 1/4] fix: numstats memory consumption --- .../services/commit/commit_service.py | 113 ++++++------------ 1 file changed, 35 insertions(+), 78 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 4009ad8460..a76e5495e6 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -35,7 +35,8 @@ class CommitService(BaseService): """Service for processing repository commits""" - COMMIT_END_SPLITTER = "--CROWD-END-OF-COMMIT--" + COMMIT_START_SPLITTER = "---CROWD_COMMIT_START---" + NUMSTAT_SPLITTER = "---CROWD_NUMSTAT_START---" MIN_COMMIT_FIELDS = 8 DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z" FUTURE_DATE_THRESHOLD_DAYS = 1 @@ -48,7 +49,7 @@ class CommitService(BaseService): _GIT_PLATFORM = "git" _USERNAME_TYPE = "username" _EMAIL_TYPE = "email" - _COMMITTED_COMMIT_SUFFIX = "commited-commit" + _COMMITTED_COMMIT_SUFFIX = "committed-commit" MAX_CHUNK_SIZE = 250 @@ -76,7 +77,7 @@ def cleanup_process_pool(self): @property def git_log_format(self) -> str: """Git log format string with commit splitter""" - return f"%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.COMMIT_END_SPLITTER}" + return f"{self.COMMIT_START_SPLITTER}%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.NUMSTAT_SPLITTER}" @staticmethod def is_valid_commit_hash(commit_hash: str) -> bool: @@ -143,7 +144,7 @@ async def process_single_batch_commits( self.logger.info( f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}" ) - raw_commits, raw_numstats = await self._execute_git_log( + raw_commits = await self._execute_git_log( batch_info.repo_path, batch_info.clone_with_batches, batch_info.prev_batch_edge_commit, @@ -152,7 +153,6 @@ async def process_single_batch_commits( await self._process_activities_from_commits( raw_commits, - raw_numstats, batch_info.repo_path, batch_info.edge_commit, batch_info.remote, @@ -222,71 +222,41 @@ async def _get_commit_reference(self, repo_path: str) -> str: return "HEAD" return f"origin/{default_branch}" - def _build_git_log_commands( - self, repo_path: str, commit_range: str - ) -> tuple[list[str], list[str]]: + def _build_git_log_command(self, repo_path: str, commit_range: str) -> list[str]: """Build git log commands for commits and numstats.""" - raw_commits_cmd = [ + return [ "git", "-C", repo_path, "log", commit_range, - f"--pretty=format:{self.git_log_format}", - ] - raw_insertions_deletions_cmd = [ - "git", - "-C", - repo_path, - "log", - commit_range, - "--pretty=format:%H", "--cc", "--numstat", + f"--pretty=format:{self.git_log_format}", ] - return raw_commits_cmd, raw_insertions_deletions_cmd @staticmethod - def _parse_numstats(raw_numstats: str) -> dict[str, tuple[int, int]]: + def _parse_numstats(raw_numstats: str) -> tuple[int, int]: """ - Parse raw numstats into commit_hash -> (insertions, deletions) mapping. + Parse raw numstats into -> (insertions, deletions). Args: raw_numstats: Raw output from git log --numstat --pretty=format:%H Returns: - Dictionary mapping commit hashes to (insertions, deletions) tuples + (insertions, deletions) tuple """ - if not raw_numstats.strip(): - return {} - - changes = {} - commits_texts = raw_numstats.split("\n\n") - - for commit_text in commits_texts: - commit_lines = commit_text.strip().splitlines() - - if len(commit_lines) < 2: - continue - - commit_hash = commit_lines[0] - if not CommitService.is_valid_commit_hash(commit_hash): - logger.error("Invalid insertions/deletions hash found: hash=%s", commit_hash) - continue + insertions_deletions = raw_numstats.strip().splitlines() + insertions = 0 + deletions = 0 - insertions_deletions = commit_lines[1:] - insertions = 0 - deletions = 0 - - for line in insertions_deletions: - match = re.match(r"^(\d+)\s+(\d+)", line) - if match: - insertions += int(match.group(1)) - deletions += int(match.group(2)) - - changes[commit_hash] = (insertions, deletions) + for line in insertions_deletions: + match = re.match(r"^(\d+)\s+(\d+)", line) + if match: + insertions += int(match.group(1)) + deletions += int(match.group(2)) - return changes + return (insertions, deletions) @retry( stop=stop_after_attempt(5), @@ -299,7 +269,7 @@ async def _execute_git_log( clone_with_batches: bool, prev_batch_edge_commit: str | None = None, edge_commit: str | None = None, - ) -> tuple[str, str]: + ) -> str: """Execute git log command and return raw output.""" # Ensure abbreviated commits are disabled await run_shell_command( @@ -313,16 +283,11 @@ async def _execute_git_log( self.logger.info( f"Full repo cloned in single batch, getting all commits in {commit_reference}" ) - raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands( - repo_path, commit_reference - ) - return await asyncio.gather( - run_shell_command(raw_commits_cmd), - run_shell_command(raw_insertions_deletions_cmd), - ) + raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference) + return await run_shell_command(raw_commits_cmd) if not prev_batch_edge_commit: - return "", "" + return "" if edge_commit: # Middle batches: Get the slice of history between the last batch's edge and this one's. @@ -335,14 +300,10 @@ async def _execute_git_log( commit_range = prev_batch_edge_commit self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root") - raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands( - repo_path, commit_range - ) + raw_commits_cmd = self._build_git_log_command(repo_path, commit_range) self.logger.info(f"Executing git log for range: {commit_range}") - return await asyncio.gather( - run_shell_command(raw_commits_cmd), run_shell_command(raw_insertions_deletions_cmd) - ) + return await run_shell_command(raw_commits_cmd) @staticmethod def should_skip_commit(raw_commit: str | None, edge_commit: str | None) -> bool: @@ -623,7 +584,6 @@ def create_activities_from_commit( @staticmethod def process_commits_chunk( commit_texts_chunk: list[str | None], - numstats_map: dict[str, tuple[int, int]], repo_path: str, edge_commit_hash: str | None, remote: str, @@ -657,7 +617,7 @@ def process_commits_chunk( for commit_text in commit_texts_chunk: if CommitService.should_skip_commit(commit_text, edge_commit_hash): continue - + commit_text, numstats_text = commit_text.split(CommitService.NUMSTAT_SPLITTER) commit_lines = commit_text.strip().splitlines() if not CommitService._validate_commit_structure(commit_lines): logger.warning( @@ -667,7 +627,7 @@ def process_commits_chunk( continue try: - commit = CommitService._construct_commit_dict(commit_lines, numstats_map) + commit = CommitService._construct_commit_dict(commit_lines, numstats_text) if CommitService._validate_commit_data(commit): activity_db_records, activity_kafka = ( CommitService.create_activities_from_commit( @@ -694,7 +654,6 @@ def process_commits_chunk( async def _process_activities_from_commits( self, raw_commits: str, - raw_numstats: str, repo_path: str, edge_commit_hash: str | None, remote: str, @@ -705,18 +664,13 @@ async def _process_activities_from_commits( Parse raw git log output into commit dictionaries. """ commit_texts = [ - c.strip() for c in raw_commits.split(self.COMMIT_END_SPLITTER) if c.strip() + c.strip() for c in raw_commits.split(self.COMMIT_START_SPLITTER) if c.strip() ] logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") if len(commit_texts) == 0: self.logger.info("No commits to be processed") return - # Pre-parse numstats for efficient lookup during commit processing - self.logger.info("Pre-parsing numstats data...") - numstats_map = CommitService._parse_numstats(raw_numstats) - self.logger.info(f"Parsed numstats for {len(numstats_map)} commits") - chunk_size = min(max(20, len(commit_texts) // MAX_WORKER_PROCESSES), self.MAX_CHUNK_SIZE) self.logger.info(f"Spliting commits into chunks of {chunk_size}") @@ -724,6 +678,7 @@ async def _process_activities_from_commits( commit_texts[i : i + chunk_size] for i in range(0, len(commit_texts), chunk_size) ] self.logger.info(f"Total commits {len(commit_texts)} chunks {len(chunks)}") + del commit_texts # to release memory loop = asyncio.get_event_loop() executor = self._get_or_create_pool() @@ -734,7 +689,6 @@ async def _process_activities_from_commits( executor, CommitService.process_commits_chunk, chunk, - numstats_map, repo_path, edge_commit_hash, remote, @@ -772,7 +726,7 @@ async def _process_activities_from_commits( @staticmethod def _construct_commit_dict( - commit_metadata_lines: list[str], numstats_map: dict[str, tuple[int, int]] + commit_metadata_lines: list[str], numstats_text: str ) -> dict[str, Any]: """Create commit dictionary from parsed lines.""" commit_hash = commit_metadata_lines[0] @@ -807,7 +761,10 @@ def _construct_commit_dict( ) # Get insertions/deletions from pre-parsed numstats map - insertions, deletions = numstats_map.get(commit_hash, (0, 0)) + insertions, deletions = CommitService._parse_numstats(numstats_text) + # release memory + del numstats_text + del commit_metadata_lines return { "hash": commit_hash, From 64f1347029fb0e3cfce000856a251da1dc0aa9ff Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 13 Oct 2025 11:23:11 +0100 Subject: [PATCH 2/4] fix: not updating typo in sourceId hash --- .../src/crowdgit/services/commit/commit_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index a76e5495e6..94aee87ca4 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -49,7 +49,7 @@ class CommitService(BaseService): _GIT_PLATFORM = "git" _USERNAME_TYPE = "username" _EMAIL_TYPE = "email" - _COMMITTED_COMMIT_SUFFIX = "committed-commit" + _COMMITTED_COMMIT_SUFFIX = "commited-commit" MAX_CHUNK_SIZE = 250 From d233d19472f3a4f46223caea2abb5062f3bd9103 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 13 Oct 2025 12:25:03 +0100 Subject: [PATCH 3/4] feat: optimize encoding --- .../src/crowdgit/services/queue/queue_service.py | 4 ++-- services/apps/git_integration/src/crowdgit/services/utils.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py index eef00b4675..6bcb0e2331 100644 --- a/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py +++ b/services/apps/git_integration/src/crowdgit/services/queue/queue_service.py @@ -153,8 +153,8 @@ async def send_batch_activities(self, activities_kafka: list[dict[str, str]]): futures = [ self.kafka_producer.send( topic=self.kafka_topic, - key=activity["message_id"].encode("utf-8"), - value=activity["payload"].encode("utf-8"), + key=activity["message_id"].encode("utf-8", errors="replace"), + value=activity["payload"].encode("utf-8", errors="replace"), ) for activity in activities_kafka ] diff --git a/services/apps/git_integration/src/crowdgit/services/utils.py b/services/apps/git_integration/src/crowdgit/services/utils.py index feeed61536..44b3a34f70 100644 --- a/services/apps/git_integration/src/crowdgit/services/utils.py +++ b/services/apps/git_integration/src/crowdgit/services/utils.py @@ -39,7 +39,7 @@ def _safe_decode(data: bytes) -> str: # CP1252 is common for Windows-generated content and has specific byte mappings # ISO-8859-1 is a common legacy encoding for Western European languages - for encoding in ("cp1252", "iso-8859-1"): + for encoding in ("iso-8859-1", "cp1252"): logger.info(f"Trying {encoding} decoding") try: return data.decode(encoding) From bf9d7380400f00293924d2a9de43e7134d5d8c06 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Mon, 13 Oct 2025 14:37:13 +0100 Subject: [PATCH 4/4] feat: free memory after use & improve docs --- .../services/commit/commit_service.py | 39 ++++++++++++------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 94aee87ca4..647147feeb 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -120,13 +120,8 @@ async def process_single_batch_commits( Process commits from a cloned batch. Args: - repo_path: Path to the Git repository. - edge_commit: The edge commit for the current batch. It should be excluded from processing because its data may be incomplete or inaccurate. - prev_batch_edge_commit: The edge commit from the previous batch. Which is used as the starting point (included) for the current batch processing. - remote: Remote repository URL - segment_id: Segment identifier - integration_id: Integration identifier - is_final_batch: Whether this is the final batch (triggers metrics saving) + repository: Repository object containing segment and integration info + batch_info: Clone batch information with paths and commit boundaries """ # Initialize metrics context on first call if self._metrics_context is None: @@ -337,6 +332,7 @@ def create_activity( activity_type: Type of activity member: Member information dictionary source_id: Source ID for the activity + segment_id: Segment identifier source_parent_id: Parent source ID (optional) Returns: @@ -589,7 +585,7 @@ def process_commits_chunk( remote: str, segment_id: str, integration_id: str, - ) -> list[tuple]: + ) -> tuple[list[tuple], list[dict]]: """ Process a chunk of raw commit texts into activities. @@ -599,7 +595,6 @@ def process_commits_chunk( Args: commit_texts_chunk: List of commit text strings to process - numstats_map: Pre-parsed mapping of commit_hash -> (insertions, deletions) repo_path: Path to the repository edge_commit_hash: Edge commit hash for filtering remote: Remote repository URL @@ -607,23 +602,28 @@ def process_commits_chunk( integration_id: Integration identifier Returns: - List of activity dictionaries for database insertion + Tuple of (activities_db, activities_queue) for database and queue """ activities_db = [] activities_queue = [] bad_commits = 0 processed_commits = 0 + commit = None - for commit_text in commit_texts_chunk: - if CommitService.should_skip_commit(commit_text, edge_commit_hash): + for full_commit_text in commit_texts_chunk: + if CommitService.should_skip_commit(full_commit_text, edge_commit_hash): continue - commit_text, numstats_text = commit_text.split(CommitService.NUMSTAT_SPLITTER) + commit_text, numstats_text = full_commit_text.split(CommitService.NUMSTAT_SPLITTER) commit_lines = commit_text.strip().splitlines() + del full_commit_text + del commit_text if not CommitService._validate_commit_structure(commit_lines): logger.warning( f"Invalid commit structure in {repo_path}: {len(commit_lines)} fields" ) bad_commits += 1 + del commit_lines + del numstats_text continue try: @@ -636,6 +636,8 @@ def process_commits_chunk( ) activities_db.extend(activity_db_records) activities_queue.extend(activity_kafka) + del activity_db_records + del activity_kafka processed_commits += 1 else: bad_commits += 1 @@ -644,6 +646,10 @@ def process_commits_chunk( logger.warning(f"Failed to parse commit in {repo_path}: {e}") bad_commits += 1 continue + finally: + del commit + del commit_lines + del numstats_text logger.info( f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}" @@ -661,11 +667,12 @@ async def _process_activities_from_commits( integration_id: str, ): """ - Parse raw git log output into commit dictionaries. + Parse raw git log output, process commits into activities, and save to database. """ commit_texts = [ c.strip() for c in raw_commits.split(self.COMMIT_START_SPLITTER) if c.strip() ] + del raw_commits logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") if len(commit_texts) == 0: self.logger.info("No commits to be processed") @@ -716,6 +723,8 @@ async def _process_activities_from_commits( batch_insert_activities(chunk_activities_db), self.queue_service.send_batch_activities(chunk_activities_queue), ) + del chunk_activities_db + del chunk_activities_queue except Exception as e: if isinstance(e, BrokenProcessPool): self.logger.warning( @@ -760,7 +769,7 @@ def _construct_commit_dict( commit_datetime, author_datetime ) - # Get insertions/deletions from pre-parsed numstats map + # Parse numstats to get insertions/deletions insertions, deletions = CommitService._parse_numstats(numstats_text) # release memory del numstats_text