Skip to content

Commit 7aa296b

Browse files
fix: use Modal async API for proper streaming
- Use modal.Sandbox.create.aio() and sb.exec.aio() - Use async for iteration over stdout - Use process.wait.aio() and sb.terminate.aio() This ensures proper async streaming instead of mixing sync/async.
1 parent e75623a commit 7aa296b

2 files changed

Lines changed: 16 additions & 53 deletions

File tree

src/policyengine_api/agent_sandbox.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,17 @@
4141
logfire_secret = modal.Secret.from_name("logfire-token")
4242

4343

44-
def run_claude_code_in_sandbox(
44+
async def run_claude_code_in_sandbox_async(
4545
question: str,
4646
api_base_url: str = "https://v2.api.policyengine.org",
4747
) -> tuple[modal.Sandbox, any]:
4848
"""Create a sandbox running Claude Code with MCP server configured.
4949
5050
Returns the sandbox and process handle for streaming output.
51+
Uses Modal's async API for proper streaming support.
5152
"""
5253
import logfire
5354

54-
print("[SANDBOX] run_claude_code_in_sandbox starting", flush=True)
5555
logfire.info(
5656
"run_claude_code_in_sandbox: starting",
5757
question=question[:100],
@@ -65,35 +65,20 @@ def run_claude_code_in_sandbox(
6565
mcp_config_json = json.dumps(mcp_config)
6666

6767
# Get reference to deployed app (required when calling from outside Modal)
68-
print("[SANDBOX] looking up Modal app", flush=True)
6968
logfire.info("run_claude_code_in_sandbox: looking up Modal app")
7069
sandbox_app = modal.App.lookup("policyengine-sandbox", create_if_missing=True)
71-
print("[SANDBOX] Modal app found", flush=True)
7270
logfire.info("run_claude_code_in_sandbox: Modal app found")
7371

74-
print("[SANDBOX] creating sandbox", flush=True)
7572
logfire.info("run_claude_code_in_sandbox: creating sandbox")
76-
sb = modal.Sandbox.create(
73+
sb = await modal.Sandbox.create.aio(
7774
app=sandbox_app,
7875
image=sandbox_image,
7976
secrets=[anthropic_secret, logfire_secret],
8077
timeout=600,
8178
workdir="/tmp",
8279
)
83-
print("[SANDBOX] sandbox created", flush=True)
8480
logfire.info("run_claude_code_in_sandbox: sandbox created")
8581

86-
# Run Claude Code with the question
87-
# Note: Can't use --dangerously-skip-permissions as root (Modal runs as root)
88-
# Use shell wrapper with </dev/null to properly close stdin (prevents hanging)
89-
# --max-turns: limit execution to prevent runaway
90-
# Use --mcp-config to pass MCP config directly (more reliable than config file)
91-
print("[SANDBOX] Starting claude CLI with question", flush=True)
92-
logfire.info(
93-
"run_claude_code_in_sandbox: starting claude CLI",
94-
mcp_url=f"{api_base_url}/mcp",
95-
)
96-
9782
# Escape the question and config for shell
9883
escaped_question = question.replace("'", "'\"'\"'")
9984
escaped_mcp_config = mcp_config_json.replace("'", "'\"'\"'")
@@ -112,9 +97,8 @@ def run_claude_code_in_sandbox(
11297
question_len=len(question),
11398
escaped_question_len=len(escaped_question),
11499
)
115-
# text=True, bufsize=1 enables line-buffered streaming
116-
process = sb.exec("sh", "-c", cmd, text=True, bufsize=1)
117-
print("[SANDBOX] claude CLI process started", flush=True)
100+
# Use async exec for proper streaming
101+
process = await sb.exec.aio("sh", "-c", cmd, text=True, bufsize=1)
118102
logfire.info("run_claude_code_in_sandbox: claude CLI process started, returning")
119103

120104
return sb, process

src/policyengine_api/api/agent.py

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -154,58 +154,38 @@ def _parse_claude_stream_event(line: str) -> dict | None:
154154

155155
async def _stream_modal_sandbox(question: str, api_base_url: str):
156156
"""Stream output from Claude Code running in Modal Sandbox."""
157-
from concurrent.futures import ThreadPoolExecutor
158-
159157
with logfire.span(
160158
"agent_stream", question=question[:100], api_base_url=api_base_url
161159
):
162160
sb = None
163-
executor = ThreadPoolExecutor(max_workers=2)
164161
try:
165-
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox
162+
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox_async
166163

167164
logfire.info("creating_sandbox")
168-
169-
loop = asyncio.get_event_loop()
170-
sb, process = await loop.run_in_executor(
171-
executor, run_claude_code_in_sandbox, question, api_base_url
172-
)
165+
sb, process = await run_claude_code_in_sandbox_async(question, api_base_url)
173166
logfire.info("sandbox_created")
174167

175-
# Read stdout synchronously in executor, yield lines as we get them
176-
def read_next_line(stdout_iter):
177-
try:
178-
return next(stdout_iter)
179-
except StopIteration:
180-
return None
181-
182-
stdout_iter = iter(process.stdout)
168+
# Use Modal's async iteration for stdout
183169
lines_received = 0
184170
events_sent = 0
185171

186-
while True:
187-
line = await loop.run_in_executor(executor, read_next_line, stdout_iter)
188-
189-
if line is None:
190-
# stdout exhausted
191-
logfire.info("stdout_exhausted", total_lines=lines_received)
192-
break
193-
172+
async for line in process.stdout:
194173
lines_received += 1
195174
logfire.info(
196175
"raw_line",
197176
line_num=lines_received,
198-
line_len=len(line),
199-
line_preview=line[:300].replace("session", "sess1on"),
177+
line_len=len(line) if line else 0,
178+
line_preview=line[:300].replace("session", "sess1on")
179+
if line
180+
else None,
200181
)
201-
202182
parsed = _parse_claude_stream_event(line)
203183
if parsed:
204184
events_sent += 1
205185
yield f"data: {json.dumps(parsed)}\n\n"
206186

207-
# Wait for process to finish
208-
returncode = await loop.run_in_executor(executor, process.wait)
187+
# Wait for process using async API
188+
returncode = await process.wait.aio()
209189
logfire.info(
210190
"complete",
211191
returncode=returncode,
@@ -221,10 +201,9 @@ def read_next_line(stdout_iter):
221201
finally:
222202
if sb is not None:
223203
try:
224-
await loop.run_in_executor(executor, sb.terminate)
204+
await sb.terminate.aio()
225205
except Exception:
226206
pass
227-
executor.shutdown(wait=False)
228207

229208

230209
@router.post("/stream")

0 commit comments

Comments
 (0)