Skip to content

Commit 0788d1e

Browse files
fix: critical and high port bugs across all adapters
Critical fixes: - Slack: ContextVar now uses copy_context().run() so async factory tasks inherit the multi-workspace token (was resetting before tasks ran) - Discord: Added deferred slash command response path (PATCH interaction webhook instead of posting new message) - Discord: Added file attachment support in post_message (multipart upload) - WhatsApp: Restored Authorization header on media download step 2 (SSRF fix had removed it, breaking all media downloads) - Chat: Added on_lock_conflict support (force-release path was missing) High fixes: - Discord: Normalize emoji through resolve_emoji_from_gchat before constructing EmojiValue (was storing raw unicode like "👍") - Teams: Pass webhook options to process_reaction - GChat: Subscription error propagation to concurrent waiters - Linear: Include both snake_case and camelCase keys in fetch_thread metadata - Thread/Channel: Document messages()/all_messages() as methods (Pythonic) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6d0340a commit 0788d1e

10 files changed

Lines changed: 182 additions & 30 deletions

File tree

src/chat_sdk/adapters/discord/adapter.py

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
DiscordThreadId,
3737
InteractionResponseType,
3838
)
39-
from chat_sdk.emoji import convert_emoji_placeholders
39+
from chat_sdk.emoji import convert_emoji_placeholders, get_emoji, resolve_emoji_from_gchat
4040
from chat_sdk.logger import ConsoleLogger, Logger
41-
from chat_sdk.shared.adapter_utils import extract_card
41+
from chat_sdk.shared.adapter_utils import extract_card, extract_files
4242
from chat_sdk.shared.errors import NetworkError, ValidationError
4343
from chat_sdk.types import (
4444
ActionEvent,
@@ -50,6 +50,7 @@
5050
EmojiValue,
5151
FetchOptions,
5252
FetchResult,
53+
FileUpload,
5354
FormattedContent,
5455
Message,
5556
MessageMetadata,
@@ -677,13 +678,21 @@ async def _handle_forwarded_reaction(
677678
emoji_id = emoji_data.get("id")
678679
raw_emoji = f"<:{emoji_name}:{emoji_id}>" if emoji_id else emoji_name
679680

681+
# Normalize emoji through the emoji resolver
682+
if emoji_name and not emoji_id:
683+
# Standard unicode emoji -- resolve through gchat (unicode) resolver
684+
normalized = resolve_emoji_from_gchat(emoji_name)
685+
else:
686+
# Custom emoji -- use custom:{id} key or raw name
687+
normalized = get_emoji(f"custom:{emoji_id}" if emoji_id else emoji_name)
688+
680689
self._chat.process_reaction(
681690
ReactionEvent(
682691
adapter=self,
683692
thread=None,
684693
thread_id=thread_id,
685694
message_id=data.get("message_id", ""),
686-
emoji=EmojiValue(name=emoji_name),
695+
emoji=normalized,
687696
raw_emoji=raw_emoji,
688697
added=added,
689698
user=Author(
@@ -730,20 +739,59 @@ async def post_message(
730739
if components:
731740
payload["components"] = components
732741

742+
# --- Handle file attachments via multipart/form-data ---
743+
files = extract_files(message)
744+
745+
# --- Resolve deferred slash-command interaction if pending ---
746+
req_ctx = self._request_context.get()
747+
slash_ctx = req_ctx.slash_command if req_ctx else None
748+
if slash_ctx and not slash_ctx.initial_response_sent:
749+
slash_ctx.initial_response_sent = True
750+
self._logger.debug(
751+
"Discord API: PATCH deferred interaction response",
752+
{
753+
"channelId": channel_id,
754+
"contentLength": len(payload.get("content", "")),
755+
"embedCount": len(embeds),
756+
"componentCount": len(components),
757+
"fileCount": len(files),
758+
},
759+
)
760+
761+
result = await self._discord_fetch(
762+
f"/webhooks/{self._application_id}/{slash_ctx.interaction_token}/messages/@original",
763+
"PATCH",
764+
payload,
765+
files=files or None,
766+
)
767+
768+
self._logger.debug(
769+
"Discord API: PATCH deferred interaction response completed",
770+
{"messageId": result.get("id") if result else None},
771+
)
772+
773+
return RawMessage(
774+
id=(result or {}).get("id", ""),
775+
thread_id=thread_id,
776+
raw=result or {},
777+
)
778+
733779
self._logger.debug(
734780
"Discord API: POST message",
735781
{
736782
"channelId": channel_id,
737783
"contentLength": len(payload.get("content", "")),
738784
"embedCount": len(embeds),
739785
"componentCount": len(components),
786+
"fileCount": len(files),
740787
},
741788
)
742789

743790
result = await self._discord_fetch(
744791
f"/channels/{channel_id}/messages",
745792
"POST",
746793
payload,
794+
files=files or None,
747795
)
748796

749797
self._logger.debug(
@@ -1255,25 +1303,48 @@ async def _discord_fetch(
12551303
path: str,
12561304
method: str,
12571305
body: Any = None,
1306+
files: list[FileUpload] | None = None,
12581307
) -> Any:
1259-
"""Make a request to the Discord API using aiohttp (lazy import)."""
1308+
"""Make a request to the Discord API using aiohttp (lazy import).
1309+
1310+
When *files* is provided the request uses ``multipart/form-data``
1311+
with a ``payload_json`` field for the JSON body and one field per
1312+
file attachment, matching the Discord API multipart upload spec.
1313+
"""
12601314
import aiohttp # lazy import
12611315

12621316
url = f"{DISCORD_API_BASE}{path}"
12631317
headers: dict[str, str] = {
12641318
"Authorization": f"Bot {self._bot_token}",
12651319
}
12661320

1267-
if body is not None:
1268-
headers["Content-Type"] = "application/json"
1321+
# Build request kwargs depending on whether we have file uploads
1322+
request_kwargs: dict[str, Any] = {}
1323+
if files:
1324+
# Multipart form-data with payload_json + file parts
1325+
form = aiohttp.FormData()
1326+
form.add_field("payload_json", json.dumps(body or {}), content_type="application/json")
1327+
for idx, file in enumerate(files):
1328+
form.add_field(
1329+
f"files[{idx}]",
1330+
file.data,
1331+
filename=file.filename,
1332+
content_type=file.mime_type or "application/octet-stream",
1333+
)
1334+
request_kwargs["data"] = form
1335+
# Do NOT set Content-Type header -- aiohttp sets the multipart boundary
1336+
else:
1337+
if body is not None:
1338+
headers["Content-Type"] = "application/json"
1339+
request_kwargs["json"] = body
12691340

12701341
async with (
12711342
aiohttp.ClientSession() as session,
12721343
session.request(
12731344
method,
12741345
url,
12751346
headers=headers,
1276-
json=body if body is not None else None,
1347+
**request_kwargs,
12771348
) as response,
12781349
):
12791350
if not response.ok:

src/chat_sdk/adapters/google_chat/adapter.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -501,16 +501,22 @@ async def _ensure_space_subscription(self, space_name: str) -> None:
501501
"Subscription creation already in progress, waiting",
502502
{"spaceName": space_name},
503503
)
504-
await self._pending_subscriptions[space_name].wait()
504+
pending = self._pending_subscriptions[space_name]
505+
await pending["event"].wait()
506+
if pending.get("error"):
507+
raise pending["error"]
505508
return
506509

507510
# Create the subscription
508-
event = asyncio.Event()
509-
self._pending_subscriptions[space_name] = event
511+
pending_entry: dict[str, Any] = {"event": asyncio.Event(), "error": None}
512+
self._pending_subscriptions[space_name] = pending_entry
510513
try:
511514
await self._create_space_subscription_with_cache(space_name, cache_key)
515+
except Exception as e:
516+
pending_entry["error"] = e
517+
raise
512518
finally:
513-
event.set()
519+
pending_entry["event"].set()
514520
self._pending_subscriptions.pop(space_name, None)
515521

516522
async def _create_space_subscription_with_cache(

src/chat_sdk/adapters/linear/adapter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,8 @@ async def fetch_thread(self, thread_id: str) -> ThreadInfo:
785785
channel_name=f"{issue.get('identifier', '')}: {issue.get('title', '')}",
786786
is_dm=False,
787787
metadata={
788-
"issue_id": decoded.issue_id,
788+
"issueId": decoded.issue_id,
789+
"issue_id": decoded.issue_id, # snake_case alias for compatibility
789790
"identifier": issue.get("identifier"),
790791
"title": issue.get("title"),
791792
"url": issue.get("url"),

src/chat_sdk/adapters/slack/adapter.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import asyncio
1212
import base64
13+
import contextvars
1314
import hashlib
1415
import hmac
1516
import json
@@ -681,18 +682,21 @@ async def handle_webhook(self, request: Any, options: WebhookOptions | None = No
681682
"headers": {"Content-Type": "application/json"},
682683
}
683684

684-
# Multi-workspace: resolve token before processing events
685+
# Multi-workspace: resolve token before processing events.
686+
# Use contextvars.copy_context() so the ContextVar value persists into
687+
# any async tasks spawned by _process_event_payload (e.g. process_message
688+
# creates a task via asyncio.create_task). The copied context is
689+
# isolated -- the ContextVar change does not leak back to the caller
690+
# and does not need an explicit reset.
685691
if not self._default_bot_token and payload.get("type") == "event_callback":
686692
team_id_event = payload.get("team_id")
687693
if team_id_event:
688694
ctx = await self._resolve_token_for_team(team_id_event)
689695
if ctx:
690-
tok = self._request_context.set(ctx)
691-
try:
692-
self._process_event_payload(payload, options)
693-
return {"body": "ok", "status": 200}
694-
finally:
695-
self._request_context.reset(tok)
696+
isolated = contextvars.copy_context()
697+
isolated.run(self._request_context.set, ctx)
698+
isolated.run(self._process_event_payload, payload, options)
699+
return {"body": "ok", "status": 200}
696700
self._logger.warn("Could not resolve token for team", {"teamId": team_id_event})
697701
return {"body": "ok", "status": 200}
698702

src/chat_sdk/adapters/teams/adapter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,8 @@ def _handle_reaction_activity(
469469
thread=None,
470470
adapter=self,
471471
raw=activity,
472-
)
472+
),
473+
options,
473474
)
474475

475476
for reaction in activity.get("reactionsRemoved", []):
@@ -485,7 +486,8 @@ def _handle_reaction_activity(
485486
thread=None,
486487
adapter=self,
487488
raw=activity,
488-
)
489+
),
490+
options,
489491
)
490492

491493
def _parse_teams_message(

src/chat_sdk/adapters/whatsapp/adapter.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,9 +660,13 @@ async def download_media(self, media_id: str) -> bytes:
660660
f"Media download URL host is not an allowed Meta domain: {host}",
661661
)
662662

663-
# Step 2: Download the actual file (no Bearer token -- CDN URLs are pre-signed)
663+
# Step 2: Download the actual file.
664+
# The WhatsApp Cloud API requires the Bearer token for media downloads
665+
# (the URL is not pre-signed). The SSRF domain validation above ensures
666+
# we only send the token to legitimate Meta/WhatsApp domains.
664667
async with session.get(
665668
download_url,
669+
headers={"Authorization": f"Bearer {self._access_token}"},
666670
) as data_response:
667671
if data_response.status != 200:
668672
self._logger.error(

src/chat_sdk/chat.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
ModalCloseEvent,
4848
ModalResponse,
4949
ModalSubmitEvent,
50+
OnLockConflict,
5051
QueueEntry,
5152
ReactionEvent,
5253
SlashCommandEvent,
@@ -228,6 +229,7 @@ def __init__(self, config: ChatConfig | None = None, **kwargs: Any) -> None:
228229
self._fallback_streaming_placeholder_text = config.fallback_streaming_placeholder_text
229230
self._dedupe_ttl_ms = config.dedupe_ttl_ms or DEDUPE_TTL_MS
230231
self._lock_scope_config = config.lock_scope
232+
self._on_lock_conflict: OnLockConflict | None = config.on_lock_conflict
231233

232234
# -- Concurrency config -----------------------------------------------
233235
concurrency = config.concurrency
@@ -1440,11 +1442,14 @@ async def _handle_drop(
14401442
) -> None:
14411443
lock = await self._state_adapter.acquire_lock(lock_key, DEFAULT_LOCK_TTL_MS)
14421444
if lock is None:
1443-
self._logger.warn("Could not acquire lock on thread", {"thread_id": thread_id, "lock_key": lock_key})
1444-
raise LockError(
1445-
thread_id,
1446-
f"Could not acquire lock on thread {thread_id}. Another instance may be processing.",
1447-
)
1445+
# Lock acquisition failed -- consult on_lock_conflict policy
1446+
lock = await self._resolve_lock_conflict(thread_id, lock_key, message)
1447+
if lock is None:
1448+
self._logger.warn("Could not acquire lock on thread", {"thread_id": thread_id, "lock_key": lock_key})
1449+
raise LockError(
1450+
thread_id,
1451+
f"Could not acquire lock on thread {thread_id}. Another instance may be processing.",
1452+
)
14481453

14491454
self._logger.debug("Lock acquired", {"thread_id": thread_id, "lock_key": lock_key, "token": lock.token})
14501455
try:
@@ -1453,6 +1458,47 @@ async def _handle_drop(
14531458
await self._state_adapter.release_lock(lock)
14541459
self._logger.debug("Lock released", {"thread_id": thread_id, "lock_key": lock_key})
14551460

1461+
async def _resolve_lock_conflict(
1462+
self,
1463+
thread_id: str,
1464+
lock_key: str,
1465+
message: Message,
1466+
) -> Lock | None:
1467+
"""Attempt to resolve a lock conflict based on the ``on_lock_conflict`` policy.
1468+
1469+
Returns a :class:`Lock` if the conflict was resolved and the lock
1470+
was successfully re-acquired, or ``None`` if the message should be
1471+
dropped.
1472+
"""
1473+
conflict = self._on_lock_conflict
1474+
1475+
if conflict is None or conflict == "drop":
1476+
return None
1477+
1478+
if conflict == "force":
1479+
self._logger.info(
1480+
"Force-releasing lock due to on_lock_conflict='force'",
1481+
{"thread_id": thread_id, "lock_key": lock_key},
1482+
)
1483+
await self._state_adapter.force_release_lock(lock_key)
1484+
return await self._state_adapter.acquire_lock(lock_key, DEFAULT_LOCK_TTL_MS)
1485+
1486+
# Callable handler -- invoke and inspect result
1487+
if callable(conflict):
1488+
result = conflict(thread_id, message)
1489+
# Support both sync and async callables
1490+
if asyncio.iscoroutine(result) or asyncio.isfuture(result):
1491+
result = await result
1492+
if result:
1493+
self._logger.info(
1494+
"on_lock_conflict callback returned True, force-releasing lock",
1495+
{"thread_id": thread_id, "lock_key": lock_key},
1496+
)
1497+
await self._state_adapter.force_release_lock(lock_key)
1498+
return await self._state_adapter.acquire_lock(lock_key, DEFAULT_LOCK_TTL_MS)
1499+
1500+
return None
1501+
14561502
# -- Queue / Debounce strategy -------------------------------------------
14571503

14581504
async def _handle_queue_or_debounce(

0 commit comments

Comments
 (0)