Skip to content

Commit 0ca0895

Browse files
committed
fix global event_manager for correctly support multiple crawlers
1 parent 0b2e3fc commit 0ca0895

4 files changed

Lines changed: 85 additions & 12 deletions

File tree

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
UserDefinedErrorHandlerError,
6262
UserHandlerTimeoutError,
6363
)
64+
from crawlee.events import EventManager
6465
from crawlee.events._types import Event, EventCrawlerStatusData
6566
from crawlee.http_clients import ImpitHttpClient
6667
from crawlee.router import Router
@@ -90,7 +91,6 @@
9091
PushDataKwargs,
9192
)
9293
from crawlee.configuration import Configuration
93-
from crawlee.events import EventManager
9494
from crawlee.http_clients import HttpClient, HttpResponse
9595
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
9696
from crawlee.request_loaders import RequestManager
@@ -783,7 +783,7 @@ async def _run_crawler(self) -> None:
783783
self._crawler_state_rec_task,
784784
*self._additional_context_managers,
785785
)
786-
if cm and getattr(cm, 'active', False) is False
786+
if cm and (isinstance(cm, EventManager) or not getattr(cm, 'active', False))
787787
]
788788

789789
async with AsyncExitStack() as exit_stack:

src/crawlee/events/_event_manager.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ def __init__(
9393
delay=self._persist_state_interval,
9494
)
9595

96+
# Reference count for active contexts.
97+
self._ref_count = 0
98+
9699
# Flag to indicate the context state.
97100
self._active = False
98101

@@ -102,13 +105,11 @@ def active(self) -> bool:
102105
return self._active
103106

104107
async def __aenter__(self) -> EventManager:
105-
"""Initialize the event manager upon entering the async context.
108+
"""Initialize the event manager upon entering the async context."""
109+
self._ref_count += 1
106110

107-
Raises:
108-
RuntimeError: If the context manager is already active.
109-
"""
110-
if self._active:
111-
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
111+
if self._ref_count > 1:
112+
return self
112113

113114
self._active = True
114115
self._emit_persist_state_event_rec_task.start()
@@ -130,6 +131,11 @@ async def __aexit__(
130131
if not self._active:
131132
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
132133

134+
self._ref_count -= 1
135+
136+
if self._ref_count > 0:
137+
return
138+
133139
# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
134140
await self._emit_persist_state_event_rec_task.stop()
135141
await self._emit_persist_state_event()

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2047,3 +2047,74 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ
20472047
assert error_request is not None
20482048
assert error_request.state == RequestState.DONE
20492049
assert error_request.was_already_handled
2050+
2051+
2052+
@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.')
2053+
async def test_multiple_crawlers_with_global_event_manager() -> None:
2054+
"""Test that multiple crawlers work correctly when using the global event manager."""
2055+
2056+
# Test is skipped in older Python versions.
2057+
from asyncio import Barrier # type:ignore[attr-defined] # noqa: PLC0415
2058+
2059+
rq1 = await RequestQueue.open(alias='rq1')
2060+
rq2 = await RequestQueue.open(alias='rq2')
2061+
2062+
crawler_1 = BasicCrawler(request_manager=rq1)
2063+
crawler_2 = BasicCrawler(request_manager=rq2)
2064+
2065+
started_event = asyncio.Event()
2066+
finished_event = asyncio.Event()
2067+
2068+
async def launch_crawler_1() -> None:
2069+
await crawler_1.run(['https://a.placeholder.com'])
2070+
finished_event.set()
2071+
2072+
async def launch_crawler_2() -> None:
2073+
# Ensure that crawler_1 is already running and has activated event_manager
2074+
await started_event.wait()
2075+
await crawler_2.run(['https://b.placeholder.com'])
2076+
2077+
handler_barrier = Barrier(2)
2078+
2079+
handler_call = AsyncMock()
2080+
2081+
@crawler_1.router.default_handler
2082+
async def handler_1(context: BasicCrawlingContext) -> None:
2083+
started_event.set()
2084+
# Ensure that both handlers are running at the same time.
2085+
await handler_barrier.wait()
2086+
event_manager = service_locator.get_event_manager()
2087+
2088+
await handler_call(event_manager.active)
2089+
2090+
@crawler_2.router.default_handler
2091+
async def handler_2(context: BasicCrawlingContext) -> None:
2092+
# Ensure that both handlers are running at the same time.
2093+
await handler_barrier.wait()
2094+
# Ensure that crawler_1 is finished and closed all active contexts.
2095+
await finished_event.wait()
2096+
# Check that event manager is active and can be used in the second crawler.
2097+
event_manager = service_locator.get_event_manager()
2098+
2099+
await handler_call(event_manager.active)
2100+
2101+
await asyncio.gather(
2102+
launch_crawler_1(),
2103+
launch_crawler_2(),
2104+
)
2105+
2106+
assert handler_call.call_count == 2
2107+
2108+
first_call = handler_call.call_args_list[0]
2109+
second_call = handler_call.call_args_list[1]
2110+
2111+
assert first_call[0][0] is True
2112+
assert second_call[0][0] is True
2113+
2114+
event_manager = service_locator.get_event_manager()
2115+
2116+
# After both crawlers are finished, event manager should be inactive.
2117+
assert event_manager.active is False
2118+
2119+
await rq1.drop()
2120+
await rq2.drop()

tests/unit/events/test_event_manager.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
199199
with pytest.raises(RuntimeError, match=r'EventManager is not active.'):
200200
await event_manager.wait_for_all_listeners_to_complete()
201201

202-
with pytest.raises(RuntimeError, match=r'EventManager is already active.'):
203-
async with event_manager, event_manager:
204-
pass
205-
206202
async with event_manager:
207203
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data)
208204
await event_manager.wait_for_all_listeners_to_complete()

0 commit comments

Comments
 (0)