Skip to content

Commit 4deb1f4

Browse files
fix: parse Claude stream-json events for cleaner output
Instead of passing raw JSON lines, now parses Claude Code's stream-json events and extracts meaningful content (text, tool use, results). Logs cleaner events to logfire with content previews.
1 parent 7339b46 commit 4deb1f4

1 file changed

Lines changed: 86 additions & 37 deletions

File tree

src/policyengine_api/api/agent.py

Lines changed: 86 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -85,108 +85,157 @@ async def _stream_claude_code(question: str, api_base_url: str):
8585
yield f"data: {json.dumps({'type': 'done', 'returncode': process.returncode})}\n\n"
8686

8787

88+
def _parse_claude_stream_event(line: str) -> dict | None:
89+
"""Parse a Claude Code stream-json event and extract useful content.
90+
91+
Returns a dict with 'type' and 'content' for streaming to client,
92+
or None if the event should be skipped.
93+
"""
94+
if not line or not line.strip():
95+
return None
96+
97+
try:
98+
event = json.loads(line)
99+
except json.JSONDecodeError:
100+
# Not JSON, pass through as raw output
101+
return {"type": "raw", "content": line}
102+
103+
event_type = event.get("type")
104+
105+
# Assistant text output (the main response)
106+
if event_type == "assistant":
107+
message = event.get("message", {})
108+
content_blocks = message.get("content", [])
109+
text_parts = []
110+
for block in content_blocks:
111+
if block.get("type") == "text":
112+
text_parts.append(block.get("text", ""))
113+
elif block.get("type") == "tool_use":
114+
tool_name = block.get("name", "unknown")
115+
text_parts.append(f"[Using tool: {tool_name}]")
116+
if text_parts:
117+
return {"type": "assistant", "content": "".join(text_parts)}
118+
119+
# Content block delta (streaming text chunks)
120+
elif event_type == "content_block_delta":
121+
delta = event.get("delta", {})
122+
if delta.get("type") == "text_delta":
123+
text = delta.get("text", "")
124+
if text:
125+
return {"type": "text", "content": text}
126+
127+
# Tool use events
128+
elif event_type == "tool_use":
129+
tool_name = event.get("name", "unknown")
130+
return {"type": "tool", "content": f"Using tool: {tool_name}"}
131+
132+
# Tool result
133+
elif event_type == "tool_result":
134+
content = event.get("content", "")
135+
if isinstance(content, str) and content:
136+
# Truncate long tool results
137+
preview = content[:500] + "..." if len(content) > 500 else content
138+
return {"type": "tool_result", "content": preview}
139+
140+
# Result/completion
141+
elif event_type == "result":
142+
result_text = event.get("result", "")
143+
if result_text:
144+
return {"type": "result", "content": result_text}
145+
146+
# System messages
147+
elif event_type == "system":
148+
msg = event.get("message", "")
149+
if msg:
150+
return {"type": "system", "content": msg}
151+
152+
return None
153+
154+
88155
async def _stream_modal_sandbox(question: str, api_base_url: str):
89156
"""Stream output from Claude Code running in Modal Sandbox."""
157+
import queue
158+
import threading
90159
from concurrent.futures import ThreadPoolExecutor
91160

92-
# Immediate log
93-
print("[AGENT] _stream_modal_sandbox started", flush=True)
94-
logfire.info("_stream_modal_sandbox: started", question=question[:100])
161+
logfire.info("stream_modal_sandbox: starting", question=question[:100])
95162

96163
sb = None
97164
executor = ThreadPoolExecutor(max_workers=1)
98165
try:
99166
from policyengine_api.agent_sandbox import run_claude_code_in_sandbox
100167

101-
print("[AGENT] creating sandbox", flush=True)
102168
logfire.info(
103-
"_stream_modal_sandbox: creating sandbox", api_base_url=api_base_url
169+
"stream_modal_sandbox: creating sandbox", api_base_url=api_base_url
104170
)
105171

106-
# Run blocking Modal SDK calls in thread pool to avoid blocking event loop
107172
loop = asyncio.get_event_loop()
108173
sb, process = await loop.run_in_executor(
109174
executor, run_claude_code_in_sandbox, question, api_base_url
110175
)
111-
print("[AGENT] sandbox created", flush=True)
112-
logfire.info("_stream_modal_sandbox: sandbox created")
113-
114-
# Poll for lines with timeout to allow other async tasks
115-
import queue
116-
import threading
176+
logfire.info("stream_modal_sandbox: sandbox created")
117177

118178
line_queue = queue.Queue()
119179

120180
def stream_reader():
121181
try:
122-
logfire.info("stream_reader: starting to read stdout")
123-
line_count = 0
124182
for line in process.stdout:
125-
line_count += 1
126-
logfire.info(
127-
"stream_reader: got line",
128-
line_num=line_count,
129-
line_preview=line[:200] if line else None,
130-
)
131183
line_queue.put(("line", line))
132-
logfire.info("stream_reader: stdout exhausted, waiting for process")
133184
process.wait()
134-
logfire.info(
135-
"stream_reader: process finished", returncode=process.returncode
136-
)
137185
if process.returncode != 0:
138186
stderr = process.stderr.read()
139187
logfire.error(
140-
"stream_reader: process failed",
188+
"claude_code_failed",
141189
returncode=process.returncode,
142190
stderr=stderr[:500] if stderr else None,
143191
)
144192
line_queue.put(("error", (process.returncode, stderr)))
145193
else:
146194
line_queue.put(("done", process.returncode))
147195
except Exception as e:
148-
logfire.exception("stream_reader: exception", error=str(e))
196+
logfire.exception("stream_reader_error", error=str(e))
149197
line_queue.put(("exception", str(e)))
150198

151-
logfire.info("_stream_modal_sandbox: starting reader thread")
152199
reader_thread = threading.Thread(target=stream_reader, daemon=True)
153200
reader_thread.start()
154-
logfire.info("_stream_modal_sandbox: reader thread started, entering main loop")
155201

156202
while True:
157203
try:
158-
# Non-blocking check with short timeout
159204
item = await loop.run_in_executor(
160205
executor, lambda: line_queue.get(timeout=0.1)
161206
)
162207
event_type, data = item
163208

164209
if event_type == "line":
165-
yield f"data: {json.dumps({'type': 'output', 'content': data})}\n\n"
210+
parsed = _parse_claude_stream_event(data)
211+
if parsed:
212+
logfire.info(
213+
"stream_event",
214+
event_type=parsed["type"],
215+
content_preview=parsed["content"][:100]
216+
if parsed["content"]
217+
else None,
218+
)
219+
yield f"data: {json.dumps(parsed)}\n\n"
166220
elif event_type == "error":
167221
returncode, stderr = data
168-
logfire.error(
169-
"Claude Code failed in sandbox",
170-
returncode=returncode,
171-
stderr=stderr[:500],
172-
)
173222
yield f"data: {json.dumps({'type': 'error', 'content': stderr})}\n\n"
174223
yield f"data: {json.dumps({'type': 'done', 'returncode': returncode})}\n\n"
175224
break
176225
elif event_type == "done":
226+
logfire.info("stream_complete", returncode=data)
177227
yield f"data: {json.dumps({'type': 'done', 'returncode': data})}\n\n"
178228
break
179229
elif event_type == "exception":
180230
raise Exception(data)
181231
except Exception as e:
182232
if "Empty" in type(e).__name__:
183-
# Queue timeout, continue polling
184233
await asyncio.sleep(0)
185234
continue
186235
raise
187236

188237
except Exception as e:
189-
logfire.exception("_stream_modal_sandbox: failed", error=str(e))
238+
logfire.exception("stream_modal_sandbox_failed", error=str(e))
190239
yield f"data: {json.dumps({'type': 'error', 'content': f'Sandbox error: {str(e)}'})}\n\n"
191240
yield f"data: {json.dumps({'type': 'done', 'returncode': 1})}\n\n"
192241
finally:

0 commit comments

Comments
 (0)