feat: experimental API#3
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 908ee1aa38
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@ds-review |
There was a problem hiding this comment.
PR Review
PR: feat: experimental API
Important
Verdict: Request changes - 13 actionable findings, highest severity P0.
Findings (13)
No authentication or authorization
src/blackgeorge/api/main.py:11
The API has no authentication mechanism, allowing anyone with network access to create workers, run jobs, and access all endpoints. This is a critical vulnerability for production deployment.
Resume discards paused report state
src/blackgeorge/api/routes/runs.py:88
The resume endpoint constructs a new Report with empty lists for pending_action, errors, messages, tool_calls, and events, losing the suspended state needed for resumption. Use the original paused report instead of reconstructing it from the run record.
src/blackgeorge/api/routes/workers.py:33
worker.tools is a list attribute, not a method. Calling it with parentheses will raise TypeError. Use worker.tools instead of worker.tools().
Default CORS allows all origins
src/blackgeorge/api/config.py:9
The default cors_origins = ['*'] is permissive and a security risk. Restrict to a safe default (e.g., localhost) and require explicit configuration for production.
Blocking I/O in async route handler
src/blackgeorge/api/routes/runs.py:38
desk.run is likely a synchronous blocking call. In an async FastAPI route, it will block the event loop, degrading performance. Offload it with await run_in_threadpool(desk.run, worker, job, stream=request.stream).
list_runs fetches all records before filtering
src/blackgeorge/api/routes/status.py:38
Fetching all runs and then filtering in Python is inefficient and memory-heavy for large datasets. The store's list_runs should support filtering and pagination at the query level.
Missing pagination on run events endpoint
src/blackgeorge/api/routes/status.py:66
The run events endpoint returns all events for a run without limit or offset. Runs with many events can produce large responses and impact client and server performance.
Direct access to private Desk._workers
src/blackgeorge/api/routes/workers.py:20
The route directly accesses desk._workers, a private attribute. Use a public accessor method (e.g., desk.get_worker(name)) to avoid coupling and future breakage.
Missing pagination on list workers endpoint
src/blackgeorge/api/routes/workers.py:39
The list workers endpoint returns all registered workers without any limit or offset. In large deployments this can lead to large response payloads and excessive memory usage.
Missing pagination on list workforces endpoint
src/blackgeorge/api/routes/workforces.py:56
The list workforces endpoint returns all workforces without pagination, risking large responses and memory pressure when many workforces exist.
src/blackgeorge/api/dependencies.py:13
dependencies.py defines a get_config function that mirrors the one in config.py. Remove the duplicate and import from config.py to avoid confusion.
Hot-path logging may degrade performance
src/blackgeorge/api/middleware.py:14
Logging every incoming request and completed request at INFO level introduces overhead on every HTTP call. In high-traffic scenarios this can become a bottleneck.
src/blackgeorge/api/routes/runs.py:72
The resume endpoint throws a generic HTTPException for non-paused runs. The custom InvalidResumeError is better suited and ensures consistent error responses.
How To Recheck
Reply @ds-review recheck under the relevant inline finding after pushing a fix.
|
|
||
|
|
||
| def create_app() -> FastAPI: | ||
| config = get_config() |
There was a problem hiding this comment.
No authentication or authorization
The API has no authentication mechanism, allowing anyone with network access to create workers, run jobs, and access all endpoints. This is a critical vulnerability for production deployment.
| config = get_config() | |
| Integrate FastAPI's security utilities to add authentication (e.g., API key, OAuth2) and enforce access control. |
| name=worker.name, | ||
| model=worker.model, | ||
| instructions=worker.instructions, | ||
| tools=[t.name for t in worker.tools()], |
There was a problem hiding this comment.
| report = Report( | ||
| run_id=run_id, | ||
| status=record.status, | ||
| content=record.output, | ||
| data=record.output_json, | ||
| pending_action=None, | ||
| metrics={}, | ||
| errors=[], | ||
| messages=[], | ||
| tool_calls=[], | ||
| events=[], | ||
| ) |
There was a problem hiding this comment.
There was a problem hiding this comment.
Can you explain this further ? @ds-review
There was a problem hiding this comment.
In the resume endpoint (line 77), you're creating a new Report with pending_action=None, errors=[], messages=[], tool_calls=[], events=[]—all blank. The paused run’s report had accumulated these fields to track progress (e.g., pending actions, errors, tool calls). Reconstructing it from limited record fields loses that state, so the desk.resume call can't properly pick up where it left off. Instead, the original Report object from the run should be stored and reused.
| metadata=request.metadata, | ||
| ) | ||
|
|
||
| report = desk.run(worker, job, stream=request.stream) |
There was a problem hiding this comment.
Blocking I/O in async route handler
desk.run is likely a synchronous blocking call. In an async FastAPI route, it will block the event loop, degrading performance. Offload it with await run_in_threadpool(desk.run, worker, job, stream=request.stream).
| report = desk.run(worker, job, stream=request.stream) | |
| report = await run_in_threadpool(desk.run, worker, job, stream=request.stream) |
|
|
||
| title: str = "Blackgeorge API" | ||
| version: str = "0.1.0" | ||
| cors_origins: list[str] = ["*"] |
There was a problem hiding this comment.
| manager=workforce.manager.name if workforce.manager else None, | ||
| ) | ||
|
|
||
|
|
There was a problem hiding this comment.
Missing pagination on list workforces endpoint
The list workforces endpoint returns all workforces without pagination, risking large responses and memory pressure when many workforces exist.
| async def list_workforces(desk: Desk = Depends(get_desk), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0)) -> list[WorkforceResponse]: | |
| workforces = list(desk._workforces.values()) | |
| return [ | |
| WorkforceResponse( | |
| name=w.name, | |
| workers=[worker.name for worker in w.workers], | |
| mode=w.mode, | |
| manager=w.manager.name if w.manager else None, | |
| ) | |
| for w in workforces[offset : offset + limit] | |
| ] |
| async def get_run_events(run_id: str, desk: Desk = Depends(get_desk)) -> list[EventResponse]: | ||
| record = desk.run_store.get_run(run_id) | ||
| if not record: | ||
| raise RunNotFoundError(run_id) |
There was a problem hiding this comment.
Missing pagination on run events endpoint
The run events endpoint returns all events for a run without limit or offset. Runs with many events can produce large responses and impact client and server performance.
| raise RunNotFoundError(run_id) | |
| async def get_run_events(run_id: str, desk: Desk = Depends(get_desk), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0)) -> list[EventResponse]: | |
| record = desk.run_store.get_run(run_id) | |
| if not record: | |
| raise RunNotFoundError(run_id) | |
| events = desk.run_store.get_events(run_id) | |
| page = events[offset : offset + limit] | |
| return [ | |
| EventResponse( | |
| event_id=e.event_id, | |
| type=e.type, | |
| timestamp=e.timestamp, | |
| source=e.source, | |
| payload=e.payload, | |
| ) | |
| for e in page | |
| ] |
|
|
||
| def get_config() -> APIConfig: | ||
| global _config | ||
| if _config is None: | ||
| _config = APIConfig() |
There was a problem hiding this comment.
dependencies.py defines a get_config function that mirrors the one in config.py. Remove the duplicate and import from config.py to avoid confusion.
| def get_config() -> APIConfig: | |
| global _config | |
| if _config is None: | |
| _config = APIConfig() | |
| from blackgeorge.api.config import get_config |
| def register_middleware(app: Any) -> None: | ||
| @app.middleware("http") | ||
| async def log_requests( | ||
| request: Request, call_next: Callable[[Request], Awaitable[Response]] |
There was a problem hiding this comment.
Hot-path logging may degrade performance
Logging every incoming request and completed request at INFO level introduces overhead on every HTTP call. In high-traffic scenarios this can become a bottleneck.
| request: Request, call_next: Callable[[Request], Awaitable[Response]] | |
| logger.debug( | |
| "Incoming request", | |
| method=request.method, | |
| path=request.url.path, | |
| client=str(request.client), | |
| ) |
|
|
||
| if record.status != "paused": | ||
| raise HTTPException( | ||
| status_code=400, detail=f"Run '{run_id}' is not paused (status: {record.status})" |
There was a problem hiding this comment.
The resume endpoint throws a generic HTTPException for non-paused runs. The custom InvalidResumeError is better suited and ensures consistent error responses.
| status_code=400, detail=f"Run '{run_id}' is not paused (status: {record.status})" | |
| raise InvalidResumeError(f"Run '{run_id}' is not paused (status: {record.status})") |
Summary of Changes
Added
Changed
Documentation