Skip to content

Commit 17aa41a

Browse files
feat(teams): migrate native streaming to the SDK IStreamer + unwind transitional divergences (#93 PR 3/4) (#145)
* feat(teams): migrate native streaming to the SDK IStreamer + unwind transitional divergences (#93 PR 3/4) Replace the hand-rolled Bot Framework streaming wire format with the Teams SDK's native streamer (microsoft-teams-apps IStreamer / HttpStream), mirroring upstream adapter-teams@chat@4.30.0 index.ts (streamViaEmit DM path). Atomically unwind the two transitional public-type divergences this enables. Streaming flow (DMs): - _handle_message_activity captures an IStreamer for DMs via app.activity_sender.create_stream(ref), registers it in _active_streams, and awaits a processing_done gate (wrapped wait_until shim) so the streamer stays alive while the handler streams. - stream() -> _stream_via_emit calls stream.emit(text) per chunk; it NEVER calls close(). The handler's finally calls stream.close() once (the lifecycle-owner role the SDK App's process_activity plays upstream; our bridge owns dispatch via server.on_request). - Cancellation is detected two ways: stream.canceled (checked before each emit) and catching StreamCancelledError (other exceptions re-raise). - The first chunk's server-assigned id is captured via on_chunk and awaited only when text was emitted and the stream was not canceled. - Group / channel / proactive threads fall back to a single buffered post_message (SDK-backed from PR 2). Removed: _TeamsStreamSession, _stream_via_emit's hand-rolled wire format, _emit_streaming_activity, _close_stream_session, _teams_send, the 1500ms emit throttle, the clock/sleep injectables, and the native_stream_min_emit_interval_ms config field. Divergence unwinds (must land together — touching one without the other corrupts recorded message history): - types.py: removed the RawMessage.text override field (RawMessage is now {id, thread_id, raw}, matching upstream packages/chat/src/types.ts). - thread.py: _handle_stream now seeds StreamOptions.update_interval_ms with the thread default (upstream parity, vercel/chat#340) and records the local accumulator (no adapter-side text override). Throttle parity: the SDK HttpStream owns the Bot Framework streaming wire format (streamType/streamSequence/streamId), a 500ms inter-flush throttle, and 429 retry/backoff — verified against microsoft-teams-apps==2.0.13 http_stream.py (see docs/UPSTREAM_SYNC.md). A live Teams check is flagged for reviewers/maintainer. Tests: rewrote tests/test_teams_native_streaming.py to mock ctx.stream as a StreamerProtocol double; added history-fidelity tests proving the SentMessage->Message recording path and update_interval_ms seeding after the unwind. * docs(teams): clarify close-on-raw-cancel intent + pin SDK version in throttle note Review fast-follow for PR 3 (native streaming): the finally-block comment now distinguishes SDK-detected cancel (HttpStream.close no-ops, _canceled set) from a raw asyncio.CancelledError (close flushes a final activity for the accumulated text — intentional, and the except-Exception wrapper does not catch CancelledError so cancellation still propagates). Also pins the throttle-parity note to the installed microsoft-teams-apps==2.0.13.4 so a future SDK bump that changes the 0.5s cadence is caught against the cited baseline. No logic change.
1 parent a9f0c80 commit 17aa41a

11 files changed

Lines changed: 985 additions & 2397 deletions

docs/UPSTREAM_SYNC.md

Lines changed: 6 additions & 9 deletions
Large diffs are not rendered by default.

src/chat_sdk/adapters/teams/adapter.py

Lines changed: 192 additions & 581 deletions
Large diffs are not rendered by default.
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Teams SDK streamer plumbing.
2+
3+
Builds the :class:`~microsoft_teams.api.ConversationReference` the Teams SDK
4+
``IStreamer`` (``microsoft_teams.apps.StreamerProtocol`` /
5+
``HttpStream``) needs, from the inbound Bot Framework activity dict the adapter
6+
already parses. Kept in its own module so the SDK model construction stays
7+
isolated from ``adapter.py`` and importable lazily (Port Rule: optional/SDK
8+
deps imported inside functions, not at module top).
9+
10+
Mirrors what the Teams SDK's own ``ActivityContext`` does in
11+
``microsoft_teams/apps/app_process.py`` ``_build_context`` (it builds a
12+
``ConversationReference`` from the activity and calls
13+
``ActivitySender.create_stream(ref)`` to expose ``ctx.stream``). Our bridge
14+
owns dispatch, so we reproduce just the reference-building step.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
from typing import TYPE_CHECKING, Any
20+
21+
if TYPE_CHECKING:
22+
from microsoft_teams.api import ConversationReference
23+
24+
25+
def build_conversation_reference(activity: dict[str, Any], *, bot_app_id: str) -> ConversationReference:
26+
"""Build a :class:`ConversationReference` from an inbound activity dict.
27+
28+
The streamer reads ``ref.conversation.id`` and ``ref.service_url`` to
29+
target the Bot Framework streaming endpoint, and ``ref.bot`` to populate
30+
the outgoing activity's ``from`` account. We source these from the inbound
31+
activity: the bot is the activity ``recipient`` (falling back to a
32+
synthetic account carrying ``bot_app_id`` when the recipient is absent),
33+
the conversation is the activity ``conversation``, and ``channelId`` /
34+
``serviceUrl`` come straight off the activity.
35+
36+
Raises if required fields are missing — the caller catches and falls back
37+
to buffered posting.
38+
"""
39+
from microsoft_teams.api import Account, ConversationAccount, ConversationReference
40+
41+
recipient = activity.get("recipient") or {}
42+
bot = Account(
43+
id=recipient.get("id") or bot_app_id or "",
44+
name=recipient.get("name"),
45+
)
46+
47+
conversation_raw = activity.get("conversation") or {}
48+
conversation = ConversationAccount(
49+
id=conversation_raw.get("id") or "",
50+
conversation_type=conversation_raw.get("conversationType"),
51+
tenant_id=conversation_raw.get("tenantId"),
52+
name=conversation_raw.get("name"),
53+
is_group=conversation_raw.get("isGroup"),
54+
)
55+
56+
return ConversationReference(
57+
service_url=activity.get("serviceUrl") or "",
58+
activity_id=activity.get("id"),
59+
bot=bot,
60+
channel_id=activity.get("channelId") or "msteams",
61+
conversation=conversation,
62+
locale=activity.get("locale"),
63+
)

src/chat_sdk/adapters/teams/types.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,6 @@ class TeamsAdapterConfig:
7272
logger: Logger | None = None
7373
# Override bot username (optional).
7474
user_name: str | None = None
75-
# Minimum interval between native DM streaming activities, in
76-
# milliseconds. Bot Framework's streaming endpoint is throttled to
77-
# roughly 1 request/second; Microsoft recommends buffering tokens
78-
# for 1.5-2 seconds to avoid 429s mid-response. We default to 1500ms
79-
# per https://learn.microsoft.com/microsoftteams/platform/bots/streaming-ux.
80-
# Chunks that arrive within this window after the previous emit are
81-
# accumulated locally and shipped together on the next emit (or in
82-
# the final ``message`` activity if the stream ends inside the
83-
# window). A caller-supplied ``StreamOptions.update_interval_ms``
84-
# overrides this default for a single stream.
85-
native_stream_min_emit_interval_ms: int = 1500
8675

8776

8877
# =============================================================================

src/chat_sdk/thread.py

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -690,23 +690,15 @@ async def _handle_stream(
690690
# Build text-only stream from raw_stream
691691
text_stream = _from_full_stream(raw_stream)
692692

693-
# Build streaming options from current message context.
694-
#
695-
# Divergence from upstream — see docs/UPSTREAM_SYNC.md. Upstream
696-
# (vercel/chat#340) seeds ``updateIntervalMs`` with the thread-level
697-
# default (``this._streamingUpdateIntervalMs``) before spreading
698-
# caller options, so adapters always see a concrete interval. We
699-
# deliberately leave ``update_interval_ms`` as ``None`` unless the
700-
# caller supplied one: the hand-rolled Teams native streaming path
701-
# treats any non-``None`` value as a caller override of its 1500ms
702-
# quota-protecting emit throttle (upstream's Teams adapter ignores
703-
# the field entirely, so the seed is harmless there). Adapters that
704-
# consume the field apply their own default when it is ``None``
705-
# (Telegram: ``TELEGRAM_DEFAULT_STREAM_UPDATE_INTERVAL_MS``), and
706-
# ``_fallback_stream`` already falls back to
707-
# ``self._streaming_update_interval_ms`` — same observable behavior
708-
# as upstream's seed for the fallback path.
709-
options = StreamOptions()
693+
# Build streaming options from current message context + caller
694+
# options. Upstream parity (vercel/chat#340): seed
695+
# ``update_interval_ms`` with the thread-level default
696+
# (``self._streaming_update_interval_ms``) before merging caller
697+
# options, so adapters always see a concrete interval. The Teams
698+
# adapter now delegates throttling to the SDK ``IStreamer`` (it no
699+
# longer owns a quota throttle), so the seed is harmless there —
700+
# matching upstream, whose Teams adapter ignores the field.
701+
options = StreamOptions(update_interval_ms=self._streaming_update_interval_ms)
710702
if self._current_message is not None:
711703
options.recipient_user_id = self._current_message.author.user_id
712704
# recipient_team_id is only consumed by the Slack adapter; other
@@ -744,21 +736,16 @@ async def _wrapped_stream() -> AsyncGenerator[str | StreamChunk, None]:
744736
wrapped_stream = _wrapped_stream()
745737
raw_result = await self.adapter.stream(self._id, wrapped_stream, options) # type: ignore[union-attr]
746738
if raw_result is not None:
747-
# Adapters can override the recorded text via the optional
748-
# ``text`` field on ``RawMessage`` when their internal state
749-
# (cancellation, throttling, partial commits) makes the local
750-
# ``accumulated`` buffer diverge from what the platform
751-
# actually accepted. Default ``None`` falls back to the local
752-
# buffer — backward-compatible for adapters that don't need
753-
# the override (Slack, Discord, GitHub, Google Chat,
754-
# Telegram, Linear, WhatsApp). The Teams native streaming
755-
# path sets it on cancellation to short-circuit the buffered
756-
# suffix that was coalesced into the throttle window but
757-
# never emitted.
758-
recorded_text = raw_result.text if raw_result.text is not None else accumulated
739+
# Record the locally-accumulated text. Matches upstream
740+
# thread.ts, which builds the ``SentMessage`` from the text
741+
# collected by the wrapping iterator (``accumulated``), not
742+
# from the adapter's return value. The Teams adapter now
743+
# emits each chunk through the SDK ``IStreamer`` as it is
744+
# yielded, so ``accumulated`` and what the platform shipped
745+
# stay in lockstep — no adapter-side text override needed.
759746
sent = self._create_sent_message(
760747
raw_result.id,
761-
PostableMarkdown(markdown=recorded_text),
748+
PostableMarkdown(markdown=accumulated),
762749
raw_result.thread_id,
763750
)
764751
if self._thread_history is not None:

src/chat_sdk/types.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -855,26 +855,6 @@ class RawMessage:
855855
id: str
856856
thread_id: str
857857
raw: Any
858-
# Optional adapter-authoritative text snapshot. When set, callers
859-
# like ``Thread.stream`` MUST prefer this over their own local
860-
# accumulator when constructing the recorded ``SentMessage`` body /
861-
# message-history entry. Used by adapters whose internal state
862-
# (cancellation, throttling, partial commits) makes the local
863-
# accumulator diverge from what the platform actually accepted —
864-
# the Teams native streaming path sets this when a session is
865-
# canceled mid-flight so ``Thread.stream`` records only the text
866-
# Teams shipped, not the buffered suffix the user canceled out of.
867-
# ``None`` means "use the caller's existing logic" — backward
868-
# compatible for adapters that don't need this override.
869-
#
870-
# Divergence from upstream — see docs/UPSTREAM_SYNC.md. Upstream's
871-
# ``RawMessage`` interface (packages/chat/src/types.ts) has only
872-
# ``id``, ``raw``, ``threadId``; the override is Python-only because
873-
# we hand-roll Teams native streaming (upstream uses
874-
# ``@microsoft/teams.apps``'s ``IStreamer.emit`` which owns the
875-
# cancellation-text reconciliation internally). Will simplify or
876-
# disappear once we migrate to ``microsoft-teams-apps`` (Python).
877-
text: str | None = None
878858

879859

880860
@dataclass

tests/test_teams_coverage.py

Lines changed: 10 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -983,47 +983,11 @@ async def test_start_typing_failure_swallowed(self):
983983

984984

985985
# ---------------------------------------------------------------------------
986-
# _teams_send HTTP helper (retained for the still-hand-rolled streaming path)
986+
# Adapter lifecycle helpers
987987
# ---------------------------------------------------------------------------
988988

989989

990990
class TestTeamsHTTPHelpers:
991-
async def test_teams_send_success(self):
992-
adapter = _make_adapter(logger=_make_logger())
993-
994-
token_resp = _mock_aiohttp_response({"access_token": "t", "expires_in": 3600})
995-
send_resp = _mock_aiohttp_response({"id": "sent-1"})
996-
997-
mock_session = _MockSession(default_response=send_resp)
998-
original_post = mock_session.post
999-
1000-
def routed_post(url, **kwargs):
1001-
if "oauth2" in url:
1002-
return mock_session._make_cm(token_resp)
1003-
return original_post(url, **kwargs)
1004-
1005-
mock_session.post = routed_post
1006-
1007-
decoded = TeamsThreadId(
1008-
conversation_id="19:abc@thread.tacv2",
1009-
service_url="https://smba.trafficmanager.net/teams/",
1010-
)
1011-
1012-
with patch("aiohttp.ClientSession", return_value=mock_session):
1013-
result = await adapter._teams_send(decoded, {"type": "message", "text": "hi"})
1014-
assert result["id"] == "sent-1"
1015-
1016-
async def test_teams_send_invalid_service_url_raises(self):
1017-
adapter = _make_adapter(logger=_make_logger())
1018-
1019-
decoded = TeamsThreadId(
1020-
conversation_id="19:abc@thread.tacv2",
1021-
service_url="https://evil.com/",
1022-
)
1023-
1024-
with pytest.raises(ValidationError):
1025-
await adapter._teams_send(decoded, {"type": "message"})
1026-
1027991
async def test_disconnect_is_noop(self):
1028992
adapter = _make_adapter(logger=_make_logger())
1029993
result = await adapter.disconnect()
@@ -1244,16 +1208,11 @@ def test_handle_message_action_no_chat(self):
12441208

12451209

12461210
class TestStream:
1211+
"""Group-chat / non-DM fallback: accumulate and post one SDK message."""
1212+
12471213
async def test_stream_dict_chunks(self):
12481214
adapter = _make_adapter(logger=_make_logger())
1249-
send_call_count = 0
1250-
1251-
async def mock_send(decoded, payload):
1252-
nonlocal send_call_count
1253-
send_call_count += 1
1254-
return {"id": f"msg-{send_call_count}"}
1255-
1256-
adapter._teams_send = mock_send
1215+
send = _mock_app_send(adapter, "msg-1")
12571216

12581217
tid = adapter.encode_thread_id(
12591218
TeamsThreadId(
@@ -1267,14 +1226,14 @@ async def text_stream():
12671226
yield {"type": "markdown_text", "text": "World"}
12681227

12691228
result = await adapter.stream(tid, text_stream())
1270-
# Group chat: accumulate → single send.
1229+
# Group chat: accumulate → single SDK send.
12711230
assert result.id == "msg-1"
12721231
assert result.raw["text"] == "Hello World"
1273-
assert send_call_count == 1
1232+
send.assert_called_once()
12741233

12751234
async def test_stream_string_chunks(self):
12761235
adapter = _make_adapter(logger=_make_logger())
1277-
adapter._teams_send = AsyncMock(return_value={"id": "s1"})
1236+
send = _mock_app_send(adapter, "s1")
12781237

12791238
tid = adapter.encode_thread_id(
12801239
TeamsThreadId(
@@ -1290,11 +1249,11 @@ async def text_stream():
12901249
result = await adapter.stream(tid, text_stream())
12911250
assert "Hello World" in result.raw["text"]
12921251
# Group chat: single accumulate-and-post send (no per-chunk edits).
1293-
assert adapter._teams_send.call_count == 1
1252+
send.assert_called_once()
12941253

12951254
async def test_stream_empty_chunks_skipped(self):
12961255
adapter = _make_adapter(logger=_make_logger())
1297-
adapter._teams_send = AsyncMock(return_value={"id": "s1"})
1256+
send = _mock_app_send(adapter, "s1")
12981257

12991258
tid = adapter.encode_thread_id(
13001259
TeamsThreadId(
@@ -1309,7 +1268,7 @@ async def text_stream():
13091268

13101269
result = await adapter.stream(tid, text_stream())
13111270
assert result.id == "" # nothing sent
1312-
assert adapter._teams_send.call_count == 0
1271+
send.assert_not_called()
13131272

13141273

13151274
# ---------------------------------------------------------------------------
@@ -1368,37 +1327,6 @@ def test_extract_card_title_body_not_list(self):
13681327
# tests/test_teams_bridge.py.
13691328

13701329

1371-
# ---------------------------------------------------------------------------
1372-
# _teams_send error path (retained for the still-hand-rolled streaming path)
1373-
# ---------------------------------------------------------------------------
1374-
1375-
1376-
class TestTeamsHTTPErrorPaths:
1377-
async def test_teams_send_non_ok_response(self):
1378-
adapter = _make_adapter(logger=_make_logger())
1379-
1380-
token_resp = _mock_aiohttp_response({"access_token": "t", "expires_in": 3600})
1381-
error_resp = _mock_aiohttp_response({"error": "bad"}, status=500)
1382-
1383-
mock_session = _MockSession(default_response=error_resp)
1384-
original_post = mock_session.post
1385-
1386-
def routed_post(url, **kwargs):
1387-
if "oauth2" in url:
1388-
return mock_session._make_cm(token_resp)
1389-
return original_post(url, **kwargs)
1390-
1391-
mock_session.post = routed_post
1392-
1393-
decoded = TeamsThreadId(
1394-
conversation_id="19:abc@thread.tacv2",
1395-
service_url="https://smba.trafficmanager.net/teams/",
1396-
)
1397-
1398-
with patch("aiohttp.ClientSession", return_value=mock_session), pytest.raises(NetworkError):
1399-
await adapter._teams_send(decoded, {"type": "message"})
1400-
1401-
14021330
# ---------------------------------------------------------------------------
14031331
# fetch_channel_info with channel context
14041332
# ---------------------------------------------------------------------------

tests/test_teams_extended.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,13 @@ class TestStream:
736736
async def test_group_chat_stream_accumulates_and_posts_single_message(self):
737737
"""Group chats / channels accumulate the stream and post one message.
738738
739-
Mirrors upstream after vercel/chat#416: ``streamViaEmit`` is reserved
740-
for DMs; non-DM threads no longer post+edit (which produced flicker).
739+
Mirrors upstream ``stream`` (adapter-teams@chat@4.30.0): native
740+
``streamViaEmit`` is reserved for DMs (where an ``IStreamer`` exists);
741+
non-DM threads accumulate and ``postMessage`` a single message via the
742+
SDK ``App.send`` (PR 2-backed).
741743
"""
742744
adapter = _make_adapter(logger=_make_logger())
743-
adapter._teams_send = AsyncMock(return_value={"id": "stream-msg-1"})
745+
send = _mock_app_send(adapter, "stream-msg-1")
744746

745747
tid = adapter.encode_thread_id(
746748
TeamsThreadId(
@@ -755,17 +757,18 @@ async def text_gen():
755757

756758
result = await adapter.stream(tid, text_gen())
757759
assert result.id == "stream-msg-1"
758-
# Single send carrying the full accumulated text — no edits.
759-
assert adapter._teams_send.call_count == 1
760-
sent_payload = adapter._teams_send.await_args.args[1]
761-
assert sent_payload["text"] == "Hello world"
762-
assert sent_payload["type"] == "message"
760+
# Single SDK send carrying the full accumulated text — no edits.
761+
send.assert_called_once()
762+
conv_id, activity = send.call_args.args
763+
assert conv_id == "19:abc@thread.tacv2"
764+
assert activity.text == "Hello world"
765+
assert activity.text_format == "markdown"
763766

764767
@pytest.mark.asyncio
765768
async def test_group_chat_stream_empty_returns_empty(self):
766769
"""Empty streams in a group chat skip the post entirely."""
767770
adapter = _make_adapter(logger=_make_logger())
768-
adapter._teams_send = AsyncMock(return_value={"id": "stream-msg-2"})
771+
send = _mock_app_send(adapter, "stream-msg-2")
769772

770773
tid = adapter.encode_thread_id(
771774
TeamsThreadId(
@@ -782,4 +785,4 @@ async def text_gen():
782785
# No real text → no send, returned RawMessage carries empty content.
783786
assert result.id == ""
784787
assert result.raw["text"] == ""
785-
assert adapter._teams_send.call_count == 0
788+
send.assert_not_called()

0 commit comments

Comments
 (0)