|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +import logging |
4 | 5 | import warnings |
5 | 6 | from datetime import datetime, timedelta, timezone |
6 | 7 | from typing import TYPE_CHECKING |
@@ -344,3 +345,57 @@ async def test_get_remaining_time_returns_positive_when_timeout_in_future() -> N |
344 | 345 | assert result is not None |
345 | 346 | assert result > timedelta(0) |
346 | 347 | assert result <= timedelta(minutes=5) |
| 348 | + |
| 349 | + |
| 350 | +async def test_reboot_runs_all_listeners_even_when_one_fails( |
| 351 | + apify_client_async_patcher: ApifyClientAsyncPatcher, |
| 352 | + caplog: pytest.LogCaptureFixture, |
| 353 | +) -> None: |
| 354 | + """Test that a failing pre-reboot event listener does not prevent other listeners from running. |
| 355 | +
|
| 356 | + Directly injects raw async callables into the event manager's _listeners_to_wrappers |
| 357 | + to simulate exceptions escaping the wrapper layer (the scenario return_exceptions=True guards against). |
| 358 | + """ |
| 359 | + apify_client_async_patcher.patch('run', 'reboot', return_value=None) |
| 360 | + |
| 361 | + persist_state_called = False |
| 362 | + migrating_called = False |
| 363 | + |
| 364 | + async def failing_listener(*_args: object) -> None: |
| 365 | + raise RuntimeError('persist_state listener error') |
| 366 | + |
| 367 | + async def successful_persist_state_listener(*_args: object) -> None: |
| 368 | + nonlocal persist_state_called |
| 369 | + persist_state_called = True |
| 370 | + |
| 371 | + async def successful_migrating_listener(*_args: object) -> None: |
| 372 | + nonlocal migrating_called |
| 373 | + migrating_called = True |
| 374 | + |
| 375 | + async with Actor: |
| 376 | + Actor.configuration.is_at_home = True |
| 377 | + Actor.configuration.actor_run_id = 'some-run-id' |
| 378 | + |
| 379 | + # Inject raw listeners directly into the event manager's internal structure, |
| 380 | + # bypassing crawlee's wrapper that would catch exceptions on its own. |
| 381 | + listeners_map = Actor.event_manager._listeners_to_wrappers |
| 382 | + listeners_map[Event.PERSIST_STATE] = { |
| 383 | + failing_listener: [failing_listener], |
| 384 | + successful_persist_state_listener: [successful_persist_state_listener], |
| 385 | + } |
| 386 | + listeners_map[Event.MIGRATING] = { |
| 387 | + successful_migrating_listener: [successful_migrating_listener], |
| 388 | + } |
| 389 | + |
| 390 | + with caplog.at_level(logging.ERROR): |
| 391 | + await Actor.reboot(custom_after_sleep=timedelta(milliseconds=1)) |
| 392 | + |
| 393 | + # All listeners ran despite the failure in one of them. |
| 394 | + assert persist_state_called |
| 395 | + assert migrating_called |
| 396 | + |
| 397 | + # The exception was logged. |
| 398 | + assert any('A pre-reboot event listener failed' in r.message for r in caplog.records) |
| 399 | + |
| 400 | + # The reboot API call was still made. |
| 401 | + assert len(apify_client_async_patcher.calls['run']['reboot']) == 1 |
0 commit comments