Skip to content

Commit a9f0c80

Browse files
feat(teams): migrate outbound send/edit/delete/typing to the SDK (#93 PR 2/4) (#144)
Route the outbound public Adapter methods through the Microsoft Teams SDK ``App``, mirroring upstream adapter-teams@chat@4.30.0, and untangle the Bot Framework / Graph token caches. Outbound delegation: - post_message / start_typing -> self._app.send(conversation_id, activity) - edit_message -> self._app.api.conversations.activities(id).update(msg_id, activity) - delete_message -> self._app.api.conversations.activities(id).delete(msg_id) The SDK splits input from output activity models, so we build MessageActivityInput / TypingActivityInput (the ActivityParams union that app.send / activities.update accept) from the existing camelCase wire dict. The dict is still returned as RawMessage.raw, preserving the public contract (attachment shape, file delivery, returned SentMessage id). Per-thread routing: thread IDs encode a service URL, but the SDK binds the App's ApiClient to one URL at construction. _point_app_api_at retargets the real ApiClient's service-url chain (validated against the SSRF allow-list first) before each call, so different regions / sovereign clouds reach the right endpoint. Errors now pass the raw SDK exception to handleTeamsError, which already understands the SDK exception shape (401 -> AuthenticationError, 429 -> AdapterRateLimitError, etc.). Token-cache untangle: _get_access_token (Bot Framework) and _get_graph_token (Graph) previously shared _access_token / _token_expiry, so the last writer clobbered the other scope's token (a Graph read could ship a Bot Framework token and vice versa). Graph now caches on dedicated _graph_token / _graph_token_expiry fields. _get_access_token + its fields are retained for the still-hand-rolled Bot Framework callers (native streaming via _teams_send in PR 3, and open_dm), now isolated by scope. Streaming is untouched: _teams_send (and its _get_access_token token) remain for the streaming envelope + server-assigned streamId, which app.send does not surface. Removed the now-unused _teams_update / _teams_delete aiohttp helpers. Tests: outbound tests assert the SDK app.send / app.api.conversations.activities are called (not aiohttp); added a token-cache isolation regression proving the two scopes no longer collide, and a per-thread service-url routing regression exercising the real ApiClient chain.
1 parent 2676f0b commit a9f0c80

4 files changed

Lines changed: 505 additions & 199 deletions

File tree

src/chat_sdk/adapters/teams/adapter.py

Lines changed: 135 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,21 @@ def __init__(self, config: TeamsAdapterConfig | None = None) -> None:
356356
)
357357

358358
self._bot_user_id: str | None = self._app_id or None
359+
# Bot Framework token cache (scope ``api.botframework.com``). Owned by
360+
# ``_get_access_token`` and consumed only by the still-hand-rolled Bot
361+
# Framework paths: native streaming (``_teams_send``, PR 3) and
362+
# ``open_dm``. The SDK ``App`` mints its own Bot Framework token for the
363+
# migrated outbound send/edit/delete/typing paths, so those no longer
364+
# touch this field.
359365
self._access_token: str | None = None
360366
self._token_expiry: float = 0
367+
# Microsoft Graph token cache (scope ``graph.microsoft.com``). Kept on
368+
# DEDICATED fields so it can never collide with the Bot Framework token
369+
# above — the two have different scopes, and sharing one cache caused
370+
# last-writer-wins corruption (issue #93). Owned by ``_get_graph_token``
371+
# and consumed only by the hand-rolled Graph reads.
372+
self._graph_token: str | None = None
373+
self._graph_token_expiry: float = 0
361374
self._token_lock = asyncio.Lock()
362375

363376
# Microsoft Teams SDK ``App`` — owns inbound JWT validation and
@@ -1327,6 +1340,58 @@ async def _files_to_attachments(self, files: list[FileUpload]) -> list[dict[str,
13271340

13281341
return attachments
13291342

1343+
def _point_app_api_at(self, service_url: str) -> None:
1344+
"""Aim the SDK ``App``'s Bot Framework client at ``service_url``.
1345+
1346+
The migrated outbound paths call ``self._app.send(...)`` and
1347+
``self._app.api.conversations.activities(...)`` directly (parity with
1348+
upstream ``this.app.send`` / ``this.app.api.conversations``). The SDK
1349+
binds the App's :class:`ApiClient` to a single service URL at
1350+
construction, and ``app.send`` reads ``self.api.service_url`` into the
1351+
outgoing :class:`ConversationReference`. Our thread IDs encode a
1352+
per-thread service URL, so before each call we retarget the App's API
1353+
client — validating against the SSRF allow-list first, exactly as the
1354+
retired hand-rolled senders did.
1355+
1356+
The setter walks the real :class:`ApiClient`'s service-url chain
1357+
(the client itself, its ``conversations`` sub-client, and that
1358+
sub-client's ``activities_client``). It is defensive about test doubles
1359+
that replace ``self._app.api`` with a mock lacking those attributes —
1360+
an ``AttributeError`` there is harmless because the mock ignores the
1361+
service URL anyway.
1362+
"""
1363+
_validate_service_url(service_url)
1364+
normalized = service_url.rstrip("/")
1365+
api = self._app.api
1366+
try:
1367+
api.service_url = normalized
1368+
conversations = api.conversations
1369+
conversations.service_url = normalized
1370+
conversations.activities_client.service_url = normalized
1371+
except AttributeError:
1372+
# ``self._app.api`` is a test double without the real client's
1373+
# service-url chain; nothing to retarget.
1374+
pass
1375+
1376+
@staticmethod
1377+
def _message_activity_input(payload: dict[str, Any]) -> Any:
1378+
"""Build the SDK ``MessageActivityInput`` from our camelCase activity dict.
1379+
1380+
The dict we construct (``text`` / ``textFormat`` / ``attachments`` with
1381+
``contentType`` / ``contentUrl`` / ``content`` / ``name`` keys) is the
1382+
Bot Framework wire shape, which matches the SDK input model's
1383+
serialization aliases — so ``model_validate`` round-trips it directly.
1384+
We keep building the dict (it is still returned as ``RawMessage.raw``,
1385+
preserving the public contract) and convert at the SDK boundary.
1386+
1387+
Upstream constructs a ``MessageActivity``; the Python SDK splits input
1388+
(``MessageActivityInput``) from output (``MessageActivity``) models and
1389+
``app.send`` / ``activities.update`` accept only the input variant.
1390+
"""
1391+
from microsoft_teams.api import MessageActivityInput
1392+
1393+
return MessageActivityInput.model_validate(payload)
1394+
13301395
async def post_message(
13311396
self,
13321397
thread_id: str,
@@ -1361,8 +1426,16 @@ async def post_message(
13611426
)
13621427

13631428
try:
1364-
result = await self._teams_send(decoded, activity_payload)
1365-
return RawMessage(id=result.get("id", ""), thread_id=thread_id, raw=activity_payload)
1429+
self._point_app_api_at(decoded.service_url)
1430+
sent = await self._app.send(
1431+
decoded.conversation_id,
1432+
self._message_activity_input(activity_payload),
1433+
)
1434+
return RawMessage(
1435+
id=getattr(sent, "id", "") or "",
1436+
thread_id=thread_id,
1437+
raw=activity_payload,
1438+
)
13661439
except Exception as error:
13671440
self._logger.error(
13681441
"Teams API: send failed",
@@ -1371,10 +1444,7 @@ async def post_message(
13711444
"error": str(error),
13721445
},
13731446
)
1374-
error_dict: dict[str, Any] = {"message": str(error)}
1375-
if hasattr(error, "status"):
1376-
error_dict["statusCode"] = error.status
1377-
_handle_teams_error(error_dict, "postMessage")
1447+
_handle_teams_error(error, "postMessage")
13781448
raise # unreachable: _handle_teams_error always raises
13791449

13801450
# Regular text message
@@ -1401,8 +1471,17 @@ async def post_message(
14011471
)
14021472

14031473
try:
1404-
result = await self._teams_send(decoded, activity_payload)
1405-
return RawMessage(id=result.get("id", ""), thread_id=thread_id, raw=activity_payload)
1474+
self._point_app_api_at(decoded.service_url)
1475+
sent = await self._app.send(
1476+
decoded.conversation_id,
1477+
self._message_activity_input(activity_payload),
1478+
)
1479+
self._logger.debug("Teams API: send response", {"messageId": getattr(sent, "id", None)})
1480+
return RawMessage(
1481+
id=getattr(sent, "id", "") or "",
1482+
thread_id=thread_id,
1483+
raw=activity_payload,
1484+
)
14061485
except Exception as error:
14071486
self._logger.error(
14081487
"Teams API: send failed",
@@ -1411,10 +1490,7 @@ async def post_message(
14111490
"error": str(error),
14121491
},
14131492
)
1414-
error_dict = {"message": str(error)}
1415-
if hasattr(error, "status"):
1416-
error_dict["statusCode"] = error.status
1417-
_handle_teams_error(error_dict, "postMessage")
1493+
_handle_teams_error(error, "postMessage")
14181494
# Should not reach here due to _handle_teams_error always raising
14191495
raise # pragma: no cover
14201496

@@ -1467,7 +1543,11 @@ async def edit_message(
14671543
)
14681544

14691545
try:
1470-
await self._teams_update(decoded, message_id, activity_payload)
1546+
self._point_app_api_at(decoded.service_url)
1547+
await self._app.api.conversations.activities(decoded.conversation_id).update(
1548+
message_id,
1549+
self._message_activity_input(activity_payload),
1550+
)
14711551
except Exception as error:
14721552
self._logger.error(
14731553
"Teams API: updateActivity failed",
@@ -1477,10 +1557,7 @@ async def edit_message(
14771557
"error": str(error),
14781558
},
14791559
)
1480-
error_dict = {"message": str(error)}
1481-
if hasattr(error, "status"):
1482-
error_dict["statusCode"] = error.status
1483-
_handle_teams_error(error_dict, "editMessage")
1560+
_handle_teams_error(error, "editMessage")
14841561
raise # unreachable: _handle_teams_error always raises
14851562

14861563
return RawMessage(id=message_id, thread_id=thread_id, raw=activity_payload)
@@ -1498,7 +1575,8 @@ async def delete_message(self, thread_id: str, message_id: str) -> None:
14981575
)
14991576

15001577
try:
1501-
await self._teams_delete(decoded, message_id)
1578+
self._point_app_api_at(decoded.service_url)
1579+
await self._app.api.conversations.activities(decoded.conversation_id).delete(message_id)
15021580
except Exception as error:
15031581
self._logger.error(
15041582
"Teams API: deleteActivity failed",
@@ -1508,10 +1586,7 @@ async def delete_message(self, thread_id: str, message_id: str) -> None:
15081586
"error": str(error),
15091587
},
15101588
)
1511-
error_dict = {"message": str(error)}
1512-
if hasattr(error, "status"):
1513-
error_dict["statusCode"] = error.status
1514-
_handle_teams_error(error_dict, "deleteMessage")
1589+
_handle_teams_error(error, "deleteMessage")
15151590
raise # unreachable: _handle_teams_error always raises
15161591

15171592
async def add_reaction(
@@ -1534,6 +1609,8 @@ async def remove_reaction(
15341609

15351610
async def start_typing(self, thread_id: str, status: str | None = None) -> None:
15361611
"""Send typing indicator to a Teams conversation."""
1612+
from microsoft_teams.api import TypingActivityInput
1613+
15371614
decoded = self.decode_thread_id(thread_id)
15381615

15391616
self._logger.debug(
@@ -1544,7 +1621,8 @@ async def start_typing(self, thread_id: str, status: str | None = None) -> None:
15441621
)
15451622

15461623
try:
1547-
await self._teams_send(decoded, {"type": "typing"})
1624+
self._point_app_api_at(decoded.service_url)
1625+
await self._app.send(decoded.conversation_id, TypingActivityInput())
15481626
except Exception as error:
15491627
self._logger.error(
15501628
"Teams API: send (typing) failed",
@@ -2751,17 +2829,25 @@ def _extract_attachments_from_graph_message(self, msg: dict[str, Any]) -> list[A
27512829
return attachments
27522830

27532831
async def _get_graph_token(self) -> str:
2754-
"""Get a Microsoft Graph API access token (OAuth2 client credentials)."""
2832+
"""Get a Microsoft Graph API access token (OAuth2 client credentials).
2833+
2834+
Caches on the DEDICATED ``_graph_token`` / ``_graph_token_expiry``
2835+
fields — never the Bot Framework ``_access_token`` field. The two
2836+
tokens carry different scopes (``graph.microsoft.com`` vs
2837+
``api.botframework.com``); sharing one cache slot let whichever was
2838+
fetched last clobber the other, so a Graph read could end up sending a
2839+
Bot Framework token (and vice versa). See issue #93.
2840+
"""
27552841
import time as _time
27562842

27572843
# Reuse cached token if valid
2758-
if self._access_token and _time.time() < self._token_expiry:
2759-
return self._access_token
2844+
if self._graph_token and _time.time() < self._graph_token_expiry:
2845+
return self._graph_token
27602846

27612847
async with self._token_lock:
27622848
# Double-check after acquiring lock to avoid redundant refreshes
2763-
if self._access_token and _time.time() < self._token_expiry:
2764-
return self._access_token
2849+
if self._graph_token and _time.time() < self._graph_token_expiry:
2850+
return self._graph_token
27652851

27662852
tenant_id = self._app_tenant_id or "botframework.com"
27672853
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
@@ -2783,16 +2869,25 @@ async def _get_graph_token(self) -> str:
27832869
f"Failed to get Graph API token: {response.status} {error_text}",
27842870
)
27852871
data = await response.json()
2786-
self._access_token = data["access_token"]
2787-
self._token_expiry = _time.time() + data.get("expires_in", 3600) - 300
2788-
return self._access_token # type: ignore[return-value]
2872+
self._graph_token = data["access_token"]
2873+
self._graph_token_expiry = _time.time() + data.get("expires_in", 3600) - 300
2874+
return self._graph_token # type: ignore[return-value]
27892875

27902876
# =========================================================================
27912877
# Teams Bot Framework HTTP API helpers
27922878
# =========================================================================
27932879

27942880
async def _get_access_token(self) -> str:
2795-
"""Get a Bot Framework access token (OAuth2 client credentials)."""
2881+
"""Get a Bot Framework access token (OAuth2 client credentials).
2882+
2883+
Scope ``api.botframework.com``, cached on ``_access_token`` /
2884+
``_token_expiry``. The migrated outbound paths
2885+
(post/edit/delete/typing) now mint their Bot Framework token through
2886+
the SDK ``App``, so this hand-rolled token is consumed only by the
2887+
still-hand-rolled Bot Framework callers: native streaming via
2888+
:meth:`_teams_send` (PR 3) and :meth:`open_dm`. It must never share a
2889+
cache slot with the Graph token (see :meth:`_get_graph_token`).
2890+
"""
27962891
import time
27972892

27982893
if self._access_token and time.time() < self._token_expiry:
@@ -2843,7 +2938,14 @@ async def _teams_send(
28432938
decoded: TeamsThreadId,
28442939
activity: dict[str, Any],
28452940
) -> dict[str, Any]:
2846-
"""Send an activity to a Teams conversation via Bot Framework REST API."""
2941+
"""Send an activity to a Teams conversation via Bot Framework REST API.
2942+
2943+
Retained for the still-hand-rolled native streaming path (PR 3), which
2944+
needs the raw ``channelData``/``entities`` streaming envelope and the
2945+
server-assigned ``streamId`` from the REST response — neither of which
2946+
the SDK ``app.send`` surface exposes today. The migrated outbound
2947+
public methods no longer use this; they delegate to the SDK.
2948+
"""
28472949
_validate_service_url(decoded.service_url)
28482950
token = await self._get_access_token()
28492951
url = f"{decoded.service_url}v3/conversations/{decoded.conversation_id}/activities"
@@ -2865,55 +2967,6 @@ async def _teams_send(
28652967
)
28662968
return await response.json()
28672969

2868-
async def _teams_update(
2869-
self,
2870-
decoded: TeamsThreadId,
2871-
message_id: str,
2872-
activity: dict[str, Any],
2873-
) -> None:
2874-
"""Update an activity in a Teams conversation via Bot Framework REST API."""
2875-
_validate_service_url(decoded.service_url)
2876-
token = await self._get_access_token()
2877-
url = f"{decoded.service_url}v3/conversations/{decoded.conversation_id}/activities/{message_id}"
2878-
2879-
session = await self._get_http_session()
2880-
async with session.put(
2881-
url,
2882-
headers={
2883-
"Authorization": f"Bearer {token}",
2884-
"Content-Type": "application/json",
2885-
},
2886-
json=activity,
2887-
) as response:
2888-
if not response.ok:
2889-
error_text = await response.text()
2890-
raise NetworkError(
2891-
"teams",
2892-
f"Teams API error: {response.status} {error_text}",
2893-
)
2894-
2895-
async def _teams_delete(
2896-
self,
2897-
decoded: TeamsThreadId,
2898-
message_id: str,
2899-
) -> None:
2900-
"""Delete an activity from a Teams conversation via Bot Framework REST API."""
2901-
_validate_service_url(decoded.service_url)
2902-
token = await self._get_access_token()
2903-
url = f"{decoded.service_url}v3/conversations/{decoded.conversation_id}/activities/{message_id}"
2904-
2905-
session = await self._get_http_session()
2906-
async with session.delete(
2907-
url,
2908-
headers={"Authorization": f"Bearer {token}"},
2909-
) as response:
2910-
if not response.ok:
2911-
error_text = await response.text()
2912-
raise NetworkError(
2913-
"teams",
2914-
f"Teams API error: {response.status} {error_text}",
2915-
)
2916-
29172970

29182971
def create_teams_adapter(config: TeamsAdapterConfig | None = None) -> TeamsAdapter:
29192972
"""Factory function to create a Teams adapter."""

0 commit comments

Comments
 (0)