Skip to content

Commit 84ae58a

Browse files
committed
feat: process fork repos only if their parent repo were already processed
1 parent 5b7c49b commit 84ae58a

4 files changed

Lines changed: 58 additions & 3 deletions

File tree

services/apps/git_integration/src/crowdgit/database/crud.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,28 @@ async def get_repository_by_url(url: str) -> dict[str, Any] | None:
3636
return dict(result) if result else None
3737

3838

39+
async def get_recently_processed_repository_by_url(url: str) -> Repository | None:
40+
"""
41+
Get repository by URL that was processed within the configured update interval.
42+
43+
Returns the repository only if it was last processed within REPOSITORY_UPDATE_INTERVAL_HOURS
44+
and has a COMPLETED state.
45+
Used to check if a repository needs reprocessing based on the update interval.
46+
"""
47+
sql_query = """
48+
SELECT id, url, state, priority, "lastProcessedAt", "lockedAt", "createdAt", "updatedAt", "maintainerFile", "forkedFrom"
49+
FROM git.repositories
50+
WHERE url = $1
51+
AND "deletedAt" IS NULL
52+
AND "lastProcessedAt" > NOW() - INTERVAL '1 hour' * $2
53+
AND state = $3
54+
"""
55+
result = await fetchrow(
56+
sql_query, (url, REPOSITORY_UPDATE_INTERVAL_HOURS, RepositoryState.COMPLETED)
57+
)
58+
return Repository.from_db(dict(result)) if result else None
59+
60+
3961
async def acquire_onboarding_repo() -> Repository | None:
4062
onboarding_repo_sql_query = """
4163
WITH current_onboarding_count AS (

services/apps/git_integration/src/crowdgit/enums.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class RepositoryState(str, Enum):
2828
PROCESSING = "processing"
2929
COMPLETED = "completed"
3030
FAILED = "failed"
31+
REQUIRES_PARENT = "requires_parent" # fork repo without valid parent repo in out system
3132

3233

3334
class RepositoryPriority(int):

services/apps/git_integration/src/crowdgit/models/repository.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import uuid
24
from datetime import datetime
35
from typing import Any
@@ -36,11 +38,14 @@ class Repository(BaseModel):
3638
None,
3739
description="The source repository URL if this repository is a fork",
3840
)
41+
parent_repo: Repository | None = Field(
42+
None, description="The parent repository (in case of fork) object from our database"
43+
)
3944
created_at: datetime = Field(..., description="Creation timestamp")
4045
updated_at: datetime = Field(..., description="Last update timestamp")
4146

4247
@classmethod
43-
def from_db(cls, db_data: dict[str, Any]) -> "Repository":
48+
def from_db(cls, db_data: dict[str, Any]) -> Repository:
4449
"""Create Repository instance from database data"""
4550
# Convert database field names to model field names
4651
repo_data = db_data.copy()

services/apps/git_integration/src/crowdgit/worker/repository_worker.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from crowdgit.database.crud import (
44
acquire_repo_for_processing,
5+
get_recently_processed_repository_by_url,
56
mark_repo_as_processed,
67
release_repo,
78
update_last_processed_commit,
@@ -135,14 +136,41 @@ def _reset_all_contexts(self) -> None:
135136
for service in services:
136137
service.reset_logger_context()
137138

139+
async def _check_parent_repo_validity(self, repository: Repository) -> bool:
140+
"""
141+
In case of forked repo we need to prevent re-processing activities from parent repo and assigning them to fork, so we need to check:
142+
1. Parent repo already connected/onboarded
143+
2. Parent repo was processed successfully from last run to ensure we have up to date data
144+
also assigns repository.parent_repo if valid
145+
"""
146+
if not repository.forked_from:
147+
return True
148+
logger.info(
149+
f"Repo forked from {repository.forked_from}, checking parent repo validity in our system"
150+
)
151+
await asyncio.sleep(10)
152+
parent_repo = await get_recently_processed_repository_by_url(repository.forked_from)
153+
if not parent_repo:
154+
logger.warning(
155+
f"Parent repo {repository.forked_from} is not found/valid - Aborting processing"
156+
)
157+
return False
158+
logger.info("Parent repo is valid and already processed, proceeding with fork processing")
159+
repository.parent_repo = parent_repo
160+
return True
161+
138162
async def _process_single_repository(self, repository: Repository):
139163
"""Process a single repository through services with full clone for new repos, incremental for existing"""
140164
logger.info("Processing repository: {}", repository.url)
141-
processing_state = RepositoryState.PENDING
165+
processing_state = RepositoryState.FAILED
142166

143167
try:
144168
repo_name = get_repo_name(repository.url)
145169
self._bind_repository_context(repository, repo_name)
170+
valid_parent = await self._check_parent_repo_validity(repository)
171+
if not valid_parent:
172+
processing_state = RepositoryState.REQUIRES_PARENT
173+
return
146174
async for batch_info in self.clone_service.clone_batches_generator(
147175
repository,
148176
working_dir_cleanup=True,
@@ -166,7 +194,6 @@ async def _process_single_repository(self, repository: Repository):
166194
logger.info("Incremental processing completed successfully")
167195
processing_state = RepositoryState.COMPLETED
168196
except Exception as e:
169-
processing_state = RepositoryState.FAILED
170197
logger.error(f"Processing failed with error: {repr(e)}")
171198
finally:
172199
# Reset logger context for all services

0 commit comments

Comments
 (0)