Skip to content

Commit 03e73d3

Browse files
vdusekclaude
andauthored
fix: honor event_listeners_timeout in Actor.reboot (#872)
## Summary The `event_listeners_timeout` parameter of `Actor.reboot` was previously ignored (marked `# noqa: ARG002`), which meant a hanging pre-reboot listener could block the reboot indefinitely. This change wraps listener execution in `asyncio.wait_for` so the reboot proceeds once the timeout elapses, logging a warning in that case. Added a unit test that verifies the reboot is not blocked by a listener that exceeds the timeout. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b017cb9 commit 03e73d3

2 files changed

Lines changed: 43 additions & 6 deletions

File tree

src/apify/_actor.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,7 @@ async def metamorph(
11671167
async def reboot(
11681168
self,
11691169
*,
1170-
event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT, # noqa: ARG002
1170+
event_listeners_timeout: timedelta | None = EVENT_LISTENERS_TIMEOUT,
11711171
custom_after_sleep: timedelta | None = None,
11721172
) -> None:
11731173
"""Internally reboot this Actor.
@@ -1204,11 +1204,18 @@ async def reboot(
12041204
(self.event_manager._listeners_to_wrappers[Event.MIGRATING] or {}).values() # noqa: SLF001
12051205
)
12061206

1207-
results = await asyncio.gather(
1208-
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
1209-
*[listener(EventMigratingData()) for listener in migrating_listeners],
1210-
return_exceptions=True,
1211-
)
1207+
try:
1208+
results = await asyncio.wait_for(
1209+
asyncio.gather(
1210+
*[listener(EventPersistStateData(is_migrating=True)) for listener in persist_state_listeners],
1211+
*[listener(EventMigratingData()) for listener in migrating_listeners],
1212+
return_exceptions=True,
1213+
),
1214+
timeout=event_listeners_timeout.total_seconds() if event_listeners_timeout else None,
1215+
)
1216+
except asyncio.TimeoutError:
1217+
self.log.warning('Pre-reboot event listeners did not finish within timeout; proceeding with reboot')
1218+
results = []
12121219

12131220
for result in results:
12141221
if isinstance(result, Exception):

tests/unit/actor/test_actor_helpers.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,3 +399,33 @@ async def successful_migrating_listener(*_args: object) -> None:
399399

400400
# The reboot API call was still made.
401401
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1
402+
403+
404+
async def test_reboot_proceeds_when_event_listener_exceeds_timeout(
405+
apify_client_async_patcher: ApifyClientAsyncPatcher,
406+
caplog: pytest.LogCaptureFixture,
407+
) -> None:
408+
"""Test that a hanging pre-reboot event listener does not block reboot beyond the timeout."""
409+
apify_client_async_patcher.patch('run', 'reboot', return_value=None)
410+
411+
async def hanging_listener(*_args: object) -> None:
412+
await asyncio.sleep(60)
413+
414+
async with Actor:
415+
Actor.configuration.is_at_home = True
416+
Actor.configuration.actor_run_id = 'some-run-id'
417+
418+
listeners_map = Actor.event_manager._listeners_to_wrappers
419+
listeners_map[Event.PERSIST_STATE] = {hanging_listener: [hanging_listener]}
420+
421+
with caplog.at_level(logging.WARNING):
422+
await Actor.reboot(
423+
event_listeners_timeout=timedelta(milliseconds=50),
424+
custom_after_sleep=timedelta(milliseconds=1),
425+
)
426+
427+
# The timeout was honored and logged.
428+
assert any('Pre-reboot event listeners did not finish within timeout' in r.message for r in caplog.records)
429+
430+
# The reboot API call proceeded despite the hanging listener.
431+
assert len(apify_client_async_patcher.calls['run']['reboot']) == 1

0 commit comments

Comments
 (0)