Skip to content

Commit f5207d8

Browse files
fix: telegram polling recovery after network failures (#7468)
1 parent b801003 commit f5207d8

File tree

3 files changed

+344
-57
lines changed

3 files changed

+344
-57
lines changed

astrbot/core/platform/sources/telegram/tg_adapter.py

Lines changed: 150 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
import re
44
import sys
55
import uuid
6+
from contextlib import suppress
67
from typing import cast
78

89
from apscheduler.schedulers.asyncio import AsyncIOScheduler
910
from telegram import BotCommand, Update
1011
from telegram.constants import ChatType
11-
from telegram.error import Forbidden, InvalidToken
12+
from telegram.error import Forbidden, InvalidToken, NetworkError
1213
from telegram.ext import ApplicationBuilder, ContextTypes, ExtBot, filters
1314
from telegram.ext import MessageHandler as TelegramMessageHandler
1415

@@ -66,6 +67,7 @@ def __init__(
6667
file_base_url = "https://api.telegram.org/file/bot"
6768

6869
self.base_url = base_url
70+
self.file_base_url = file_base_url
6971

7072
self.enable_command_register = self.config.get(
7173
"telegram_command_register",
@@ -77,23 +79,12 @@ def __init__(
7779
)
7880
self.last_command_hash = None
7981

80-
self.application = (
81-
ApplicationBuilder()
82-
.token(self.config["telegram_token"])
83-
.base_url(base_url)
84-
.base_file_url(file_base_url)
85-
.build()
86-
)
87-
message_handler = TelegramMessageHandler(
88-
filters=filters.ALL, # receive all messages
89-
callback=self.message_handler,
90-
)
91-
self.application.add_handler(message_handler)
92-
self.client = self.application.bot
93-
logger.debug(f"Telegram base url: {self.client.base_url}")
94-
9582
self.scheduler = AsyncIOScheduler()
9683
self._terminating = False
84+
self._loop: asyncio.AbstractEventLoop | None = None
85+
self._polling_recovery_requested = asyncio.Event()
86+
self._consecutive_polling_failures = 0
87+
self._last_polling_failure_at = 0.0
9788
raw_delay = self.config.get("telegram_polling_restart_delay", 5.0)
9889
try:
9990
delay = float(raw_delay)
@@ -113,6 +104,10 @@ def __init__(
113104
)
114105
delay = 0.1
115106
self._polling_restart_delay = delay
107+
self._polling_recovery_threshold = 3
108+
self._polling_failure_window = 60.0
109+
self._application_started = False
110+
self._build_application()
116111

117112
# Media group handling
118113
# Cache structure: {media_group_id: {"created_at": datetime, "items": [(update, context), ...]}}
@@ -124,6 +119,85 @@ def __init__(
124119
"telegram_media_group_max_wait", 10.0
125120
) # max seconds - hard cap to prevent indefinite delay
126121

122+
def _build_application(self) -> None:
123+
self.application = (
124+
ApplicationBuilder()
125+
.token(self.config["telegram_token"])
126+
.base_url(self.base_url)
127+
.base_file_url(self.file_base_url)
128+
.build()
129+
)
130+
message_handler = TelegramMessageHandler(
131+
filters=filters.ALL,
132+
callback=self.message_handler,
133+
)
134+
self.application.add_handler(message_handler)
135+
self.client = self.application.bot
136+
logger.debug(f"Telegram base url: {self.client.base_url}")
137+
138+
async def _start_application(self) -> None:
139+
await self.application.initialize()
140+
await self.application.start()
141+
142+
if self.enable_command_register:
143+
await self.register_commands()
144+
145+
self._application_started = True
146+
147+
async def _shutdown_application(
148+
self,
149+
*,
150+
delete_commands: bool,
151+
) -> None:
152+
self._application_started = False
153+
154+
updater = self.application.updater
155+
if updater is not None:
156+
with suppress(Exception):
157+
await updater.stop()
158+
159+
if delete_commands and self.enable_command_register:
160+
with suppress(Exception):
161+
await self.client.delete_my_commands()
162+
163+
with suppress(Exception):
164+
await self.application.stop()
165+
166+
shutdown = getattr(self.application, "shutdown", None)
167+
if shutdown is not None:
168+
with suppress(Exception):
169+
await shutdown()
170+
171+
async def _recreate_application(self) -> None:
172+
if self._terminating:
173+
self._polling_recovery_requested.clear()
174+
return
175+
176+
logger.warning(
177+
"Telegram polling hit repeated network errors; rebuilding the "
178+
"Telegram application and HTTP client.",
179+
)
180+
await self._shutdown_application(delete_commands=False)
181+
self._build_application()
182+
self._consecutive_polling_failures = 0
183+
self._last_polling_failure_at = 0.0
184+
self._polling_recovery_requested.clear()
185+
186+
def _start_command_scheduler(self) -> None:
187+
if not self.enable_command_refresh or not self.enable_command_register:
188+
return
189+
if self.scheduler.running:
190+
return
191+
192+
self.scheduler.add_job(
193+
self.register_commands,
194+
"interval",
195+
seconds=self.config.get("telegram_command_register_interval", 300),
196+
id="telegram_command_register",
197+
misfire_grace_time=60,
198+
)
199+
self.scheduler.start()
200+
127201
@override
128202
async def send_by_session(
129203
self,
@@ -145,41 +219,42 @@ def meta(self) -> PlatformMetadata:
145219

146220
@override
147221
async def run(self) -> None:
148-
await self.application.initialize()
149-
await self.application.start()
150-
151-
if self.enable_command_register:
152-
await self.register_commands()
153-
154-
if self.enable_command_refresh and self.enable_command_register:
155-
self.scheduler.add_job(
156-
self.register_commands,
157-
"interval",
158-
seconds=self.config.get("telegram_command_register_interval", 300),
159-
id="telegram_command_register",
160-
misfire_grace_time=60,
161-
)
162-
self.scheduler.start()
163-
164-
if not self.application.updater:
165-
logger.error("Telegram Updater is not initialized. Cannot start polling.")
166-
return
222+
self._loop = asyncio.get_running_loop()
223+
self._start_command_scheduler()
167224

168225
while not self._terminating:
169226
try:
227+
if not self._application_started:
228+
await self._start_application()
229+
230+
self._polling_recovery_requested.clear()
231+
updater = self.application.updater
232+
if updater is None:
233+
logger.error(
234+
"Telegram Updater is not initialized. Cannot start polling."
235+
)
236+
self._application_started = False
237+
await asyncio.sleep(self._polling_restart_delay)
238+
continue
170239
logger.info("Starting Telegram polling...")
171-
await self.application.updater.start_polling(
172-
error_callback=self._on_polling_error
173-
)
240+
await updater.start_polling(error_callback=self._on_polling_error)
174241
logger.info("Telegram Platform Adapter is running.")
175-
while self.application.updater.running and not self._terminating: # noqa: ASYNC110
242+
while updater.running and not self._terminating: # noqa: ASYNC110
243+
if self._polling_recovery_requested.is_set():
244+
await self._recreate_application()
245+
break
176246
await asyncio.sleep(1)
247+
else:
248+
if not self._terminating:
249+
logger.warning(
250+
"Telegram polling loop exited unexpectedly, "
251+
f"retrying in {self._polling_restart_delay}s."
252+
)
253+
continue
177254

178255
if not self._terminating:
179-
logger.warning(
180-
"Telegram polling loop exited unexpectedly, "
181-
f"retrying in {self._polling_restart_delay}s."
182-
)
256+
logger.info("Telegram polling restarted with a fresh client.")
257+
continue
183258
except asyncio.CancelledError:
184259
raise
185260
except (Forbidden, InvalidToken) as e:
@@ -193,6 +268,9 @@ async def run(self) -> None:
193268
f"{type(e).__name__}: {e!s}. "
194269
f"Retrying in {self._polling_restart_delay}s.",
195270
)
271+
with suppress(Exception):
272+
await self._shutdown_application(delete_commands=False)
273+
self._build_application()
196274

197275
if not self._terminating:
198276
await asyncio.sleep(self._polling_restart_delay)
@@ -202,6 +280,33 @@ def _on_polling_error(self, error: Exception) -> None:
202280
f"Telegram polling request failed: {type(error).__name__}: {error!s}",
203281
exc_info=error,
204282
)
283+
if not isinstance(error, NetworkError):
284+
return
285+
286+
if self._loop is None:
287+
return
288+
289+
now = self._loop.time()
290+
if now - self._last_polling_failure_at > self._polling_failure_window:
291+
self._consecutive_polling_failures = 0
292+
self._last_polling_failure_at = now
293+
self._consecutive_polling_failures += 1
294+
295+
if self._consecutive_polling_failures < self._polling_recovery_threshold:
296+
return
297+
298+
logger.warning(
299+
"Telegram polling encountered %s network failures within %.1fs; "
300+
"scheduling client rebuild.",
301+
self._consecutive_polling_failures,
302+
self._polling_failure_window,
303+
)
304+
if self._loop.is_closed():
305+
return
306+
try:
307+
self._loop.call_soon_threadsafe(self._polling_recovery_requested.set)
308+
except RuntimeError:
309+
return
205310

206311
async def register_commands(self) -> None:
207312
"""收集所有注册的指令并注册到 Telegram"""
@@ -634,15 +739,8 @@ async def terminate(self) -> None:
634739
self._terminating = True
635740
if self.scheduler.running:
636741
self.scheduler.shutdown()
637-
638-
await self.application.stop()
639-
640-
if self.enable_command_register:
641-
await self.client.delete_my_commands()
642-
643-
# 保险起见先判断是否存在updater对象
644-
if self.application.updater is not None:
645-
await self.application.updater.stop()
742+
self._polling_recovery_requested.set()
743+
await self._shutdown_application(delete_commands=True)
646744

647745
logger.info("Telegram adapter has been closed.")
648746
except Exception as e:

tests/fixtures/mocks/telegram.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@
99
import pytest
1010

1111

12+
class MockTelegramNetworkError(Exception):
13+
"""Mock telegram.error.NetworkError used in tests."""
14+
15+
16+
class MockTelegramForbidden(Exception):
17+
"""Mock telegram.error.Forbidden used in tests."""
18+
19+
20+
class MockTelegramInvalidToken(Exception):
21+
"""Mock telegram.error.InvalidToken used in tests."""
22+
23+
1224
def create_mock_telegram_modules():
1325
"""创建 Telegram 相关的 mock 模块。
1426
@@ -28,6 +40,9 @@ def create_mock_telegram_modules():
2840
mock_telegram.constants.ChatAction.UPLOAD_PHOTO = "upload_photo"
2941
mock_telegram.error = MagicMock()
3042
mock_telegram.error.BadRequest = Exception
43+
mock_telegram.error.Forbidden = MockTelegramForbidden
44+
mock_telegram.error.InvalidToken = MockTelegramInvalidToken
45+
mock_telegram.error.NetworkError = MockTelegramNetworkError
3146
mock_telegram.ReactionTypeCustomEmoji = MagicMock
3247
mock_telegram.ReactionTypeEmoji = MagicMock
3348

@@ -73,7 +88,9 @@ def mock_telegram_modules():
7388
monkeypatch.setitem(sys.modules, "telegram.constants", mocks["telegram"].constants)
7489
monkeypatch.setitem(sys.modules, "telegram.error", mocks["telegram"].error)
7590
monkeypatch.setitem(sys.modules, "telegram.ext", mocks["telegram.ext"])
76-
monkeypatch.setitem(sys.modules, "telegramify_markdown", mocks["telegramify_markdown"])
91+
monkeypatch.setitem(
92+
sys.modules, "telegramify_markdown", mocks["telegramify_markdown"]
93+
)
7794
monkeypatch.setitem(sys.modules, "apscheduler", mocks["apscheduler"])
7895
monkeypatch.setitem(
7996
sys.modules, "apscheduler.schedulers", mocks["apscheduler"].schedulers
@@ -120,16 +137,16 @@ def create_application():
120137
from tests.fixtures.helpers import NoopAwaitable
121138

122139
app = MagicMock()
123-
app.bot = MagicMock()
124-
app.bot.username = "test_bot"
125-
app.bot.base_url = "https://api.telegram.org/bottest_token_123/"
140+
app.bot = MockTelegramBuilder.create_bot()
126141
app.initialize = AsyncMock()
127142
app.start = AsyncMock()
128143
app.stop = AsyncMock()
144+
app.shutdown = AsyncMock()
129145
app.add_handler = MagicMock()
130146
app.updater = MagicMock()
131147
app.updater.start_polling = MagicMock(return_value=NoopAwaitable())
132148
app.updater.stop = AsyncMock()
149+
app.updater.running = False
133150
return app
134151

135152
@staticmethod

0 commit comments

Comments
 (0)