Skip to content

Commit a2a32a8

Browse files
Improve worker resilience
1 parent 0126451 commit a2a32a8

File tree

4 files changed

+85
-11
lines changed

4 files changed

+85
-11
lines changed

docs/source/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ Changelog
88

99
- Python versions 3.7 and 3.8 are no longer supported.
1010
- tdlib 1.8.31.
11+
- Fix: Handle errors during updates processing
12+
- Fix: Handle queue full errors
1113

1214
[0.18.0] - 2023-03-13
1315

telegram/client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,14 @@ def _run_handlers(self, update: Dict[Any, Any]) -> None:
559559
update_type: str = update.get("@type", "unknown")
560560

561561
for handler in self._update_handlers[update_type]:
562-
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
562+
try:
563+
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
564+
except queue.Full:
565+
logger.error(
566+
"Queue is full, update %s dropped for handler %s",
567+
update_type,
568+
handler.__name__ if hasattr(handler, "__name__") else handler,
569+
)
563570

564571
def remove_update_handler(self, handler_type: str, func: Callable) -> None:
565572
"""

telegram/worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,12 @@ def _run_thread(self) -> None:
4141
except Empty:
4242
continue
4343

44-
handler(update)
45-
self._queue.task_done()
44+
try:
45+
handler(update)
46+
except Exception:
47+
logger.exception("Handler raised an exception")
48+
finally:
49+
self._queue.task_done()
4650

4751
def stop(self) -> None:
4852
self._is_enabled = False

tests/test_telegram_methods.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import pytest
2+
import queue
3+
import time
24

35
from unittest.mock import patch
46

57
from telegram import VERSION
68
from telegram.utils import AsyncResult
79
from telegram.client import Telegram, MESSAGE_HANDLER_TYPE, AuthorizationState
810
from telegram.text import Spoiler
11+
from telegram.worker import SimpleWorker
912

1013
API_ID = 1
1114
API_HASH = "hash"
@@ -90,7 +93,10 @@ def test_parse_text_entities(self, telegram):
9093

9194
def test_send_phone_number_or_bot_token(self, telegram):
9295
# check that the dunction calls _send_phone_number or _send_bot_token
93-
with patch.object(telegram, "_send_phone_number"), patch.object(telegram, "_send_bot_token"):
96+
with (
97+
patch.object(telegram, "_send_phone_number"),
98+
patch.object(telegram, "_send_bot_token"),
99+
):
94100
telegram.phone = "123"
95101
telegram.bot_token = None
96102

@@ -112,7 +118,9 @@ def test_send_bot_token(self, telegram):
112118
telegram._send_bot_token()
113119

114120
exp_data = {"@type": "checkAuthenticationBotToken", "token": "some-token"}
115-
telegram._send_data.assert_called_once_with(exp_data, result_id="updateAuthorizationState")
121+
telegram._send_data.assert_called_once_with(
122+
exp_data, result_id="updateAuthorizationState"
123+
)
116124

117125
def test_add_message_handler(self, telegram):
118126
# check that add_message_handler
@@ -201,7 +209,9 @@ def test_get_web_page_instant_view(self, telegram):
201209
url = "https://yandex.ru/"
202210
force_full = False
203211

204-
async_result = telegram.get_web_page_instant_view(url=url, force_full=force_full)
212+
async_result = telegram.get_web_page_instant_view(
213+
url=url, force_full=force_full
214+
)
205215

206216
exp_data = {
207217
"@type": "getWebPageInstantView",
@@ -263,7 +273,9 @@ def test_get_chats(self, telegram):
263273
offset_chat_id = 1
264274
limit = 100
265275

266-
async_result = telegram.get_chats(offset_order=offset_order, offset_chat_id=offset_chat_id, limit=limit)
276+
async_result = telegram.get_chats(
277+
offset_order=offset_order, offset_chat_id=offset_chat_id, limit=limit
278+
)
267279

268280
exp_data = {
269281
"@type": "getChats",
@@ -366,7 +378,9 @@ def test_update_async_result_returns_async_result_with_same_id(self, telegram):
366378
assert async_result == new_async_result
367379

368380
def test_result_id_should_be_replaced_if_it_is_auth_process(self, telegram):
369-
async_result = AsyncResult(client=telegram, result_id="updateAuthorizationState")
381+
async_result = AsyncResult(
382+
client=telegram, result_id="updateAuthorizationState"
383+
)
370384
telegram._results["updateAuthorizationState"] = async_result
371385

372386
update = {
@@ -404,7 +418,9 @@ def _get_async_result(data, request_id=None):
404418
)
405419

406420
telegram._set_initial_params = lambda: _get_async_result(
407-
data={"authorization_state": {"@type": "authorizationStateWaitEncryptionKey"}}
421+
data={
422+
"authorization_state": {"@type": "authorizationStateWaitEncryptionKey"}
423+
}
408424
)
409425
telegram._send_encryption_key = lambda: _get_async_result(
410426
data={"authorization_state": {"@type": "authorizationStateWaitPhoneNumber"}}
@@ -444,7 +460,9 @@ def _get_async_result(data, request_id=None):
444460
)
445461

446462
telegram._set_initial_params = lambda: _get_async_result(
447-
data={"authorization_state": {"@type": "authorizationStateWaitEncryptionKey"}}
463+
data={
464+
"authorization_state": {"@type": "authorizationStateWaitEncryptionKey"}
465+
}
448466
)
449467
telegram._send_encryption_key = lambda: _get_async_result(
450468
data={"authorization_state": {"@type": "authorizationStateWaitPhoneNumber"}}
@@ -453,7 +471,9 @@ def _get_async_result(data, request_id=None):
453471
data={"authorization_state": {"@type": "authorizationStateWaitCode"}}
454472
)
455473
telegram._send_telegram_code = lambda _: _get_async_result(
456-
data={"authorization_state": {"@type": "authorizationStateWaitRegistration"}}
474+
data={
475+
"authorization_state": {"@type": "authorizationStateWaitRegistration"}
476+
}
457477
)
458478
telegram._register_user = lambda _, __: _get_async_result(
459479
data={"authorization_state": {"@type": "authorizationStateWaitPassword"}}
@@ -478,3 +498,44 @@ def _get_async_result(data, request_id=None):
478498
assert state == telegram.authorization_state == AuthorizationState.READY
479499

480500
assert telegram._tdjson.send.call_count == 0
501+
502+
503+
class TestWorker:
504+
def test_worker_continues_after_handler_exception(self):
505+
"""Handler exceptions should not kill the worker thread and task_done must be called"""
506+
q = queue.Queue()
507+
worker = SimpleWorker(q)
508+
worker.run()
509+
510+
results = []
511+
512+
def failing_handler(update):
513+
raise ValueError("Handler failed")
514+
515+
def working_handler(update):
516+
results.append(update)
517+
518+
# Put two items: one with a failing handler, one with a working handler
519+
q.put((failing_handler, {"id": 1}))
520+
q.put((working_handler, {"id": 2}))
521+
522+
# Give the worker time to process both items.
523+
# Can't use join when the test fails.
524+
time.sleep(1)
525+
526+
worker.stop()
527+
528+
assert results == [{"id": 2}]
529+
530+
def test_run_handlers_continues_on_queue_full(self, telegram):
531+
"""queue.Full should not crash the listener"""
532+
533+
def my_handler():
534+
pass
535+
536+
telegram.add_message_handler(my_handler)
537+
538+
# Mock the queue to always raise queue.Full
539+
with patch.object(telegram._workers_queue, "put", side_effect=queue.Full):
540+
# This should not raise an exception
541+
telegram._run_handlers({"@type": MESSAGE_HANDLER_TYPE})

0 commit comments

Comments
 (0)