-
Notifications
You must be signed in to change notification settings - Fork 729
fix(git-integration): insertions deletions memory consumption [CM-724] #3505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,7 +35,8 @@ | |
| class CommitService(BaseService): | ||
| """Service for processing repository commits""" | ||
|
|
||
| COMMIT_END_SPLITTER = "--CROWD-END-OF-COMMIT--" | ||
| COMMIT_START_SPLITTER = "---CROWD_COMMIT_START---" | ||
| NUMSTAT_SPLITTER = "---CROWD_NUMSTAT_START---" | ||
| MIN_COMMIT_FIELDS = 8 | ||
| DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z" | ||
| FUTURE_DATE_THRESHOLD_DAYS = 1 | ||
|
|
@@ -76,7 +77,7 @@ def cleanup_process_pool(self): | |
| @property | ||
| def git_log_format(self) -> str: | ||
| """Git log format string with commit splitter""" | ||
| return f"%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.COMMIT_END_SPLITTER}" | ||
| return f"{self.COMMIT_START_SPLITTER}%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.NUMSTAT_SPLITTER}" | ||
|
|
||
| @staticmethod | ||
| def is_valid_commit_hash(commit_hash: str) -> bool: | ||
|
|
@@ -119,13 +120,8 @@ async def process_single_batch_commits( | |
| Process commits from a cloned batch. | ||
|
|
||
| Args: | ||
| repo_path: Path to the Git repository. | ||
| edge_commit: The edge commit for the current batch. It should be excluded from processing because its data may be incomplete or inaccurate. | ||
| prev_batch_edge_commit: The edge commit from the previous batch. Which is used as the starting point (included) for the current batch processing. | ||
| remote: Remote repository URL | ||
| segment_id: Segment identifier | ||
| integration_id: Integration identifier | ||
| is_final_batch: Whether this is the final batch (triggers metrics saving) | ||
| repository: Repository object containing segment and integration info | ||
| batch_info: Clone batch information with paths and commit boundaries | ||
| """ | ||
| # Initialize metrics context on first call | ||
| if self._metrics_context is None: | ||
|
|
@@ -143,7 +139,7 @@ async def process_single_batch_commits( | |
| self.logger.info( | ||
| f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}" | ||
| ) | ||
| raw_commits, raw_numstats = await self._execute_git_log( | ||
| raw_commits = await self._execute_git_log( | ||
| batch_info.repo_path, | ||
| batch_info.clone_with_batches, | ||
| batch_info.prev_batch_edge_commit, | ||
|
|
@@ -152,7 +148,6 @@ async def process_single_batch_commits( | |
|
|
||
| await self._process_activities_from_commits( | ||
| raw_commits, | ||
| raw_numstats, | ||
| batch_info.repo_path, | ||
| batch_info.edge_commit, | ||
| batch_info.remote, | ||
|
|
@@ -222,71 +217,41 @@ async def _get_commit_reference(self, repo_path: str) -> str: | |
| return "HEAD" | ||
| return f"origin/{default_branch}" | ||
|
|
||
| def _build_git_log_commands( | ||
| self, repo_path: str, commit_range: str | ||
| ) -> tuple[list[str], list[str]]: | ||
| def _build_git_log_command(self, repo_path: str, commit_range: str) -> list[str]: | ||
| """Build git log commands for commits and numstats.""" | ||
| raw_commits_cmd = [ | ||
| "git", | ||
| "-C", | ||
| repo_path, | ||
| "log", | ||
| commit_range, | ||
| f"--pretty=format:{self.git_log_format}", | ||
| ] | ||
| raw_insertions_deletions_cmd = [ | ||
| return [ | ||
| "git", | ||
| "-C", | ||
| repo_path, | ||
| "log", | ||
| commit_range, | ||
| "--pretty=format:%H", | ||
| "--cc", | ||
| "--numstat", | ||
| f"--pretty=format:{self.git_log_format}", | ||
| ] | ||
| return raw_commits_cmd, raw_insertions_deletions_cmd | ||
|
|
||
| @staticmethod | ||
| def _parse_numstats(raw_numstats: str) -> dict[str, tuple[int, int]]: | ||
| def _parse_numstats(raw_numstats: str) -> tuple[int, int]: | ||
| """ | ||
| Parse raw numstats into commit_hash -> (insertions, deletions) mapping. | ||
| Parse raw numstats into -> (insertions, deletions). | ||
|
|
||
| Args: | ||
| raw_numstats: Raw output from git log --numstat --pretty=format:%H | ||
|
|
||
| Returns: | ||
| Dictionary mapping commit hashes to (insertions, deletions) tuples | ||
| (insertions, deletions) tuple | ||
| """ | ||
| if not raw_numstats.strip(): | ||
| return {} | ||
|
|
||
| changes = {} | ||
| commits_texts = raw_numstats.split("\n\n") | ||
|
|
||
| for commit_text in commits_texts: | ||
| commit_lines = commit_text.strip().splitlines() | ||
| insertions_deletions = raw_numstats.strip().splitlines() | ||
| insertions = 0 | ||
| deletions = 0 | ||
|
|
||
| if len(commit_lines) < 2: | ||
| continue | ||
|
|
||
| commit_hash = commit_lines[0] | ||
| if not CommitService.is_valid_commit_hash(commit_hash): | ||
| logger.error("Invalid insertions/deletions hash found: hash=%s", commit_hash) | ||
| continue | ||
|
|
||
| insertions_deletions = commit_lines[1:] | ||
| insertions = 0 | ||
| deletions = 0 | ||
|
|
||
| for line in insertions_deletions: | ||
| match = re.match(r"^(\d+)\s+(\d+)", line) | ||
| if match: | ||
| insertions += int(match.group(1)) | ||
| deletions += int(match.group(2)) | ||
|
|
||
| changes[commit_hash] = (insertions, deletions) | ||
| for line in insertions_deletions: | ||
| match = re.match(r"^(\d+)\s+(\d+)", line) | ||
| if match: | ||
| insertions += int(match.group(1)) | ||
| deletions += int(match.group(2)) | ||
|
|
||
| return changes | ||
| return (insertions, deletions) | ||
|
|
||
| @retry( | ||
| stop=stop_after_attempt(5), | ||
|
|
@@ -299,7 +264,7 @@ async def _execute_git_log( | |
| clone_with_batches: bool, | ||
| prev_batch_edge_commit: str | None = None, | ||
| edge_commit: str | None = None, | ||
| ) -> tuple[str, str]: | ||
| ) -> str: | ||
| """Execute git log command and return raw output.""" | ||
| # Ensure abbreviated commits are disabled | ||
| await run_shell_command( | ||
|
|
@@ -313,16 +278,11 @@ async def _execute_git_log( | |
| self.logger.info( | ||
| f"Full repo cloned in single batch, getting all commits in {commit_reference}" | ||
| ) | ||
| raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands( | ||
| repo_path, commit_reference | ||
| ) | ||
| return await asyncio.gather( | ||
| run_shell_command(raw_commits_cmd), | ||
| run_shell_command(raw_insertions_deletions_cmd), | ||
| ) | ||
| raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference) | ||
| return await run_shell_command(raw_commits_cmd) | ||
|
|
||
| if not prev_batch_edge_commit: | ||
| return "", "" | ||
| return "" | ||
|
|
||
| if edge_commit: | ||
| # Middle batches: Get the slice of history between the last batch's edge and this one's. | ||
|
|
@@ -335,14 +295,10 @@ async def _execute_git_log( | |
| commit_range = prev_batch_edge_commit | ||
| self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root") | ||
|
|
||
| raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands( | ||
| repo_path, commit_range | ||
| ) | ||
| raw_commits_cmd = self._build_git_log_command(repo_path, commit_range) | ||
|
|
||
| self.logger.info(f"Executing git log for range: {commit_range}") | ||
| return await asyncio.gather( | ||
| run_shell_command(raw_commits_cmd), run_shell_command(raw_insertions_deletions_cmd) | ||
| ) | ||
| return await run_shell_command(raw_commits_cmd) | ||
|
|
||
| @staticmethod | ||
| def should_skip_commit(raw_commit: str | None, edge_commit: str | None) -> bool: | ||
|
|
@@ -376,6 +332,7 @@ def create_activity( | |
| activity_type: Type of activity | ||
| member: Member information dictionary | ||
| source_id: Source ID for the activity | ||
| segment_id: Segment identifier | ||
| source_parent_id: Parent source ID (optional) | ||
|
|
||
| Returns: | ||
|
|
@@ -623,13 +580,12 @@ def create_activities_from_commit( | |
| @staticmethod | ||
| def process_commits_chunk( | ||
| commit_texts_chunk: list[str | None], | ||
| numstats_map: dict[str, tuple[int, int]], | ||
| repo_path: str, | ||
| edge_commit_hash: str | None, | ||
| remote: str, | ||
| segment_id: str, | ||
| integration_id: str, | ||
| ) -> list[tuple]: | ||
| ) -> tuple[list[tuple], list[dict]]: | ||
| """ | ||
| Process a chunk of raw commit texts into activities. | ||
|
|
||
|
|
@@ -639,35 +595,39 @@ def process_commits_chunk( | |
|
|
||
| Args: | ||
| commit_texts_chunk: List of commit text strings to process | ||
| numstats_map: Pre-parsed mapping of commit_hash -> (insertions, deletions) | ||
| repo_path: Path to the repository | ||
| edge_commit_hash: Edge commit hash for filtering | ||
| remote: Remote repository URL | ||
| segment_id: Segment identifier | ||
| integration_id: Integration identifier | ||
|
|
||
| Returns: | ||
| List of activity dictionaries for database insertion | ||
| Tuple of (activities_db, activities_queue) for database and queue | ||
| """ | ||
| activities_db = [] | ||
| activities_queue = [] | ||
| bad_commits = 0 | ||
| processed_commits = 0 | ||
| commit = None | ||
|
|
||
| for commit_text in commit_texts_chunk: | ||
| if CommitService.should_skip_commit(commit_text, edge_commit_hash): | ||
| for full_commit_text in commit_texts_chunk: | ||
| if CommitService.should_skip_commit(full_commit_text, edge_commit_hash): | ||
| continue | ||
|
|
||
| commit_text, numstats_text = full_commit_text.split(CommitService.NUMSTAT_SPLITTER) | ||
| commit_lines = commit_text.strip().splitlines() | ||
| del full_commit_text | ||
| del commit_text | ||
| if not CommitService._validate_commit_structure(commit_lines): | ||
| logger.warning( | ||
| f"Invalid commit structure in {repo_path}: {len(commit_lines)} fields" | ||
| ) | ||
| bad_commits += 1 | ||
| del commit_lines | ||
| del numstats_text | ||
| continue | ||
|
|
||
| try: | ||
| commit = CommitService._construct_commit_dict(commit_lines, numstats_map) | ||
| commit = CommitService._construct_commit_dict(commit_lines, numstats_text) | ||
| if CommitService._validate_commit_data(commit): | ||
| activity_db_records, activity_kafka = ( | ||
| CommitService.create_activities_from_commit( | ||
|
|
@@ -676,6 +636,8 @@ def process_commits_chunk( | |
| ) | ||
| activities_db.extend(activity_db_records) | ||
| activities_queue.extend(activity_kafka) | ||
| del activity_db_records | ||
| del activity_kafka | ||
| processed_commits += 1 | ||
| else: | ||
| bad_commits += 1 | ||
|
|
@@ -684,6 +646,10 @@ def process_commits_chunk( | |
| logger.warning(f"Failed to parse commit in {repo_path}: {e}") | ||
| bad_commits += 1 | ||
| continue | ||
| finally: | ||
| del commit | ||
| del commit_lines | ||
| del numstats_text | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Unconditional Deletion in
|
||
|
|
||
| logger.info( | ||
| f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}" | ||
|
|
@@ -694,36 +660,32 @@ def process_commits_chunk( | |
| async def _process_activities_from_commits( | ||
| self, | ||
| raw_commits: str, | ||
| raw_numstats: str, | ||
| repo_path: str, | ||
| edge_commit_hash: str | None, | ||
| remote: str, | ||
| segment_id: str, | ||
| integration_id: str, | ||
| ): | ||
| """ | ||
| Parse raw git log output into commit dictionaries. | ||
| Parse raw git log output, process commits into activities, and save to database. | ||
| """ | ||
| commit_texts = [ | ||
| c.strip() for c in raw_commits.split(self.COMMIT_END_SPLITTER) if c.strip() | ||
| c.strip() for c in raw_commits.split(self.COMMIT_START_SPLITTER) if c.strip() | ||
| ] | ||
| del raw_commits | ||
| logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") | ||
| if len(commit_texts) == 0: | ||
| self.logger.info("No commits to be processed") | ||
| return | ||
|
|
||
| # Pre-parse numstats for efficient lookup during commit processing | ||
| self.logger.info("Pre-parsing numstats data...") | ||
| numstats_map = CommitService._parse_numstats(raw_numstats) | ||
| self.logger.info(f"Parsed numstats for {len(numstats_map)} commits") | ||
|
|
||
| chunk_size = min(max(20, len(commit_texts) // MAX_WORKER_PROCESSES), self.MAX_CHUNK_SIZE) | ||
|
|
||
| self.logger.info(f"Spliting commits into chunks of {chunk_size}") | ||
| chunks = [ | ||
| commit_texts[i : i + chunk_size] for i in range(0, len(commit_texts), chunk_size) | ||
| ] | ||
| self.logger.info(f"Total commits {len(commit_texts)} chunks {len(chunks)}") | ||
| del commit_texts # to release memory | ||
| loop = asyncio.get_event_loop() | ||
|
|
||
| executor = self._get_or_create_pool() | ||
|
|
@@ -734,7 +696,6 @@ async def _process_activities_from_commits( | |
| executor, | ||
| CommitService.process_commits_chunk, | ||
| chunk, | ||
| numstats_map, | ||
| repo_path, | ||
| edge_commit_hash, | ||
| remote, | ||
|
|
@@ -762,6 +723,8 @@ async def _process_activities_from_commits( | |
| batch_insert_activities(chunk_activities_db), | ||
| self.queue_service.send_batch_activities(chunk_activities_queue), | ||
| ) | ||
| del chunk_activities_db | ||
| del chunk_activities_queue | ||
| except Exception as e: | ||
| if isinstance(e, BrokenProcessPool): | ||
| self.logger.warning( | ||
|
|
@@ -772,7 +735,7 @@ async def _process_activities_from_commits( | |
|
|
||
| @staticmethod | ||
| def _construct_commit_dict( | ||
| commit_metadata_lines: list[str], numstats_map: dict[str, tuple[int, int]] | ||
| commit_metadata_lines: list[str], numstats_text: str | ||
| ) -> dict[str, Any]: | ||
| """Create commit dictionary from parsed lines.""" | ||
| commit_hash = commit_metadata_lines[0] | ||
|
|
@@ -806,8 +769,11 @@ def _construct_commit_dict( | |
| commit_datetime, author_datetime | ||
| ) | ||
|
|
||
| # Get insertions/deletions from pre-parsed numstats map | ||
| insertions, deletions = numstats_map.get(commit_hash, (0, 0)) | ||
| # Parse numstats to get insertions/deletions | ||
| insertions, deletions = CommitService._parse_numstats(numstats_text) | ||
| # release memory | ||
| del numstats_text | ||
| del commit_metadata_lines | ||
|
|
||
| return { | ||
| "hash": commit_hash, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Commit Splitting Fails on Incorrect NUMSTAT Splitter Usage
The
split()operation onfull_commit_textexpects exactly two parts: commit metadata and numstat lines, separated byNUMSTAT_SPLITTER. If the splitter appears zero or multiple times (e.g., within a commit message), unpacking the result will raise aValueError, causing commit processing to fail.