Skip to content

Commit 9fd264f

Browse files
authored
asyncio-based Socket Mode client improvements (#1117)
* Add initial ping right after establishing a new connection in AIOHTTP-based SocketModeClient * Add is_ping_pong_failing check to is_connected method * Many improvements to the session monitoring job * Fix syntax error in Python 3.6 * Enable Socket Mode tests
1 parent 9b3cf06 commit 9fd264f

4 files changed

Lines changed: 305 additions & 84 deletions

File tree

.github/workflows/ci-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
python-version: ['3.6', '3.7', '3.8', '3.9']
1717
env:
1818
PYTHON_SLACK_SDK_MOCK_SERVER_MODE: 'threading'
19-
CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
19+
#CI_UNSTABLE_TESTS_SKIP_ENABLED: '1'
2020
steps:
2121
- uses: actions/checkout@v2
2222
- name: Set up Python ${{ matrix.python-version }}

slack_sdk/socket_mode/aiohttp/__init__.py

Lines changed: 163 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -136,59 +136,116 @@ def __init__(
136136
self.message_processor = asyncio.ensure_future(self.process_messages())
137137

138138
async def monitor_current_session(self) -> None:
139+
# In the asyncio runtime, accessing a shared object (self.current_session here) from
140+
# multiple tasks can cause race conditions and errors.
141+
# To avoid such, we access only the session that is active when this loop starts.
142+
session: ClientWebSocketResponse = self.current_session
143+
session_id: str = self.build_session_id(session)
144+
145+
if self.logger.level <= logging.DEBUG:
146+
self.logger.debug(
147+
f"A new monitor_current_session() execution loop for {session_id} started"
148+
)
139149
try:
150+
logging_interval = 100
151+
counter_for_logging = 0
152+
140153
while not self.closed:
154+
if session != self.current_session:
155+
if self.logger.level <= logging.DEBUG:
156+
self.logger.debug(
157+
f"The monitor_current_session task for {session_id} is now cancelled"
158+
)
159+
break
141160
try:
161+
# The logging here is for detailed trouble shooting of potential issues in this client.
162+
# If you don't see this log for a while, it means that
163+
# this receive_messages execution is no longer working for some reason.
164+
counter_for_logging = (counter_for_logging + 1) % logging_interval
165+
if counter_for_logging == 0:
166+
log_message = (
167+
f"{logging_interval} session verification executed after the previous same log"
168+
f" ({session_id})"
169+
)
170+
self.logger.debug(log_message)
171+
142172
await asyncio.sleep(self.ping_interval)
143-
if self.current_session is not None:
173+
174+
if session is not None and session.closed is False:
144175
t = time.time()
145176
if self.last_ping_pong_time is None:
146177
self.last_ping_pong_time = float(t)
147-
await self.current_session.ping(f"sdk-ping-pong:{t}")
178+
try:
179+
await session.ping(f"sdk-ping-pong:{t}")
180+
except Exception as e:
181+
# The ping() method can fail for some reason.
182+
# To establish a new connection even in this scenario,
183+
# we ignore the exception here.
184+
self.logger.warning(
185+
f"Failed to send a ping message ({session_id}): {e}"
186+
)
148187

149188
if self.auto_reconnect_enabled:
150189
should_reconnect = False
151-
if self.current_session is None or self.current_session.closed:
190+
if session is None or session.closed:
152191
self.logger.info(
153-
"The session seems to be already closed. Reconnecting..."
192+
f"The session ({session_id}) seems to be already closed. Reconnecting..."
154193
)
155194
should_reconnect = True
156195

157-
if self.last_ping_pong_time is not None:
196+
if await self.is_ping_pong_failing():
158197
disconnected_seconds = int(
159198
time.time() - self.last_ping_pong_time
160199
)
161-
if disconnected_seconds >= (self.ping_interval * 4):
162-
self.logger.info(
163-
"The connection seems to be stale. Reconnecting..."
164-
f" reason: disconnected for {disconnected_seconds}+ seconds)"
165-
)
166-
self.stale = True
167-
self.last_ping_pong_time = None
168-
should_reconnect = True
200+
self.logger.info(
201+
f"The session ({session_id}) seems to be stale. Reconnecting..."
202+
f" reason: disconnected for {disconnected_seconds}+ seconds)"
203+
)
204+
self.stale = True
205+
self.last_ping_pong_time = None
206+
should_reconnect = True
169207

170208
if should_reconnect is True or not await self.is_connected():
171209
await self.connect_to_new_endpoint()
172210

173211
except Exception as e:
174212
self.logger.error(
175-
"Failed to check the current session or reconnect to the server "
213+
f"Failed to check the current session ({session_id}) or reconnect to the server "
176214
f"(error: {type(e).__name__}, message: {e})"
177215
)
178216
except asyncio.CancelledError:
179-
if self.trace_enabled:
217+
if self.logger.level <= logging.DEBUG:
180218
self.logger.debug(
181-
"The running monitor_current_session task is now cancelled"
219+
f"The monitor_current_session task for {session_id} is now cancelled"
182220
)
183221
raise
184222

185223
async def receive_messages(self) -> None:
224+
# In the asyncio runtime, accessing a shared object (self.current_session here) from
225+
# multiple tasks can cause race conditions and errors.
226+
# To avoid such, we access only the session that is active when this loop starts.
227+
session = self.current_session
228+
session_id = self.build_session_id(session)
229+
if self.logger.level <= logging.DEBUG:
230+
self.logger.debug(
231+
f"A new receive_messages() execution loop with {session_id} started"
232+
)
186233
try:
187234
consecutive_error_count = 0
235+
logging_interval = 100
236+
counter_for_logging = 0
237+
188238
while not self.closed:
239+
if session != self.current_session:
240+
if self.logger.level <= logging.DEBUG:
241+
self.logger.debug(
242+
f"The running receive_messages task for {session_id} is now cancelled"
243+
)
244+
break
189245
try:
190-
message: WSMessage = await self.current_session.receive()
246+
message: WSMessage = await session.receive()
191247
if self.trace_enabled and self.logger.level <= logging.DEBUG:
248+
# The following logging prints every single received message except empty message data ones.
192249
type = WSMsgType(message.type)
193250
message_type = type.name if type is not None else message.type
194251
message_data = message.data
@@ -197,8 +254,27 @@ async def receive_messages(self) -> None:
197254
if len(message_data) > 0:
198255
# To skip the empty message that Slack server-side often sends
199256
self.logger.debug(
200-
f"Received message (type: {message_type}, data: {message_data}, extra: {message.extra})"
257+
f"Received message "
258+
f"(type: {message_type}, "
259+
f"data: {message_data}, "
260+
f"extra: {message.extra}, "
261+
f"session: {session_id})"
201262
)
263+
264+
# The logging here is for detailed trouble shooting of potential issues in this client.
265+
# If you don't see this log for a while, it means that
266+
# this receive_messages execution is no longer working for some reason.
267+
if self.logger.level <= logging.DEBUG:
268+
counter_for_logging = (
269+
counter_for_logging + 1
270+
) % logging_interval
271+
if counter_for_logging == 0:
272+
log_message = (
273+
f"{logging_interval} WebSocket messages received "
274+
f"after the previous same log ({session_id})"
275+
)
276+
self.logger.debug(log_message)
277+
202278
if message is not None:
203279
if message.type == WSMsgType.TEXT:
204280
message_data = message.data
@@ -208,7 +284,7 @@ async def receive_messages(self) -> None:
208284
elif message.type == WSMsgType.CLOSE:
209285
if self.auto_reconnect_enabled:
210286
self.logger.info(
211-
"Received CLOSE event. Reconnecting..."
287+
f"Received CLOSE event from {session_id}. Reconnecting..."
212288
)
213289
await self.connect_to_new_endpoint()
214290
for listener in self.on_close_listeners:
@@ -220,7 +296,7 @@ async def receive_messages(self) -> None:
220296
await asyncio.sleep(self.ping_interval)
221297
continue
222298
elif message.type == WSMsgType.PING:
223-
await self.current_session.pong(message.data)
299+
await session.pong(message.data)
224300
continue
225301
elif message.type == WSMsgType.PONG:
226302
if message.data is not None:
@@ -235,31 +311,56 @@ async def receive_messages(self) -> None:
235311
except Exception as e:
236312
self.logger.warning(
237313
f"Failed to parse the last_ping_pong_time value from {str_message_data}"
238-
f" - error : {e}"
314+
f" - error : {e}, session: {session_id}"
239315
)
240316
continue
241317
consecutive_error_count = 0
242318
except Exception as e:
243319
consecutive_error_count += 1
244320
self.logger.error(
245-
f"Failed to receive or enqueue a message: {type(e).__name__}, {e}"
321+
f"Failed to receive or enqueue a message: {type(e).__name__}, {e} ({session_id})"
246322
)
247323
if isinstance(e, ClientConnectionError):
248324
await asyncio.sleep(self.ping_interval)
249325
else:
250326
await asyncio.sleep(consecutive_error_count)
251327
except asyncio.CancelledError:
252-
if self.trace_enabled:
253-
self.logger.debug("The running receive_messages task is now cancelled")
328+
if self.logger.level <= logging.DEBUG:
329+
self.logger.debug(
330+
f"The running receive_messages task for {session_id} is now cancelled"
331+
)
254332
raise
255333

334+
async def is_ping_pong_failing(self) -> bool:
335+
if self.last_ping_pong_time is None:
336+
return False
337+
disconnected_seconds = int(time.time() - self.last_ping_pong_time)
338+
return disconnected_seconds >= (self.ping_interval * 4)
339+
256340
async def is_connected(self) -> bool:
257-
return (
341+
connected: bool = (
258342
not self.closed
259343
and not self.stale
260344
and self.current_session is not None
261345
and not self.current_session.closed
346+
and not await self.is_ping_pong_failing()
262347
)
348+
if connected is False and self.logger.level <= logging.DEBUG:
349+
is_ping_pong_failing = await self.is_ping_pong_failing()
350+
session_id = await self.session_id()
351+
self.logger.debug(
352+
"Inactive connection detected ("
353+
f"session_id: {session_id}, "
354+
f"closed: {self.closed}, "
355+
f"stale: {self.stale}, "
356+
f"current_session.closed: {self.current_session.closed}, "
357+
f"is_ping_pong_failing: {is_ping_pong_failing}"
358+
")"
359+
)
360+
return connected
361+
362+
async def session_id(self) -> str:
363+
return self.build_session_id(self.current_session)
263364

264365
async def connect(self):
265366
old_session = None if self.current_session is None else self.current_session
@@ -271,42 +372,66 @@ async def connect(self):
271372
heartbeat=self.ping_interval,
272373
proxy=self.proxy,
273374
)
375+
session_id: str = await self.session_id()
274376
self.auto_reconnect_enabled = self.default_auto_reconnect_enabled
275377
self.stale = False
276-
self.logger.info("A new session has been established")
378+
self.logger.info(f"A new session ({session_id}) has been established")
379+
380+
# The first ping from the new connection
381+
if self.logger.level <= logging.DEBUG:
382+
self.logger.debug(
383+
f"Sending a ping message with the newly established connection ({session_id})..."
384+
)
385+
t = time.time()
386+
await self.current_session.ping(f"sdk-ping-pong:{t}")
277387

278388
if self.current_session_monitor is not None:
279389
self.current_session_monitor.cancel()
280390

281391
self.current_session_monitor = asyncio.ensure_future(
282392
self.monitor_current_session()
283393
)
394+
if self.logger.level <= logging.DEBUG:
395+
self.logger.debug(
396+
f"A new monitor_current_session() executor has been recreated for {session_id}"
397+
)
284398

285399
if self.message_receiver is not None:
286400
self.message_receiver.cancel()
287401

288402
self.message_receiver = asyncio.ensure_future(self.receive_messages())
403+
if self.logger.level <= logging.DEBUG:
404+
self.logger.debug(
405+
f"A new receive_messages() executor has been recreated for {session_id}"
406+
)
289407

290408
if old_session is not None:
291409
await old_session.close()
292-
self.logger.info("The old session has been abandoned")
410+
old_session_id = self.build_session_id(old_session)
411+
self.logger.info(f"The old session ({old_session_id}) has been abandoned")
293412

294413
async def disconnect(self):
295414
if self.current_session is not None:
296415
await self.current_session.close()
297-
self.logger.info("The session has been abandoned")
416+
session_id = await self.session_id()
417+
self.logger.info(
418+
f"The current session ({session_id}) has been abandoned by disconnect() method call"
419+
)
298420

299421
async def send_message(self, message: str):
422+
session_id = await self.session_id()
300423
if self.logger.level <= logging.DEBUG:
301-
self.logger.debug(f"Sending a message: {message}")
424+
self.logger.debug(
425+
f"Sending a message: {message} from session: {session_id}"
426+
)
302427
try:
303428
await self.current_session.send_str(message)
304429
except ConnectionError as e:
305430
# We rarely get this exception while replacing the underlying WebSocket connections.
306431
# We can do one more try here as the self.current_session should be ready now.
307432
if self.logger.level <= logging.DEBUG:
308433
self.logger.debug(
309-
f"Failed to send a message (error: {e}, message: {message})"
434+
f"Failed to send a message (error: {e}, message: {message}, session: {session_id})"
310435
" as the underlying connection was replaced. Retrying the same request only one time..."
311436
)
312437
# Although acquiring self.connect_operation_lock also for the first method call is the safest way,
@@ -317,7 +442,8 @@ async def send_message(self, message: str):
317442
await self.current_session.send_str(message)
318443
else:
319444
self.logger.warning(
320-
"The current session is no longer active. Failed to send a message"
445+
f"The current session ({session_id}) is no longer active. "
446+
"Failed to send a message"
321447
)
322448
raise e
323449
finally:
@@ -336,3 +462,9 @@ async def close(self):
336462
self.message_receiver.cancel()
337463
if self.aiohttp_client_session is not None:
338464
await self.aiohttp_client_session.close()
465+
466+
@classmethod
467+
def build_session_id(cls, session: ClientWebSocketResponse) -> str:
468+
if session is None:
469+
return None
470+
return "s_" + str(hash(session))

0 commit comments

Comments
 (0)