Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
class CommitService(BaseService):
"""Service for processing repository commits"""

COMMIT_END_SPLITTER = "--CROWD-END-OF-COMMIT--"
COMMIT_START_SPLITTER = "---CROWD_COMMIT_START---"
NUMSTAT_SPLITTER = "---CROWD_NUMSTAT_START---"
MIN_COMMIT_FIELDS = 8
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
FUTURE_DATE_THRESHOLD_DAYS = 1
Expand Down Expand Up @@ -76,7 +77,7 @@ def cleanup_process_pool(self):
@property
def git_log_format(self) -> str:
"""Git log format string with commit splitter"""
return f"%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.COMMIT_END_SPLITTER}"
return f"{self.COMMIT_START_SPLITTER}%n%H%n%aI%n%an%n%ae%n%cI%n%cn%n%ce%n%P%n%d%n%B%n{self.NUMSTAT_SPLITTER}"

@staticmethod
def is_valid_commit_hash(commit_hash: str) -> bool:
Expand Down Expand Up @@ -119,13 +120,8 @@ async def process_single_batch_commits(
Process commits from a cloned batch.

Args:
repo_path: Path to the Git repository.
edge_commit: The edge commit for the current batch. It should be excluded from processing because its data may be incomplete or inaccurate.
prev_batch_edge_commit: The edge commit from the previous batch. Which is used as the starting point (included) for the current batch processing.
remote: Remote repository URL
segment_id: Segment identifier
integration_id: Integration identifier
is_final_batch: Whether this is the final batch (triggers metrics saving)
repository: Repository object containing segment and integration info
batch_info: Clone batch information with paths and commit boundaries
"""
# Initialize metrics context on first call
if self._metrics_context is None:
Expand All @@ -143,7 +139,7 @@ async def process_single_batch_commits(
self.logger.info(
f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}"
)
raw_commits, raw_numstats = await self._execute_git_log(
raw_commits = await self._execute_git_log(
batch_info.repo_path,
batch_info.clone_with_batches,
batch_info.prev_batch_edge_commit,
Expand All @@ -152,7 +148,6 @@ async def process_single_batch_commits(

await self._process_activities_from_commits(
raw_commits,
raw_numstats,
batch_info.repo_path,
batch_info.edge_commit,
batch_info.remote,
Expand Down Expand Up @@ -222,71 +217,41 @@ async def _get_commit_reference(self, repo_path: str) -> str:
return "HEAD"
return f"origin/{default_branch}"

def _build_git_log_commands(
self, repo_path: str, commit_range: str
) -> tuple[list[str], list[str]]:
def _build_git_log_command(self, repo_path: str, commit_range: str) -> list[str]:
"""Build git log commands for commits and numstats."""
raw_commits_cmd = [
"git",
"-C",
repo_path,
"log",
commit_range,
f"--pretty=format:{self.git_log_format}",
]
raw_insertions_deletions_cmd = [
return [
"git",
"-C",
repo_path,
"log",
commit_range,
"--pretty=format:%H",
"--cc",
"--numstat",
f"--pretty=format:{self.git_log_format}",
]
return raw_commits_cmd, raw_insertions_deletions_cmd

@staticmethod
def _parse_numstats(raw_numstats: str) -> dict[str, tuple[int, int]]:
def _parse_numstats(raw_numstats: str) -> tuple[int, int]:
"""
Parse raw numstats into commit_hash -> (insertions, deletions) mapping.
Parse raw numstats into -> (insertions, deletions).

Args:
raw_numstats: Raw output from git log --numstat --pretty=format:%H

Returns:
Dictionary mapping commit hashes to (insertions, deletions) tuples
(insertions, deletions) tuple
"""
if not raw_numstats.strip():
return {}

changes = {}
commits_texts = raw_numstats.split("\n\n")

for commit_text in commits_texts:
commit_lines = commit_text.strip().splitlines()
insertions_deletions = raw_numstats.strip().splitlines()
insertions = 0
deletions = 0

if len(commit_lines) < 2:
continue

commit_hash = commit_lines[0]
if not CommitService.is_valid_commit_hash(commit_hash):
logger.error("Invalid insertions/deletions hash found: hash=%s", commit_hash)
continue

insertions_deletions = commit_lines[1:]
insertions = 0
deletions = 0

for line in insertions_deletions:
match = re.match(r"^(\d+)\s+(\d+)", line)
if match:
insertions += int(match.group(1))
deletions += int(match.group(2))

changes[commit_hash] = (insertions, deletions)
for line in insertions_deletions:
match = re.match(r"^(\d+)\s+(\d+)", line)
if match:
insertions += int(match.group(1))
deletions += int(match.group(2))

return changes
return (insertions, deletions)

@retry(
stop=stop_after_attempt(5),
Expand All @@ -299,7 +264,7 @@ async def _execute_git_log(
clone_with_batches: bool,
prev_batch_edge_commit: str | None = None,
edge_commit: str | None = None,
) -> tuple[str, str]:
) -> str:
"""Execute git log command and return raw output."""
# Ensure abbreviated commits are disabled
await run_shell_command(
Expand All @@ -313,16 +278,11 @@ async def _execute_git_log(
self.logger.info(
f"Full repo cloned in single batch, getting all commits in {commit_reference}"
)
raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands(
repo_path, commit_reference
)
return await asyncio.gather(
run_shell_command(raw_commits_cmd),
run_shell_command(raw_insertions_deletions_cmd),
)
raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference)
return await run_shell_command(raw_commits_cmd)

if not prev_batch_edge_commit:
return "", ""
return ""

if edge_commit:
# Middle batches: Get the slice of history between the last batch's edge and this one's.
Expand All @@ -335,14 +295,10 @@ async def _execute_git_log(
commit_range = prev_batch_edge_commit
self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root")

raw_commits_cmd, raw_insertions_deletions_cmd = self._build_git_log_commands(
repo_path, commit_range
)
raw_commits_cmd = self._build_git_log_command(repo_path, commit_range)

self.logger.info(f"Executing git log for range: {commit_range}")
return await asyncio.gather(
run_shell_command(raw_commits_cmd), run_shell_command(raw_insertions_deletions_cmd)
)
return await run_shell_command(raw_commits_cmd)

@staticmethod
def should_skip_commit(raw_commit: str | None, edge_commit: str | None) -> bool:
Expand Down Expand Up @@ -376,6 +332,7 @@ def create_activity(
activity_type: Type of activity
member: Member information dictionary
source_id: Source ID for the activity
segment_id: Segment identifier
source_parent_id: Parent source ID (optional)

Returns:
Expand Down Expand Up @@ -623,13 +580,12 @@ def create_activities_from_commit(
@staticmethod
def process_commits_chunk(
commit_texts_chunk: list[str | None],
numstats_map: dict[str, tuple[int, int]],
repo_path: str,
edge_commit_hash: str | None,
remote: str,
segment_id: str,
integration_id: str,
) -> list[tuple]:
) -> tuple[list[tuple], list[dict]]:
"""
Process a chunk of raw commit texts into activities.

Expand All @@ -639,35 +595,39 @@ def process_commits_chunk(

Args:
commit_texts_chunk: List of commit text strings to process
numstats_map: Pre-parsed mapping of commit_hash -> (insertions, deletions)
repo_path: Path to the repository
edge_commit_hash: Edge commit hash for filtering
remote: Remote repository URL
segment_id: Segment identifier
integration_id: Integration identifier

Returns:
List of activity dictionaries for database insertion
Tuple of (activities_db, activities_queue) for database and queue
"""
activities_db = []
activities_queue = []
bad_commits = 0
processed_commits = 0
commit = None

for commit_text in commit_texts_chunk:
if CommitService.should_skip_commit(commit_text, edge_commit_hash):
for full_commit_text in commit_texts_chunk:
if CommitService.should_skip_commit(full_commit_text, edge_commit_hash):
continue

commit_text, numstats_text = full_commit_text.split(CommitService.NUMSTAT_SPLITTER)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Commit Splitting Fails on Incorrect NUMSTAT Splitter Usage

The split() operation on full_commit_text expects exactly two parts: commit metadata and numstat lines, separated by NUMSTAT_SPLITTER. If the splitter appears zero or multiple times (e.g., within a commit message), unpacking the result will raise a ValueError, causing commit processing to fail.

Fix in Cursor Fix in Web

commit_lines = commit_text.strip().splitlines()
del full_commit_text
del commit_text
if not CommitService._validate_commit_structure(commit_lines):
logger.warning(
f"Invalid commit structure in {repo_path}: {len(commit_lines)} fields"
)
bad_commits += 1
del commit_lines
del numstats_text
continue

try:
commit = CommitService._construct_commit_dict(commit_lines, numstats_map)
commit = CommitService._construct_commit_dict(commit_lines, numstats_text)
if CommitService._validate_commit_data(commit):
activity_db_records, activity_kafka = (
CommitService.create_activities_from_commit(
Expand All @@ -676,6 +636,8 @@ def process_commits_chunk(
)
activities_db.extend(activity_db_records)
activities_queue.extend(activity_kafka)
del activity_db_records
del activity_kafka
processed_commits += 1
else:
bad_commits += 1
Expand All @@ -684,6 +646,10 @@ def process_commits_chunk(
logger.warning(f"Failed to parse commit in {repo_path}: {e}")
bad_commits += 1
continue
finally:
del commit
del commit_lines
del numstats_text

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unconditional Deletion in finally Block

The finally block in process_commits_chunk attempts to del variables like commit, commit_lines, and numstats_text unconditionally. This can cause a NameError if an exception occurs before these variables are defined within an iteration, or if commit was already deleted in a previous loop iteration.

Fix in Cursor Fix in Web


logger.info(
f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}"
Expand All @@ -694,36 +660,32 @@ def process_commits_chunk(
async def _process_activities_from_commits(
self,
raw_commits: str,
raw_numstats: str,
repo_path: str,
edge_commit_hash: str | None,
remote: str,
segment_id: str,
integration_id: str,
):
"""
Parse raw git log output into commit dictionaries.
Parse raw git log output, process commits into activities, and save to database.
"""
commit_texts = [
c.strip() for c in raw_commits.split(self.COMMIT_END_SPLITTER) if c.strip()
c.strip() for c in raw_commits.split(self.COMMIT_START_SPLITTER) if c.strip()
]
del raw_commits
logger.info(f"Actual number of commits to be processed: {len(commit_texts)}")
if len(commit_texts) == 0:
self.logger.info("No commits to be processed")
return

# Pre-parse numstats for efficient lookup during commit processing
self.logger.info("Pre-parsing numstats data...")
numstats_map = CommitService._parse_numstats(raw_numstats)
self.logger.info(f"Parsed numstats for {len(numstats_map)} commits")

chunk_size = min(max(20, len(commit_texts) // MAX_WORKER_PROCESSES), self.MAX_CHUNK_SIZE)

self.logger.info(f"Spliting commits into chunks of {chunk_size}")
chunks = [
commit_texts[i : i + chunk_size] for i in range(0, len(commit_texts), chunk_size)
]
self.logger.info(f"Total commits {len(commit_texts)} chunks {len(chunks)}")
del commit_texts # to release memory
loop = asyncio.get_event_loop()

executor = self._get_or_create_pool()
Expand All @@ -734,7 +696,6 @@ async def _process_activities_from_commits(
executor,
CommitService.process_commits_chunk,
chunk,
numstats_map,
repo_path,
edge_commit_hash,
remote,
Expand Down Expand Up @@ -762,6 +723,8 @@ async def _process_activities_from_commits(
batch_insert_activities(chunk_activities_db),
self.queue_service.send_batch_activities(chunk_activities_queue),
)
del chunk_activities_db
del chunk_activities_queue
except Exception as e:
if isinstance(e, BrokenProcessPool):
self.logger.warning(
Expand All @@ -772,7 +735,7 @@ async def _process_activities_from_commits(

@staticmethod
def _construct_commit_dict(
commit_metadata_lines: list[str], numstats_map: dict[str, tuple[int, int]]
commit_metadata_lines: list[str], numstats_text: str
) -> dict[str, Any]:
"""Create commit dictionary from parsed lines."""
commit_hash = commit_metadata_lines[0]
Expand Down Expand Up @@ -806,8 +769,11 @@ def _construct_commit_dict(
commit_datetime, author_datetime
)

# Get insertions/deletions from pre-parsed numstats map
insertions, deletions = numstats_map.get(commit_hash, (0, 0))
# Parse numstats to get insertions/deletions
insertions, deletions = CommitService._parse_numstats(numstats_text)
# release memory
del numstats_text
del commit_metadata_lines

return {
"hash": commit_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ async def send_batch_activities(self, activities_kafka: list[dict[str, str]]):
futures = [
self.kafka_producer.send(
topic=self.kafka_topic,
key=activity["message_id"].encode("utf-8"),
value=activity["payload"].encode("utf-8"),
key=activity["message_id"].encode("utf-8", errors="replace"),
value=activity["payload"].encode("utf-8", errors="replace"),
)
for activity in activities_kafka
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _safe_decode(data: bytes) -> str:

# CP1252 is common for Windows-generated content and has specific byte mappings
# ISO-8859-1 is a common legacy encoding for Western European languages
for encoding in ("cp1252", "iso-8859-1"):
for encoding in ("iso-8859-1", "cp1252"):
logger.info(f"Trying {encoding} decoding")
try:
return data.decode(encoding)
Expand Down
Loading