Skip to content

Commit 4fd622d

Browse files
committed
Fix concurrency: Use Queue + Semaphore pattern
1 parent 49b37cc commit 4fd622d

1 file changed

Lines changed: 49 additions & 16 deletions

File tree

app.py

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import time
99
import re
10+
import asyncio
1011
from urllib.parse import urlparse
1112
from fastapi import Request, HTTPException
1213

@@ -189,20 +190,12 @@ async def verify_signature(request: Request, body: bytes) -> str | None:
189190
return delivery_id
190191

191192

192-
@app.function(image=runner_image, secrets=[github_secret])
193-
@modal.concurrent(max_inputs=DEFAULT_MAX_PARALLEL)
194-
async def spawn_runner_sandbox(job_data: dict):
195-
"""Worker that spawns sandboxes with controlled concurrency using @modal.concurrent.
196-
197-
This runs with max_inputs=DEFAULT_MAX_PARALLEL to match GitHub's max-parallel,
198-
ensuring we don't spawn more sandboxes than GitHub can use.
199-
"""
193+
async def _spawn_single_sandbox(job_data: dict):
194+
"""Helper to spawn a single sandbox and wait for completion."""
200195
job_id = job_data["job_id"]
201-
repo_url = job_data["repo_url"]
202-
job_labels = job_data.get("job_labels", [])
203196
jit_config = job_data["jit_config"]
204197

205-
logger.info(f"Worker spawning sandbox for job {job_id}")
198+
logger.info(f"Spawning sandbox for job {job_id}")
206199

207200
try:
208201
cmd = "cd /actions-runner && export RUNNER_ALLOW_RUNASROOT=1 && ./run.sh --jitconfig $GHA_JIT_CONFIG"
@@ -219,7 +212,7 @@ async def spawn_runner_sandbox(job_data: dict):
219212

220213
sandbox.set_tags({"job_id": str(job_id)})
221214

222-
# Wait for sandbox to complete (blocks this worker slot)
215+
# Wait for completion
223216
sandbox.wait()
224217

225218
logger.info(f"Sandbox for job {job_id} completed")
@@ -229,6 +222,47 @@ async def spawn_runner_sandbox(job_data: dict):
229222
raise
230223

231224

225+
@app.function(image=runner_image, secrets=[github_secret])
226+
async def process_job_batch():
227+
"""Process jobs from queue with controlled concurrency using asyncio.Semaphore.
228+
229+
This function runs continuously, pulling jobs from the queue and spawning
230+
sandboxes with max DEFAULT_MAX_PARALLEL concurrent executions.
231+
"""
232+
semaphore = asyncio.Semaphore(DEFAULT_MAX_PARALLEL)
233+
234+
logger.info(
235+
f"Starting job processor with max {DEFAULT_MAX_PARALLEL} concurrent sandboxes"
236+
)
237+
238+
while True:
239+
try:
240+
# Get job from queue (blocks until available)
241+
job_data = await job_queue.get.aio()
242+
243+
if job_data is None:
244+
# Sentinel value to stop processing
245+
break
246+
247+
# Acquire semaphore before spawning
248+
await semaphore.acquire()
249+
250+
# Spawn sandbox in background task so we can continue to next job
251+
asyncio.create_task(_process_with_semaphore(semaphore, job_data))
252+
253+
except Exception as e:
254+
logger.error(f"Error in job processor: {type(e).__name__}: {e}")
255+
await asyncio.sleep(1)
256+
257+
258+
async def _process_with_semaphore(semaphore: asyncio.Semaphore, job_data: dict):
259+
"""Process a job and release semaphore when done."""
260+
try:
261+
await _spawn_single_sandbox(job_data)
262+
finally:
263+
semaphore.release()
264+
265+
232266
@app.function(image=runner_image, secrets=[github_secret])
233267
@modal.fastapi_endpoint(method="POST")
234268
async def github_webhook(request: Request):
@@ -403,15 +437,14 @@ async def github_webhook(request: Request):
403437
"jit_config": jit_config,
404438
}
405439

406-
# Spawn worker function to process this job
407-
# @modal.concurrent controls concurrent execution
408-
await spawn_runner_sandbox.spawn.aio(job_data)
440+
# Put job in queue for processor
441+
await job_queue.put.aio(job_data)
409442

410443
logger.info(f"Successfully queued job {job_id}")
411444

412445
except Exception as e:
413446
logger.error(f"Failed to queue job {job_id}: {type(e).__name__}")
414-
raise HTTPException(status_code=500, detail="Failed to spawn runner sandbox")
447+
raise HTTPException(status_code=500, detail="Failed to queue job")
415448

416449
logger.info(f"Successfully queued runner for job {job_id}")
417450
return {"status": "queued", "job_id": job_id}

0 commit comments

Comments
 (0)