|
8 | 8 | from crowdgit.errors import RepoLockingError |
9 | 9 | from crowdgit.models.repository import Repository |
10 | 10 | from crowdgit.models.service_execution import ServiceExecution |
11 | | -from crowdgit.settings import REPOSITORY_UPDATE_INTERVAL_HOURS |
| 11 | +from crowdgit.settings import MAX_CONCURRENT_ONBOARDINGS, REPOSITORY_UPDATE_INTERVAL_HOURS |
12 | 12 |
|
13 | 13 | from .connection import get_db_connection |
14 | 14 | from .registry import execute, executemany, fetchrow, fetchval, query |
@@ -38,24 +38,35 @@ async def get_repository_by_url(url: str) -> dict[str, Any] | None: |
38 | 38 |
|
39 | 39 | async def acquire_onboarding_repo() -> Repository | None: |
40 | 40 | onboarding_repo_sql_query = """ |
| 41 | + WITH current_onboarding_count AS ( |
| 42 | + -- Count repositories currently being onboarded (processing + never processed before) |
| 43 | + SELECT COUNT(*) as count |
| 44 | + FROM git.repositories |
| 45 | + WHERE state = $1 |
| 46 | + AND "lastProcessedCommit" IS NULL |
| 47 | + AND "deletedAt" IS NULL |
| 48 | + ) |
41 | 49 | UPDATE git.repositories |
42 | 50 | SET "lockedAt" = NOW(), |
43 | 51 | state = $1, |
44 | 52 | "updatedAt" = NOW() |
45 | 53 | WHERE id = ( |
46 | | - SELECT id |
47 | | - FROM git.repositories |
48 | | - WHERE state = $2 |
49 | | - AND "lockedAt" IS NULL |
50 | | - AND "deletedAt" IS NULL |
51 | | - ORDER BY priority ASC, "createdAt" ASC |
| 54 | + SELECT r.id |
| 55 | + FROM git.repositories r |
| 56 | + CROSS JOIN current_onboarding_count c |
| 57 | + WHERE r.state = $2 |
| 58 | + AND r."lockedAt" IS NULL |
| 59 | + AND r."deletedAt" IS NULL |
| 60 | + AND c.count < $3 -- Only proceed if under the limit |
| 61 | + ORDER BY r.priority ASC, r."createdAt" ASC |
52 | 62 | LIMIT 1 |
53 | 63 | FOR UPDATE SKIP LOCKED |
54 | 64 | ) |
55 | 65 | RETURNING id, url, state, priority, "lastProcessedAt", "lastProcessedCommit", "lockedAt", "createdAt", "updatedAt", "segmentId", "integrationId", "maintainerFile", "lastMaintainerRunAt", "branch" |
56 | 66 | """ |
57 | 67 | return await acquire_repository( |
58 | | - onboarding_repo_sql_query, (RepositoryState.PROCESSING, RepositoryState.PENDING) |
| 68 | + onboarding_repo_sql_query, |
| 69 | + (RepositoryState.PROCESSING, RepositoryState.PENDING, MAX_CONCURRENT_ONBOARDINGS), |
59 | 70 | ) |
60 | 71 |
|
61 | 72 |
|
|
0 commit comments