Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Down migration: Remove metrics column and index from serviceExecutions table

-- Drop index first
DROP INDEX IF EXISTS git."idx_serviceExecutions_metrics";

-- Drop column
ALTER TABLE git."serviceExecutions"
DROP COLUMN IF EXISTS metrics;

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- Add metrics column to serviceExecutions table for storing service-specific execution metrics
-- Examples: ai_cost for maintainer service, total_commits/bad_commits/total_activities for commit service

ALTER TABLE git."serviceExecutions"
ADD COLUMN metrics JSONB DEFAULT '{}'::jsonb;

-- Create GIN index for efficient querying within JSONB data
CREATE INDEX IF NOT EXISTS "idx_serviceExecutions_metrics"
ON git."serviceExecutions" USING gin (metrics);

5 changes: 3 additions & 2 deletions services/apps/git_integration/src/crowdgit/database/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ async def save_service_execution(service_execution: ServiceExecution) -> None:
sql_query = """
INSERT INTO git."serviceExecutions" (
"repoId", "operationType", "status", "errorCode",
"errorMessage", "executionTimeSec"
"errorMessage", "executionTimeSec", "metrics"
)
VALUES ($1, $2, $3, $4, $5, $6)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"""

db_data = service_execution.to_db_dict()
Expand All @@ -306,6 +306,7 @@ async def save_service_execution(service_execution: ServiceExecution) -> None:
db_data["errorCode"],
db_data["errorMessage"],
db_data["executionTimeSec"],
db_data["metrics"],
),
)
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from decimal import Decimal
from typing import Any

import orjson
from pydantic import BaseModel, Field

from crowdgit.enums import ExecutionStatus, OperationType
Expand All @@ -15,6 +16,9 @@ class ServiceExecution(BaseModel):
error_code: str | None = Field(None, description="Custom error code")
error_message: str | None = Field(None, description="Detailed error message")
execution_time_sec: Decimal = Field(..., description="Execution time in seconds")
metrics: dict[str, Any] = Field(
default_factory=dict, description="Service-specific execution metrics"
)

def to_db_dict(self) -> dict[str, Any]:
"""Convert create model to database dictionary"""
Expand All @@ -25,4 +29,5 @@ def to_db_dict(self) -> dict[str, Any]:
"errorCode": self.error_code,
"errorMessage": self.error_message,
"executionTimeSec": self.execution_time_sec,
"metrics": orjson.dumps(self.metrics).decode(),
Copy link

Copilot AI Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The orjson.dumps() call will return bytes, but PostgreSQL JSONB columns expect JSON text or can accept the dict directly. Since the metrics parameter in the INSERT query ($7) is passed as a string after .decode(), PostgreSQL will need to parse it. Consider passing self.metrics directly as a dict instead of serializing it, which allows the database driver to handle JSON serialization natively and is more efficient.

Suggested change
"metrics": orjson.dumps(self.metrics).decode(),
"metrics": self.metrics,

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ async def process_single_batch_commits(
"execution_status": ExecutionStatus.SUCCESS,
"error_code": None,
"error_message": None,
"total_commits": 0,
"processed_commits": 0,
"bad_commits": 0,
"total_activities": 0,
}

batch_start_time = time.time()
Expand Down Expand Up @@ -155,6 +159,12 @@ async def process_single_batch_commits(
execution_time_sec=Decimal(
str(round(self._metrics_context["total_execution_time"], 2))
),
metrics={
"total_commits": self._metrics_context["total_commits"],
"processed_commits": self._metrics_context["processed_commits"],
"bad_commits": self._metrics_context["bad_commits"],
"total_activities": self._metrics_context["total_activities"],
},
)
await save_service_execution(service_execution)
# Reset metrics context after saving
Expand Down Expand Up @@ -185,6 +195,12 @@ async def process_single_batch_commits(
execution_time_sec=Decimal(
str(round(self._metrics_context["total_execution_time"], 2))
),
metrics={
"total_commits": self._metrics_context["total_commits"],
"processed_commits": self._metrics_context["processed_commits"],
"bad_commits": self._metrics_context["bad_commits"],
"total_activities": self._metrics_context["total_activities"],
},
)
await save_service_execution(service_execution)
# Reset metrics context after saving
Expand Down Expand Up @@ -623,6 +639,12 @@ async def process_commits_chunk(
f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}"
)

# Update metrics context
if self._metrics_context:
self._metrics_context["processed_commits"] += processed_commits
self._metrics_context["bad_commits"] += bad_commits
self._metrics_context["total_activities"] += len(activities_db)

# Write activities to database and queue
if activities_db:
await asyncio.gather(
Expand Down Expand Up @@ -651,6 +673,11 @@ async def _process_activities_from_commits(
gc.collect()

logger.info(f"Actual number of commits to be processed: {len(commit_texts)}")

# Update total_commits metric
if self._metrics_context:
self._metrics_context["total_commits"] += len(commit_texts)

if len(commit_texts) == 0:
self.logger.info("No commits to be processed")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,8 @@ async def process_maintainers(
error_code = None
error_message = None
latest_maintainer_file = None
ai_cost = 0.0
maintainers_found = 0

try:
owner, repo_name = parse_repo_url(batch_info.remote)
Expand All @@ -454,8 +456,10 @@ async def process_maintainers(
self.logger.info(f"Starting maintainers processing for repo: {batch_info.remote}")
maintainers = await self.extract_maintainers(batch_info.repo_path, owner, repo_name)
latest_maintainer_file = maintainers.maintainer_file
ai_cost = maintainers.total_cost
maintainers_found = len(maintainers.maintainer_info)
self.logger.info(
f"Extracted {len(maintainers.maintainer_info)} maintainers from {latest_maintainer_file} file"
f"Extracted {maintainers_found} maintainers from {latest_maintainer_file} file"
)
await self.save_maintainers(
repository.id,
Expand All @@ -469,6 +473,9 @@ async def process_maintainers(
error_code = (
e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value
)
# Capture AI cost even on error if it's a CrowdGitError with ai_cost
if isinstance(e, CrowdGitError) and hasattr(e, "ai_cost"):
Comment on lines +476 to +477
Copy link

Copilot AI Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The comment references capturing AI cost from CrowdGitError with ai_cost attribute, but this pattern assumes a specific error structure that may not be consistently available. Consider documenting which specific CrowdGitError subclasses provide the ai_cost attribute, or implement a more explicit protocol (e.g., a mixin or base class method) to make this contract clearer and more maintainable.

Suggested change
# Capture AI cost even on error if it's a CrowdGitError with ai_cost
if isinstance(e, CrowdGitError) and hasattr(e, "ai_cost"):
# Capture AI cost even on error if it's a CrowdGitError that implements the AICostCarrier protocol.
# See: crowdgit.errors.AICostCarrier
if isinstance(e, CrowdGitError) and isinstance(e, getattr(__import__("crowdgit.errors", fromlist=["AICostCarrier"]), "AICostCarrier", ())):

Copilot uses AI. Check for mistakes.
ai_cost = e.ai_cost

self.logger.error(f"Maintainer processing failed: {error_message}")
finally:
Expand All @@ -484,5 +491,9 @@ async def process_maintainers(
error_code=error_code,
error_message=error_message,
execution_time_sec=execution_time,
metrics={
"ai_cost": ai_cost,
"maintainers_found": maintainers_found,
},
)
await save_service_execution(service_execution)
Loading