Skip to content

Commit 17009a9

Browse files
committed
fix(teams): prevent webhook deadlock when no chat task is scheduled
Address CodeRabbit Critical on PR #88. ``Chat.process_message`` invokes ``wait_until`` synchronously only if a chat task is actually scheduled. If the message is deduped, dropped by the concurrency strategy, or otherwise short-circuited, no task is created and ``wait_until`` is never called — leaving ``processing_done`` unresolved and ``await processing_done`` hanging the webhook indefinitely. After ``process_message`` returns, check whether ``processing_done`` was resolved (which can only happen via ``_chained_wait_until``, which is only invoked when a task is scheduled). If not, resolve immediately — there's no in-flight handler to wait on, no streaming will happen, and ``await processing_done`` should fall through to the ``finally`` cleanup. Also addresses Codex P2 (lines 471): caller-supplied ``WebhookOptions.wait_until`` raising synchronously now logs and continues instead of escaping through ``Chat.process_message``, preventing the same-class deadlock-via-finally issue. Both fixes together close the two paths where the streaming session could be torn down while the chat task was still scheduled (or never scheduled): a synchronous wait_until exception, and a deduped/dropped message. Tests: - New ``test_caller_wait_until_raise_does_not_kill_native_streaming`` verifies the wait_until-raise path keeps the session alive long enough for the chat task to call ``thread.stream()`` natively. - Existing 23 native-streaming tests still pass. Skipped CodeRabbit's other comment (``_active_streams`` thread-id keying) — same as Codex P2 #2, matches upstream TS behavior and would be a heavy-lift Python-only divergence. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
1 parent 07df4b7 commit 17009a9

2 files changed

Lines changed: 96 additions & 2 deletions

File tree

src/chat_sdk/adapters/teams/adapter.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,12 +468,34 @@ def _chained_wait_until(task: Awaitable[Any]) -> None:
468468
# cannot starve due to a misbehaving caller-supplied hook.
469469
_resolve_processing(task)
470470
if upstream_wait_until is not None:
471-
upstream_wait_until(task)
471+
# Catch synchronous failures in the caller's hook. If we
472+
# let it escape, ``Chat.process_message`` propagates the
473+
# exception, the outer ``try`` skips ``await processing_done``,
474+
# and the ``finally`` tears down the session while the
475+
# underlying chat task is still scheduled — handlers that
476+
# later call ``thread.stream()`` would then miss native
477+
# streaming and fall back to a normal post. Logging keeps
478+
# the failure visible without breaking the streaming path.
479+
try:
480+
upstream_wait_until(task)
481+
except Exception as exc:
482+
self._logger.warn(
483+
"Caller-supplied WebhookOptions.wait_until raised",
484+
{"threadId": thread_id, "error": str(exc)},
485+
)
472486

473487
chained_options = WebhookOptions(wait_until=_chained_wait_until)
474488

475489
try:
476490
self._chat.process_message(self, thread_id, message, chained_options)
491+
# If ``process_message`` returned without invoking
492+
# ``wait_until`` synchronously, no chat task was scheduled
493+
# (deduped, dropped by the concurrency strategy, or the
494+
# message wasn't admitted for handling). Resolve the gate
495+
# immediately so ``await processing_done`` doesn't hang
496+
# forever — there is no in-flight handler to wait on.
497+
if not processing_done.done():
498+
processing_done.set_result(None)
477499
try:
478500
await processing_done
479501
except asyncio.CancelledError:

tests/test_teams_native_streaming.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,79 @@ class TestHandleMessageActivityLifecycle:
540540
"""Verify the message-activity → process_message → stream → close flow."""
541541

542542
@pytest.mark.asyncio
543-
async def test_dm_message_registers_session_and_closes_after_processing(self):
543+
async def test_caller_wait_until_raise_does_not_kill_native_streaming(self):
544+
"""A caller-supplied ``WebhookOptions.wait_until`` that raises must
545+
NOT tear down the DM streaming session before the chat task runs.
546+
547+
What to fix if this fails: in
548+
``src/chat_sdk/adapters/teams/adapter.py`` ``_chained_wait_until``,
549+
the call to the upstream ``wait_until`` must be wrapped in
550+
``try/except`` (and logged). Otherwise the synchronous raise
551+
escapes through ``Chat.process_message``, the outer ``try`` skips
552+
``await processing_done``, and the ``finally`` removes the session
553+
while the chat task is still scheduled. The handler's later
554+
``thread.stream()`` call would then miss native streaming and
555+
fall back to a normal post.
556+
"""
557+
adapter = _make_adapter()
558+
adapter._teams_send = AsyncMock(return_value={"id": "id-1"})
559+
560+
tid = _dm_thread_id(adapter)
561+
562+
# Build a chat that schedules the streaming task AND invokes
563+
# a deliberately-raising upstream wait_until.
564+
chat = MagicMock()
565+
chat.get_state = MagicMock(return_value=None)
566+
stream_calls: list[str] = []
567+
568+
def process_message(adapter_arg, thread_id, message, options):
569+
async def _do_stream():
570+
async def gen():
571+
yield "hi"
572+
573+
# Snapshot whether native streaming is still wired up at
574+
# the moment the chat task runs.
575+
stream_calls.append(
576+
"native" if thread_id in adapter_arg._active_streams else "fallback"
577+
)
578+
await adapter_arg.stream(thread_id, gen())
579+
580+
task = asyncio.get_running_loop().create_task(_do_stream())
581+
# Caller-supplied wait_until raises synchronously. The chained
582+
# wrapper must swallow this so processing_done still resolves.
583+
options.wait_until(task)
584+
585+
chat.process_message = process_message
586+
adapter._chat = chat
587+
588+
# Inject a raising upstream wait_until via WebhookOptions.
589+
from chat_sdk.types import WebhookOptions
590+
591+
def raising_wait_until(_task: Any) -> None:
592+
raise RuntimeError("caller wait_until exploded")
593+
594+
upstream_options = WebhookOptions(wait_until=raising_wait_until)
595+
596+
activity = {
597+
"type": "message",
598+
"id": "incoming-1",
599+
"text": "user said something",
600+
"from": {"id": "user-1", "name": "User One"},
601+
"conversation": {"id": "a:1Abc-DM-conversation-id"},
602+
"serviceUrl": "https://smba.trafficmanager.net/teams/",
603+
}
604+
605+
# Should NOT raise — the chained wrapper logs and continues.
606+
await adapter._handle_message_activity(activity, upstream_options)
607+
608+
# The streaming task ran while the session was still registered.
609+
assert stream_calls == ["native"], (
610+
"Caller wait_until raise tore down the session before the chat "
611+
"task ran; the handler fell back to a normal post instead of "
612+
"native Teams streaming"
613+
)
614+
# Session was cleaned up after the task finished.
615+
assert tid not in adapter._active_streams
544616
"""A DM message activity registers a session, awaits processing, then drops it."""
545617
adapter = _make_adapter()
546618
adapter._teams_send = AsyncMock(return_value={"id": "id-1"})

0 commit comments

Comments
 (0)