From 2412f5ad7cc89c83fc020141c4020c8e4033966b Mon Sep 17 00:00:00 2001 From: Mark Turansky Date: Tue, 16 Jun 2026 00:30:09 +0000 Subject: [PATCH] fix(runner): always emit MESSAGES_SNAPSHOT to prevent compaction failures Three fixes to ensure MESSAGES_SNAPSHOT is always emitted, which is required for compactFinishedRun to succeed. Without it, sessions are marked corrupted and chat history is lost after the JSONL tail-read optimization (ffe4a213). - adapter.py: Remove `if run_messages:` guard around MESSAGES_SNAPSHOT emission. Runs that produce no assistant output (interrupted, halted on frontend tool, state-tool-only) still need a snapshot so compaction can succeed. When run_messages is empty the snapshot is the stamped input history, which is sufficient for compaction to find MESSAGES_SNAPSHOT and atomically replace the JSONL. - grpc_transport.py: Skip empty-payload gRPC messages instead of building a message with empty content. An empty-content message causes process_messages to return "" which triggers an early RunFinishedEvent before _stream_claude_sdk runs, so run_messages stays empty and MESSAGES_SNAPSHOT is never emitted. Affects both Operator-managed and control-plane-reconciled runner sessions. - run.py: Assign UUIDs to message dicts that lack an id in to_run_agent_input. Without ids, upsert_message falls back to append for every message, creating duplicates in the MESSAGES_SNAPSHOT for multi-turn sessions going through the gRPC dict message path. Co-Authored-By: Claude --- .../ag_ui_claude_sdk/adapter.py | 89 ++++++++++--------- .../bridges/claude/grpc_transport.py | 6 ++ .../ambient_runner/endpoints/run.py | 10 ++- 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py index 6eb633abd..81ce97cfe 100644 --- a/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py +++ b/components/runners/ambient-runner/ag_ui_claude_sdk/adapter.py @@ -1310,55 +1310,62 @@ def flush_pending_msg(): # Emit MESSAGES_SNAPSHOT with input messages + new messages from this run. # Enrich tool result messages with tool names so the frontend can # reconstruct parent-child hierarchy with proper display names. - if run_messages: - enriched: list[Any] = [] - for msg in run_messages: - # Check if this is a tool result message that needs a name - msg_role = getattr(msg, "role", None) - msg_tcid = getattr(msg, "tool_call_id", None) - if msg_role == "tool" and msg_tcid and msg_tcid in tool_name_by_id: - # Convert to dict so we can add the name field - if hasattr(msg, "model_dump"): - d = msg.model_dump(exclude_none=True) - elif hasattr(msg, "dict"): - d = msg.dict(exclude_none=True) - else: - d = { - "id": getattr(msg, "id", ""), - "role": msg_role, - "content": getattr(msg, "content", ""), - "tool_call_id": msg_tcid, - } - d["name"] = tool_name_by_id[msg_tcid] - enriched.append(d) - else: - enriched.append(msg) - - # Stamp input messages with the run-start timestamp so they - # survive a page refresh (the frontend's local timestamp is - # lost when reconnecting to the SSE stream). - run_start_iso = ( - datetime.fromtimestamp(run_start_ts / 1000, tz=timezone.utc).isoformat() - if run_start_ts - else None - ) - stamped_inputs: list[Any] = [] - for msg in (input_data.messages if input_data else None) or []: + # + # Always emit MESSAGES_SNAPSHOT regardless of whether run_messages is + # populated — compactFinishedRun requires it to succeed. Runs that + # produce no assistant output (interrupted, state-tool-only, halted) + # still need a snapshot so the JSONL can be compacted and prior + # history is not lost. When run_messages is empty the snapshot is + # just the stamped input history. + enriched: list[Any] = [] + for msg in run_messages: + # Check if this is a tool result message that needs a name + msg_role = getattr(msg, "role", None) + msg_tcid = getattr(msg, "tool_call_id", None) + if msg_role == "tool" and msg_tcid and msg_tcid in tool_name_by_id: + # Convert to dict so we can add the name field if hasattr(msg, "model_dump"): d = msg.model_dump(exclude_none=True) - elif isinstance(msg, dict): - d = dict(msg) + elif hasattr(msg, "dict"): + d = msg.dict(exclude_none=True) else: d = { "id": getattr(msg, "id", ""), - "role": getattr(msg, "role", ""), + "role": msg_role, "content": getattr(msg, "content", ""), + "tool_call_id": msg_tcid, } - if "timestamp" not in d and run_start_iso: - d["timestamp"] = run_start_iso - stamped_inputs.append(d) + d["name"] = tool_name_by_id[msg_tcid] + enriched.append(d) + else: + enriched.append(msg) + + # Stamp input messages with the run-start timestamp so they + # survive a page refresh (the frontend's local timestamp is + # lost when reconnecting to the SSE stream). + run_start_iso = ( + datetime.fromtimestamp(run_start_ts / 1000, tz=timezone.utc).isoformat() + if run_start_ts + else None + ) + stamped_inputs: list[Any] = [] + for msg in (input_data.messages if input_data else None) or []: + if hasattr(msg, "model_dump"): + d = msg.model_dump(exclude_none=True) + elif isinstance(msg, dict): + d = dict(msg) + else: + d = { + "id": getattr(msg, "id", ""), + "role": getattr(msg, "role", ""), + "content": getattr(msg, "content", ""), + } + if "timestamp" not in d and run_start_iso: + d["timestamp"] = run_start_iso + stamped_inputs.append(d) - all_messages = stamped_inputs + enriched + all_messages = stamped_inputs + enriched + if all_messages: logger.debug( f"MESSAGES_SNAPSHOT: {len(all_messages)} msgs ({message_count} SDK messages processed)" ) diff --git a/components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py b/components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py index 2c5413b2a..6c1912518 100644 --- a/components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py +++ b/components/runners/ambient-runner/ambient_runner/bridges/claude/grpc_transport.py @@ -294,6 +294,12 @@ async def _handle_user_message(self, msg: Any) -> None: try: runner_input = RunnerInput.model_validate_json(msg.payload) except Exception: + if not msg.payload or not msg.payload.strip(): + logger.warning( + "[GRPC LISTENER] Empty payload for seq=%d, skipping to avoid empty-run with no MESSAGES_SNAPSHOT", + msg.seq, + ) + return runner_input = RunnerInput( messages=[ {"id": str(uuid.uuid4()), "role": "user", "content": msg.payload} diff --git a/components/runners/ambient-runner/ambient_runner/endpoints/run.py b/components/runners/ambient-runner/ambient_runner/endpoints/run.py index aba850bae..666a670e9 100644 --- a/components/runners/ambient-runner/ambient_runner/endpoints/run.py +++ b/components/runners/ambient-runner/ambient_runner/endpoints/run.py @@ -41,11 +41,19 @@ def to_run_agent_input(self) -> RunAgentInput: parent_run_id = self.parentRunId or self.parent_run_id context_list = self.context if isinstance(self.context, list) else [] + # Ensure every message dict has an id so upsert_message deduplication + # works correctly in the adapter for multi-turn sessions. + messages = [] + for m in self.messages: + if isinstance(m, dict) and not m.get("id"): + m = {**m, "id": str(uuid.uuid4())} + messages.append(m) + return RunAgentInput( thread_id=thread_id, run_id=run_id, parent_run_id=parent_run_id, - messages=self.messages, + messages=messages, state=self.state or {}, tools=self.tools or [], context=context_list,