Skip to content

feat(teams): native streaming for DMs via emit (vercel/chat#416)#88

Open
patrick-chinchill wants to merge 8 commits intomainfrom
claude/port-teams-native-dm-streaming-J7S7H
Open

feat(teams): native streaming for DMs via emit (vercel/chat#416)#88
patrick-chinchill wants to merge 8 commits intomainfrom
claude/port-teams-native-dm-streaming-J7S7H

Conversation

@patrick-chinchill
Copy link
Copy Markdown
Collaborator

@patrick-chinchill patrick-chinchill commented May 9, 2026

Summary

Ports upstream vercel/chat#416 (commit ed46bae) — feat(adapter-teams): native streaming for DMs via emit — into the Python Teams adapter.

DM threads now dispatch chunks through the Bot Framework streaming protocol (typing activities with channelData.streamType=streaming + streamSequence, then a final message activity with streamType=final) instead of routing through Thread._fallback_stream. Group chats / channels accumulate the stream and post a single message — matching upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming.

Lifecycle (hazard #5: async task lifecycle)

  • _handle_message_activity registers a _TeamsStreamSession for DMs and awaits the chat handler so the session stays alive through stream(). The handler is fire-and-forget for non-DM threads.
  • _stream_via_emit emits cumulative text per chunk; streamId is captured from the first send and threaded through subsequent chunks (hazard fix: launch must-fix items — security, perf, docs #7 — the first chunk omits the key entirely).
  • _close_stream_session posts the final activity. Skipped on cancellation or zero-chunk streams (no orphan finalizers).
  • Iterator exceptions cancel the session and re-raise; transient send failures (e.g. 429 mid-stream) cancel the session and return the partial RawMessage instead of bubbling up.

Non-parity updates

docs/UPSTREAM_SYNC.md: scoped existing "Fallback streaming" rows to non-Teams adapters, and added two new Teams-specific divergences:

  • Group chats accumulate-and-post (vs upstream's chunked-edit fallback).
  • Native streaming soft-cancels on transient send failure (returns partial RawMessage rather than re-raising).

Tests

tests/test_teams_native_streaming.py (665 LOC, new) covers:

  • Wire-format invariants (channelData.streamType / streamSequence shape, omitted vs present streamId).
  • Dispatch decisions (DM → native, group chat → accumulate-and-post).
  • Cancellation paths (caller cancels mid-stream, indicator clears).
  • Error mid-stream (iterator raises, send fails 429).
  • Very-short streams (one chunk).
  • Two concurrent DMs (no cross-contamination of streamId / sequence).
  • End-to-end webhook → process_messagestream()_close_stream_session lifecycle.

Existing TestStream cases in test_teams_coverage.py and test_teams_extended.py updated for the new accumulate-and-post semantics on group chats.

Test plan

  • uv run ruff check src/ tests/ scripts/
  • uv run ruff format --check src/ tests/ scripts/
  • uv run python scripts/audit_test_quality.py
  • uv run pytest tests/ --tb=short -q

Upstream ref: vercel/chat#416 (commit ed46bae)

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj


Generated by Claude Code

Summary by CodeRabbit

  • New Features

    • Real-time native streaming for Teams Direct Messages with incremental updates; group chats/channels continue to accumulate and post full messages; DM fallback to accumulate-and-post when no active session.
  • Bug Fixes

    • Improved handling of whitespace-only streams and send failures so placeholders and final messages reflect what users saw; soft-cancel behavior prevents erroneous final posts.
  • Documentation

    • Expanded docs detailing Teams streaming behaviors and SDK divergences.
  • Tests

    • Added and strengthened Teams streaming tests.

Review Change Stack

Port upstream `feat(adapter-teams): native streaming for DMs via emit`
(commit ed46bae). DM threads now dispatch chunks through the Bot
Framework streaming protocol (typing activities with
`channelData.streamType=streaming` + `streamSequence`, then a final
`message` activity with `streamType=final`) instead of routing through
`Thread._fallback_stream`. Group chats / channels accumulate the stream
and post a single message — matching upstream's post-#416 behavior of
avoiding the post+edit flicker where Teams doesn't support native
streaming.

Lifecycle:
- `_handle_message_activity` registers a `_TeamsStreamSession` for DMs
  and `await`s the chat handler so the session stays alive through
  `stream()`. The handler is fire-and-forget for non-DM threads.
- `_stream_via_emit` emits cumulative text per chunk; `streamId` is
  captured from the first send and threaded through subsequent chunks
  (Hazard #7 — the first chunk omits the key entirely).
- `_close_stream_session` posts the final activity. Skipped on
  cancellation or zero-chunk streams (no orphan finalizers).
- Iterator exceptions cancel the session and re-raise; transient send
  failures (e.g. 429 mid-stream) cancel the session and return the
  partial RawMessage instead of bubbling up.

Tests: `tests/test_teams_native_streaming.py` covers wire-format
invariants, dispatch decisions, cancellation paths, error mid-stream,
very-short streams, two concurrent DMs, and the end-to-end
webhook → process_message → stream → close lifecycle. Existing
`TestStream` cases in `test_teams_coverage.py` and `test_teams_extended.py`
updated to reflect the new accumulate-and-post semantics for group
chats.

Non-parity: updated existing "Fallback streaming" rows in
`docs/UPSTREAM_SYNC.md` to scope the divergence to non-Teams adapters,
and added two new rows documenting Teams-specific divergences (group
chats now accumulate-and-post; native streaming soft-cancels on send
failure).

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 9, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 63ee047a-61f4-48b8-8729-312d4e4128dd

📥 Commits

Reviewing files that changed from the base of the PR and between 07df4b7 and 6acc796.

📒 Files selected for processing (2)
  • src/chat_sdk/adapters/teams/adapter.py
  • tests/test_teams_native_streaming.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/test_teams_native_streaming.py

📝 Walkthrough

Walkthrough

This PR adds native Bot Framework streaming support for Teams direct messages. When a DM message arrives, a stream session is registered and kept alive during handler execution. The stream() method then chooses between native incremental typing activities (if a session exists) or accumulate-and-post fallback (group chats always use accumulate-and-post). Comprehensive test coverage validates wire format, dispatch routing, error handling, lifecycle, and concurrency semantics.

Changes

Teams Native Streaming Implementation

Layer / File(s) Summary
Session State & Data Structure
src/chat_sdk/adapters/teams/adapter.py
_TeamsStreamSession class tracks streaming session metadata (stream IDs, sequence counters, cancellation flag, accumulated text). TeamsAdapter._active_streams registry stores active sessions keyed by thread ID.
Native Streaming Implementation
src/chat_sdk/adapters/teams/adapter.py
_stream_via_emit emits incremental typing activities per chunk using Bot Framework streaming protocol with streamSequence increments and streaminfo entities. Soft-cancels on send failures and records accumulated text only after successful sends. _close_stream_session finalizes streams.
Stream Dispatch & Webhook Integration
src/chat_sdk/adapters/teams/adapter.py
stream() selects native streaming when active uncanceled session exists; otherwise falls back to accumulate-and-post. _handle_message_activity registers sessions for DMs, wraps handler completion gates to prevent deadlocks, awaits handler execution to keep sessions alive, and ensures cleanup on exceptions. Group chats remain fire-and-forget.
Test Coverage: Wire Format, Dispatch, Errors & Lifecycle
tests/test_teams_native_streaming.py
Validates native streaming payload structure (streamId omission on first chunk, streamSequence increments, cumulative text), dispatch routing (DM with session → typing activities, DM without session → accumulate-and-post message, channels → always accumulate-and-post), error semantics (iterator exceptions cancel and re-raise, send failures soft-cancel without bubbling), session lifecycle (register/close around process_message, exception cleanup), and concurrency (isolated state across threads, last-registration-wins clobber on same thread).
Test Updates: Existing Coverage
tests/test_teams_coverage.py, tests/test_teams_extended.py
Strengthened assertions for group chat accumulation (single _teams_send, zero _teams_update, fully concatenated text), empty stream handling (no sends, empty returned message), and DM behavior consistency.
Documentation: Streaming Non-Parities
docs/UPSTREAM_SYNC.md
Clarified and expanded Teams streaming behavior divergences: fallback behavior for whitespace-only streams, exception capture in fallback, remend vs raw content in final message, Teams-specific accumulate-and-post for group chats, DM soft-cancel on send failures, and final-send behavior when first chunk's activity ID is empty.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 I bounced a stream through typing and light,

chunks tapped like raindrops in orderly flight,
DMs whispered pieces, groups posted once true,
soft-cancel curled gently where failures blew,
the session hopped home when the handler said adieu.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 43.06% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(teams): native streaming for DMs via emit (vercel/chat#416)' directly and clearly describes the main change: adding native streaming support for Teams DMs using the emit mechanism.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/port-teams-native-dm-streaming-J7S7H

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements native streaming for Microsoft Teams DMs using the Bot Framework protocol, while transitioning group chats and channels to an accumulate-and-post strategy to eliminate UI flicker. It introduces a session management system for tracking in-flight DM streams and includes comprehensive test coverage for the new streaming lifecycle. Feedback suggests simplifying the stream finalization logic by removing a redundant check for empty content, as the presence of a stream ID already implies that content was sent.

Comment thread src/chat_sdk/adapters/teams/adapter.py Outdated
"""
if session.canceled:
return
if session.stream_id is None or not session._text: # noqa: SLF001
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check not session._text is redundant here. session.stream_id is only set when a chunk with content is sent, which also guarantees session._text will be non-empty. Therefore, checking for session.stream_id is None is sufficient.

Suggested change
if session.stream_id is None or not session._text: # noqa: SLF001
if session.stream_id is None:

Address gemini-code-assist review on PR #88 (line 1218).
``session.stream_id`` is only assigned after a successful chunk send, and
empty chunks are skipped before the send call — so ``stream_id is not None``
implies ``_text`` is non-empty. Drop the redundant ``not session._text``
check; the single condition expresses the invariant more clearly.

Behavior unchanged. Tests still pass.

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Copy link
Copy Markdown
Collaborator Author

@patrick-chinchill patrick-chinchill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review — feat(teams): native streaming for DMs via emit (vercel/chat#416)

Compared HEAD (58b7fc7, including the follow-up _text-check refactor) to upstream ed46bae at f55378a (packages/adapter-teams/src/index.ts). Walked the porting hazards (especially #5/#7) and the SELF_REVIEW adversarial checks (pass-interaction, emit/parse symmetry, sentinels).

✅ Looks good

  • Hazard #7 (omit vs None) is honored: the first chunk's channelData is constructed without a streamId key (if session.stream_id is not None: channel_data["streamId"] = ...), and test_first_chunk_omits_stream_id pins the wire shape with assert "streamId" not in first_payload["channelData"].
  • streamSequence starts at 1 and increments per emitted chunk (session.sequence += 1 before building the payload, after the empty-skip), matching the Bot Framework protocol and pinned by test_subsequent_chunks_carry_stream_id_and_increment_sequence.
  • DM detection matches upstream byte-for-byte — both Python is_dm and TS isDM use not conversationId.startsWith("19:") (not conversationType == "personal" as the brief implied). Group-chat IDs (19:...@thread.skype) correctly fall through to accumulate-and-post.
  • Invariant in the follow-up refactor holds: stream_id is only set after a successful _teams_send AND only when result.get("id") is non-empty (line 1174–1178); empty chunks are skipped before accumulated += text. So stream_id is not None ⇒ _text non-empty. The not session._text check was indeed redundant.
  • Cleanup-path safety: the finally in _handle_message_activity uses if current is session before popping (so a concurrent re-registration on the same DM thread isn't clobbered), and _close_stream_session is wrapped in try/except so its failure can't leak the registry entry. Cancel/iterator-exception paths both call session.cancel() before re-raising, so _close_stream_session correctly skips the final activity.
  • Soft-cancel-on-send-failure divergence is documented in docs/UPSTREAM_SYNC.md non-parity table with rationale and tested by test_emit_send_failure_cancels_session (verifies no exception bubbles, result.raw["text"] carries pre-failure content).
  • Group-chat accumulate-and-post is a parity behavior with upstream post-#416, not a divergence — the doc table row marks it "no divergence at the adapter level" for clarity, which is the right framing.

🟡 Medium

  • Concurrent-DM test doesn't exercise _active_streams racetest_two_concurrent_dm_streams_have_independent_sessions constructs two sessions for two distinct thread IDs and calls _stream_via_emit directly with explicit session args, bypassing _active_streams entirely. The realistic race (two near-simultaneous webhooks for the same DM thread overwriting _active_streams[tid]) isn't covered. This race exists in upstream too (activeStreams.set(threadId, ctx.stream) overwrites identically), so it's parity behavior — but the test name oversells what's being verified. Suggest renaming or adding a same-thread test that drives two _handle_message_activity calls under asyncio.gather.
  • First-chunk send returns id="": if Teams accepts the typing activity but returns an empty id, session.stream_id stays None (line 1177 guard), _text becomes non-empty, and _close_stream_session skips because of the (now-tightened) stream_id is None check. Net effect: user sees streamed chunks but no final message activity — the streaming UI stays running until Teams times it out client-side. Upstream has the same shape (messageId = "") but would still attempt the final send. Worth a one-line comment at the close-skip site explaining this, and ideally a test pinning the empty-id behavior.

🔵 Nit

  • _chained_wait_until invokes upstream_wait_until(task) before _resolve_processing(task). If the caller-supplied wait_until raises, the exception propagates back through process_message, hits the adapter's outer try, and falls into finally (no hang — await processing_done isn't reached). That's fine, but reversing the order — resolve first, then call upstream — would make the deadlock-immunity argument trivially obvious to a future reader.
  • _TeamsStreamSession._text is read from outside the class via session._text with # noqa: SLF001 — a public text attribute (or method) would be cleaner since the leading underscore isn't really enforcing privacy.

Posted by an automated reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj


Generated by Claude Code

- Add a same-thread concurrent _handle_message_activity test that exercises
  the realistic _active_streams race (two near-simultaneous DM webhooks for
  the same thread). Pins upstream's plain-Map clobber semantics: the second
  registration overwrites the first, both in-flight handlers observe the
  later session, and the registry ends empty after both finish. The original
  distinct-threads test is renamed to make clear it covers session
  ISOLATION, not the registry race.

- Empty-id final-send fallback: when Teams accepts streaming chunks but
  returns id="" on the first activity, _close_stream_session now ships the
  final message anyway (omitting streamId from channelData) instead of
  skipping and leaving the streaming UI spinning until Teams times the
  session out client-side. Mirrors upstream's looser check (text non-empty
  → ship the final). Adds a regression test and a non-parity row in
  docs/UPSTREAM_SYNC.md.

- _chained_wait_until: resolve our internal processing_done gate BEFORE
  invoking the caller-supplied waitUntil, so the deadlock-immunity argument
  is trivially obvious (a misbehaving upstream callback can't starve the
  await on processing_done).

- _TeamsStreamSession: add a public read-only `text` property so external
  callers (now _close_stream_session) read through it instead of the
  underscore-prefixed _text attribute. _stream_via_emit retains the direct
  _text write as the canonical mutator.
Copy link
Copy Markdown
Collaborator Author

@patrick-chinchill patrick-chinchill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review of fixes in 2e96fbb

Verified the four fixes against ed46bae (vercel/chat#416) and the broader adapter surface.

Verified — fixes land cleanly

  1. test_same_thread_concurrent_handlers_clobber_active_stream (tests/test_teams_native_streaming.py:722) — drives two _handle_message_activity tasks for the same thread under a barrier, snapshots the registry post-overlap, and pins [second_session, second_session]. Matches upstream's Map clobber semantics. The renamed test_distinct_dm_threads_each_have_isolated_session_state makes the isolation-vs-race split explicit.
  2. Empty-id final-send (adapter.py:1239–1271, test 317–362) — _close_stream_session now ships the final message when text is non-empty even if stream_id is None, omitting streamId from channelData (rather than serializing None — Hazard #7). Regression test asserts "streamId" not in final_payload["channelData"]. Non-parity row at UPSTREAM_SYNC.md:492 is accurate.
  3. _chained_wait_until order (adapter.py:462–471) — _resolve_processing(task) runs before upstream_wait_until(task), with a comment explaining the deadlock-immunity argument.
  4. Public text property (adapter.py:95–103, 1243, 1260) — read-only text accessor exposed; reads in _close_stream_session go through it; only the canonical mutator at adapter.py:1216 retains # noqa: SLF001. Tests still poke _text directly (lines 176, 300) — fine, they're inside the test module's own laxer scope and lint-clean.

Findings

Nit (parity gap, pre-existing across multiple adapters)
_stream_via_emit and the accumulate-and-post fallback both check isinstance(chunk, dict) and chunk.get("type") == "markdown_text" (adapter.py:1081, 1139). A real MarkdownTextChunk dataclass instance silently drops to "" because isinstance(MarkdownTextChunk(...), dict) is False. Other adapters (google_chat, whatsapp, slack, github) use hasattr(chunk, "type") for this. Same gap exists in discord/linear, so out of scope here, but worth a follow-up issue — the StreamChunk union in types.py:798 is dataclass-typed.

Nit (divergence not in UPSTREAM_SYNC.md)
_chained_wait_until resolves the internal gate before invoking the caller-supplied wait_until. Upstream TS does the opposite (baseOptions?.waitUntil?.(task) first, then task.then(...)). Functionally equivalent (both register callbacks; neither blocks), but it IS a deliberate ordering divergence and the commit message frames it as a hardening defense — consider a one-line entry in the non-parity table so future syncs don't "correct" it back.

Nit (forward-looking)
chained_options = WebhookOptions(wait_until=_chained_wait_until) (adapter.py:473) drops any other fields that may exist on the inbound options. Today WebhookOptions only has wait_until, so no observable bug — but upstream uses {...baseOptions, waitUntil: ...} for a reason. A replace(options, wait_until=...) (or explicit comment "WebhookOptions has only wait_until — re-spread if it grows") would future-proof.

Hunt results — no other issues

  • Wire format: streamSequence (1-indexed, ++ per emit), streamType ("streaming"/"final"), streamId (omitted on first chunk + when server returned empty id), parallel streaminfo entity — all match the Bot Framework streaming protocol that upstream's IStreamer emits under the hood.
  • Cleanup: try/finally in _handle_message_activity mirrors upstream's try { await processingDone } finally { activeStreams.delete }; asyncio.CancelledError cancels the session and re-raises (covered by test_cancelled_error_propagates_and_marks_session_canceled).
  • DM detection: is_dm checks only not conversationId.startswith("19:") — matches thread-id.ts:30 exactly.
  • Test fidelity: upstream PR #416 added zero new it() blocks in index.test.ts; only assertion swaps in replay-streaming.test.ts. Python's 23 native-streaming tests + 159 teams-streaming-related tests all pass; no missing TS coverage.
  • Hazard #5: no untracked tasks in the new code. process_message tracks via _active_tasks, _chained_wait_until only calls add_done_callback, no bare create_task / ensure_future.

Re-review verdict: PASS

The three nits are real but non-blocking (one is pre-existing, one is hardening with no observable behavior change, one is forward-looking). Fixes for the previous round all landed correctly.

Posted by an automated re-reviewer agent. https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj


Generated by Claude Code

patrick-chinchill pushed a commit that referenced this pull request May 10, 2026
Final upstream-coverage audit before merging the 7 sync PRs (#84-#90)
identified one undocumented N/A item:

vercel/chat#415 (Teams SDK 2.0.8 + User-Agent) is a JS-only botbuilder
dependency bump. The Python Teams adapter uses raw aiohttp (no
botbuilder), so there is no equivalent dependency to bump. The optional
User-Agent: Vercel.ChatSDK header on the ~9 outbound aiohttp call sites
is a defense-in-depth nice-to-have; deferred as a follow-up rather than
landed in this sync.

Updates:
- CHANGELOG.md: tick all completed items and link them to their PRs
  (#84, #85, #86, #87, #88, #89, #90, plus already-merged PR #74).
  Document #415 inline as N/A.
- docs/UPSTREAM_SYNC.md non-parity table: add row for Teams User-Agent
  header divergence so future syncers don't try to "port" the JS bump.

Item #6 (concurrency.maxConcurrent) is already implementation-covered
in the Python port (existing divergence row at L492). The 4 new TS
concurrency tests in chat.test.ts have Python-specific equivalents at
test_chat_faithful.py L2969-3055 that don't name-match — leaving as
deferred fidelity-baseline polish since the behavior is verified.

Verdict from the coverage audit: all 18 substantive ports across PRs
#84-#90 are upstream-verified. No commits in chat@4.26.0..f55378a were
missed. Ready to start merging.

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
@patrick-chinchill patrick-chinchill marked this pull request as ready for review May 10, 2026 03:47
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2e96fbb242

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

# task — the local ``await`` is purely so we know when to reap the
# session.
session = _TeamsStreamSession()
self._active_streams[thread_id] = session
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid clobbering active streams before message admission

With the default per-thread/drop concurrency path in Chat.handle_incoming_message, a second DM webhook for the same thread can reach this assignment before the chat task has acquired or failed the thread lock. That overwrites the first request's session even if the second message is later dropped, so if the first handler calls stream() during that window it will either emit into the wrong session or miss native streaming entirely; register the session only once the message is actually admitted for handling, or key it by the processing task/message.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — deferring this one as out-of-scope for the parity sync, with the rationale now inlined at the registration site (commit 6acc796).

Why per-thread keying is safe today:

  • Mirrors upstream activeStreams.set(threadId, …) in packages/chat-teams/src/index.ts. Diverging here would be a Python-only deviation right inside the parity sync.
  • Chat.handle_incoming_message defaults to a per-thread concurrency strategy that drops/dedupes overlapping webhooks for the same thread before any handler runs. So in the DM path, two stream() calls cannot legitimately race on the same thread_id under the default strategy — the second message is rejected before it reaches the registration site.

The reachability the test exercises is real but only manifests under non-default concurrency strategies that allow two same-thread handlers to run concurrently. In that mode native streaming is already the wrong primitive (Teams expects a single sequential streamId per conversation), so the right fix is a per-handler ContextVar (or an explicit session parameter into stream()), which we'd rather land as a Python-only follow-up rather than slip into the 4.27 sync. Tracked.


Generated by Claude Code

Comment thread src/chat_sdk/adapters/teams/adapter.py Outdated
if not text:
continue

accumulated += text
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Commit streamed text only after a successful emit

If _teams_send raises for this chunk, the exception handler below cancels the session and returns after accumulated already includes text that Teams rejected. In that mid-stream 429/network-error case the returned RawMessage and Thread._handle_stream's sent-message history can contain content the user never saw; build a candidate payload for the send and update accumulated/session._text only after the send succeeds.

Useful? React with 👍 / 👎.

Address Codex P2 finding on PR #88. The previous flow updated
``accumulated`` and ``session.sequence`` BEFORE awaiting ``_teams_send``,
so a 429/network failure mid-stream would leave the partial RawMessage
returned to the caller (and ``session._text`` feeding the final close-
activity) carrying text that Teams never displayed to the user.

Fix:
- Build ``candidate_accumulated = accumulated + text`` and
  ``next_sequence = session.sequence + 1`` as candidates.
- Send the activity with the candidate values.
- Commit ``accumulated`` and ``session.sequence`` only after the send
  succeeds.

The soft-cancel path (logged warn + session.cancel + break) now
genuinely returns the high-water mark of successfully-sent text. Matches
the documented "RawMessage carries pre-failure content" guarantee in
``docs/UPSTREAM_SYNC.md`` non-parity table.

Updated test_emit_send_failure_cancels_session: previously asserted
``result.raw["text"] == "helloworld"`` (which contained the rejected
"world" chunk); now asserts ``"hello"`` and ``session.sequence == 1``.

Skipped Codex P2 #2 (active_streams clobber timing) — matches upstream
TS behavior; deferring fix as a Python-only improvement would be a
divergence and is design-significant. File as follow-up if desired.

23 native-streaming tests pass.

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
@patrick-chinchill patrick-chinchill marked this pull request as draft May 10, 2026 06:06
@patrick-chinchill patrick-chinchill marked this pull request as ready for review May 10, 2026 06:06
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 07df4b778b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/chat_sdk/adapters/teams/adapter.py Outdated
# cannot starve due to a misbehaving caller-supplied hook.
_resolve_processing(task)
if upstream_wait_until is not None:
upstream_wait_until(task)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle wait_until errors without closing the stream early

When a caller-supplied WebhookOptions.wait_until raises synchronously, this exception escapes through Chat.process_message, so _handle_message_activity skips the await processing_done path and immediately runs the finally block that removes/closes the DM stream session while the chat task is still scheduled. In that environment the handler can continue and call thread.stream() after the session was removed, falling back to a normal post (and the webhook also errors) instead of using native Teams streaming; catch/log the upstream wait_until failure after registering your own callback so the local session still stays alive until the task finishes.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/chat_sdk/adapters/teams/adapter.py`:
- Around line 437-493: _active_streams is keyed only by thread_id causing
concurrent handlers on the same thread to overwrite each other's
_TeamsStreamSession; instead of registering the session in the shared dict and
re-resolving by thread_id, keep the session in handler-local context (e.g. a
ContextVar) or pass the session explicitly into stream-related calls so each
webhook handler uses its own _TeamsStreamSession instance; update code paths
that access _active_streams (creation in this block, stream() consumer, and
cleanup via _close_stream_session) to use the handler-local session, ensure
_chained_wait_until / processing_done still wire to that session, and remove or
restrict shared lookups by thread_id to avoid stale/overwritten sessions.
- Around line 442-478: The webhook can deadlock if Chat.process_message never
creates a task (so _chained_wait_until is never called) because processing_done
remains unresolved; after the call to self._chat.process_message(self,
thread_id, message, chained_options) ensure processing_done is always unblocked
by adding a finally block that (1) catches asyncio.CancelledError around the
await processing_done to call session.cancel() and re-raise, and (2) after the
try/except/cleanup, checks if processing_done.done() is False and calls
processing_done.set_result(None) to guarantee resolution; reference
_resolve_processing, _chained_wait_until, WebhookOptions.wait_until,
processing_done, and self._chat.process_message when applying this change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: c107c5b6-a50f-47f3-8e14-0ee87262958c

📥 Commits

Reviewing files that changed from the base of the PR and between 04c0658 and 07df4b7.

📒 Files selected for processing (5)
  • docs/UPSTREAM_SYNC.md
  • src/chat_sdk/adapters/teams/adapter.py
  • tests/test_teams_coverage.py
  • tests/test_teams_extended.py
  • tests/test_teams_native_streaming.py

Comment on lines +437 to +493
session = _TeamsStreamSession()
self._active_streams[thread_id] = session
loop = asyncio.get_running_loop()
processing_done: asyncio.Future[None] = loop.create_future()

def _resolve_processing(task: Awaitable[Any]) -> None:
# ``WebhookOptions.wait_until`` receives the chat task; we hook
# done so we can release ``processing_done`` regardless of
# success/failure (mirrors the upstream ``task.then(resolve,
# resolve)`` pattern).
if isinstance(task, asyncio.Task):

def _on_done(_t: asyncio.Task[Any]) -> None:
if not processing_done.done():
processing_done.set_result(None)

task.add_done_callback(_on_done)
elif not processing_done.done():
# Non-Task awaitables are uncommon on this path, but if we
# ever get one we still need to unblock — resolve eagerly
# so we don't deadlock the webhook handler.
processing_done.set_result(None)

upstream_wait_until = options.wait_until if options is not None else None

def _chained_wait_until(task: Awaitable[Any]) -> None:
# Resolve our own gate FIRST, before invoking the upstream
# ``wait_until`` callback. This way, even if the upstream
# callback raises, blocks, or never fires, ``processing_done``
# is still wired up — making the deadlock-immunity argument
# trivially obvious: the await on ``processing_done`` below
# cannot starve due to a misbehaving caller-supplied hook.
_resolve_processing(task)
if upstream_wait_until is not None:
upstream_wait_until(task)

chained_options = WebhookOptions(wait_until=_chained_wait_until)

try:
self._chat.process_message(self, thread_id, message, chained_options)
try:
await processing_done
except asyncio.CancelledError:
# Caller cancelled the webhook handler — propagate cancel
# into the streaming session so any in-flight ``stream()``
# exits cleanly without sending more chunks.
session.cancel()
raise
finally:
# Always close the session — sending a final activity if any
# chunks were emitted — and drop the registry entry so a
# subsequent message can register fresh.
current = self._active_streams.get(thread_id)
if current is session:
self._active_streams.pop(thread_id, None)
try:
await self._close_stream_session(thread_id, session)
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Isolate native stream sessions per handler, not per thread.

_active_streams is keyed only by thread_id, and stream() resolves the session from that shared dict. If two DM handlers overlap on the same thread, the second registration overwrites the first, so both handlers can emit through the same _TeamsStreamSession. That corrupts streamSequence/streamId, and the first handler’s finally ends up closing an empty or stale session. The new same-thread overlap test shows this overwrite is reachable, so this will leak into user-visible streams under concurrency. Prefer carrying the session in handler-local context (for example a ContextVar or an explicit parameter) instead of re-looking it up by thread ID.

Also applies to: 1070-1072

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/chat_sdk/adapters/teams/adapter.py` around lines 437 - 493,
_active_streams is keyed only by thread_id causing concurrent handlers on the
same thread to overwrite each other's _TeamsStreamSession; instead of
registering the session in the shared dict and re-resolving by thread_id, keep
the session in handler-local context (e.g. a ContextVar) or pass the session
explicitly into stream-related calls so each webhook handler uses its own
_TeamsStreamSession instance; update code paths that access _active_streams
(creation in this block, stream() consumer, and cleanup via
_close_stream_session) to use the handler-local session, ensure
_chained_wait_until / processing_done still wire to that session, and remove or
restrict shared lookups by thread_id to avoid stale/overwritten sessions.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged — deferring the per-handler isolation refactor as a Python-only follow-up rather than landing it inside the parity sync. Inlined the rationale next to the registration site in commit 6acc796 so future readers find it without digging through review history.

Short version:

  • _active_streams[thread_id] = session mirrors upstream activeStreams.set(threadId, …) (packages/chat-teams/src/index.ts). The same per-thread keying invariant that holds upstream holds here.
  • Chat.handle_incoming_message serializes DM handlers per thread under the default concurrency config (overlapping webhooks for the same thread are deduped/dropped before they reach a handler), so two stream() calls on the same thread_id don't co-exist in the supported deployment shape.
  • The same-thread-overlap test reaches the overwrite only under a non-default concurrency strategy, where Teams' single-streamId-per-conversation contract already makes native streaming the wrong primitive. The correct long-term fix is a per-handler ContextVar (or threading the session through stream() explicitly), which is a heavy-lift Python-only divergence we'd rather do as a standalone change after the sync lands. Tracked as a follow-up.

Generated by Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patrick-chinchill, understood — the reasoning holds:

  1. Upstream parity: the thread_id-keyed dict directly mirrors activeStreams.set(threadId, …) in the TypeScript adapter, so the semantics are intentionally shared.
  2. Default concurrency guard: Chat.handle_incoming_message's per-thread serialization in the default config means two concurrent DM handlers on the same thread aren't a reachable state in the supported deployment shape — the overwrite is only exercisable under a non-default strategy.
  3. Wrong primitive anyway: under any concurrency config that would allow overlap, Teams' single-streamId-per-conversation constraint means native streaming is already the wrong tool, so the ContextVar/explicit-parameter refactor belongs in a separate change where the full design can be thought through.

Inlining the rationale next to the registration site in 6acc796 is the right call — it short-circuits the re-discovery cycle for any future reviewer. Happy to pick this up as a follow-up once the parity sync lands.


✏️ Learnings added
Learnt from: patrick-chinchill
Repo: Chinchill-AI/chat-sdk-python PR: 88
File: src/chat_sdk/adapters/teams/adapter.py:437-535
Timestamp: 2026-05-10T07:50:13.594Z
Learning: In `src/chat_sdk/adapters/teams/adapter.py`, `TeamsAdapter._active_streams` is intentionally keyed by `thread_id` (mirroring upstream TypeScript `activeStreams.set(threadId, …)` in `packages/chat-teams/src/index.ts`). Same-thread DM handler overlap is prevented by `Chat.handle_incoming_message`'s per-thread serialization under the default concurrency config (overlapping webhooks are deduped/dropped before reaching the handler). The per-handler `ContextVar` isolation refactor is deferred as a Python-only follow-up and should not be flagged as a blocking issue in this PR.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Comment thread src/chat_sdk/adapters/teams/adapter.py
claude added 4 commits May 10, 2026 06:13
Address CodeRabbit Critical on PR #88. ``Chat.process_message`` invokes
``wait_until`` synchronously only if a chat task is actually scheduled.
If the message is deduped, dropped by the concurrency strategy, or
otherwise short-circuited, no task is created and ``wait_until`` is
never called — leaving ``processing_done`` unresolved and ``await
processing_done`` hanging the webhook indefinitely.

After ``process_message`` returns, check whether ``processing_done`` was
resolved (which can only happen via ``_chained_wait_until``, which is
only invoked when a task is scheduled). If not, resolve immediately —
there's no in-flight handler to wait on, no streaming will happen, and
``await processing_done`` should fall through to the ``finally`` cleanup.

Also addresses Codex P2 (lines 471): caller-supplied
``WebhookOptions.wait_until`` raising synchronously now logs and
continues instead of escaping through ``Chat.process_message``,
preventing the same-class deadlock-via-finally issue.

Both fixes together close the two paths where the streaming session
could be torn down while the chat task was still scheduled (or never
scheduled): a synchronous wait_until exception, and a deduped/dropped
message.

Tests:
- New ``test_caller_wait_until_raise_does_not_kill_native_streaming``
  verifies the wait_until-raise path keeps the session alive long
  enough for the chat task to call ``thread.stream()`` natively.
- Existing 23 native-streaming tests still pass.

Skipped CodeRabbit's other comment (``_active_streams`` thread-id
keying) — same as Codex P2 #2, matches upstream TS behavior and would
be a heavy-lift Python-only divergence.

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
…messages

The previous fix (17009a9) used ``processing_done.done()`` to detect
whether a chat task was scheduled, but ``_resolve_processing`` registers
a ``done_callback`` on the task that fires only on task COMPLETION —
not at the moment ``wait_until`` is invoked. So my check
prematurely resolved ``processing_done`` immediately after
``process_message`` returned (before the task had a chance to run),
which broke 2 tests:

- test_caller_wait_until_raise_does_not_kill_native_streaming
- test_same_thread_concurrent_handlers_clobber_active_stream

Fix: track a separate ``wait_until_invoked`` flag inside
``_chained_wait_until``. After ``process_message`` returns, only resolve
``processing_done`` if ``wait_until`` was NEVER called — that's the
true signal that no chat task was scheduled (deduped/dropped).

23 native-streaming tests pass; full suite passes (modulo the
pre-existing unrelated github_webhook failure).

https://claude.ai/code/session_01FyMxQn2BEAzmwKS1GZczKj
Three separate review bots flagged the thread-id keying of
``_active_streams`` as a potential cross-handler corruption risk on the
DM streaming path. Add an inline comment at the registration site
pointing readers at the upstream parallel and the concurrency
invariant that makes per-thread keying safe today, plus a note that a
per-handler ContextVar is a tracked Python-only follow-up rather than
in-scope for the parity sync.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants