Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,68 @@ def test_interleaved_text_between_tools_only_final_shown(self):
assert "Searching..." not in combined
assert "Analyzing data..." not in combined

def test_narration_while_tool_active_does_not_leak(self):
"""Regression (thinking-text leak): with the stream already open (todo mode),
narration that arrives *while a tool is still active* must not be streamed
into the message body.

This reproduces the ChatAnthropicBedrock regression: the LLM client emits
inter-step text deltas that arrive between TOOL_CALL_START and the matching
TOOL_CALL_END (active_tools is non-empty). The old code streamed that text
live via appendStream, leaking every "Let me…" segment into the response.
Only the final answer, emitted after the last tool completes, should show.
"""
events = [
# write_todos opens the stream (todo/plan mode)
_tool_start_event("write_todos", "tc-wt"),
_tool_args_event("tc-wt", '{"todos": [{"content": "Look it up", "status": "in_progress"}]}'),
_tool_end_event("tc-wt"),
# A tool starts and, before it ends, the agent streams narration
_tool_start_event("search", "tc-1"),
_content_event("Now let me search for the answer..."),
_text_message_end_event(),
_tool_end_event("tc-1"),
# Another active-tool narration burst
_tool_start_event("search", "tc-2"),
_content_event("I notice the pagination is returning the same data..."),
_text_message_end_event(),
_tool_end_event("tc-2"),
# Final answer after the last tool completes
_content_event("The answer is 42."),
_text_message_end_event(),
_done_event(),
]
mock_slack = _mock_slack()

stream_response(
sse_client=_mock_sse_client(events),
slack_client=mock_slack,
channel_id="C1",
thread_ts="t1",
message_text="hi",
team_id="T1",
user_id="U123",
agent_id="test-agent",
conversation_id="conv-1",
)

all_text = []
for c in mock_slack.chat_appendStream.call_args_list:
for chunk in c.kwargs.get("chunks", []):
if chunk.get("type") == "markdown_text":
all_text.append(chunk["text"])
stop_call = mock_slack.chat_stopStream.call_args
for chunk in stop_call.kwargs.get("chunks") or []:
if chunk.get("type") == "markdown_text":
all_text.append(chunk["text"])
combined = "".join(all_text)

# Only the final answer should be rendered
assert "The answer is 42." in combined
# Narration emitted while a tool was active must NOT leak
assert "Now let me search for the answer..." not in combined
assert "I notice the pagination is returning the same data..." not in combined


class TestToolThoughtExtraction:
"""Tests for _extract_tool_thought and TOOL_CALL_ARGS thought display."""
Expand Down
80 changes: 35 additions & 45 deletions ai_platform_engineering/integrations/slack_bot/utils/ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,16 @@ def stream_response(
current_tool = None
active_tools = {} # tool_call_id -> tool_name

# Content accumulation
accumulated_text = [] # TEXT_MESSAGE_CONTENT deltas
# Separator state: insert a blank line between any streamed tool output
# and the final answer when both are present.
needs_separator = False

# Thinking buffer: text between tool calls is treated as reasoning
# for the next tool call, shown as details on the tool's checklist item.
# Text after the last tool call is the final answer (streamed normally).
pending_thinking = [] # accumulated text chunks before next tool call
# for the next tool call and discarded when that tool starts. Text after
# the last tool call is the final answer (streamed at finalization).
# ALL streamed text lands here — nothing is written to the message body
# mid-run — so intermediate narration can never leak into the response.
pending_thinking = [] # text chunks since the last tool call started
tool_args_buffer = {} # tool_call_id -> accumulated JSON args string
typing_text_buf = [] # accumulated text for typing indicator (reset on TEXT_MESSAGE_END / TOOL_CALL_ARGS)

Expand Down Expand Up @@ -609,38 +611,28 @@ def _start_stream_if_needed():
typing_text_buf.append(event.delta)
continue
if event.delta:
accumulated_text.append(event.delta)

# Before any tool has been seen, buffer text as potential "thinking".
# Between tool calls (active_tools empty but seen_any_tool is True),
# also buffer as thinking for the *next* tool call.
# Once all tools are done and RUN_FINISHED comes, pending_thinking
# is flushed as the final answer in the finalization block.
# Never stream text to the message body mid-run — only the final
# answer is rendered, at finalization.
#
# While a tool IS active and stream is open (todos), stream live.
# Otherwise buffer — the final answer will pick it up.
if active_tools and stream_buf:
# Mid-tool text — stream it live (only if stream already open)
text = event.delta
if needs_separator and stream_buf.has_flushed:
text = "\n\n" + text
needs_separator = False
stream_buf.append(text)
else:
# No tool currently running — buffer as thinking
# Text that arrives while a tool is active (active_tools non-empty)
# is mid-step narration ("Now let me search…") — it is discarded
# from the response and only feeds the typing indicator. Text that
# arrives when no tool is active is a final-answer candidate: it is
# buffered in pending_thinking, which is cleared on the next
# TOOL_CALL_START. Whatever remains after the last tool call is the
# real answer. (The genuine final answer is always emitted after
# all tools complete, i.e. when active_tools is empty.)
#
# Streaming mid-tool text live — the old behavior — leaked every
# narration segment into the body once the LLM client began emitting
# inter-step text deltas (the ChatAnthropicBedrock regression).
if not active_tools:
pending_thinking.append(event.delta)

if has_todos and active_todo_id is not None:
# Todo-aware mode: thinking text is NOT sent to the plan card.
# It only feeds the typing indicator (pre-stream).
pass

# Accumulate text for typing indicator — status is updated
# on TEXT_MESSAGE_END (not on every chunk) to avoid flicker.
if not stream_ts:
typing_text_buf.append(event.delta)
# Don't stream yet — might be thinking for the next tool call.
# If RUN_FINISHED arrives, we'll stream it as the final answer.
# Accumulate text for typing indicator — status is updated
# on TEXT_MESSAGE_END (not on every chunk) to avoid flicker.
if not stream_ts:
typing_text_buf.append(event.delta)

# --- TEXT_MESSAGE_END ---
elif event.type == SSEEventType.TEXT_MESSAGE_END:
Expand Down Expand Up @@ -825,8 +817,10 @@ def _start_stream_if_needed():
_set_typing_status("")
return {"skipped": True, "reason": "error"}

# If we got any text content, show it despite the error
final_text = "".join(accumulated_text).strip()
# If we got real answer text (the text since the last tool call), show
# it despite the error. Use pending_thinking — the same buffer
# finalization renders — so we never resurrect intermediate narration.
final_text = "".join(pending_thinking).strip()
if final_text and final_text != "I've completed your request.":
logger.info(f"[{thread_ts}] Recovered from error — showing {len(final_text)} chars of content")
# Fall through to finalization
Expand Down Expand Up @@ -864,15 +858,11 @@ def _start_stream_if_needed():
# --- Finalization ---
_cancel_keepalive()
_cancel_pending_status()
# When tool calls interleaved with text messages, accumulated_text
# contains ALL text (thinking + final answer concatenated). Only the
# last text segment — still sitting in pending_thinking — is the real
# answer. Use it when available; fall back to accumulated_text only
# when there was no tool-interleaved thinking.
if pending_thinking:
final_text = "".join(pending_thinking).strip()
else:
final_text = "".join(accumulated_text).strip()
# pending_thinking holds only the text emitted since the last tool call —
# i.e. the real answer. Any earlier narration was discarded on each
# TOOL_CALL_START, so this can never include intermediate "Let me…" text,
# regardless of how many text/tool-call segments the LLM client emits.
final_text = "".join(pending_thinking).strip()

if overthink_mode and final_text:
skip_markers = overthink_config.skip_markers if overthink_config else None
Expand Down