Skip to content

Commit e7df0db

Browse files
authored
Merge pull request #702 from seratch/issue-701
Fix #701 by reverting the change for simultaneous request handling
2 parents 439c7ae + 10563ef commit e7df0db

4 files changed

Lines changed: 154 additions & 32 deletions

File tree

.deepsource.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ test_patterns = [
88
exclude_patterns = [
99
'setup.py',
1010
'docs-src/**/*.py',
11-
'tutorial/**/*.py'
11+
'tutorial/**/*.py',
12+
'integration_tests/web/*.py',
13+
'integration_tests/rtm/*.py'
1214
]
1315

1416
[[analyzers]]

integration_tests/rtm/test_issue_558.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def tearDown(self):
2727
# Reset the decorators by @RTMClient.run_on
2828
RTMClient._callbacks = collections.defaultdict(list)
2929

30-
@pytest.mark.skipif(condition=is_not_specified(), reason="To avoid rate limited errors")
30+
@pytest.mark.skipif(condition=is_not_specified(), reason="Still unfixed")
3131
@async_test
3232
async def test_issue_558(self):
3333
channel_id = os.environ[SLACK_SDK_TEST_RTM_TEST_CHANNEL_ID]
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import asyncio
2+
import collections
3+
import logging
4+
import os
5+
import threading
6+
import time
7+
import unittest
8+
9+
import pytest
10+
11+
from integration_tests.env_variable_names import SLACK_SDK_TEST_CLASSIC_APP_BOT_TOKEN
12+
from integration_tests.helpers import async_test, is_not_specified
13+
from slack import RTMClient, WebClient
14+
15+
16+
class TestRTMClient(unittest.TestCase):
17+
"""Runs integration tests with real Slack API
18+
19+
https://github.com/slackapi/python-slackclient/issues/701
20+
"""
21+
22+
def setUp(self):
23+
self.logger = logging.getLogger(__name__)
24+
self.bot_token = os.environ[SLACK_SDK_TEST_CLASSIC_APP_BOT_TOKEN]
25+
26+
def tearDown(self):
27+
# Reset the decorators by @RTMClient.run_on
28+
RTMClient._callbacks = collections.defaultdict(list)
29+
30+
# @pytest.mark.skipif(condition=is_not_specified(), reason="to avoid rate_limited errors")
31+
@pytest.mark.skip()
32+
def test_receiving_all_messages(self):
33+
self.rtm_client = RTMClient(token=self.bot_token, loop=asyncio.new_event_loop())
34+
self.web_client = WebClient(token=self.bot_token)
35+
36+
self.call_count = 0
37+
38+
@RTMClient.run_on(event="message")
39+
def send_reply(**payload):
40+
self.logger.debug(payload)
41+
web_client, data = payload["web_client"], payload["data"]
42+
web_client.reactions_add(channel=data["channel"], timestamp=data["ts"], name="eyes")
43+
self.call_count += 1
44+
45+
def connect():
46+
self.logger.debug("Starting RTM Client...")
47+
self.rtm_client.start()
48+
49+
rtm = threading.Thread(target=connect)
50+
rtm.setDaemon(True)
51+
52+
rtm.start()
53+
time.sleep(3)
54+
55+
total_num = 10
56+
57+
sender_completion = []
58+
59+
def sent_bulk_message():
60+
for i in range(total_num):
61+
text = f"Sent by <https://slack.dev/python-slackclient/|python-slackclient>! ({i})"
62+
self.web_client.chat_postMessage(channel="#random", text=text)
63+
time.sleep(0.1)
64+
sender_completion.append(True)
65+
66+
num_of_senders = 3
67+
senders = []
68+
for sender_num in range(num_of_senders):
69+
sender = threading.Thread(target=sent_bulk_message)
70+
sender.setDaemon(True)
71+
sender.start()
72+
senders.append(sender)
73+
74+
while len(sender_completion) < num_of_senders:
75+
time.sleep(1)
76+
77+
expected_call_count = total_num * num_of_senders
78+
wait_seconds = 0
79+
max_wait = 20
80+
while self.call_count < expected_call_count and wait_seconds < max_wait:
81+
time.sleep(1)
82+
wait_seconds += 1
83+
84+
self.assertEqual(total_num * num_of_senders, self.call_count, "The RTM handler failed")
85+
86+
@pytest.mark.skipif(condition=is_not_specified(), reason="to avoid rate_limited errors")
87+
@async_test
88+
async def test_receiving_all_messages_async(self):
89+
self.rtm_client = RTMClient(token=self.bot_token, run_async=True)
90+
self.web_client = WebClient(token=self.bot_token, run_async=False)
91+
92+
self.call_count = 0
93+
94+
@RTMClient.run_on(event="message")
95+
async def send_reply(**payload):
96+
self.logger.debug(payload)
97+
web_client, data = payload["web_client"], payload["data"]
98+
await web_client.reactions_add(channel=data["channel"], timestamp=data["ts"], name="eyes")
99+
self.call_count += 1
100+
101+
# intentionally not waiting here
102+
self.rtm_client.start()
103+
104+
await asyncio.sleep(3)
105+
106+
total_num = 10
107+
108+
sender_completion = []
109+
110+
def sent_bulk_message():
111+
for i in range(total_num):
112+
text = f"Sent by <https://slack.dev/python-slackclient/|python-slackclient>! ({i})"
113+
self.web_client.chat_postMessage(channel="#random", text=text)
114+
time.sleep(0.1)
115+
sender_completion.append(True)
116+
117+
num_of_senders = 3
118+
senders = []
119+
for sender_num in range(num_of_senders):
120+
sender = threading.Thread(target=sent_bulk_message)
121+
sender.setDaemon(True)
122+
sender.start()
123+
senders.append(sender)
124+
125+
while len(sender_completion) < num_of_senders:
126+
await asyncio.sleep(1)
127+
128+
expected_call_count = total_num * num_of_senders
129+
wait_seconds = 0
130+
max_wait = 20
131+
while self.call_count < expected_call_count and wait_seconds < max_wait:
132+
await asyncio.sleep(1)
133+
wait_seconds += 1
134+
135+
self.assertEqual(total_num * num_of_senders, self.call_count, "The RTM handler failed")

slack/rtm/client.py

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -394,12 +394,7 @@ async def _connect_and_read(self):
394394

395395
async def _read_messages(self):
396396
"""Process messages received on the WebSocket connection."""
397-
text_message_callback_executions: List[Future] = []
398397
while not self._stopped and self._websocket is not None:
399-
for future in text_message_callback_executions:
400-
if future.done():
401-
text_message_callback_executions.remove(future)
402-
403398
try:
404399
# Wait for a message to be received, but timeout after a second so that
405400
# we can check if the socket has been closed, or if self._stopped is
@@ -419,34 +414,22 @@ async def _read_messages(self):
419414
)
420415
self._websocket = None
421416
await self._dispatch_event(event="close")
422-
num_of_running_callbacks = len(text_message_callback_executions)
423-
if num_of_running_callbacks > 0:
424-
self._logger.info(
425-
"WebSocket connection has been closed "
426-
f"though {num_of_running_callbacks} callback executions were still in progress"
427-
)
428417
return
429418

430419
if message.type == aiohttp.WSMsgType.TEXT:
431-
payload = message.json()
432-
event = payload.pop("type", "Unknown")
433-
434-
async def run_dispatch_event():
435-
try:
436-
await self._dispatch_event(event, data=payload)
437-
except Exception as err:
438-
data = message.data if message else message
439-
self._logger.info(
440-
f"Caught a raised exception ({err}) while dispatching a TEXT message ({data})"
441-
)
442-
# Raised exceptions here happen in users' code and were just unhandled.
443-
# As they're not intended for closing current WebSocket connection,
444-
# this exception should not be propagated to higher level (#_connect_and_read()).
445-
return
446-
447-
# Asynchronously run callbacks to handle simultaneous incoming messages from Slack
448-
f = asyncio.ensure_future(run_dispatch_event())
449-
text_message_callback_executions.append(f)
420+
try:
421+
payload = message.json()
422+
event = payload.pop("type", "Unknown")
423+
await self._dispatch_event(event, data=payload)
424+
except Exception as err:
425+
data = message.data if message else message
426+
self._logger.info(
427+
f"Caught a raised exception ({err}) while dispatching a TEXT message ({data})"
428+
)
429+
# Raised exceptions here happen in users' code and were just unhandled.
430+
# As they're not intended for closing current WebSocket connection,
431+
# this exception should not be propagated to higher level (#_connect_and_read()).
432+
continue
450433
elif message.type == aiohttp.WSMsgType.ERROR:
451434
self._logger.error("Received an error on the websocket: %r", message)
452435
await self._dispatch_event(event="error", data=message)
@@ -479,6 +462,8 @@ async def _dispatch_event(self, event, data=None):
479462
}
480463
}
481464
"""
465+
if self._logger.level <= logging.DEBUG:
466+
self._logger.debug("Received an event: '%s' - %s", event, data)
482467
for callback in self._callbacks[event]:
483468
self._logger.debug(
484469
"Running %s callbacks for event: '%s'",

0 commit comments

Comments
 (0)