Skip to content

Commit 2efb668

Browse files
authored
fix: Prevent premature EventManager shutdown when multiple crawlers share it (#1810)
### Description - Fixed a bug where the global `EventManager` was shut down prematurely when the first of multiple concurrent crawlers finished, leaving remaining crawlers with a broken event system. - Always starts the global `event_manager`, even if the `event_manager` argument was passed in the crawler`s constructor ### Issues - Closed: #1808 - Closed: #1805 ### Testing - Added a test verifying that the global `EventManager` remains active as long as at least one crawler is running, and is shut down only after the last crawler finishes. - Added a test to verify that both `event_manager` instances are active while the crawler is running.
1 parent cb483b2 commit 2efb668

File tree

5 files changed

+149
-36
lines changed

5 files changed

+149
-36
lines changed

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -768,27 +768,36 @@ def sigint_handler() -> None:
768768
return final_statistics
769769

770770
async def _run_crawler(self) -> None:
771-
event_manager = self._service_locator.get_event_manager()
771+
local_event_manager = self._service_locator.get_event_manager()
772+
global_event_manager = service_locator.get_event_manager()
773+
if local_event_manager is global_event_manager:
774+
local_event_manager = None # Avoid entering the same event manager context twice
775+
776+
# The event managers are always entered.
777+
contexts_to_enter: list[Any] = (
778+
[global_event_manager, local_event_manager] if local_event_manager else [global_event_manager]
779+
)
772780

773781
# Collect the context managers to be entered. Context managers that are already active are excluded,
774782
# as they were likely entered by the caller, who will also be responsible for exiting them.
775-
contexts_to_enter = [
776-
cm
777-
for cm in (
778-
event_manager,
779-
self._snapshotter,
780-
self._statistics,
781-
self._session_pool if self._use_session_pool else None,
782-
self._http_client,
783-
self._crawler_state_rec_task,
784-
*self._additional_context_managers,
785-
)
786-
if cm and getattr(cm, 'active', False) is False
787-
]
783+
contexts_to_enter.extend(
784+
[
785+
cm
786+
for cm in (
787+
self._snapshotter,
788+
self._statistics,
789+
self._session_pool if self._use_session_pool else None,
790+
self._http_client,
791+
self._crawler_state_rec_task,
792+
*self._additional_context_managers,
793+
)
794+
if cm and getattr(cm, 'active', False) is False
795+
]
796+
)
788797

789798
async with AsyncExitStack() as exit_stack:
790799
for context in contexts_to_enter:
791-
await exit_stack.enter_async_context(context) # ty: ignore[invalid-argument-type]
800+
await exit_stack.enter_async_context(context)
792801

793802
await self._autoscaled_pool.run()
794803

src/crawlee/events/_event_manager.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,24 +93,20 @@ def __init__(
9393
delay=self._persist_state_interval,
9494
)
9595

96-
# Flag to indicate the context state.
97-
self._active = False
96+
# Reference count for active contexts.
97+
self._active_ref_count = 0
9898

9999
@property
100100
def active(self) -> bool:
101101
"""Indicate whether the context is active."""
102-
return self._active
102+
return self._active_ref_count > 0
103103

104104
async def __aenter__(self) -> EventManager:
105-
"""Initialize the event manager upon entering the async context.
105+
"""Initialize the event manager upon entering the async context."""
106+
self._active_ref_count += 1
107+
if self._active_ref_count > 1:
108+
return self
106109

107-
Raises:
108-
RuntimeError: If the context manager is already active.
109-
"""
110-
if self._active:
111-
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
112-
113-
self._active = True
114110
self._emit_persist_state_event_rec_task.start()
115111
return self
116112

@@ -127,17 +123,24 @@ async def __aexit__(
127123
Raises:
128124
RuntimeError: If the context manager is not active.
129125
"""
130-
if not self._active:
126+
if not self.active:
131127
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
132128

129+
if self._active_ref_count > 1:
130+
# Emit persist state event to ensure the latest state is saved before closing the context.
131+
await self._emit_persist_state_event()
132+
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
133+
self._active_ref_count -= 1
134+
return
135+
133136
# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
134137
await self._emit_persist_state_event_rec_task.stop()
135138
await self._emit_persist_state_event()
136139
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
137140
self._event_emitter.remove_all_listeners()
138141
self._listener_tasks.clear()
139142
self._listeners_to_wrappers.clear()
140-
self._active = False
143+
self._active_ref_count -= 1
141144

142145
@overload
143146
def on(self, *, event: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]) -> None: ...

src/crawlee/events/_local_event_manager.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ async def __aenter__(self) -> LocalEventManager:
7575
It starts emitting system info events at regular intervals.
7676
"""
7777
await super().__aenter__()
78-
self._emit_system_info_event_rec_task.start()
78+
79+
if self._active_ref_count == 1:
80+
self._emit_system_info_event_rec_task.start()
81+
7982
return self
8083

8184
async def __aexit__(
@@ -88,7 +91,9 @@ async def __aexit__(
8891
8992
It stops emitting system info events and closes the event manager.
9093
"""
91-
await self._emit_system_info_event_rec_task.stop()
94+
if self._active_ref_count == 1:
95+
await self._emit_system_info_event_rec_task.stop()
96+
9297
await super().__aexit__(exc_type, exc_value, exc_traceback)
9398

9499
async def _emit_system_info_event(self) -> None:

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
from crawlee.configuration import Configuration
2727
from crawlee.crawlers import BasicCrawler
2828
from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError
29-
from crawlee.events import Event, EventCrawlerStatusData
30-
from crawlee.events._local_event_manager import LocalEventManager
29+
from crawlee.events import Event, EventCrawlerStatusData, LocalEventManager
3130
from crawlee.request_loaders import RequestList, RequestManagerTandem
3231
from crawlee.sessions import Session, SessionPool
3332
from crawlee.statistics import FinalStatistics
@@ -2047,3 +2046,104 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ
20472046
assert error_request is not None
20482047
assert error_request.state == RequestState.DONE
20492048
assert error_request.was_already_handled
2049+
2050+
2051+
@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.')
2052+
async def test_multiple_crawlers_with_global_event_manager() -> None:
2053+
"""Test that multiple crawlers work correctly when using the global event manager."""
2054+
2055+
rq1 = await RequestQueue.open(alias='rq1')
2056+
rq2 = await RequestQueue.open(alias='rq2')
2057+
2058+
crawler_1 = BasicCrawler(request_manager=rq1)
2059+
crawler_2 = BasicCrawler(request_manager=rq2)
2060+
2061+
started_event = asyncio.Event()
2062+
finished_event = asyncio.Event()
2063+
2064+
async def launch_crawler_1() -> None:
2065+
await crawler_1.run(['https://a.placeholder.com'])
2066+
finished_event.set()
2067+
2068+
async def launch_crawler_2() -> None:
2069+
# Ensure that crawler_1 is already running and has activated event_manager
2070+
await started_event.wait()
2071+
await crawler_2.run(['https://b.placeholder.com'])
2072+
2073+
handler_barrier = asyncio.Barrier(2) # ty:ignore[unresolved-attribute] # Test is skipped in older Python versions.
2074+
2075+
handler_call = AsyncMock()
2076+
2077+
@crawler_1.router.default_handler
2078+
async def handler_1(context: BasicCrawlingContext) -> None:
2079+
started_event.set()
2080+
# Ensure that both handlers are running at the same time.
2081+
await handler_barrier.wait()
2082+
event_manager = service_locator.get_event_manager()
2083+
2084+
await handler_call(event_manager.active)
2085+
2086+
@crawler_2.router.default_handler
2087+
async def handler_2(context: BasicCrawlingContext) -> None:
2088+
# Ensure that both handlers are running at the same time.
2089+
await handler_barrier.wait()
2090+
# Ensure that crawler_1 is finished and closed all active contexts.
2091+
await finished_event.wait()
2092+
# Check that event manager is active and can be used in the second crawler.
2093+
event_manager = service_locator.get_event_manager()
2094+
2095+
await handler_call(event_manager.active)
2096+
2097+
await asyncio.gather(
2098+
launch_crawler_1(),
2099+
launch_crawler_2(),
2100+
)
2101+
2102+
assert handler_call.call_count == 2
2103+
2104+
first_call = handler_call.call_args_list[0]
2105+
second_call = handler_call.call_args_list[1]
2106+
2107+
assert first_call[0][0] is True
2108+
assert second_call[0][0] is True
2109+
2110+
event_manager = service_locator.get_event_manager()
2111+
2112+
# After both crawlers are finished, event manager should be inactive.
2113+
assert event_manager.active is False
2114+
2115+
await rq1.drop()
2116+
await rq2.drop()
2117+
2118+
2119+
async def test_global_and_local_event_manager_in_crawler_run() -> None:
2120+
"""Test that both global and local event managers are used in crawler run."""
2121+
2122+
config = service_locator.get_configuration()
2123+
2124+
local_event_manager = LocalEventManager.from_config(config)
2125+
2126+
crawler = BasicCrawler(event_manager=local_event_manager)
2127+
2128+
handler_call = AsyncMock()
2129+
2130+
@crawler.router.default_handler
2131+
async def handler(context: BasicCrawlingContext) -> None:
2132+
global_event_manager = service_locator.get_event_manager()
2133+
handler_call(local_event_manager.active, global_event_manager.active)
2134+
2135+
await crawler.run(['https://a.placeholder.com'])
2136+
2137+
assert handler_call.call_count == 1
2138+
2139+
local_em_state, global_em_state = handler_call.call_args_list[0][0]
2140+
2141+
# Both event managers should be active.
2142+
assert local_em_state is True
2143+
assert global_em_state is True
2144+
2145+
global_event_manager = service_locator.get_event_manager()
2146+
2147+
# After crawler is finished, both event managers should be inactive.
2148+
assert local_event_manager.active is False
2149+
assert global_event_manager.active is False

tests/unit/events/test_event_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
199199
with pytest.raises(RuntimeError, match=r'EventManager is not active.'):
200200
await event_manager.wait_for_all_listeners_to_complete()
201201

202-
with pytest.raises(RuntimeError, match=r'EventManager is already active.'):
203-
async with event_manager, event_manager:
204-
pass
205-
206202
async with event_manager:
207203
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data)
208204
await event_manager.wait_for_all_listeners_to_complete()

0 commit comments

Comments
 (0)