feat(teams): native streaming for DMs via emit (vercel/chat#416)#88
feat(teams): native streaming for DMs via emit (vercel/chat#416)#88patrick-chinchill wants to merge 8 commits intomainfrom
Conversation
Port upstream `feat(adapter-teams): native streaming for DMs via emit` (commit ed46bae). DM threads now dispatch chunks through the Bot Framework streaming protocol (typing activities with `channelData.streamType=streaming` + `streamSequence`, then a final `message` activity with `streamType=final`) instead of routing through `Thread._fallback_stream`. Group chats / channels accumulate the stream and post a single message — matching upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming. Lifecycle: - `_handle_message_activity` registers a `_TeamsStreamSession` for DMs and `await`s the chat handler so the session stays alive through `stream()`. The handler is fire-and-forget for non-DM threads. - `_stream_via_emit` emits cumulative text per chunk; `streamId` is captured from the first send and threaded through subsequent chunks (Hazard #7 — the first chunk omits the key entirely). - `_close_stream_session` posts the final activity. Skipped on cancellation or zero-chunk streams (no orphan finalizers). - Iterator exceptions cancel the session and re-raise; transient send failures (e.g. 429 mid-stream) cancel the session and return the partial RawMessage instead of bubbling up. Tests: `tests/test_teams_native_streaming.py` covers wire-format invariants, dispatch decisions, cancellation paths, error mid-stream, very-short streams, two concurrent DMs, and the end-to-end webhook → process_message → stream → close lifecycle. Existing `TestStream` cases in `test_teams_coverage.py` and `test_teams_extended.py` updated to reflect the new accumulate-and-post semantics for group chats. Non-parity: updated existing "Fallback streaming" rows in `docs/UPSTREAM_SYNC.md` to scope the divergence to non-Teams adapters, and added two new rows documenting Teams-specific divergences (group chats now accumulate-and-post; native streaming soft-cancels on send failure). https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR adds native Bot Framework streaming support for Teams direct messages. When a DM message arrives, a stream session is registered and kept alive during handler execution. The ChangesTeams Native Streaming Implementation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements native streaming for Microsoft Teams DMs using the Bot Framework protocol, while transitioning group chats and channels to an accumulate-and-post strategy to eliminate UI flicker. It introduces a session management system for tracking in-flight DM streams and includes comprehensive test coverage for the new streaming lifecycle. Feedback suggests simplifying the stream finalization logic by removing a redundant check for empty content, as the presence of a stream ID already implies that content was sent.
| """ | ||
| if session.canceled: | ||
| return | ||
| if session.stream_id is None or not session._text: # noqa: SLF001 |
There was a problem hiding this comment.
The check not session._text is redundant here. session.stream_id is only set when a chunk with content is sent, which also guarantees session._text will be non-empty. Therefore, checking for session.stream_id is None is sufficient.
| if session.stream_id is None or not session._text: # noqa: SLF001 | |
| if session.stream_id is None: |
Address gemini-code-assist review on PR #88 (line 1218). ``session.stream_id`` is only assigned after a successful chunk send, and empty chunks are skipped before the send call — so ``stream_id is not None`` implies ``_text`` is non-empty. Drop the redundant ``not session._text`` check; the single condition expresses the invariant more clearly. Behavior unchanged. Tests still pass. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
patrick-chinchill
left a comment
There was a problem hiding this comment.
Code review — feat(teams): native streaming for DMs via emit (vercel/chat#416)
Compared HEAD (58b7fc7, including the follow-up _text-check refactor) to upstream ed46bae at f55378a (packages/adapter-teams/src/index.ts). Walked the porting hazards (especially #5/#7) and the SELF_REVIEW adversarial checks (pass-interaction, emit/parse symmetry, sentinels).
✅ Looks good
- Hazard #7 (omit vs
None) is honored: the first chunk'schannelDatais constructed without astreamIdkey (if session.stream_id is not None: channel_data["streamId"] = ...), andtest_first_chunk_omits_stream_idpins the wire shape withassert "streamId" not in first_payload["channelData"]. streamSequencestarts at 1 and increments per emitted chunk (session.sequence += 1before building the payload, after the empty-skip), matching the Bot Framework protocol and pinned bytest_subsequent_chunks_carry_stream_id_and_increment_sequence.- DM detection matches upstream byte-for-byte — both Python
is_dmand TSisDMusenot conversationId.startsWith("19:")(notconversationType == "personal"as the brief implied). Group-chat IDs (19:...@thread.skype) correctly fall through to accumulate-and-post. - Invariant in the follow-up refactor holds:
stream_idis only set after a successful_teams_sendAND only whenresult.get("id")is non-empty (line 1174–1178); empty chunks are skipped beforeaccumulated += text. Sostream_id is not None ⇒ _text non-empty. Thenot session._textcheck was indeed redundant. - Cleanup-path safety: the finally in
_handle_message_activityusesif current is sessionbefore popping (so a concurrent re-registration on the same DM thread isn't clobbered), and_close_stream_sessionis wrapped in try/except so its failure can't leak the registry entry. Cancel/iterator-exception paths both callsession.cancel()before re-raising, so_close_stream_sessioncorrectly skips the final activity. - Soft-cancel-on-send-failure divergence is documented in
docs/UPSTREAM_SYNC.mdnon-parity table with rationale and tested bytest_emit_send_failure_cancels_session(verifies no exception bubbles,result.raw["text"]carries pre-failure content). - Group-chat accumulate-and-post is a parity behavior with upstream post-#416, not a divergence — the doc table row marks it "no divergence at the adapter level" for clarity, which is the right framing.
🟡 Medium
- Concurrent-DM test doesn't exercise
_active_streamsrace —test_two_concurrent_dm_streams_have_independent_sessionsconstructs two sessions for two distinct thread IDs and calls_stream_via_emitdirectly with explicit session args, bypassing_active_streamsentirely. The realistic race (two near-simultaneous webhooks for the same DM thread overwriting_active_streams[tid]) isn't covered. This race exists in upstream too (activeStreams.set(threadId, ctx.stream)overwrites identically), so it's parity behavior — but the test name oversells what's being verified. Suggest renaming or adding a same-thread test that drives two_handle_message_activitycalls underasyncio.gather. - First-chunk send returns
id="": if Teams accepts the typing activity but returns an empty id,session.stream_idstaysNone(line 1177 guard),_textbecomes non-empty, and_close_stream_sessionskips because of the (now-tightened)stream_id is Nonecheck. Net effect: user sees streamed chunks but no finalmessageactivity — the streaming UI stays running until Teams times it out client-side. Upstream has the same shape (messageId = "") but would still attempt the final send. Worth a one-line comment at the close-skip site explaining this, and ideally a test pinning the empty-idbehavior.
🔵 Nit
_chained_wait_untilinvokesupstream_wait_until(task)before_resolve_processing(task). If the caller-suppliedwait_untilraises, the exception propagates back throughprocess_message, hits the adapter's outer try, and falls intofinally(no hang —await processing_doneisn't reached). That's fine, but reversing the order — resolve first, then call upstream — would make the deadlock-immunity argument trivially obvious to a future reader._TeamsStreamSession._textis read from outside the class viasession._textwith# noqa: SLF001— a publictextattribute (or method) would be cleaner since the leading underscore isn't really enforcing privacy.
Posted by an automated reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code
- Add a same-thread concurrent _handle_message_activity test that exercises the realistic _active_streams race (two near-simultaneous DM webhooks for the same thread). Pins upstream's plain-Map clobber semantics: the second registration overwrites the first, both in-flight handlers observe the later session, and the registry ends empty after both finish. The original distinct-threads test is renamed to make clear it covers session ISOLATION, not the registry race. - Empty-id final-send fallback: when Teams accepts streaming chunks but returns id="" on the first activity, _close_stream_session now ships the final message anyway (omitting streamId from channelData) instead of skipping and leaving the streaming UI spinning until Teams times the session out client-side. Mirrors upstream's looser check (text non-empty → ship the final). Adds a regression test and a non-parity row in docs/UPSTREAM_SYNC.md. - _chained_wait_until: resolve our internal processing_done gate BEFORE invoking the caller-supplied waitUntil, so the deadlock-immunity argument is trivially obvious (a misbehaving upstream callback can't starve the await on processing_done). - _TeamsStreamSession: add a public read-only `text` property so external callers (now _close_stream_session) read through it instead of the underscore-prefixed _text attribute. _stream_via_emit retains the direct _text write as the canonical mutator.
patrick-chinchill
left a comment
There was a problem hiding this comment.
Re-review of fixes in 2e96fbb
Verified the four fixes against ed46bae (vercel/chat#416) and the broader adapter surface.
Verified — fixes land cleanly
test_same_thread_concurrent_handlers_clobber_active_stream(tests/test_teams_native_streaming.py:722) — drives two_handle_message_activitytasks for the same thread under a barrier, snapshots the registry post-overlap, and pins[second_session, second_session]. Matches upstream'sMapclobber semantics. The renamedtest_distinct_dm_threads_each_have_isolated_session_statemakes the isolation-vs-race split explicit.- Empty-
idfinal-send (adapter.py:1239–1271, test 317–362) —_close_stream_sessionnow ships the finalmessagewhentextis non-empty even ifstream_id is None, omittingstreamIdfromchannelData(rather than serializingNone— Hazard #7). Regression test asserts"streamId" not in final_payload["channelData"]. Non-parity row at UPSTREAM_SYNC.md:492 is accurate. _chained_wait_untilorder (adapter.py:462–471) —_resolve_processing(task)runs beforeupstream_wait_until(task), with a comment explaining the deadlock-immunity argument.- Public
textproperty (adapter.py:95–103, 1243, 1260) — read-onlytextaccessor exposed; reads in_close_stream_sessiongo through it; only the canonical mutator at adapter.py:1216 retains# noqa: SLF001. Tests still poke_textdirectly (lines 176, 300) — fine, they're inside the test module's own laxer scope and lint-clean.
Findings
Nit (parity gap, pre-existing across multiple adapters)
_stream_via_emit and the accumulate-and-post fallback both check isinstance(chunk, dict) and chunk.get("type") == "markdown_text" (adapter.py:1081, 1139). A real MarkdownTextChunk dataclass instance silently drops to "" because isinstance(MarkdownTextChunk(...), dict) is False. Other adapters (google_chat, whatsapp, slack, github) use hasattr(chunk, "type") for this. Same gap exists in discord/linear, so out of scope here, but worth a follow-up issue — the StreamChunk union in types.py:798 is dataclass-typed.
Nit (divergence not in UPSTREAM_SYNC.md)
_chained_wait_until resolves the internal gate before invoking the caller-supplied wait_until. Upstream TS does the opposite (baseOptions?.waitUntil?.(task) first, then task.then(...)). Functionally equivalent (both register callbacks; neither blocks), but it IS a deliberate ordering divergence and the commit message frames it as a hardening defense — consider a one-line entry in the non-parity table so future syncs don't "correct" it back.
Nit (forward-looking)
chained_options = WebhookOptions(wait_until=_chained_wait_until) (adapter.py:473) drops any other fields that may exist on the inbound options. Today WebhookOptions only has wait_until, so no observable bug — but upstream uses {...baseOptions, waitUntil: ...} for a reason. A replace(options, wait_until=...) (or explicit comment "WebhookOptions has only wait_until — re-spread if it grows") would future-proof.
Hunt results — no other issues
- Wire format:
streamSequence(1-indexed, ++ per emit),streamType("streaming"/"final"),streamId(omitted on first chunk + when server returned empty id), parallelstreaminfoentity — all match the Bot Framework streaming protocol that upstream'sIStreameremits under the hood. - Cleanup:
try/finallyin_handle_message_activitymirrors upstream'stry { await processingDone } finally { activeStreams.delete };asyncio.CancelledErrorcancels the session and re-raises (covered bytest_cancelled_error_propagates_and_marks_session_canceled). - DM detection:
is_dmchecks onlynot conversationId.startswith("19:")— matchesthread-id.ts:30exactly. - Test fidelity: upstream PR #416 added zero new
it()blocks inindex.test.ts; only assertion swaps inreplay-streaming.test.ts. Python's 23 native-streaming tests + 159 teams-streaming-related tests all pass; no missing TS coverage. - Hazard #5: no untracked tasks in the new code.
process_messagetracks via_active_tasks,_chained_wait_untilonly callsadd_done_callback, no barecreate_task/ensure_future.
Re-review verdict: PASS
The three nits are real but non-blocking (one is pre-existing, one is hardening with no observable behavior change, one is forward-looking). Fixes for the previous round all landed correctly.
Posted by an automated re-reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code
Final upstream-coverage audit before merging the 7 sync PRs (#84-#90) identified one undocumented N/A item: vercel/chat#415 (Teams SDK 2.0.8 + User-Agent) is a JS-only botbuilder dependency bump. The Python Teams adapter uses raw aiohttp (no botbuilder), so there is no equivalent dependency to bump. The optional User-Agent: Vercel.ChatSDK header on the ~9 outbound aiohttp call sites is a defense-in-depth nice-to-have; deferred as a follow-up rather than landed in this sync. Updates: - CHANGELOG.md: tick all completed items and link them to their PRs (#84, #85, #86, #87, #88, #89, #90, plus already-merged PR #74). Document #415 inline as N/A. - docs/UPSTREAM_SYNC.md non-parity table: add row for Teams User-Agent header divergence so future syncers don't try to "port" the JS bump. Item #6 (concurrency.maxConcurrent) is already implementation-covered in the Python port (existing divergence row at L492). The 4 new TS concurrency tests in chat.test.ts have Python-specific equivalents at test_chat_faithful.py L2969-3055 that don't name-match — leaving as deferred fidelity-baseline polish since the behavior is verified. Verdict from the coverage audit: all 18 substantive ports across PRs #84-#90 are upstream-verified. No commits in chat@4.26.0..f55378a were missed. Ready to start merging. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2e96fbb242
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| # task — the local ``await`` is purely so we know when to reap the | ||
| # session. | ||
| session = _TeamsStreamSession() | ||
| self._active_streams[thread_id] = session |
There was a problem hiding this comment.
Avoid clobbering active streams before message admission
With the default per-thread/drop concurrency path in Chat.handle_incoming_message, a second DM webhook for the same thread can reach this assignment before the chat task has acquired or failed the thread lock. That overwrites the first request's session even if the second message is later dropped, so if the first handler calls stream() during that window it will either emit into the wrong session or miss native streaming entirely; register the session only once the message is actually admitted for handling, or key it by the processing task/message.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Thanks — deferring this one as out-of-scope for the parity sync, with the rationale now inlined at the registration site (commit 6acc796).
Why per-thread keying is safe today:
- Mirrors upstream
activeStreams.set(threadId, …)inpackages/chat-teams/src/index.ts. Diverging here would be a Python-only deviation right inside the parity sync. Chat.handle_incoming_messagedefaults to a per-thread concurrency strategy that drops/dedupes overlapping webhooks for the same thread before any handler runs. So in the DM path, twostream()calls cannot legitimately race on the samethread_idunder the default strategy — the second message is rejected before it reaches the registration site.
The reachability the test exercises is real but only manifests under non-default concurrency strategies that allow two same-thread handlers to run concurrently. In that mode native streaming is already the wrong primitive (Teams expects a single sequential streamId per conversation), so the right fix is a per-handler ContextVar (or an explicit session parameter into stream()), which we'd rather land as a Python-only follow-up rather than slip into the 4.27 sync. Tracked.
Generated by Claude Code
| if not text: | ||
| continue | ||
|
|
||
| accumulated += text |
There was a problem hiding this comment.
Commit streamed text only after a successful emit
If _teams_send raises for this chunk, the exception handler below cancels the session and returns after accumulated already includes text that Teams rejected. In that mid-stream 429/network-error case the returned RawMessage and Thread._handle_stream's sent-message history can contain content the user never saw; build a candidate payload for the send and update accumulated/session._text only after the send succeeds.
Useful? React with 👍 / 👎.
Address Codex P2 finding on PR #88. The previous flow updated ``accumulated`` and ``session.sequence`` BEFORE awaiting ``_teams_send``, so a 429/network failure mid-stream would leave the partial RawMessage returned to the caller (and ``session._text`` feeding the final close- activity) carrying text that Teams never displayed to the user. Fix: - Build ``candidate_accumulated = accumulated + text`` and ``next_sequence = session.sequence + 1`` as candidates. - Send the activity with the candidate values. - Commit ``accumulated`` and ``session.sequence`` only after the send succeeds. The soft-cancel path (logged warn + session.cancel + break) now genuinely returns the high-water mark of successfully-sent text. Matches the documented "RawMessage carries pre-failure content" guarantee in ``docs/UPSTREAM_SYNC.md`` non-parity table. Updated test_emit_send_failure_cancels_session: previously asserted ``result.raw["text"] == "helloworld"`` (which contained the rejected "world" chunk); now asserts ``"hello"`` and ``session.sequence == 1``. Skipped Codex P2 #2 (active_streams clobber timing) — matches upstream TS behavior; deferring fix as a Python-only improvement would be a divergence and is design-significant. File as follow-up if desired. 23 native-streaming tests pass. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 07df4b778b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| # cannot starve due to a misbehaving caller-supplied hook. | ||
| _resolve_processing(task) | ||
| if upstream_wait_until is not None: | ||
| upstream_wait_until(task) |
There was a problem hiding this comment.
Handle wait_until errors without closing the stream early
When a caller-supplied WebhookOptions.wait_until raises synchronously, this exception escapes through Chat.process_message, so _handle_message_activity skips the await processing_done path and immediately runs the finally block that removes/closes the DM stream session while the chat task is still scheduled. In that environment the handler can continue and call thread.stream() after the session was removed, falling back to a normal post (and the webhook also errors) instead of using native Teams streaming; catch/log the upstream wait_until failure after registering your own callback so the local session still stays alive until the task finishes.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/chat_sdk/adapters/teams/adapter.py`:
- Around line 437-493: _active_streams is keyed only by thread_id causing
concurrent handlers on the same thread to overwrite each other's
_TeamsStreamSession; instead of registering the session in the shared dict and
re-resolving by thread_id, keep the session in handler-local context (e.g. a
ContextVar) or pass the session explicitly into stream-related calls so each
webhook handler uses its own _TeamsStreamSession instance; update code paths
that access _active_streams (creation in this block, stream() consumer, and
cleanup via _close_stream_session) to use the handler-local session, ensure
_chained_wait_until / processing_done still wire to that session, and remove or
restrict shared lookups by thread_id to avoid stale/overwritten sessions.
- Around line 442-478: The webhook can deadlock if Chat.process_message never
creates a task (so _chained_wait_until is never called) because processing_done
remains unresolved; after the call to self._chat.process_message(self,
thread_id, message, chained_options) ensure processing_done is always unblocked
by adding a finally block that (1) catches asyncio.CancelledError around the
await processing_done to call session.cancel() and re-raise, and (2) after the
try/except/cleanup, checks if processing_done.done() is False and calls
processing_done.set_result(None) to guarantee resolution; reference
_resolve_processing, _chained_wait_until, WebhookOptions.wait_until,
processing_done, and self._chat.process_message when applying this change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: c107c5b6-a50f-47f3-8e14-0ee87262958c
📒 Files selected for processing (5)
docs/UPSTREAM_SYNC.mdsrc/chat_sdk/adapters/teams/adapter.pytests/test_teams_coverage.pytests/test_teams_extended.pytests/test_teams_native_streaming.py
| session = _TeamsStreamSession() | ||
| self._active_streams[thread_id] = session | ||
| loop = asyncio.get_running_loop() | ||
| processing_done: asyncio.Future[None] = loop.create_future() | ||
|
|
||
| def _resolve_processing(task: Awaitable[Any]) -> None: | ||
| # ``WebhookOptions.wait_until`` receives the chat task; we hook | ||
| # done so we can release ``processing_done`` regardless of | ||
| # success/failure (mirrors the upstream ``task.then(resolve, | ||
| # resolve)`` pattern). | ||
| if isinstance(task, asyncio.Task): | ||
|
|
||
| def _on_done(_t: asyncio.Task[Any]) -> None: | ||
| if not processing_done.done(): | ||
| processing_done.set_result(None) | ||
|
|
||
| task.add_done_callback(_on_done) | ||
| elif not processing_done.done(): | ||
| # Non-Task awaitables are uncommon on this path, but if we | ||
| # ever get one we still need to unblock — resolve eagerly | ||
| # so we don't deadlock the webhook handler. | ||
| processing_done.set_result(None) | ||
|
|
||
| upstream_wait_until = options.wait_until if options is not None else None | ||
|
|
||
| def _chained_wait_until(task: Awaitable[Any]) -> None: | ||
| # Resolve our own gate FIRST, before invoking the upstream | ||
| # ``wait_until`` callback. This way, even if the upstream | ||
| # callback raises, blocks, or never fires, ``processing_done`` | ||
| # is still wired up — making the deadlock-immunity argument | ||
| # trivially obvious: the await on ``processing_done`` below | ||
| # cannot starve due to a misbehaving caller-supplied hook. | ||
| _resolve_processing(task) | ||
| if upstream_wait_until is not None: | ||
| upstream_wait_until(task) | ||
|
|
||
| chained_options = WebhookOptions(wait_until=_chained_wait_until) | ||
|
|
||
| try: | ||
| self._chat.process_message(self, thread_id, message, chained_options) | ||
| try: | ||
| await processing_done | ||
| except asyncio.CancelledError: | ||
| # Caller cancelled the webhook handler — propagate cancel | ||
| # into the streaming session so any in-flight ``stream()`` | ||
| # exits cleanly without sending more chunks. | ||
| session.cancel() | ||
| raise | ||
| finally: | ||
| # Always close the session — sending a final activity if any | ||
| # chunks were emitted — and drop the registry entry so a | ||
| # subsequent message can register fresh. | ||
| current = self._active_streams.get(thread_id) | ||
| if current is session: | ||
| self._active_streams.pop(thread_id, None) | ||
| try: | ||
| await self._close_stream_session(thread_id, session) |
There was a problem hiding this comment.
Isolate native stream sessions per handler, not per thread.
_active_streams is keyed only by thread_id, and stream() resolves the session from that shared dict. If two DM handlers overlap on the same thread, the second registration overwrites the first, so both handlers can emit through the same _TeamsStreamSession. That corrupts streamSequence/streamId, and the first handler’s finally ends up closing an empty or stale session. The new same-thread overlap test shows this overwrite is reachable, so this will leak into user-visible streams under concurrency. Prefer carrying the session in handler-local context (for example a ContextVar or an explicit parameter) instead of re-looking it up by thread ID.
Also applies to: 1070-1072
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/chat_sdk/adapters/teams/adapter.py` around lines 437 - 493,
_active_streams is keyed only by thread_id causing concurrent handlers on the
same thread to overwrite each other's _TeamsStreamSession; instead of
registering the session in the shared dict and re-resolving by thread_id, keep
the session in handler-local context (e.g. a ContextVar) or pass the session
explicitly into stream-related calls so each webhook handler uses its own
_TeamsStreamSession instance; update code paths that access _active_streams
(creation in this block, stream() consumer, and cleanup via
_close_stream_session) to use the handler-local session, ensure
_chained_wait_until / processing_done still wire to that session, and remove or
restrict shared lookups by thread_id to avoid stale/overwritten sessions.
There was a problem hiding this comment.
Acknowledged — deferring the per-handler isolation refactor as a Python-only follow-up rather than landing it inside the parity sync. Inlined the rationale next to the registration site in commit 6acc796 so future readers find it without digging through review history.
Short version:
_active_streams[thread_id] = sessionmirrors upstreamactiveStreams.set(threadId, …)(packages/chat-teams/src/index.ts). The same per-thread keying invariant that holds upstream holds here.Chat.handle_incoming_messageserializes DM handlers per thread under the default concurrency config (overlapping webhooks for the same thread are deduped/dropped before they reach a handler), so twostream()calls on the samethread_iddon't co-exist in the supported deployment shape.- The same-thread-overlap test reaches the overwrite only under a non-default concurrency strategy, where Teams' single-
streamId-per-conversation contract already makes native streaming the wrong primitive. The correct long-term fix is a per-handlerContextVar(or threading the session throughstream()explicitly), which is a heavy-lift Python-only divergence we'd rather do as a standalone change after the sync lands. Tracked as a follow-up.
Generated by Claude Code
There was a problem hiding this comment.
@patrick-chinchill, understood — the reasoning holds:
- Upstream parity: the
thread_id-keyed dict directly mirrorsactiveStreams.set(threadId, …)in the TypeScript adapter, so the semantics are intentionally shared. - Default concurrency guard:
Chat.handle_incoming_message's per-thread serialization in the default config means two concurrent DM handlers on the same thread aren't a reachable state in the supported deployment shape — the overwrite is only exercisable under a non-default strategy. - Wrong primitive anyway: under any concurrency config that would allow overlap, Teams' single-
streamId-per-conversation constraint means native streaming is already the wrong tool, so theContextVar/explicit-parameter refactor belongs in a separate change where the full design can be thought through.
Inlining the rationale next to the registration site in 6acc796 is the right call — it short-circuits the re-discovery cycle for any future reviewer. Happy to pick this up as a follow-up once the parity sync lands.
✏️ Learnings added
Learnt from: patrick-chinchill
Repo: Chinchill-AI/chat-sdk-python PR: 88
File: src/chat_sdk/adapters/teams/adapter.py:437-535
Timestamp: 2026-05-10T07:50:13.594Z
Learning: In `src/chat_sdk/adapters/teams/adapter.py`, `TeamsAdapter._active_streams` is intentionally keyed by `thread_id` (mirroring upstream TypeScript `activeStreams.set(threadId, …)` in `packages/chat-teams/src/index.ts`). Same-thread DM handler overlap is prevented by `Chat.handle_incoming_message`'s per-thread serialization under the default concurrency config (overlapping webhooks are deduped/dropped before reaching the handler). The per-handler `ContextVar` isolation refactor is deferred as a Python-only follow-up and should not be flagged as a blocking issue in this PR.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
Address CodeRabbit Critical on PR #88. ``Chat.process_message`` invokes ``wait_until`` synchronously only if a chat task is actually scheduled. If the message is deduped, dropped by the concurrency strategy, or otherwise short-circuited, no task is created and ``wait_until`` is never called — leaving ``processing_done`` unresolved and ``await processing_done`` hanging the webhook indefinitely. After ``process_message`` returns, check whether ``processing_done`` was resolved (which can only happen via ``_chained_wait_until``, which is only invoked when a task is scheduled). If not, resolve immediately — there's no in-flight handler to wait on, no streaming will happen, and ``await processing_done`` should fall through to the ``finally`` cleanup. Also addresses Codex P2 (lines 471): caller-supplied ``WebhookOptions.wait_until`` raising synchronously now logs and continues instead of escaping through ``Chat.process_message``, preventing the same-class deadlock-via-finally issue. Both fixes together close the two paths where the streaming session could be torn down while the chat task was still scheduled (or never scheduled): a synchronous wait_until exception, and a deduped/dropped message. Tests: - New ``test_caller_wait_until_raise_does_not_kill_native_streaming`` verifies the wait_until-raise path keeps the session alive long enough for the chat task to call ``thread.stream()`` natively. - Existing 23 native-streaming tests still pass. Skipped CodeRabbit's other comment (``_active_streams`` thread-id keying) — same as Codex P2 #2, matches upstream TS behavior and would be a heavy-lift Python-only divergence. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
…messages The previous fix (17009a9) used ``processing_done.done()`` to detect whether a chat task was scheduled, but ``_resolve_processing`` registers a ``done_callback`` on the task that fires only on task COMPLETION — not at the moment ``wait_until`` is invoked. So my check prematurely resolved ``processing_done`` immediately after ``process_message`` returned (before the task had a chance to run), which broke 2 tests: - test_caller_wait_until_raise_does_not_kill_native_streaming - test_same_thread_concurrent_handlers_clobber_active_stream Fix: track a separate ``wait_until_invoked`` flag inside ``_chained_wait_until``. After ``process_message`` returns, only resolve ``processing_done`` if ``wait_until`` was NEVER called — that's the true signal that no chat task was scheduled (deduped/dropped). 23 native-streaming tests pass; full suite passes (modulo the pre-existing unrelated github_webhook failure). https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Three separate review bots flagged the thread-id keying of ``_active_streams`` as a potential cross-handler corruption risk on the DM streaming path. Add an inline comment at the registration site pointing readers at the upstream parallel and the concurrency invariant that makes per-thread keying safe today, plus a note that a per-handler ContextVar is a tracked Python-only follow-up rather than in-scope for the parity sync.
Summary
Ports upstream vercel/chat#416 (commit
ed46bae) —feat(adapter-teams): native streaming for DMs via emit— into the Python Teams adapter.DM threads now dispatch chunks through the Bot Framework streaming protocol (typing activities with
channelData.streamType=streaming+streamSequence, then a finalmessageactivity withstreamType=final) instead of routing throughThread._fallback_stream. Group chats / channels accumulate the stream and post a single message — matching upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming.Lifecycle (hazard #5: async task lifecycle)
_handle_message_activityregisters a_TeamsStreamSessionfor DMs andawaits the chat handler so the session stays alive throughstream(). The handler is fire-and-forget for non-DM threads._stream_via_emitemits cumulative text per chunk;streamIdis captured from the first send and threaded through subsequent chunks (hazard fix: launch must-fix items — security, perf, docs #7 — the first chunk omits the key entirely)._close_stream_sessionposts the final activity. Skipped on cancellation or zero-chunk streams (no orphan finalizers).RawMessageinstead of bubbling up.Non-parity updates
docs/UPSTREAM_SYNC.md: scoped existing "Fallback streaming" rows to non-Teams adapters, and added two new Teams-specific divergences:RawMessagerather than re-raising).Tests
tests/test_teams_native_streaming.py(665 LOC, new) covers:channelData.streamType/streamSequenceshape, omitted vs presentstreamId).streamId/ sequence).process_message→stream()→_close_stream_sessionlifecycle.Existing
TestStreamcases intest_teams_coverage.pyandtest_teams_extended.pyupdated for the new accumulate-and-post semantics on group chats.Test plan
uv run ruff check src/ tests/ scripts/uv run ruff format --check src/ tests/ scripts/uv run python scripts/audit_test_quality.pyuv run pytest tests/ --tb=short -qUpstream ref: vercel/chat#416 (commit
ed46bae)https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Generated by Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests