Skip to content

Commit 2e96fbb

Browse files
committed
fix(teams): address PR #88 review findings
- Add a same-thread concurrent _handle_message_activity test that exercises the realistic _active_streams race (two near-simultaneous DM webhooks for the same thread). Pins upstream's plain-Map clobber semantics: the second registration overwrites the first, both in-flight handlers observe the later session, and the registry ends empty after both finish. The original distinct-threads test is renamed to make clear it covers session ISOLATION, not the registry race. - Empty-id final-send fallback: when Teams accepts streaming chunks but returns id="" on the first activity, _close_stream_session now ships the final message anyway (omitting streamId from channelData) instead of skipping and leaving the streaming UI spinning until Teams times the session out client-side. Mirrors upstream's looser check (text non-empty → ship the final). Adds a regression test and a non-parity row in docs/UPSTREAM_SYNC.md. - _chained_wait_until: resolve our internal processing_done gate BEFORE invoking the caller-supplied waitUntil, so the deadlock-immunity argument is trivially obvious (a misbehaving upstream callback can't starve the await on processing_done). - _TeamsStreamSession: add a public read-only `text` property so external callers (now _close_stream_session) read through it instead of the underscore-prefixed _text attribute. _stream_via_emit retains the direct _text write as the canonical mutator.
1 parent 58b7fc7 commit 2e96fbb

3 files changed

Lines changed: 198 additions & 21 deletions

File tree

docs/UPSTREAM_SYNC.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ stay explicit instead of being rediscovered in code review.
489489
| Fallback streaming final SentMessage content (non-Teams adapters) | SentMessage + final edit carry `final_content` (remend'd — inline markers auto-closed) | SentMessage + final edit carry raw `accumulated` | Narrow UX refinement. If a stream ends with an unclosed `*`/`~~`/etc., upstream ships the unclosed marker; we run `_remend` so the user sees a clean final message. Not observable in the common case where streams close their own markers. Teams native streaming and the Teams accumulate-and-post path both ship raw `accumulated`, matching upstream after #416; this divergence applies only to the remaining adapters that still route through `_fallback_stream`. |
490490
| Teams group-chat / channel streaming via accumulate-and-post | `TeamsAdapter.stream` accumulates the full text and issues a single `post_message` instead of post+edit, even for group chats and channel threads | Same after vercel/chat#416 (`if (activeStream && !activeStream.canceled) … else { accumulate; postMessage }`) — no divergence at the adapter level | Documented for clarity: the Python port matches upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming. The adapter no longer touches `_teams_update` from the streaming path. |
491491
| Teams native streaming soft-cancel on send failure (DMs) | A `_teams_send` error mid-stream (e.g. 429, 5xx) cancels the streaming session, suppresses the exception, and returns the `RawMessage` with whatever text was already accepted | Upstream's `IStreamer.emit` is fire-and-forget under the SDK; no public hook to fail-soft on per-chunk send errors | Bot Framework streaming activities are advisory — once cancelled, Teams times the streaming UI out client-side. Soft-cancel preserves the chat-handler's ability to continue work after a transient backend hiccup, instead of bubbling the rate-limit through the stream iterator. The `RawMessage` carries the user-visible text so SentMessage history stays consistent. |
492+
| Teams native streaming final-send when first chunk's `id` was empty (DMs) | `_close_stream_session` sends the final `message` activity whenever `text` is non-empty, even if `stream_id` is `None` (Bot Framework REST response returned `{"id": ""}` on the first chunk). The final activity omits `streamId` from `channelData` rather than serializing `None`. | Upstream's `streamViaEmit` awaits the `chunk` event for the first activity's `id`; if Teams returns an empty id, `messageId` becomes `""` and the SDK's auto-close emits the final activity through `IStreamer` regardless | Without the final `message` activity, the Teams client's streaming UI keeps spinning until the platform times the session out — a stuck-loading-state UX failure with no user workaround. We mirror upstream's looser check (text non-empty → ship the final) so the streaming indicator clears even when the Bot Framework REST response surface returns an empty `id`. |
492493
| Teams divider rendering | `card_to_adaptive_card` hoists `separator: True` onto the next sibling (or emits a non-empty Container for a trailing divider) | `convertDividerToElement` emits an empty `Container` with `separator: True` | Upstream shares the same bug: Microsoft Teams renders an empty Container at zero height, so the separator line is effectively invisible. Python port fixes locally (issue #45) rather than blocking on upstream. |
493494
| `SlackAdapter.current_token` / `current_client` | Public `@property` accessors that return the request-context-bound token and a preconfigured `AsyncWebClient` | Not exposed (`getToken()` is private on the TS `SlackAdapter`) | Python-only addition (issue #47). Downstream code that calls Slack Web APIs from inside a handler — email resolution, user profile fetches, reaction bookkeeping — otherwise depends on underscore-prefixed helpers. |
494495
| `ConcurrencyConfig.max_concurrent` | Enforced via `asyncio.Semaphore` in the `"concurrent"` strategy path; rejects non-integer or `<= 0` values, and rejects any non-`None` `max_concurrent` paired with a non-`"concurrent"` strategy | Accepted into the config type with docstring "Default: Infinity" but never read (3 writes, 0 reads) | Silent correctness bug upstream — consumers setting `max_concurrent=N` with `strategy="concurrent"` reasonably expect an N-way bound on in-flight handlers. We honor the documented contract via a semaphore and fail-fast on misconfiguration so it's never silent. `max_concurrent=None` stays compatible with every strategy (unbounded default). |

src/chat_sdk/adapters/teams/adapter.py

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ def cancel(self) -> None:
9292
"""Mark the session canceled. ``stream()`` checks this each chunk."""
9393
self.canceled = True
9494

95+
@property
96+
def text(self) -> str:
97+
"""Read-only view of the cumulative streamed text.
98+
99+
External callers (tests, other adapter helpers) should read this
100+
instead of poking at the private ``_text`` attribute. Writes go
101+
through ``_stream_via_emit`` which owns the buffer.
102+
"""
103+
return self._text
104+
95105

96106
# Bot Framework streaming protocol values for ``channelData.streamType``.
97107
_STREAM_TYPE_STREAMING = "streaming"
@@ -450,9 +460,15 @@ def _on_done(_t: asyncio.Task[Any]) -> None:
450460
upstream_wait_until = options.wait_until if options is not None else None
451461

452462
def _chained_wait_until(task: Awaitable[Any]) -> None:
463+
# Resolve our own gate FIRST, before invoking the upstream
464+
# ``wait_until`` callback. This way, even if the upstream
465+
# callback raises, blocks, or never fires, ``processing_done``
466+
# is still wired up — making the deadlock-immunity argument
467+
# trivially obvious: the await on ``processing_done`` below
468+
# cannot starve due to a misbehaving caller-supplied hook.
469+
_resolve_processing(task)
453470
if upstream_wait_until is not None:
454471
upstream_wait_until(task)
455-
_resolve_processing(task)
456472

457473
chained_options = WebhookOptions(wait_until=_chained_wait_until)
458474

@@ -1194,7 +1210,10 @@ async def _stream_via_emit(
11941210

11951211
# Persist accumulated text on the session so close() can emit the
11961212
# final ``message`` activity with the same content the user saw.
1197-
session._text = accumulated # noqa: SLF001 — same module
1213+
# Direct ``_text`` write is the canonical mutator (the public
1214+
# ``text`` property is read-only by design); both classes live in
1215+
# the same module so this isn't a cross-module SLF001.
1216+
session._text = accumulated # noqa: SLF001
11981217
return RawMessage(
11991218
id=session.first_chunk_id,
12001219
thread_id=thread_id,
@@ -1208,28 +1227,39 @@ async def _close_stream_session(
12081227
) -> None:
12091228
"""Send the final ``message`` activity to close out a stream.
12101229
1211-
No-op if the session was canceled or never emitted any chunks (no
1212-
``streamId`` was ever assigned). The final activity replaces the
1213-
running ``typing`` indicator with a real message containing the full
1214-
accumulated text — Teams clients clear the streaming UI on receipt.
1230+
No-op if the session was canceled, or if no chunks were ever
1231+
emitted (empty ``text``). Otherwise we send the final activity —
1232+
even if the server never returned an ``id`` for the first chunk
1233+
(i.e. ``stream_id`` is ``None``), in which case we omit
1234+
``streamId`` from ``channelData``. Mirrors upstream's looser
1235+
check: as long as the user saw streamed text, ship the final
1236+
``message`` so the Teams streaming UI clears, instead of leaving
1237+
it spinning until Teams times the session out client-side.
12151238
"""
12161239
if session.canceled:
12171240
return
1218-
# session.stream_id is only set after a successful chunk send, and
1219-
# empty chunks are skipped before send — so stream_id-set implies
1220-
# _text non-empty. Single check is sufficient.
1221-
if session.stream_id is None:
1241+
# ``text`` is the cumulative buffer; empty means nothing was ever
1242+
# emitted (empty stream, or stream canceled before first send).
1243+
if not session.text:
12221244
return
12231245

12241246
decoded = self.decode_thread_id(thread_id)
1247+
channel_data: dict[str, Any] = {
1248+
"streamType": _STREAM_TYPE_FINAL,
1249+
}
1250+
# Hazard #7 — only include ``streamId`` when we actually have one.
1251+
# The Bot Framework REST response can return ``id=""`` even on a
1252+
# 200, in which case ``stream_id`` stays ``None`` (see emit guard
1253+
# in ``_stream_via_emit``); ship the final without a ``streamId``
1254+
# rather than skipping the send.
1255+
if session.stream_id is not None:
1256+
channel_data["streamId"] = session.stream_id
1257+
12251258
final_activity: dict[str, Any] = {
12261259
"type": "message",
1227-
"text": session._text, # noqa: SLF001
1260+
"text": session.text,
12281261
"textFormat": "markdown",
1229-
"channelData": {
1230-
"streamType": _STREAM_TYPE_FINAL,
1231-
"streamId": session.stream_id,
1232-
},
1262+
"channelData": channel_data,
12331263
"entities": [
12341264
{
12351265
"type": "streaminfo",

tests/test_teams_native_streaming.py

Lines changed: 152 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,54 @@ async def test_close_session_no_chunks_no_op(self):
313313
await adapter._close_stream_session(tid, session)
314314
adapter._teams_send.assert_not_called()
315315

316+
@pytest.mark.asyncio
317+
async def test_close_session_sends_final_when_first_chunk_returned_empty_id(
318+
self,
319+
):
320+
"""If Teams accepted chunks but never returned an ``id``, still send the final.
321+
322+
Regression for the empty-``id`` edge case: the Bot Framework REST
323+
response can be 200 with ``{"id": ""}`` even on a successful
324+
``typing`` activity send. ``stream_id`` stays ``None`` (the
325+
first-chunk guard skips assignment for the empty string), but
326+
``text`` is non-empty because the user already saw the streamed
327+
chunks. Without a final ``message`` activity the Teams streaming
328+
UI would spin until Teams times the session out client-side —
329+
ship the final ``message`` anyway, omitting ``streamId`` from
330+
``channelData``. Mirrors upstream's looser check.
331+
"""
332+
adapter = _make_adapter()
333+
# First call (chunk): returns an empty id. Second call (final):
334+
# succeeds.
335+
adapter._teams_send = AsyncMock(side_effect=[{"id": ""}, {"id": "final-id"}])
336+
tid = _dm_thread_id(adapter)
337+
session = _TeamsStreamSession()
338+
adapter._active_streams[tid] = session
339+
340+
async def text_gen():
341+
yield "hello world"
342+
343+
await adapter._stream_via_emit(tid, text_gen(), session)
344+
345+
# Sanity: the chunk send went through, but stream_id is unset
346+
# because the server didn't hand us one.
347+
assert session.stream_id is None
348+
assert session.text == "hello world"
349+
350+
# Now close: the final ``message`` activity must still be sent
351+
# (omitting ``streamId``).
352+
await adapter._close_stream_session(tid, session)
353+
354+
assert adapter._teams_send.await_count == 2
355+
final_payload = adapter._teams_send.await_args_list[1].args[1]
356+
assert final_payload["type"] == "message"
357+
assert final_payload["text"] == "hello world"
358+
assert final_payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL
359+
# Critical: no streamId key when the server never assigned one,
360+
# rather than serializing ``"streamId": None``.
361+
assert "streamId" not in final_payload["channelData"]
362+
assert final_payload["entities"] == [{"type": "streaminfo", "streamType": _STREAM_TYPE_FINAL}]
363+
316364

317365
class TestStreamErrors:
318366
@pytest.mark.asyncio
@@ -603,12 +651,18 @@ async def _failing():
603651

604652
class TestPassInteraction:
605653
@pytest.mark.asyncio
606-
async def test_two_concurrent_dm_streams_have_independent_sessions(self):
607-
"""Two streams to two different DM threads must not share state.
608-
609-
Per docs/SELF_REVIEW.md pass-interaction check: two DMs in flight
610-
from the same bot to the same user (one per thread) must each
611-
carry their own ``streamId`` and ``streamSequence``.
654+
async def test_distinct_dm_threads_each_have_isolated_session_state(self):
655+
"""Two DM threads streaming in parallel must not share session state.
656+
657+
This pins the ISOLATION property when sessions are explicitly
658+
passed to ``_stream_via_emit`` (the registry is bypassed). Two
659+
DMs in flight from the same bot to the same user (one per
660+
thread) must each carry their own ``streamId`` and
661+
``streamSequence``.
662+
663+
Same-thread concurrency (the ``_active_streams`` race) is a
664+
DIFFERENT property — see
665+
``test_same_thread_concurrent_handlers_clobber_active_stream``.
612666
"""
613667
adapter = _make_adapter()
614668
# Distinct server ids per send so we can verify thread-to-id mapping.
@@ -663,3 +717,95 @@ async def gen_b():
663717
# and B's to B's.
664718
for conv_id, payload in send_log:
665719
assert payload["text"].startswith("A" if "A" in conv_id else "B")
720+
721+
@pytest.mark.asyncio
722+
async def test_same_thread_concurrent_handlers_clobber_active_stream(self):
723+
"""Two near-simultaneous webhooks for the SAME DM thread.
724+
725+
Realistic case: a user double-sends, or two webhooks land on the
726+
same thread before the first finishes. ``_active_streams`` is a
727+
plain ``dict`` keyed by ``thread_id``, so the second registration
728+
overwrites the first — pin that behavior here so a future change
729+
to add per-thread queueing/locking is a deliberate decision, not
730+
an accidental observable change.
731+
732+
Upstream's ``activeStreams`` is also a plain ``Map`` with the
733+
same overwrite semantics; this test mirrors that contract.
734+
"""
735+
adapter = _make_adapter()
736+
# Track each session that gets registered, in the order of registration.
737+
registered_sessions: list[_TeamsStreamSession] = []
738+
# Snapshot the registry contents immediately AFTER each handler's
739+
# process_message call so we can pin the clobber.
740+
post_registration_snapshots: list[_TeamsStreamSession] = []
741+
742+
# Block both handlers on a barrier so the second registration races
743+
# the first while the first is still "in flight". This pins the
744+
# registry behavior under genuine overlap, not just sequential calls.
745+
first_registered = asyncio.Event()
746+
release_handlers = asyncio.Event()
747+
748+
adapter._teams_send = AsyncMock(return_value={"id": "send-id"})
749+
750+
chat = MagicMock()
751+
chat.get_state = MagicMock(return_value=None)
752+
753+
def process_message(adapter_arg, thread_id, message, options):
754+
# Snapshot the session that THIS handler call registered.
755+
registered_sessions.append(adapter_arg._active_streams[thread_id])
756+
757+
async def _handler():
758+
# Hold both handlers open across the barrier so they truly
759+
# overlap. After release, snapshot the registry — by this
760+
# point both handlers have registered, and the LATER
761+
# registration must have won.
762+
if not first_registered.is_set():
763+
first_registered.set()
764+
await release_handlers.wait()
765+
post_registration_snapshots.append(adapter_arg._active_streams.get(thread_id))
766+
767+
task = asyncio.get_running_loop().create_task(_handler())
768+
options.wait_until(task)
769+
770+
chat.process_message = process_message
771+
adapter._chat = chat
772+
773+
tid = _dm_thread_id(adapter)
774+
activity = {
775+
"type": "message",
776+
"id": "incoming-same-thread",
777+
"text": "user said something",
778+
"from": {"id": "user-1", "name": "User One"},
779+
"conversation": {"id": "a:1Abc-DM-conversation-id"},
780+
"serviceUrl": "https://smba.trafficmanager.net/teams/",
781+
}
782+
783+
async def _drive_two_handlers():
784+
# Start the first; wait until it has registered before launching
785+
# the second so the second observes (and clobbers) the first's
786+
# registry entry. Then release both.
787+
t1 = asyncio.create_task(adapter._handle_message_activity(activity))
788+
await first_registered.wait()
789+
t2 = asyncio.create_task(adapter._handle_message_activity(activity))
790+
# Give the second handler a tick to register.
791+
await asyncio.sleep(0)
792+
release_handlers.set()
793+
await asyncio.gather(t1, t2)
794+
795+
await _drive_two_handlers()
796+
797+
# Two distinct sessions were created.
798+
assert len(registered_sessions) == 2
799+
first_session, second_session = registered_sessions
800+
assert first_session is not second_session
801+
# Pin upstream's plain-Map clobber semantics: BOTH in-flight
802+
# handlers, when they look up the registry post-overlap, see the
803+
# SECOND session — the first's entry was overwritten in place.
804+
# If a future change adds per-thread queueing/locking it must be
805+
# a deliberate decision (i.e. update this test).
806+
assert post_registration_snapshots == [second_session, second_session]
807+
# After both handlers exit, registry is empty. Handler 2's
808+
# finally-block matches ``current is session_2`` and pops; handler
809+
# 1's finally-block sees the entry already gone (or not its own)
810+
# and skips the pop — either way the dict ends empty.
811+
assert tid not in adapter._active_streams

0 commit comments

Comments
 (0)