Skip to content

Commit e8c5957

Browse files
fix: add stdbuf for unbuffered output, improve logging
- Use stdbuf -oL to force line-buffered stdout from Claude Code - Add coreutils to sandbox image for stdbuf - Wrap streaming in logfire span for grouped traces - Add debug logging for raw lines and events
1 parent 4deb1f4 commit e8c5957

2 files changed

Lines changed: 107 additions & 87 deletions

File tree

src/policyengine_api/agent_sandbox.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88

99
import modal
1010

11-
# Sandbox image with Bun and Claude Code CLI (v2 - with ToS pre-accept)
11+
# Sandbox image with Bun and Claude Code CLI (v3 - with stdbuf for unbuffered output)
1212
sandbox_image = (
1313
modal.Image.debian_slim(python_version="3.12")
14-
.apt_install("curl", "git", "unzip")
14+
.apt_install("curl", "git", "unzip", "coreutils") # coreutils provides stdbuf
1515
.pip_install("logfire")
1616
.run_commands(
1717
# Install Bun
@@ -97,13 +97,15 @@ def run_claude_code_in_sandbox(
9797
# Escape the question and config for shell
9898
escaped_question = question.replace("'", "'\"'\"'")
9999
escaped_mcp_config = mcp_config_json.replace("'", "'\"'\"'")
100+
# Use stdbuf -oL to force line-buffered stdout (prevents buffering issues)
100101
cmd = (
101-
f"claude -p '{escaped_question}' "
102+
f"stdbuf -oL claude -p '{escaped_question}' "
102103
f"--mcp-config '{escaped_mcp_config}' "
103104
"--output-format stream-json --verbose --max-turns 10 "
104105
"--allowedTools 'mcp__policyengine__*,Bash,Read,Grep,Glob,Write,Edit' "
105106
"< /dev/null 2>&1"
106107
)
108+
logfire.info("run_claude_code_in_sandbox: executing", cmd=cmd[:200])
107109
process = sb.exec("sh", "-c", cmd)
108110
print("[SANDBOX] claude CLI process started", flush=True)
109111
logfire.info("run_claude_code_in_sandbox: claude CLI process started, returning")

src/policyengine_api/api/agent.py

Lines changed: 102 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -158,93 +158,111 @@ async def _stream_modal_sandbox(question: str, api_base_url: str):
158158
import threading
159159
from concurrent.futures import ThreadPoolExecutor
160160

161-
logfire.info("stream_modal_sandbox: starting", question=question[:100])
162-
163-
sb = None
164-
executor = ThreadPoolExecutor(max_workers=1)
165-
try:
166-
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox
167-
168-
logfire.info(
169-
"stream_modal_sandbox: creating sandbox", api_base_url=api_base_url
170-
)
171-
172-
loop = asyncio.get_event_loop()
173-
sb, process = await loop.run_in_executor(
174-
executor, run_claude_code_in_sandbox, question, api_base_url
175-
)
176-
logfire.info("stream_modal_sandbox: sandbox created")
177-
178-
line_queue = queue.Queue()
179-
180-
def stream_reader():
181-
try:
182-
for line in process.stdout:
183-
line_queue.put(("line", line))
184-
process.wait()
185-
if process.returncode != 0:
186-
stderr = process.stderr.read()
187-
logfire.error(
188-
"claude_code_failed",
189-
returncode=process.returncode,
190-
stderr=stderr[:500] if stderr else None,
161+
with logfire.span(
162+
"agent_stream", question=question[:100], api_base_url=api_base_url
163+
):
164+
sb = None
165+
executor = ThreadPoolExecutor(max_workers=1)
166+
try:
167+
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox
168+
169+
logfire.info("creating_sandbox")
170+
171+
loop = asyncio.get_event_loop()
172+
sb, process = await loop.run_in_executor(
173+
executor, run_claude_code_in_sandbox, question, api_base_url
174+
)
175+
logfire.info("sandbox_created")
176+
177+
line_queue = queue.Queue()
178+
lines_received = 0
179+
180+
def stream_reader():
181+
nonlocal lines_received
182+
try:
183+
logfire.info("reader_started")
184+
for line in process.stdout:
185+
lines_received += 1
186+
logfire.debug(
187+
"raw_line",
188+
line_num=lines_received,
189+
line=line[:300] if line else None,
190+
)
191+
line_queue.put(("line", line))
192+
logfire.info("stdout_exhausted", total_lines=lines_received)
193+
process.wait()
194+
logfire.info("process_exited", returncode=process.returncode)
195+
if process.returncode != 0:
196+
stderr = process.stderr.read()
197+
logfire.error(
198+
"process_failed",
199+
returncode=process.returncode,
200+
stderr=stderr[:500] if stderr else None,
201+
)
202+
line_queue.put(("error", (process.returncode, stderr)))
203+
else:
204+
line_queue.put(("done", process.returncode))
205+
except Exception as e:
206+
logfire.exception("reader_error", error=str(e))
207+
line_queue.put(("exception", str(e)))
208+
209+
reader_thread = threading.Thread(target=stream_reader, daemon=True)
210+
reader_thread.start()
211+
212+
events_sent = 0
213+
while True:
214+
try:
215+
item = await loop.run_in_executor(
216+
executor, lambda: line_queue.get(timeout=0.1)
191217
)
192-
line_queue.put(("error", (process.returncode, stderr)))
193-
else:
194-
line_queue.put(("done", process.returncode))
195-
except Exception as e:
196-
logfire.exception("stream_reader_error", error=str(e))
197-
line_queue.put(("exception", str(e)))
198-
199-
reader_thread = threading.Thread(target=stream_reader, daemon=True)
200-
reader_thread.start()
201-
202-
while True:
203-
try:
204-
item = await loop.run_in_executor(
205-
executor, lambda: line_queue.get(timeout=0.1)
206-
)
207-
event_type, data = item
208-
209-
if event_type == "line":
210-
parsed = _parse_claude_stream_event(data)
211-
if parsed:
218+
event_type, data = item
219+
220+
if event_type == "line":
221+
parsed = _parse_claude_stream_event(data)
222+
if parsed:
223+
events_sent += 1
224+
logfire.info(
225+
"event",
226+
num=events_sent,
227+
type=parsed["type"],
228+
content=parsed["content"][:200]
229+
if parsed["content"]
230+
else None,
231+
)
232+
yield f"data: {json.dumps(parsed)}\n\n"
233+
elif event_type == "error":
234+
returncode, stderr = data
235+
yield f"data: {json.dumps({'type': 'error', 'content': stderr})}\n\n"
236+
yield f"data: {json.dumps({'type': 'done', 'returncode': returncode})}\n\n"
237+
break
238+
elif event_type == "done":
212239
logfire.info(
213-
"stream_event",
214-
event_type=parsed["type"],
215-
content_preview=parsed["content"][:100]
216-
if parsed["content"]
217-
else None,
240+
"complete",
241+
returncode=data,
242+
events_sent=events_sent,
243+
lines_received=lines_received,
218244
)
219-
yield f"data: {json.dumps(parsed)}\n\n"
220-
elif event_type == "error":
221-
returncode, stderr = data
222-
yield f"data: {json.dumps({'type': 'error', 'content': stderr})}\n\n"
223-
yield f"data: {json.dumps({'type': 'done', 'returncode': returncode})}\n\n"
224-
break
225-
elif event_type == "done":
226-
logfire.info("stream_complete", returncode=data)
227-
yield f"data: {json.dumps({'type': 'done', 'returncode': data})}\n\n"
228-
break
229-
elif event_type == "exception":
230-
raise Exception(data)
231-
except Exception as e:
232-
if "Empty" in type(e).__name__:
233-
await asyncio.sleep(0)
234-
continue
235-
raise
236-
237-
except Exception as e:
238-
logfire.exception("stream_modal_sandbox_failed", error=str(e))
239-
yield f"data: {json.dumps({'type': 'error', 'content': f'Sandbox error: {str(e)}'})}\n\n"
240-
yield f"data: {json.dumps({'type': 'done', 'returncode': 1})}\n\n"
241-
finally:
242-
if sb is not None:
243-
try:
244-
await loop.run_in_executor(executor, sb.terminate)
245-
except Exception:
246-
pass
247-
executor.shutdown(wait=False)
245+
yield f"data: {json.dumps({'type': 'done', 'returncode': data})}\n\n"
246+
break
247+
elif event_type == "exception":
248+
raise Exception(data)
249+
except Exception as e:
250+
if "Empty" in type(e).__name__:
251+
await asyncio.sleep(0)
252+
continue
253+
raise
254+
255+
except Exception as e:
256+
logfire.exception("failed", error=str(e))
257+
yield f"data: {json.dumps({'type': 'error', 'content': f'Sandbox error: {str(e)}'})}\n\n"
258+
yield f"data: {json.dumps({'type': 'done', 'returncode': 1})}\n\n"
259+
finally:
260+
if sb is not None:
261+
try:
262+
await loop.run_in_executor(executor, sb.terminate)
263+
except Exception:
264+
pass
265+
executor.shutdown(wait=False)
248266

249267

250268
@router.post("/stream")

0 commit comments

Comments
 (0)