diff --git a/ai_platform_engineering/integrations/slack_bot/tests/test_ai_plan_streaming.py b/ai_platform_engineering/integrations/slack_bot/tests/test_ai_plan_streaming.py index 4d97d6431..6f6a3f9f3 100644 --- a/ai_platform_engineering/integrations/slack_bot/tests/test_ai_plan_streaming.py +++ b/ai_platform_engineering/integrations/slack_bot/tests/test_ai_plan_streaming.py @@ -500,6 +500,68 @@ def test_interleaved_text_between_tools_only_final_shown(self): assert "Searching..." not in combined assert "Analyzing data..." not in combined + def test_narration_while_tool_active_does_not_leak(self): + """Regression (thinking-text leak): with the stream already open (todo mode), + narration that arrives *while a tool is still active* must not be streamed + into the message body. + + This reproduces the ChatAnthropicBedrock regression: the LLM client emits + inter-step text deltas that arrive between TOOL_CALL_START and the matching + TOOL_CALL_END (active_tools is non-empty). The old code streamed that text + live via appendStream, leaking every "Let me…" segment into the response. + Only the final answer, emitted after the last tool completes, should show. + """ + events = [ + # write_todos opens the stream (todo/plan mode) + _tool_start_event("write_todos", "tc-wt"), + _tool_args_event("tc-wt", '{"todos": [{"content": "Look it up", "status": "in_progress"}]}'), + _tool_end_event("tc-wt"), + # A tool starts and, before it ends, the agent streams narration + _tool_start_event("search", "tc-1"), + _content_event("Now let me search for the answer..."), + _text_message_end_event(), + _tool_end_event("tc-1"), + # Another active-tool narration burst + _tool_start_event("search", "tc-2"), + _content_event("I notice the pagination is returning the same data..."), + _text_message_end_event(), + _tool_end_event("tc-2"), + # Final answer after the last tool completes + _content_event("The answer is 42."), + _text_message_end_event(), + _done_event(), + ] + mock_slack = _mock_slack() + + stream_response( + sse_client=_mock_sse_client(events), + slack_client=mock_slack, + channel_id="C1", + thread_ts="t1", + message_text="hi", + team_id="T1", + user_id="U123", + agent_id="test-agent", + conversation_id="conv-1", + ) + + all_text = [] + for c in mock_slack.chat_appendStream.call_args_list: + for chunk in c.kwargs.get("chunks", []): + if chunk.get("type") == "markdown_text": + all_text.append(chunk["text"]) + stop_call = mock_slack.chat_stopStream.call_args + for chunk in stop_call.kwargs.get("chunks") or []: + if chunk.get("type") == "markdown_text": + all_text.append(chunk["text"]) + combined = "".join(all_text) + + # Only the final answer should be rendered + assert "The answer is 42." in combined + # Narration emitted while a tool was active must NOT leak + assert "Now let me search for the answer..." not in combined + assert "I notice the pagination is returning the same data..." not in combined + class TestToolThoughtExtraction: """Tests for _extract_tool_thought and TOOL_CALL_ARGS thought display.""" diff --git a/ai_platform_engineering/integrations/slack_bot/utils/ai.py b/ai_platform_engineering/integrations/slack_bot/utils/ai.py index 791e9bce9..42c3471c9 100644 --- a/ai_platform_engineering/integrations/slack_bot/utils/ai.py +++ b/ai_platform_engineering/integrations/slack_bot/utils/ai.py @@ -328,14 +328,16 @@ def stream_response( current_tool = None active_tools = {} # tool_call_id -> tool_name - # Content accumulation - accumulated_text = [] # TEXT_MESSAGE_CONTENT deltas + # Separator state: insert a blank line between any streamed tool output + # and the final answer when both are present. needs_separator = False # Thinking buffer: text between tool calls is treated as reasoning - # for the next tool call, shown as details on the tool's checklist item. - # Text after the last tool call is the final answer (streamed normally). - pending_thinking = [] # accumulated text chunks before next tool call + # for the next tool call and discarded when that tool starts. Text after + # the last tool call is the final answer (streamed at finalization). + # ALL streamed text lands here — nothing is written to the message body + # mid-run — so intermediate narration can never leak into the response. + pending_thinking = [] # text chunks since the last tool call started tool_args_buffer = {} # tool_call_id -> accumulated JSON args string typing_text_buf = [] # accumulated text for typing indicator (reset on TEXT_MESSAGE_END / TOOL_CALL_ARGS) @@ -609,38 +611,28 @@ def _start_stream_if_needed(): typing_text_buf.append(event.delta) continue if event.delta: - accumulated_text.append(event.delta) - - # Before any tool has been seen, buffer text as potential "thinking". - # Between tool calls (active_tools empty but seen_any_tool is True), - # also buffer as thinking for the *next* tool call. - # Once all tools are done and RUN_FINISHED comes, pending_thinking - # is flushed as the final answer in the finalization block. + # Never stream text to the message body mid-run — only the final + # answer is rendered, at finalization. # - # While a tool IS active and stream is open (todos), stream live. - # Otherwise buffer — the final answer will pick it up. - if active_tools and stream_buf: - # Mid-tool text — stream it live (only if stream already open) - text = event.delta - if needs_separator and stream_buf.has_flushed: - text = "\n\n" + text - needs_separator = False - stream_buf.append(text) - else: - # No tool currently running — buffer as thinking + # Text that arrives while a tool is active (active_tools non-empty) + # is mid-step narration ("Now let me search…") — it is discarded + # from the response and only feeds the typing indicator. Text that + # arrives when no tool is active is a final-answer candidate: it is + # buffered in pending_thinking, which is cleared on the next + # TOOL_CALL_START. Whatever remains after the last tool call is the + # real answer. (The genuine final answer is always emitted after + # all tools complete, i.e. when active_tools is empty.) + # + # Streaming mid-tool text live — the old behavior — leaked every + # narration segment into the body once the LLM client began emitting + # inter-step text deltas (the ChatAnthropicBedrock regression). + if not active_tools: pending_thinking.append(event.delta) - if has_todos and active_todo_id is not None: - # Todo-aware mode: thinking text is NOT sent to the plan card. - # It only feeds the typing indicator (pre-stream). - pass - - # Accumulate text for typing indicator — status is updated - # on TEXT_MESSAGE_END (not on every chunk) to avoid flicker. - if not stream_ts: - typing_text_buf.append(event.delta) - # Don't stream yet — might be thinking for the next tool call. - # If RUN_FINISHED arrives, we'll stream it as the final answer. + # Accumulate text for typing indicator — status is updated + # on TEXT_MESSAGE_END (not on every chunk) to avoid flicker. + if not stream_ts: + typing_text_buf.append(event.delta) # --- TEXT_MESSAGE_END --- elif event.type == SSEEventType.TEXT_MESSAGE_END: @@ -825,8 +817,10 @@ def _start_stream_if_needed(): _set_typing_status("") return {"skipped": True, "reason": "error"} - # If we got any text content, show it despite the error - final_text = "".join(accumulated_text).strip() + # If we got real answer text (the text since the last tool call), show + # it despite the error. Use pending_thinking — the same buffer + # finalization renders — so we never resurrect intermediate narration. + final_text = "".join(pending_thinking).strip() if final_text and final_text != "I've completed your request.": logger.info(f"[{thread_ts}] Recovered from error — showing {len(final_text)} chars of content") # Fall through to finalization @@ -864,15 +858,11 @@ def _start_stream_if_needed(): # --- Finalization --- _cancel_keepalive() _cancel_pending_status() - # When tool calls interleaved with text messages, accumulated_text - # contains ALL text (thinking + final answer concatenated). Only the - # last text segment — still sitting in pending_thinking — is the real - # answer. Use it when available; fall back to accumulated_text only - # when there was no tool-interleaved thinking. - if pending_thinking: - final_text = "".join(pending_thinking).strip() - else: - final_text = "".join(accumulated_text).strip() + # pending_thinking holds only the text emitted since the last tool call — + # i.e. the real answer. Any earlier narration was discarded on each + # TOOL_CALL_START, so this can never include intermediate "Let me…" text, + # regardless of how many text/tool-call segments the LLM client emits. + final_text = "".join(pending_thinking).strip() if overthink_mode and final_text: skip_markers = overthink_config.skip_markers if overthink_config else None