diff --git a/backend/src/database/migrations/U1760530860__addMetricsToServiceExecution.sql b/backend/src/database/migrations/U1760530860__addMetricsToServiceExecution.sql new file mode 100644 index 0000000000..16aa529826 --- /dev/null +++ b/backend/src/database/migrations/U1760530860__addMetricsToServiceExecution.sql @@ -0,0 +1,9 @@ +-- Down migration: Remove metrics column and index from serviceExecutions table + +-- Drop index first +DROP INDEX IF EXISTS git."idx_serviceExecutions_metrics"; + +-- Drop column +ALTER TABLE git."serviceExecutions" +DROP COLUMN IF EXISTS metrics; + diff --git a/backend/src/database/migrations/V1760530860__addMetricsToServiceExecution.sql b/backend/src/database/migrations/V1760530860__addMetricsToServiceExecution.sql new file mode 100644 index 0000000000..a174abf39a --- /dev/null +++ b/backend/src/database/migrations/V1760530860__addMetricsToServiceExecution.sql @@ -0,0 +1,10 @@ +-- Add metrics column to serviceExecutions table for storing service-specific execution metrics +-- Examples: ai_cost for maintainer service, total_commits/bad_commits/total_activities for commit service + +ALTER TABLE git."serviceExecutions" +ADD COLUMN metrics JSONB DEFAULT '{}'::jsonb; + +-- Create GIN index for efficient querying within JSONB data +CREATE INDEX IF NOT EXISTS "idx_serviceExecutions_metrics" +ON git."serviceExecutions" USING gin (metrics); + diff --git a/services/apps/git_integration/src/crowdgit/database/crud.py b/services/apps/git_integration/src/crowdgit/database/crud.py index 7f8df421aa..b9b0fb3d22 100644 --- a/services/apps/git_integration/src/crowdgit/database/crud.py +++ b/services/apps/git_integration/src/crowdgit/database/crud.py @@ -291,9 +291,9 @@ async def save_service_execution(service_execution: ServiceExecution) -> None: sql_query = """ INSERT INTO git."serviceExecutions" ( "repoId", "operationType", "status", "errorCode", - "errorMessage", "executionTimeSec" + "errorMessage", "executionTimeSec", "metrics" ) - VALUES ($1, $2, $3, $4, $5, $6) + VALUES ($1, $2, $3, $4, $5, $6, $7) """ db_data = service_execution.to_db_dict() @@ -306,6 +306,7 @@ async def save_service_execution(service_execution: ServiceExecution) -> None: db_data["errorCode"], db_data["errorMessage"], db_data["executionTimeSec"], + db_data["metrics"], ), ) logger.debug( diff --git a/services/apps/git_integration/src/crowdgit/models/service_execution.py b/services/apps/git_integration/src/crowdgit/models/service_execution.py index 34b774e049..8efd68ad60 100644 --- a/services/apps/git_integration/src/crowdgit/models/service_execution.py +++ b/services/apps/git_integration/src/crowdgit/models/service_execution.py @@ -1,6 +1,7 @@ from decimal import Decimal from typing import Any +import orjson from pydantic import BaseModel, Field from crowdgit.enums import ExecutionStatus, OperationType @@ -15,6 +16,9 @@ class ServiceExecution(BaseModel): error_code: str | None = Field(None, description="Custom error code") error_message: str | None = Field(None, description="Detailed error message") execution_time_sec: Decimal = Field(..., description="Execution time in seconds") + metrics: dict[str, Any] = Field( + default_factory=dict, description="Service-specific execution metrics" + ) def to_db_dict(self) -> dict[str, Any]: """Convert create model to database dictionary""" @@ -25,4 +29,5 @@ def to_db_dict(self) -> dict[str, Any]: "errorCode": self.error_code, "errorMessage": self.error_message, "executionTimeSec": self.execution_time_sec, + "metrics": orjson.dumps(self.metrics).decode(), } diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 488da4e889..9f7f03d055 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -112,6 +112,10 @@ async def process_single_batch_commits( "execution_status": ExecutionStatus.SUCCESS, "error_code": None, "error_message": None, + "total_commits": 0, + "processed_commits": 0, + "bad_commits": 0, + "total_activities": 0, } batch_start_time = time.time() @@ -155,6 +159,12 @@ async def process_single_batch_commits( execution_time_sec=Decimal( str(round(self._metrics_context["total_execution_time"], 2)) ), + metrics={ + "total_commits": self._metrics_context["total_commits"], + "processed_commits": self._metrics_context["processed_commits"], + "bad_commits": self._metrics_context["bad_commits"], + "total_activities": self._metrics_context["total_activities"], + }, ) await save_service_execution(service_execution) # Reset metrics context after saving @@ -185,6 +195,12 @@ async def process_single_batch_commits( execution_time_sec=Decimal( str(round(self._metrics_context["total_execution_time"], 2)) ), + metrics={ + "total_commits": self._metrics_context["total_commits"], + "processed_commits": self._metrics_context["processed_commits"], + "bad_commits": self._metrics_context["bad_commits"], + "total_activities": self._metrics_context["total_activities"], + }, ) await save_service_execution(service_execution) # Reset metrics context after saving @@ -623,6 +639,12 @@ async def process_commits_chunk( f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits in {repo_path}" ) + # Update metrics context + if self._metrics_context: + self._metrics_context["processed_commits"] += processed_commits + self._metrics_context["bad_commits"] += bad_commits + self._metrics_context["total_activities"] += len(activities_db) + # Write activities to database and queue if activities_db: await asyncio.gather( @@ -651,6 +673,11 @@ async def _process_activities_from_commits( gc.collect() logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") + + # Update total_commits metric + if self._metrics_context: + self._metrics_context["total_commits"] += len(commit_texts) + if len(commit_texts) == 0: self.logger.info("No commits to be processed") return diff --git a/services/apps/git_integration/src/crowdgit/services/maintainer/maintainer_service.py b/services/apps/git_integration/src/crowdgit/services/maintainer/maintainer_service.py index 7c6975366d..1ffe97c42a 100644 --- a/services/apps/git_integration/src/crowdgit/services/maintainer/maintainer_service.py +++ b/services/apps/git_integration/src/crowdgit/services/maintainer/maintainer_service.py @@ -439,6 +439,8 @@ async def process_maintainers( error_code = None error_message = None latest_maintainer_file = None + ai_cost = 0.0 + maintainers_found = 0 try: owner, repo_name = parse_repo_url(batch_info.remote) @@ -454,8 +456,10 @@ async def process_maintainers( self.logger.info(f"Starting maintainers processing for repo: {batch_info.remote}") maintainers = await self.extract_maintainers(batch_info.repo_path, owner, repo_name) latest_maintainer_file = maintainers.maintainer_file + ai_cost = maintainers.total_cost + maintainers_found = len(maintainers.maintainer_info) self.logger.info( - f"Extracted {len(maintainers.maintainer_info)} maintainers from {latest_maintainer_file} file" + f"Extracted {maintainers_found} maintainers from {latest_maintainer_file} file" ) await self.save_maintainers( repository.id, @@ -469,6 +473,9 @@ async def process_maintainers( error_code = ( e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value ) + # Capture AI cost even on error if it's a CrowdGitError with ai_cost + if isinstance(e, CrowdGitError) and hasattr(e, "ai_cost"): + ai_cost = e.ai_cost self.logger.error(f"Maintainer processing failed: {error_message}") finally: @@ -484,5 +491,9 @@ async def process_maintainers( error_code=error_code, error_message=error_message, execution_time_sec=execution_time, + metrics={ + "ai_cost": ai_cost, + "maintainers_found": maintainers_found, + }, ) await save_service_execution(service_execution)