Skip to content

Commit 4ca565f

Browse files
Fix crash and silent-failure bugs in TDJson, listener, and worker
- TDJson.__del__ checked hasattr on the wrong object (CDLL instead of self), so the destructor never called stop(). Fix the hasattr target. - ctypes fatal error callback was a local variable eligible for GC while TDLib still held its pointer. Store it on self to prevent collection. - TDJson.stop() had no guard against double-destroy: null out the client handle after destroying and early-return if already None. - Worker thread had no try/except around handler calls — a single failing handler killed the daemon thread silently. - Listener thread had no exception handling at all — any error in receive/update/handlers killed it, hanging all pending AsyncResults. - queue.Full from _run_handlers propagated into the listener, creating a cascading failure when the worker died. - send_message with Element used assert for error checking, which is stripped under python -O. Use raise_exc=True and explicit RuntimeError.
1 parent 3fb7752 commit 4ca565f

5 files changed

Lines changed: 160 additions & 12 deletions

File tree

telegram/client.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,9 @@ def send_message(
254254
updated_text: str
255255
if isinstance(text, Element):
256256
result = self.parse_text_entities(text.to_html(), parse_mode="HTML")
257-
result.wait()
258-
assert result.update is not None
257+
result.wait(raise_exc=True)
258+
if result.update is None:
259+
raise RuntimeError(f"Failed to parse text entities: {result.error_info}")
259260
update: dict = result.update
260261
entities = update["entities"]
261262
updated_text = update["text"]
@@ -524,11 +525,16 @@ def _listen_to_td(self) -> None:
524525
logger.info("[Telegram.td_listener] started")
525526

526527
while not self._stopped.is_set():
527-
update = self._tdjson.receive()
528+
try:
529+
update = self._tdjson.receive()
528530

529-
if update:
530-
self._update_async_result(update)
531-
self._run_handlers(update)
531+
if update:
532+
self._update_async_result(update)
533+
self._run_handlers(update)
534+
except Exception:
535+
if self._stopped.is_set():
536+
break
537+
logger.exception("[Telegram.td_listener] error processing update")
532538

533539
def _update_async_result(self, update: Dict[Any, Any]) -> typing.Optional[AsyncResult]:
534540
async_result = None
@@ -559,7 +565,10 @@ def _run_handlers(self, update: Dict[Any, Any]) -> None:
559565
update_type: str = update.get("@type", "unknown")
560566

561567
for handler in self._update_handlers[update_type]:
562-
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
568+
try:
569+
self._workers_queue.put((handler, update), timeout=self._queue_put_timeout)
570+
except queue.Full:
571+
logger.error("Handler queue full, dropping update %s for handler %s", update_type, handler)
563572

564573
def remove_update_handler(self, handler_type: str, func: Callable) -> None:
565574
"""

telegram/tdjson.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, library_path: Optional[str] = None, verbosity: int = 2) -> No
3232
self._build_client(library_path, verbosity)
3333

3434
def __del__(self) -> None:
35-
if hasattr(self, "_tdjson") and hasattr(self._tdjson, "_td_json_client_destroy"):
35+
if hasattr(self, "_td_json_client_destroy"):
3636
self.stop()
3737

3838
def _build_client(self, library_path: str, verbosity: int) -> None:
@@ -85,8 +85,8 @@ def _build_client(self, library_path: str, verbosity: int) -> None:
8585
def on_fatal_error_callback(error_message: str) -> None:
8686
logger.error("TDLib fatal error: %s", error_message)
8787

88-
c_on_fatal_error_callback = fatal_error_callback_type(on_fatal_error_callback)
89-
self._td_set_log_fatal_error_callback(c_on_fatal_error_callback)
88+
self._c_on_fatal_error_callback = fatal_error_callback_type(on_fatal_error_callback)
89+
self._td_set_log_fatal_error_callback(self._c_on_fatal_error_callback)
9090

9191
def send(self, query: Dict[Any, Any]) -> None:
9292
dumped_query = json.dumps(query).encode("utf-8")
@@ -116,4 +116,7 @@ def td_execute(self, query: Dict[Any, Any]) -> Union[Dict[Any, Any], Any]:
116116
return None
117117

118118
def stop(self) -> None:
119+
if self.td_json_client is None:
120+
return
119121
self._td_json_client_destroy(self.td_json_client)
122+
self.td_json_client = None

telegram/worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ def _run_thread(self) -> None:
4141
except Empty:
4242
continue
4343

44-
handler(update)
44+
try:
45+
handler(update)
46+
except Exception:
47+
logger.exception("Error in update handler %s", handler)
4548
self._queue.task_done()
4649

4750
def stop(self) -> None:

tests/test_tdjson.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from unittest.mock import Mock, patch
22

3-
from telegram.tdjson import _get_tdjson_lib_path
3+
from telegram.tdjson import TDJson, _get_tdjson_lib_path
44

55

66
class TestGetTdjsonTdlibPath:
@@ -42,3 +42,40 @@ def test_unknown(self):
4242

4343
mocked_files.assert_called_once_with("telegram")
4444
mocked_joinpath.assert_called_once_with("lib/linux/libtdjson.so")
45+
46+
47+
class TestTDJson:
48+
def _make_tdjson(self):
49+
with patch("telegram.tdjson.CDLL") as mocked_cdll:
50+
mocked_cdll.return_value.td_json_client_create.return_value = 12345
51+
tdjson = TDJson(library_path="/fake/lib.so", verbosity=0)
52+
return tdjson
53+
54+
def test_del_calls_stop(self):
55+
tdjson = self._make_tdjson()
56+
with patch.object(tdjson, "stop") as mocked_stop:
57+
tdjson.__del__()
58+
mocked_stop.assert_called_once()
59+
60+
def test_del_skips_stop_if_build_incomplete(self):
61+
tdjson = TDJson.__new__(TDJson)
62+
with patch.object(TDJson, "stop") as mocked_stop:
63+
tdjson.__del__()
64+
mocked_stop.assert_not_called()
65+
66+
def test_stop_nulls_client_handle(self):
67+
tdjson = self._make_tdjson()
68+
assert tdjson.td_json_client is not None
69+
tdjson.stop()
70+
assert tdjson.td_json_client is None
71+
72+
def test_stop_is_idempotent(self):
73+
tdjson = self._make_tdjson()
74+
tdjson.stop()
75+
tdjson.stop()
76+
tdjson._td_json_client_destroy.assert_called_once()
77+
78+
def test_fatal_error_callback_stored_on_instance(self):
79+
tdjson = self._make_tdjson()
80+
assert hasattr(tdjson, "_c_on_fatal_error_callback")
81+
assert tdjson._c_on_fatal_error_callback is not None

tests/test_telegram_methods.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import queue
2+
13
import pytest
24

35
from unittest.mock import patch
@@ -478,3 +480,97 @@ def _get_async_result(data, request_id=None):
478480
assert state == telegram.authorization_state == AuthorizationState.READY
479481

480482
assert telegram._tdjson.send.call_count == 0
483+
484+
485+
class TestWorkerExceptionHandling:
486+
def test_worker_thread_survives_handler_exception(self):
487+
import time
488+
from queue import Queue
489+
from telegram.worker import SimpleWorker
490+
491+
q = Queue()
492+
worker = SimpleWorker(queue=q)
493+
worker.run()
494+
495+
results = []
496+
497+
def bad_handler(update):
498+
raise RuntimeError("boom")
499+
500+
def good_handler(update):
501+
results.append(update)
502+
503+
q.put((bad_handler, {"@type": "test"}))
504+
q.put((good_handler, {"@type": "test"}))
505+
506+
time.sleep(0.5)
507+
worker.stop()
508+
509+
assert results == [{"@type": "test"}]
510+
511+
512+
class TestListenerExceptionHandling:
513+
def test_listener_survives_receive_exception(self, telegram):
514+
import threading
515+
516+
telegram._stopped = threading.Event()
517+
call_count = 0
518+
519+
def exploding_receive():
520+
nonlocal call_count
521+
call_count += 1
522+
if call_count == 1:
523+
raise RuntimeError("receive failed")
524+
if call_count == 2:
525+
return {"@type": "ok", "@extra": {"request_id": "test123"}}
526+
telegram._stopped.set()
527+
return None
528+
529+
telegram._tdjson.receive = exploding_receive
530+
telegram._listen_to_td()
531+
532+
assert call_count == 3
533+
534+
def test_listener_exits_on_exception_when_stopped(self, telegram):
535+
import threading
536+
537+
telegram._stopped = threading.Event()
538+
539+
def exploding_receive():
540+
telegram._stopped.set()
541+
raise RuntimeError("error during shutdown")
542+
543+
telegram._tdjson.receive = exploding_receive
544+
telegram._listen_to_td()
545+
546+
547+
class TestRunHandlersQueueFull:
548+
def test_queue_full_does_not_propagate(self, telegram):
549+
def my_handler(update):
550+
pass
551+
552+
telegram.add_update_handler("testUpdate", my_handler)
553+
554+
with patch.object(telegram._workers_queue, "put", side_effect=queue.Full):
555+
telegram._run_handlers({"@type": "testUpdate"})
556+
557+
558+
class TestSendMessageElementError:
559+
def test_raises_on_parse_error(self, telegram):
560+
error_result = AsyncResult(client=telegram)
561+
error_result.error = True
562+
error_result.error_info = {"@type": "error", "message": "Bad HTML"}
563+
error_result._ready.set()
564+
565+
with patch.object(telegram, "parse_text_entities", return_value=error_result):
566+
with pytest.raises(RuntimeError):
567+
telegram.send_message(chat_id=1, text=Spoiler("test"))
568+
569+
def test_raises_on_none_update(self, telegram):
570+
result = AsyncResult(client=telegram)
571+
result.update = None
572+
result._ready.set()
573+
574+
with patch.object(telegram, "parse_text_entities", return_value=result):
575+
with pytest.raises(RuntimeError, match="Failed to parse text entities"):
576+
telegram.send_message(chat_id=1, text=Spoiler("test"))

0 commit comments

Comments
 (0)