Skip to content

Commit ff4c976

Browse files
committed
code quality up
1 parent 4bf288f commit ff4c976

2 files changed

Lines changed: 32 additions & 33 deletions

File tree

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
UserDefinedErrorHandlerError,
6262
UserHandlerTimeoutError,
6363
)
64-
from crawlee.events import EventManager
6564
from crawlee.events._types import Event, EventCrawlerStatusData
6665
from crawlee.http_clients import ImpitHttpClient
6766
from crawlee.router import Router
@@ -91,6 +90,7 @@
9190
PushDataKwargs,
9291
)
9392
from crawlee.configuration import Configuration
93+
from crawlee.events import EventManager
9494
from crawlee.http_clients import HttpClient, HttpResponse
9595
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
9696
from crawlee.request_loaders import RequestManager
@@ -773,26 +773,31 @@ async def _run_crawler(self) -> None:
773773
if local_event_manager is global_event_manager:
774774
local_event_manager = None # Avoid entering the same event manager context twice
775775

776+
# The event managers are always entered.
777+
contexts_to_enter: list[Any] = (
778+
[global_event_manager, local_event_manager] if local_event_manager else [global_event_manager]
779+
)
780+
776781
# Collect the context managers to be entered. Context managers that are already active are excluded,
777782
# as they were likely entered by the caller, who will also be responsible for exiting them.
778-
contexts_to_enter = [
779-
cm
780-
for cm in (
781-
global_event_manager,
782-
local_event_manager,
783-
self._snapshotter,
784-
self._statistics,
785-
self._session_pool if self._use_session_pool else None,
786-
self._http_client,
787-
self._crawler_state_rec_task,
788-
*self._additional_context_managers,
789-
)
790-
if cm and (isinstance(cm, EventManager) or not getattr(cm, 'active', False))
791-
]
783+
contexts_to_enter.extend(
784+
[
785+
cm
786+
for cm in (
787+
self._snapshotter,
788+
self._statistics,
789+
self._session_pool if self._use_session_pool else None,
790+
self._http_client,
791+
self._crawler_state_rec_task,
792+
*self._additional_context_managers,
793+
)
794+
if cm and getattr(cm, 'active', False) is False
795+
]
796+
)
792797

793798
async with AsyncExitStack() as exit_stack:
794799
for context in contexts_to_enter:
795-
await exit_stack.enter_async_context(context) # ty: ignore[invalid-argument-type]
800+
await exit_stack.enter_async_context(context)
796801

797802
await self._autoscaled_pool.run()
798803

src/crawlee/events/_event_manager.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -94,24 +94,19 @@ def __init__(
9494
)
9595

9696
# Reference count for active contexts.
97-
self._ref_count = 0
98-
99-
# Flag to indicate the context state.
100-
self._active = False
97+
self._active_ref_count = 0
10198

10299
@property
103100
def active(self) -> bool:
104101
"""Indicate whether the context is active."""
105-
return self._active
102+
return self._active_ref_count > 0
106103

107104
async def __aenter__(self) -> EventManager:
108105
"""Initialize the event manager upon entering the async context."""
109-
self._ref_count += 1
110-
111-
if self._ref_count > 1:
106+
self._active_ref_count += 1
107+
if self._active_ref_count > 1:
112108
return self
113109

114-
self._active = True
115110
self._emit_persist_state_event_rec_task.start()
116111
return self
117112

@@ -128,24 +123,23 @@ async def __aexit__(
128123
Raises:
129124
RuntimeError: If the context manager is not active.
130125
"""
131-
if not self._active:
126+
if not self.active:
132127
raise RuntimeError(f'The {self.__class__.__name__} is not active.')
133128

134-
self._ref_count -= 1
135-
136-
# Emit persist state event to ensure the latest state is saved before closing the context.
137-
await self._emit_persist_state_event()
138-
139-
if self._ref_count > 0:
129+
if self._active_ref_count > 1:
130+
# Emit persist state event to ensure the latest state is saved before closing the context.
131+
await self._emit_persist_state_event()
132+
self._active_ref_count -= 1
140133
return
141134

142135
# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
143136
await self._emit_persist_state_event_rec_task.stop()
137+
await self._emit_persist_state_event()
144138
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
145139
self._event_emitter.remove_all_listeners()
146140
self._listener_tasks.clear()
147141
self._listeners_to_wrappers.clear()
148-
self._active = False
142+
self._active_ref_count -= 1
149143

150144
@overload
151145
def on(self, *, event: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]) -> None: ...

0 commit comments

Comments
 (0)