Skip to content

Commit c07ef5f

Browse files
committed
Add per-repo concurrency limits via MAX_CONCURRENT_PER_REPO
In-memory counter keyed by repo_full_name. Returns 429 when limit is reached. Uses try/finally to ensure counter decrements on all exit paths. None or 0 (default) means unlimited.
1 parent eb1161b commit c07ef5f

1 file changed

Lines changed: 116 additions & 65 deletions

File tree

app.py

Lines changed: 116 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ def format(self, record: logging.LogRecord) -> str:
8484
# HTTP client timeout
8585
HTTP_TIMEOUT_SECONDS = 30.0
8686

87+
# Per-repo concurrency limiting (None or 0 = unlimited)
88+
_concurrent_jobs: dict[str, int] = {}
89+
MAX_CONCURRENT_PER_REPO = os.environ.get("MAX_CONCURRENT_PER_REPO")
90+
if MAX_CONCURRENT_PER_REPO is not None:
91+
MAX_CONCURRENT_PER_REPO = int(MAX_CONCURRENT_PER_REPO)
92+
if MAX_CONCURRENT_PER_REPO <= 0:
93+
MAX_CONCURRENT_PER_REPO = None
94+
8795
# =============================================================================
8896
# TRUST MODEL
8997
# =============================================================================
@@ -496,6 +504,34 @@ async def github_webhook(request: Request):
496504
_cleanup_job_cache()
497505
_processed_jobs[str(job_id)] = current_time
498506

507+
if MAX_CONCURRENT_PER_REPO is not None:
508+
current_concurrent = _concurrent_jobs.get(repo_full_name, 0)
509+
if current_concurrent >= MAX_CONCURRENT_PER_REPO:
510+
logger.warning(
511+
"Per-repo concurrency limit reached",
512+
extra={
513+
"job_id": job_id,
514+
"repo": repo_full_name,
515+
"current": current_concurrent,
516+
"limit": MAX_CONCURRENT_PER_REPO,
517+
"error_code": "concurrency_limit",
518+
},
519+
)
520+
raise HTTPException(
521+
status_code=429,
522+
detail="Too many concurrent jobs for this repository",
523+
)
524+
_concurrent_jobs[repo_full_name] = current_concurrent + 1
525+
logger.info(
526+
"Incremented per-repo concurrency counter",
527+
extra={
528+
"job_id": job_id,
529+
"repo": repo_full_name,
530+
"current": current_concurrent + 1,
531+
"limit": MAX_CONCURRENT_PER_REPO,
532+
},
533+
)
534+
499535
# Fetch configuration from environment with defaults
500536
runner_group_id = int(os.environ.get("RUNNER_GROUP_ID", 1))
501537

@@ -516,72 +552,72 @@ async def github_webhook(request: Request):
516552
"work_directory": "_work",
517553
}
518554

519-
logger.info(
520-
"Requesting JIT config",
521-
extra={"job_id": job_id, "repo": repo_full_name},
522-
)
555+
try:
556+
logger.info(
557+
"Requesting JIT config",
558+
extra={"job_id": job_id, "repo": repo_full_name},
559+
)
523560

524-
api_start = time.monotonic()
525-
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS) as client:
526-
try:
527-
response = await _call_github_api(
528-
client,
529-
"POST",
530-
f"{repo_url}/actions/runners/generate-jitconfig",
531-
headers=headers,
532-
json=data,
533-
)
534-
duration = time.monotonic() - api_start
535-
jit_config = response.json()["encoded_jit_config"]
536-
logger.info(
537-
"Received JIT config",
538-
extra={
539-
"job_id": job_id,
540-
"repo": repo_full_name,
541-
"status": response.status_code,
542-
"duration": duration,
543-
},
544-
)
545-
except httpx.HTTPStatusError as e:
546-
duration = time.monotonic() - api_start
547-
sanitized_error = _sanitize_error_message(e.response.text)
548-
logger.error(
549-
"GitHub API error",
550-
extra={
551-
"job_id": job_id,
552-
"repo": repo_full_name,
553-
"status": e.response.status_code,
554-
"duration": duration,
555-
"error_code": sanitized_error,
556-
},
557-
)
558-
raise HTTPException(
559-
status_code=e.response.status_code,
560-
detail="Failed to generate JIT config",
561-
)
562-
except httpx.TimeoutException:
563-
duration = time.monotonic() - api_start
564-
logger.error(
565-
"GitHub API timeout",
566-
extra={"job_id": job_id, "repo": repo_full_name, "duration": duration, "error_code": "timeout"},
567-
)
568-
raise HTTPException(status_code=504, detail="GitHub API timeout")
569-
except Exception as e:
570-
duration = time.monotonic() - api_start
571-
logger.error(
572-
"Unexpected error calling GitHub API",
573-
extra={
574-
"job_id": job_id,
575-
"repo": repo_full_name,
576-
"duration": duration,
577-
"error_code": type(e).__name__,
578-
},
579-
)
580-
raise HTTPException(status_code=500, detail="Internal server error")
561+
api_start = time.monotonic()
562+
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS) as client:
563+
try:
564+
response = await _call_github_api(
565+
client,
566+
"POST",
567+
f"{repo_url}/actions/runners/generate-jitconfig",
568+
headers=headers,
569+
json=data,
570+
)
571+
duration = time.monotonic() - api_start
572+
jit_config = response.json()["encoded_jit_config"]
573+
logger.info(
574+
"Received JIT config",
575+
extra={
576+
"job_id": job_id,
577+
"repo": repo_full_name,
578+
"status": response.status_code,
579+
"duration": duration,
580+
},
581+
)
582+
except httpx.HTTPStatusError as e:
583+
duration = time.monotonic() - api_start
584+
sanitized_error = _sanitize_error_message(e.response.text)
585+
logger.error(
586+
"GitHub API error",
587+
extra={
588+
"job_id": job_id,
589+
"repo": repo_full_name,
590+
"status": e.response.status_code,
591+
"duration": duration,
592+
"error_code": sanitized_error,
593+
},
594+
)
595+
raise HTTPException(
596+
status_code=e.response.status_code,
597+
detail="Failed to generate JIT config",
598+
)
599+
except httpx.TimeoutException:
600+
duration = time.monotonic() - api_start
601+
logger.error(
602+
"GitHub API timeout",
603+
extra={"job_id": job_id, "repo": repo_full_name, "duration": duration, "error_code": "timeout"},
604+
)
605+
raise HTTPException(status_code=504, detail="GitHub API timeout")
606+
except Exception as e:
607+
duration = time.monotonic() - api_start
608+
logger.error(
609+
"Unexpected error calling GitHub API",
610+
extra={
611+
"job_id": job_id,
612+
"repo": repo_full_name,
613+
"duration": duration,
614+
"error_code": type(e).__name__,
615+
},
616+
)
617+
raise HTTPException(status_code=500, detail="Internal server error")
581618

582-
logger.info("Spawning sandbox", extra={"job_id": job_id, "repo": repo_full_name})
619+
logger.info("Spawning sandbox", extra={"job_id": job_id, "repo": repo_full_name})
583620

584-
try:
585621
cmd = (
586622
"bash -c '/start-dockerd.sh &'"
587623
" && sleep 5"
@@ -609,11 +645,26 @@ async def github_webhook(request: Request):
609645
extra={"job_id": job_id, "repo": repo_full_name},
610646
)
611647

648+
return {"status": "provisioned", "job_id": job_id}
649+
650+
except HTTPException:
651+
raise
612652
except Exception as e:
613653
logger.error(
614654
"Failed to spawn sandbox",
615655
extra={"job_id": job_id, "error_code": type(e).__name__},
616656
)
617657
raise HTTPException(status_code=500, detail="Failed to spawn runner sandbox")
618-
619-
return {"status": "provisioned", "job_id": job_id}
658+
finally:
659+
if MAX_CONCURRENT_PER_REPO is not None:
660+
_concurrent_jobs[repo_full_name] = max(
661+
0, _concurrent_jobs.get(repo_full_name, 0) - 1
662+
)
663+
logger.info(
664+
"Decremented per-repo concurrency counter",
665+
extra={
666+
"repo": repo_full_name,
667+
"current": _concurrent_jobs.get(repo_full_name, 0),
668+
"limit": MAX_CONCURRENT_PER_REPO,
669+
},
670+
)

0 commit comments

Comments
 (0)