Skip to content

Commit dfdea8c

Browse files
authored
fix(git-integration): insertions deletions memory consumption [CM-724] (#3505)
1 parent 4746190 commit dfdea8c

3 files changed

Lines changed: 60 additions & 94 deletions

File tree

services/apps/git_integration/src/crowdgit/services/commit/commit_service.py

Lines changed: 57 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
class CommitService(BaseService):
3636
"""Service for processing repository commits"""
3737

38-
COMMIT_END_SPLITTER = "--CROWD-END-OF-COMMIT--"
38+
COMMIT_START_SPLITTER = "---CROWD_COMMIT_START---"
39+
NUMSTAT_SPLITTER = "---CROWD_NUMSTAT_START---"
3940
MIN_COMMIT_FIELDS = 8
4041
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
4142
FUTURE_DATE_THRESHOLD_DAYS = 1
@@ -76,7 +77,7 @@ def cleanup_process_pool(self):
7677
@property
7778
def git_log_format(self) -> str:
7879
"""Git log format string with commit splitter"""
79-
return f"%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.COMMIT_END_SPLITTER}"
80+
return f"{self.COMMIT_START_SPLITTER}%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.NUMSTAT_SPLITTER}"
8081

8182
@staticmethod
8283
def is_valid_commit_hash(commit_hash: str) -> bool:
@@ -119,13 +120,8 @@ async def process_single_batch_commits(
119120
Process commits from a cloned batch.
120121
121122
Args:
122-
repo_path: Path to the Git repository.
123-
edge_commit: The edge commit for the current batch. It should be excluded from processing because its data may be incomplete or inaccurate.
124-
prev_batch_edge_commit: The edge commit from the previous batch. Which is used as the starting point (included) for the current batch processing.
125-
remote: Remote repository URL
126-
segment_id: Segment identifier
127-
integration_id: Integration identifier
128-
is_final_batch: Whether this is the final batch (triggers metrics saving)
123+
repository: Repository object containing segment and integration info
124+
batch_info: Clone batch information with paths and commit boundaries
129125
"""
130126
# Initialize metrics context on first call
131127
if self._metrics_context is None:
@@ -143,7 +139,7 @@ async def process_single_batch_commits(
143139
self.logger.info(
144140
f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}"
145141
)
146-
raw_commits, raw_numstats = await self._execute_git_log(
142+
raw_commits = await self._execute_git_log(
147143
batch_info.repo_path,
148144
batch_info.clone_with_batches,
149145
batch_info.prev_batch_edge_commit,
@@ -152,7 +148,6 @@ async def process_single_batch_commits(
152148

153149
await self._process_activities_from_commits(
154150
raw_commits,
155-
raw_numstats,
156151
batch_info.repo_path,
157152
batch_info.edge_commit,
158153
batch_info.remote,
@@ -222,71 +217,41 @@ async def _get_commit_reference(self, repo_path: str) -> str:
222217
return "HEAD"
223218
return f"origin/{default_branch}"
224219

225-
def _build_git_log_commands(
226-
self, repo_path: str, commit_range: str
227-
) -> tuple[list[str], list[str]]:
220+
def _build_git_log_command(self, repo_path: str, commit_range: str) -> list[str]:
228221
"""Build git log commands for commits and numstats."""
229-
raw_commits_cmd = [
230-
"git",
231-
"-C",
232-
repo_path,
233-
"log",
234-
commit_range,
235-
f"--pretty=format:{self.git_log_format}",
236-
]
237-
raw_insertions_deletions_cmd = [
222+
return [
238223
"git",
239224
"-C",
240225
repo_path,
241226
"log",
242227
commit_range,
243-
"--pretty=format:%H",
244228
"--cc",
245229
"--numstat",
230+
f"--pretty=format:{self.git_log_format}",
246231
]
247-
return raw_commits_cmd, raw_insertions_deletions_cmd
248232

249233
@staticmethod
250-
def _parse_numstats(raw_numstats: str) -> dict[str, tuple[int, int]]:
234+
def _parse_numstats(raw_numstats: str) -> tuple[int, int]:
251235
"""
252-
Parse raw numstats into commit_hash -> (insertions, deletions) mapping.
236+
Parse raw numstats into -> (insertions, deletions).
253237
254238
Args:
255239
raw_numstats: Raw output from git log --numstat --pretty=format:%H
256240
257241
Returns:
258-
Dictionary mapping commit hashes to (insertions, deletions) tuples
242+
(insertions, deletions) tuple
259243
"""
260-
if not raw_numstats.strip():
261-
return {}
262-
263-
changes = {}
264-
commits_texts = raw_numstats.split("\n\n")
265-
266-
for commit_text in commits_texts:
267-
commit_lines = commit_text.strip().splitlines()
244+
insertions_deletions = raw_numstats.strip().splitlines()
245+
insertions = 0
246+
deletions = 0
268247

269-
if len(commit_lines) < 2:
270-
continue
271-
272-
commit_hash = commit_lines[0]
273-
if not CommitService.is_valid_commit_hash(commit_hash):
274-
logger.error("Invalid insertions/deletions hash found: hash=%s", commit_hash)
275-
continue
276-
277-
insertions_deletions = commit_lines[1:]
278-
insertions = 0
279-
deletions = 0
280-
281-
for line in insertions_deletions:
282-
match = re.match(r"^(\d+)\s+(\d+)", line)
283-
if match:
284-
insertions += int(match.group(1))
285-
deletions += int(match.group(2))
286-
287-
changes[commit_hash] = (insertions, deletions)
248+
for line in insertions_deletions:
249+
match = re.match(r"^(\d+)\s+(\d+)", line)
250+
if match:
251+
insertions += int(match.group(1))
252+
deletions += int(match.group(2))
288253

289-
return changes
254+
return (insertions, deletions)
290255

291256
@retry(
292257
stop=stop_after_attempt(5),
@@ -299,7 +264,7 @@ async def _execute_git_log(
299264
clone_with_batches: bool,
300265
prev_batch_edge_commit: str | None = None,
301266
edge_commit: str | None = None,
302-
) -> tuple[str, str]:
267+
) -> str:
303268
"""Execute git log command and return raw output."""
304269
# Ensure abbreviated commits are disabled
305270
await run_shell_command(
@@ -313,16 +278,11 @@ async def _execute_git_log(
313278
self.logger.info(
314279
f"Full repo cloned in single batch, getting all commits in {commit_reference}"
315280
)
316-
raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands(
317-
repo_path, commit_reference
318-
)
319-
return await asyncio.gather(
320-
run_shell_command(raw_commits_cmd),
321-
run_shell_command(raw_insertions_deletions_cmd),
322-
)
281+
raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference)
282+
return await run_shell_command(raw_commits_cmd)
323283

324284
if not prev_batch_edge_commit:
325-
return "", ""
285+
return ""
326286

327287
if edge_commit:
328288
# Middle batches: Get the slice of history between the last batch's edge and this one's.
@@ -335,14 +295,10 @@ async def _execute_git_log(
335295
commit_range = prev_batch_edge_commit
336296
self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root")
337297

338-
raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands(
339-
repo_path, commit_range
340-
)
298+
raw_commits_cmd = self._build_git_log_command(repo_path, commit_range)
341299

342300
self.logger.info(f"Executing git log for range: {commit_range}")
343-
return await asyncio.gather(
344-
run_shell_command(raw_commits_cmd), run_shell_command(raw_insertions_deletions_cmd)
345-
)
301+
return await run_shell_command(raw_commits_cmd)
346302

347303
@staticmethod
348304
def should_skip_commit(raw_commit: str | None, edge_commit: str | None) -> bool:
@@ -376,6 +332,7 @@ def create_activity(
376332
activity_type: Type of activity
377333
member: Member information dictionary
378334
source_id: Source ID for the activity
335+
segment_id: Segment identifier
379336
source_parent_id: Parent source ID (optional)
380337
381338
Returns:
@@ -623,13 +580,12 @@ def create_activities_from_commit(
623580
@staticmethod
624581
def process_commits_chunk(
625582
commit_texts_chunk: list[str | None],
626-
numstats_map: dict[str, tuple[int, int]],
627583
repo_path: str,
628584
edge_commit_hash: str | None,
629585
remote: str,
630586
segment_id: str,
631587
integration_id: str,
632-
) -> list[tuple]:
588+
) -> tuple[list[tuple], list[dict]]:
633589
"""
634590
Process a chunk of raw commit texts into activities.
635591
@@ -639,35 +595,39 @@ def process_commits_chunk(
639595
640596
Args:
641597
commit_texts_chunk: List of commit text strings to process
642-
numstats_map: Pre-parsed mapping of commit_hash -> (insertions, deletions)
643598
repo_path: Path to the repository
644599
edge_commit_hash: Edge commit hash for filtering
645600
remote: Remote repository URL
646601
segment_id: Segment identifier
647602
integration_id: Integration identifier
648603
649604
Returns:
650-
List of activity dictionaries for database insertion
605+
Tuple of (activities_db, activities_queue) for database and queue
651606
"""
652607
activities_db = []
653608
activities_queue = []
654609
bad_commits = 0
655610
processed_commits = 0
611+
commit = None
656612

657-
for commit_text in commit_texts_chunk:
658-
if CommitService.should_skip_commit(commit_text, edge_commit_hash):
613+
for full_commit_text in commit_texts_chunk:
614+
if CommitService.should_skip_commit(full_commit_text, edge_commit_hash):
659615
continue
660-
616+
commit_text, numstats_text = full_commit_text.split(CommitService.NUMSTAT_SPLITTER)
661617
commit_lines = commit_text.strip().splitlines()
618+
del full_commit_text
619+
del commit_text
662620
if not CommitService._validate_commit_structure(commit_lines):
663621
logger.warning(
664622
f"Invalid commit structure in {repo_path}: {len(commit_lines)} fields"
665623
)
666624
bad_commits += 1
625+
del commit_lines
626+
del numstats_text
667627
continue
668628

669629
try:
670-
commit = CommitService._construct_commit_dict(commit_lines, numstats_map)
630+
commit = CommitService._construct_commit_dict(commit_lines, numstats_text)
671631
if CommitService._validate_commit_data(commit):
672632
activity_db_records, activity_kafka = (
673633
CommitService.create_activities_from_commit(
@@ -676,6 +636,8 @@ def process_commits_chunk(
676636
)
677637
activities_db.extend(activity_db_records)
678638
activities_queue.extend(activity_kafka)
639+
del activity_db_records
640+
del activity_kafka
679641
processed_commits += 1
680642
else:
681643
bad_commits += 1
@@ -684,6 +646,10 @@ def process_commits_chunk(
684646
logger.warning(f"Failed to parse commit in {repo_path}: {e}")
685647
bad_commits += 1
686648
continue
649+
finally:
650+
del commit
651+
del commit_lines
652+
del numstats_text
687653

688654
logger.info(
689655
f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}"
@@ -694,36 +660,32 @@ def process_commits_chunk(
694660
async def _process_activities_from_commits(
695661
self,
696662
raw_commits: str,
697-
raw_numstats: str,
698663
repo_path: str,
699664
edge_commit_hash: str | None,
700665
remote: str,
701666
segment_id: str,
702667
integration_id: str,
703668
):
704669
"""
705-
Parse raw git log output into commit dictionaries.
670+
Parse raw git log output, process commits into activities, and save to database.
706671
"""
707672
commit_texts = [
708-
c.strip() for c in raw_commits.split(self.COMMIT_END_SPLITTER) if c.strip()
673+
c.strip() for c in raw_commits.split(self.COMMIT_START_SPLITTER) if c.strip()
709674
]
675+
del raw_commits
710676
logger.info(f"Actual number of commits to be processed: {len(commit_texts)}")
711677
if len(commit_texts) == 0:
712678
self.logger.info("No commits to be processed")
713679
return
714680

715-
# Pre-parse numstats for efficient lookup during commit processing
716-
self.logger.info("Pre-parsing numstats data...")
717-
numstats_map = CommitService._parse_numstats(raw_numstats)
718-
self.logger.info(f"Parsed numstats for {len(numstats_map)} commits")
719-
720681
chunk_size = min(max(20, len(commit_texts) // MAX_WORKER_PROCESSES), self.MAX_CHUNK_SIZE)
721682

722683
self.logger.info(f"Spliting commits into chunks of {chunk_size}")
723684
chunks = [
724685
commit_texts[i : i + chunk_size] for i in range(0, len(commit_texts), chunk_size)
725686
]
726687
self.logger.info(f"Total commits {len(commit_texts)} chunks {len(chunks)}")
688+
del commit_texts # to release memory
727689
loop = asyncio.get_event_loop()
728690

729691
executor = self._get_or_create_pool()
@@ -734,7 +696,6 @@ async def _process_activities_from_commits(
734696
executor,
735697
CommitService.process_commits_chunk,
736698
chunk,
737-
numstats_map,
738699
repo_path,
739700
edge_commit_hash,
740701
remote,
@@ -762,6 +723,8 @@ async def _process_activities_from_commits(
762723
batch_insert_activities(chunk_activities_db),
763724
self.queue_service.send_batch_activities(chunk_activities_queue),
764725
)
726+
del chunk_activities_db
727+
del chunk_activities_queue
765728
except Exception as e:
766729
if isinstance(e, BrokenProcessPool):
767730
self.logger.warning(
@@ -772,7 +735,7 @@ async def _process_activities_from_commits(
772735

773736
@staticmethod
774737
def _construct_commit_dict(
775-
commit_metadata_lines: list[str], numstats_map: dict[str, tuple[int, int]]
738+
commit_metadata_lines: list[str], numstats_text: str
776739
) -> dict[str, Any]:
777740
"""Create commit dictionary from parsed lines."""
778741
commit_hash = commit_metadata_lines[0]
@@ -806,8 +769,11 @@ def _construct_commit_dict(
806769
commit_datetime, author_datetime
807770
)
808771

809-
# Get insertions/deletions from pre-parsed numstats map
810-
insertions, deletions = numstats_map.get(commit_hash, (0, 0))
772+
# Parse numstats to get insertions/deletions
773+
insertions, deletions = CommitService._parse_numstats(numstats_text)
774+
# release memory
775+
del numstats_text
776+
del commit_metadata_lines
811777

812778
return {
813779
"hash": commit_hash,

services/apps/git_integration/src/crowdgit/services/queue/queue_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,8 @@ async def send_batch_activities(self, activities_kafka: list[dict[str, str]]):
153153
futures = [
154154
self.kafka_producer.send(
155155
topic=self.kafka_topic,
156-
key=activity["message_id"].encode("utf-8"),
157-
value=activity["payload"].encode("utf-8"),
156+
key=activity["message_id"].encode("utf-8", errors="replace"),
157+
value=activity["payload"].encode("utf-8", errors="replace"),
158158
)
159159
for activity in activities_kafka
160160
]

services/apps/git_integration/src/crowdgit/services/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def _safe_decode(data: bytes) -> str:
3939

4040
# CP1252 is common for Windows-generated content and has specific byte mappings
4141
# ISO-8859-1 is a common legacy encoding for Western European languages
42-
for encoding in ("cp1252", "iso-8859-1"):
42+
for encoding in ("iso-8859-1", "cp1252"):
4343
logger.info(f"Trying {encoding} decoding")
4444
try:
4545
return data.decode(encoding)

0 commit comments

Comments
 (0)