Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 48 additions & 41 deletions components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,55 +1310,62 @@ def flush_pending_msg():
# Emit MESSAGES_SNAPSHOT with input messages + new messages from this run.
# Enrich tool result messages with tool names so the frontend can
# reconstruct parent-child hierarchy with proper display names.
if run_messages:
enriched: list[Any] = []
for msg in run_messages:
# Check if this is a tool result message that needs a name
msg_role = getattr(msg, "role", None)
msg_tcid = getattr(msg, "tool_call_id", None)
if msg_role == "tool" and msg_tcid and msg_tcid in tool_name_by_id:
# Convert to dict so we can add the name field
if hasattr(msg, "model_dump"):
d = msg.model_dump(exclude_none=True)
elif hasattr(msg, "dict"):
d = msg.dict(exclude_none=True)
else:
d = {
"id": getattr(msg, "id", ""),
"role": msg_role,
"content": getattr(msg, "content", ""),
"tool_call_id": msg_tcid,
}
d["name"] = tool_name_by_id[msg_tcid]
enriched.append(d)
else:
enriched.append(msg)

# Stamp input messages with the run-start timestamp so they
# survive a page refresh (the frontend's local timestamp is
# lost when reconnecting to the SSE stream).
run_start_iso = (
datetime.fromtimestamp(run_start_ts / 1000, tz=timezone.utc).isoformat()
if run_start_ts
else None
)
stamped_inputs: list[Any] = []
for msg in (input_data.messages if input_data else None) or []:
#
# Always emit MESSAGES_SNAPSHOT regardless of whether run_messages is
# populated — compactFinishedRun requires it to succeed. Runs that
# produce no assistant output (interrupted, state-tool-only, halted)
# still need a snapshot so the JSONL can be compacted and prior
# history is not lost. When run_messages is empty the snapshot is
# just the stamped input history.
enriched: list[Any] = []
for msg in run_messages:
# Check if this is a tool result message that needs a name
msg_role = getattr(msg, "role", None)
msg_tcid = getattr(msg, "tool_call_id", None)
if msg_role == "tool" and msg_tcid and msg_tcid in tool_name_by_id:
# Convert to dict so we can add the name field
if hasattr(msg, "model_dump"):
d = msg.model_dump(exclude_none=True)
elif isinstance(msg, dict):
d = dict(msg)
elif hasattr(msg, "dict"):
d = msg.dict(exclude_none=True)
else:
d = {
"id": getattr(msg, "id", ""),
"role": getattr(msg, "role", ""),
"role": msg_role,
"content": getattr(msg, "content", ""),
"tool_call_id": msg_tcid,
}
if "timestamp" not in d and run_start_iso:
d["timestamp"] = run_start_iso
stamped_inputs.append(d)
d["name"] = tool_name_by_id[msg_tcid]
enriched.append(d)
else:
enriched.append(msg)

# Stamp input messages with the run-start timestamp so they
# survive a page refresh (the frontend's local timestamp is
# lost when reconnecting to the SSE stream).
run_start_iso = (
datetime.fromtimestamp(run_start_ts / 1000, tz=timezone.utc).isoformat()
if run_start_ts
else None
)
stamped_inputs: list[Any] = []
for msg in (input_data.messages if input_data else None) or []:
if hasattr(msg, "model_dump"):
d = msg.model_dump(exclude_none=True)
elif isinstance(msg, dict):
d = dict(msg)
else:
d = {
"id": getattr(msg, "id", ""),
"role": getattr(msg, "role", ""),
"content": getattr(msg, "content", ""),
}
if "timestamp" not in d and run_start_iso:
d["timestamp"] = run_start_iso
stamped_inputs.append(d)

all_messages = stamped_inputs + enriched
all_messages = stamped_inputs + enriched
if all_messages:
logger.debug(
f"MESSAGES_SNAPSHOT: {len(all_messages)} msgs ({message_count} SDK messages processed)"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ async def _handle_user_message(self, msg: Any) -> None:
try:
runner_input = RunnerInput.model_validate_json(msg.payload)
except Exception:
if not msg.payload or not msg.payload.strip():
logger.warning(
"[GRPC LISTENER] Empty payload for seq=%d, skipping to avoid empty-run with no MESSAGES_SNAPSHOT",
msg.seq,
)
return
runner_input = RunnerInput(
messages=[
{"id": str(uuid.uuid4()), "role": "user", "content": msg.payload}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ def to_run_agent_input(self) -> RunAgentInput:
parent_run_id = self.parentRunId or self.parent_run_id
context_list = self.context if isinstance(self.context, list) else []

# Ensure every message dict has an id so upsert_message deduplication
# works correctly in the adapter for multi-turn sessions.
messages = []
for m in self.messages:
if isinstance(m, dict) and not m.get("id"):
m = {**m, "id": str(uuid.uuid4())}
messages.append(m)

return RunAgentInput(
thread_id=thread_id,
run_id=run_id,
parent_run_id=parent_run_id,
messages=self.messages,
messages=messages,
state=self.state or {},
tools=self.tools or [],
context=context_list,
Expand Down
Loading