Skip to content

Commit 305ae0d

Browse files
authored
fix(DATAGO-137322): add timeout to slack client http call (#169)
Signed-off-by: zhenyu369 <job.zhenyuchen@gmail.com> Signed-off-by: zhenyu369 <zhenyu.chen@solace.com>
1 parent a52fe96 commit 305ae0d

4 files changed

Lines changed: 607 additions & 3 deletions

File tree

sam-slack-gateway-adapter/src/sam_slack_gateway_adapter/adapter.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import requests
1212
from slack_bolt.adapter.socket_mode.async_handler import AsyncSocketModeHandler
1313
from slack_bolt.async_app import AsyncApp
14+
from slack_bolt.version import __version__ as bolt_version
1415
from slack_sdk.errors import SlackApiError
16+
from slack_sdk.web.async_client import AsyncWebClient
1517

1618
from pydantic import BaseModel, Field
1719

@@ -36,6 +38,12 @@
3638

3739
_NO_EMAIL_MARKER = "_NO_EMAIL_"
3840

41+
# Upper bound on how long handle_task_complete will wait for the per-task
42+
# SlackMessageQueue to drain before giving up and ACKing the broker message
43+
# anyway. A hung Slack HTTP call inside the queue worker must not be allowed
44+
# to wedge the broker consumer.
45+
QUEUE_DRAIN_TIMEOUT_SEC = 60.0
46+
3947

4048
class SlackAdapterConfig(BaseModel):
4149
"""Configuration model for the SlackAdapter."""
@@ -78,7 +86,18 @@ async def init(self, context: GatewayContext) -> None:
7886
# Config is now a validated Pydantic model
7987
adapter_config: SlackAdapterConfig = self.context.adapter_config
8088

81-
self.slack_app = AsyncApp(token=adapter_config.slack_bot_token)
89+
# Explicit HTTP timeout so a stalled Slack API call fails fast instead
90+
# of hanging the queue worker forever. Without this, a single bad
91+
# chat.update can wedge the entire broker consumer flow.
92+
# The user_agent_prefix matches what slack_bolt.util.async_utils.
93+
# create_async_web_client() would set (Slack uses this for telemetry
94+
# to identify Bolt clients in support investigations).
95+
slack_web_client = AsyncWebClient(
96+
token=adapter_config.slack_bot_token,
97+
timeout=30,
98+
user_agent_prefix=f"Bolt-Async/{bolt_version}",
99+
)
100+
self.slack_app = AsyncApp(client=slack_web_client)
82101

83102
# --- Register Event and Action Handlers ---
84103
self._register_handlers()
@@ -542,7 +561,19 @@ async def handle_task_complete(self, context: ResponseContext) -> None:
542561

543562
if task_id in self.message_queues:
544563
queue = self.message_queues[task_id]
545-
await queue.wait_until_complete()
564+
# Bound the wait so a hung Slack API call in the queue worker can't
565+
# block the broker ACK and wedge the consumer flow.
566+
try:
567+
await asyncio.wait_for(
568+
queue.wait_until_complete(),
569+
timeout=QUEUE_DRAIN_TIMEOUT_SEC,
570+
)
571+
except asyncio.TimeoutError:
572+
log.warning(
573+
"Timeout waiting for queue to complete for task %s; "
574+
"proceeding so the broker message can be ACKed",
575+
task_id,
576+
)
546577

547578
# Final citation resolution pass: RAG data signals may have arrived after
548579
# the text was already formatted and posted. Re-apply citation transformation

sam-slack-gateway-adapter/src/sam_slack_gateway_adapter/message_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1181,8 +1181,13 @@ async def _handle_file_upload(self, op: FileUploadOp):
11811181
)
11821182
import requests
11831183

1184+
# Bound the file upload at the HTTP layer. requests.post here is
1185+
# synchronous (run in a thread) and previously had NO timeout, so
1186+
# a stalled connection to Slack's upload URL could hang the queue
1187+
# worker indefinitely — and unlike the slack-sdk's async methods,
1188+
# this call has no built-in 30s default.
11841189
upload_response = await asyncio.to_thread(
1185-
requests.post, upload_url, data=op.content_bytes
1190+
requests.post, upload_url, data=op.content_bytes, timeout=60
11861191
)
11871192
upload_response.raise_for_status()
11881193

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
"""
2+
Integration tests for hang-recovery in the Slack adapter.
3+
4+
These complement the structural unit tests in
5+
``tests/unit/test_adapter_timeouts.py`` by exercising the real code paths:
6+
7+
* A real ``SlackMessageQueue`` with a real background worker coroutine
8+
* Real ``asyncio.wait_for`` with real timing
9+
* A Slack HTTP client whose chat methods genuinely hang (await sleep)
10+
11+
The production timeout (``QUEUE_DRAIN_TIMEOUT_SEC = 60.0``) is patched down to
12+
a small value so the test runs in ~1 second instead of ~60. The fix is verified
13+
by behavior: ``handle_task_complete`` must return within the bounded window
14+
even when the queue worker is stuck on a Slack call that never returns.
15+
"""
16+
17+
import asyncio
18+
import logging
19+
from unittest.mock import AsyncMock, MagicMock, patch
20+
21+
import pytest
22+
23+
from sam_slack_gateway_adapter import adapter as adapter_mod
24+
from sam_slack_gateway_adapter.adapter import SlackAdapter, SlackAdapterConfig
25+
from sam_slack_gateway_adapter.message_queue import SlackMessageQueue
26+
from solace_agent_mesh.gateway.adapter.types import (
27+
GatewayContext,
28+
ResponseContext,
29+
)
30+
31+
32+
# Patched value of QUEUE_DRAIN_TIMEOUT_SEC for these tests. Small enough to
33+
# keep CI fast, but large enough to absorb scheduler jitter on shared runners.
34+
TEST_DRAIN_TIMEOUT_SEC = 0.5
35+
36+
# Hard upper bound for the whole test to fail fast on a regression instead of
37+
# stalling the suite. Must exceed TEST_DRAIN_TIMEOUT_SEC by a healthy margin.
38+
TEST_OUTER_DEADLINE_SEC = 5.0
39+
40+
41+
@pytest.fixture
42+
def gateway_context():
43+
context = MagicMock(spec=GatewayContext)
44+
context.adapter_config = SlackAdapterConfig(
45+
slack_bot_token="xoxb-test-token",
46+
slack_app_token="xapp-test-token",
47+
slack_initial_status_message="Thinking...",
48+
correct_markdown_formatting=True,
49+
feedback_enabled=False,
50+
slack_email_cache_ttl_seconds=3600,
51+
)
52+
context.cache_service = None
53+
context.get_config = MagicMock(return_value="OrchestratorAgent")
54+
context.get_task_state = MagicMock(return_value=None)
55+
context.set_task_state = MagicMock()
56+
return context
57+
58+
59+
@pytest.fixture
60+
def hanging_slack_client():
61+
"""A Slack client whose chat methods never return — simulates the real bug."""
62+
client = MagicMock()
63+
64+
async def hang_forever(*args, **kwargs):
65+
await asyncio.sleep(3600)
66+
67+
# Anything the queue worker might call during a text update path:
68+
client.chat_postMessage = AsyncMock(side_effect=hang_forever)
69+
client.chat_update = AsyncMock(side_effect=hang_forever)
70+
return client
71+
72+
73+
@pytest.fixture
74+
def adapter_with_hung_queue(gateway_context, hanging_slack_client):
75+
"""Adapter wired up with a REAL SlackMessageQueue + real worker.
76+
77+
The queue is seeded with one text update before ``handle_task_complete``
78+
is called, so the worker is actively awaiting a hung Slack API call by
79+
the time the test triggers the timeout-protected drain.
80+
"""
81+
adapter = SlackAdapter()
82+
adapter.context = gateway_context
83+
adapter.slack_app = MagicMock()
84+
adapter.slack_app.client = hanging_slack_client
85+
return adapter
86+
87+
88+
@pytest.fixture
89+
def response_ctx():
90+
return ResponseContext(
91+
task_id="task-int-stuck",
92+
session_id="session-int",
93+
user_id="user-int",
94+
platform_context={"channel_id": "C-int", "thread_ts": "1.0"},
95+
)
96+
97+
98+
@pytest.mark.asyncio
99+
async def test_handle_task_complete_recovers_from_hung_slack_call(
100+
adapter_with_hung_queue, response_ctx, hanging_slack_client, caplog
101+
):
102+
"""End-to-end: real queue + real worker + hung Slack client → adapter must
103+
recover within the bounded timeout and proceed to ACK.
104+
105+
This is the regression test for the hung-consumer bug. Without the fix,
106+
the background worker hangs on chat.postMessage forever, queue.join()
107+
never returns, and handle_task_complete blocks indefinitely.
108+
"""
109+
task_id = response_ctx.task_id
110+
111+
# Build a real SlackMessageQueue with the hanging client.
112+
real_queue = SlackMessageQueue(
113+
task_id=task_id,
114+
slack_client=hanging_slack_client,
115+
channel_id=response_ctx.platform_context["channel_id"],
116+
thread_ts=response_ctx.platform_context["thread_ts"],
117+
adapter=adapter_with_hung_queue,
118+
)
119+
await real_queue.start()
120+
adapter_with_hung_queue.message_queues[task_id] = real_queue
121+
122+
# Seed one text update so the worker is actively stuck on a hung Slack
123+
# call by the time handle_task_complete runs.
124+
await real_queue.queue_text_update("Hello from the agent")
125+
126+
# Give the worker a moment to pull the op and start the hanging call.
127+
await asyncio.sleep(0.05)
128+
129+
# Mock the queue's stop() — it has its own 60s wait_for on the processor
130+
# task in message_queue.py, which is a separate concern from the
131+
# wait_until_complete hang under test here.
132+
real_queue.stop = AsyncMock()
133+
134+
try:
135+
with patch.object(
136+
adapter_mod, "QUEUE_DRAIN_TIMEOUT_SEC", TEST_DRAIN_TIMEOUT_SEC
137+
), patch.object(
138+
adapter_with_hung_queue,
139+
"_resolve_citations_final_pass",
140+
new=AsyncMock(),
141+
), patch(
142+
"sam_slack_gateway_adapter.adapter.utils.send_slack_message",
143+
new=AsyncMock(),
144+
), patch(
145+
"sam_slack_gateway_adapter.adapter.utils.update_slack_message",
146+
new=AsyncMock(),
147+
), caplog.at_level(logging.WARNING):
148+
# Outer deadline: if the fix is removed, this fails fast instead
149+
# of stalling the suite.
150+
await asyncio.wait_for(
151+
adapter_with_hung_queue.handle_task_complete(response_ctx),
152+
timeout=TEST_OUTER_DEADLINE_SEC,
153+
)
154+
finally:
155+
# Force-cancel the still-stuck worker so the test loop can shut down
156+
# cleanly. In production this happens via cleanup(); here we do it
157+
# explicitly because the worker is wedged on asyncio.sleep(3600).
158+
if real_queue.processor_task and not real_queue.processor_task.done():
159+
real_queue.processor_task.cancel()
160+
try:
161+
await real_queue.processor_task
162+
except (asyncio.CancelledError, BaseException):
163+
pass
164+
165+
# Behavioral assertion: the documented warning was emitted.
166+
assert any(
167+
"Timeout waiting for queue to complete" in record.message
168+
and task_id in record.message
169+
for record in caplog.records
170+
), (
171+
"handle_task_complete recovered from the hung queue but did not log "
172+
"the expected timeout warning. Without that log line, operators will "
173+
"have no signal that a task was abandoned."
174+
)
175+
176+
177+
@pytest.mark.asyncio
178+
async def test_handle_task_complete_completes_normally_when_queue_drains(
179+
adapter_with_hung_queue, response_ctx, caplog
180+
):
181+
"""Happy path: a real queue that drains quickly must produce no warning."""
182+
task_id = response_ctx.task_id
183+
184+
# Use a client whose chat methods return immediately (no hang).
185+
fast_client = MagicMock()
186+
fast_client.chat_postMessage = AsyncMock(
187+
return_value={"ok": True, "ts": "1.1"}
188+
)
189+
fast_client.chat_update = AsyncMock(return_value={"ok": True})
190+
191+
real_queue = SlackMessageQueue(
192+
task_id=task_id,
193+
slack_client=fast_client,
194+
channel_id=response_ctx.platform_context["channel_id"],
195+
thread_ts=response_ctx.platform_context["thread_ts"],
196+
adapter=adapter_with_hung_queue,
197+
)
198+
await real_queue.start()
199+
adapter_with_hung_queue.message_queues[task_id] = real_queue
200+
201+
try:
202+
with patch.object(
203+
adapter_mod, "QUEUE_DRAIN_TIMEOUT_SEC", TEST_DRAIN_TIMEOUT_SEC
204+
), patch.object(
205+
adapter_with_hung_queue,
206+
"_resolve_citations_final_pass",
207+
new=AsyncMock(),
208+
), patch(
209+
"sam_slack_gateway_adapter.adapter.utils.send_slack_message",
210+
new=AsyncMock(),
211+
), patch(
212+
"sam_slack_gateway_adapter.adapter.utils.update_slack_message",
213+
new=AsyncMock(),
214+
), caplog.at_level(logging.WARNING):
215+
await asyncio.wait_for(
216+
adapter_with_hung_queue.handle_task_complete(response_ctx),
217+
timeout=TEST_OUTER_DEADLINE_SEC,
218+
)
219+
finally:
220+
await real_queue.stop()
221+
222+
assert not any(
223+
"Timeout waiting for queue to complete" in record.message
224+
for record in caplog.records
225+
), "Happy-path drain must not emit a timeout warning"

0 commit comments

Comments
 (0)