Skip to content

Commit 64c246b

Browse files
authored
fix: Do not share state between different crawlers unless requested (#1669)
### Description - Introduces a new argument `id` for `BasicCrawler`. This argument controls the shared state. - Each new instance of the `BasicCrawler` gets an automatically incremented `id` to avoid unintentional sharing of state between crawlers. - If two crawlers should use the same state, then it is possible to pass the same `id` to the crawler `__init__`. ### Issues Closes: #1627 ### Testing - Added tests.
1 parent cbf3954 commit 64c246b

File tree

4 files changed

+169
-23
lines changed

4 files changed

+169
-23
lines changed

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ class _BasicCrawlerOptions(TypedDict):
213213
"""Allows overriding the default status message. The default status message is provided in the parameters.
214214
Returning `None` suppresses the status message."""
215215

216+
id: NotRequired[int]
217+
"""Identifier used for crawler state tracking. Use the same id across multiple crawlers to share state between
218+
them."""
219+
216220

217221
class _BasicCrawlerOptionsGeneric(TypedDict, Generic[TCrawlingContext, TStatisticsState]):
218222
"""Generic options the `BasicCrawler` constructor."""
@@ -266,6 +270,7 @@ class BasicCrawler(Generic[TCrawlingContext, TStatisticsState]):
266270

267271
_CRAWLEE_STATE_KEY = 'CRAWLEE_STATE'
268272
_request_handler_timeout_text = 'Request handler timed out after'
273+
__next_id = 0
269274

270275
def __init__(
271276
self,
@@ -297,6 +302,7 @@ def __init__(
297302
status_message_logging_interval: timedelta = timedelta(seconds=10),
298303
status_message_callback: Callable[[StatisticsState, StatisticsState | None, str], Awaitable[str | None]]
299304
| None = None,
305+
id: int | None = None,
300306
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
301307
_additional_context_managers: Sequence[AbstractAsyncContextManager] | None = None,
302308
_logger: logging.Logger | None = None,
@@ -349,13 +355,21 @@ def __init__(
349355
status_message_logging_interval: Interval for logging the crawler status messages.
350356
status_message_callback: Allows overriding the default status message. The default status message is
351357
provided in the parameters. Returning `None` suppresses the status message.
358+
id: Identifier used for crawler state tracking. Use the same id across multiple crawlers to share state
359+
between them.
352360
_context_pipeline: Enables extending the request lifecycle and modifying the crawling context.
353361
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
354362
_additional_context_managers: Additional context managers used throughout the crawler lifecycle.
355363
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
356364
_logger: A logger instance, typically provided by a subclass, for consistent logging labels.
357365
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
358366
"""
367+
if id is None:
368+
self._id = BasicCrawler.__next_id
369+
BasicCrawler.__next_id += 1
370+
else:
371+
self._id = id
372+
359373
implicit_event_manager_with_explicit_config = False
360374
if not configuration:
361375
configuration = service_locator.get_configuration()
@@ -831,7 +845,7 @@ async def _use_state(
831845
default_value: dict[str, JsonSerializable] | None = None,
832846
) -> dict[str, JsonSerializable]:
833847
kvs = await self.get_key_value_store()
834-
return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value)
848+
return await kvs.get_auto_saved_value(f'{self._CRAWLEE_STATE_KEY}_{self._id}', default_value)
835849

836850
async def _save_crawler_state(self) -> None:
837851
store = await self.get_key_value_store()

tests/unit/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from uvicorn.config import Config
1313

1414
from crawlee import service_locator
15+
from crawlee.crawlers import BasicCrawler
1516
from crawlee.fingerprint_suite._browserforge_adapter import get_available_header_network
1617
from crawlee.http_clients import CurlImpersonateHttpClient, HttpxHttpClient, ImpitHttpClient
1718
from crawlee.proxy_configuration import ProxyInfo
@@ -74,6 +75,7 @@ def _prepare_test_env() -> None:
7475
# Reset global class variables to ensure test isolation.
7576
KeyValueStore._autosaved_values = {}
7677
Statistics._Statistics__next_id = 0 # type:ignore[attr-defined] # Mangled attribute
78+
BasicCrawler._BasicCrawler__next_id = 0 # type:ignore[attr-defined] # Mangled attribute
7779

7880
return _prepare_test_env
7981

tests/unit/crawlers/_adaptive_playwright/test_adaptive_playwright_crawler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ async def test_adaptive_crawling_result_use_state_isolation(
380380
crawler = AdaptivePlaywrightCrawler.with_beautifulsoup_static_parser(
381381
rendering_type_predictor=static_only_predictor_enforce_detection,
382382
)
383-
await key_value_store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
383+
await key_value_store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
384384
request_handler_calls = 0
385385

386386
@crawler.router.default_handler
@@ -397,7 +397,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None:
397397
# Request handler was called twice
398398
assert request_handler_calls == 2
399399
# Increment of global state happened only once
400-
assert (await key_value_store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 1
400+
assert (await key_value_store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 1
401401

402402

403403
async def test_adaptive_crawling_statistics(test_urls: list[str]) -> None:

tests/unit/crawlers/_basic/test_basic_crawler.py

Lines changed: 150 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -810,11 +810,62 @@ async def handler(context: BasicCrawlingContext) -> None:
810810
await crawler.run(['https://hello.world'])
811811

812812
kvs = await crawler.get_key_value_store()
813-
value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)
813+
value = await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
814814

815815
assert value == {'hello': 'world'}
816816

817817

818+
async def test_context_use_state_crawlers_share_state() -> None:
819+
async def handler(context: BasicCrawlingContext) -> None:
820+
state = await context.use_state({'urls': []})
821+
assert isinstance(state['urls'], list)
822+
state['urls'].append(context.request.url)
823+
824+
crawler_1 = BasicCrawler(id=0, request_handler=handler)
825+
crawler_2 = BasicCrawler(id=0, request_handler=handler)
826+
827+
await crawler_1.run(['https://a.com'])
828+
await crawler_2.run(['https://b.com'])
829+
830+
kvs = await KeyValueStore.open()
831+
assert crawler_1._id == crawler_2._id == 0
832+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_{crawler_1._id}') == {
833+
'urls': ['https://a.com', 'https://b.com']
834+
}
835+
836+
837+
async def test_crawlers_share_stats() -> None:
838+
async def handler(context: BasicCrawlingContext) -> None:
839+
await context.use_state({'urls': []})
840+
841+
crawler_1 = BasicCrawler(id=0, request_handler=handler)
842+
crawler_2 = BasicCrawler(id=0, request_handler=handler, statistics=crawler_1.statistics)
843+
844+
result1 = await crawler_1.run(['https://a.com'])
845+
result2 = await crawler_2.run(['https://b.com'])
846+
847+
assert crawler_1.statistics == crawler_2.statistics
848+
assert result1.requests_finished == 1
849+
assert result2.requests_finished == 2
850+
851+
852+
async def test_context_use_state_crawlers_own_state() -> None:
853+
async def handler(context: BasicCrawlingContext) -> None:
854+
state = await context.use_state({'urls': []})
855+
assert isinstance(state['urls'], list)
856+
state['urls'].append(context.request.url)
857+
858+
crawler_1 = BasicCrawler(request_handler=handler)
859+
crawler_2 = BasicCrawler(request_handler=handler)
860+
861+
await crawler_1.run(['https://a.com'])
862+
await crawler_2.run(['https://b.com'])
863+
864+
kvs = await KeyValueStore.open()
865+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0') == {'urls': ['https://a.com']}
866+
assert await kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1') == {'urls': ['https://b.com']}
867+
868+
818869
async def test_context_handlers_use_state(key_value_store: KeyValueStore) -> None:
819870
state_in_handler_one: dict[str, JsonSerializable] = {}
820871
state_in_handler_two: dict[str, JsonSerializable] = {}
@@ -855,7 +906,7 @@ async def handler_three(context: BasicCrawlingContext) -> None:
855906
store = await crawler.get_key_value_store()
856907

857908
# The state in the KVS must match with the last set state
858-
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'}
909+
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')) == {'hello': 'last_world'}
859910

860911

861912
async def test_max_requests_per_crawl() -> None:
@@ -1283,7 +1334,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key
12831334

12841335
crawler = BasicCrawler()
12851336
store = await crawler.get_key_value_store()
1286-
await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
1337+
await store.set_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0', {'counter': 0})
12871338
handler_barrier = Barrier(2)
12881339

12891340
@crawler.router.default_handler
@@ -1298,7 +1349,7 @@ async def handler(context: BasicCrawlingContext) -> None:
12981349
store = await crawler.get_key_value_store()
12991350
# Ensure that local state is pushed back to kvs.
13001351
await store.persist_autosaved_values()
1301-
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2
1352+
assert (await store.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0'))['counter'] == 2
13021353

13031354

13041355
@pytest.mark.run_alone
@@ -1754,55 +1805,88 @@ async def handler(context: BasicCrawlingContext) -> None:
17541805
assert await unrelated_rq.fetch_next_request() == unrelated_request
17551806

17561807

1757-
async def _run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1808+
async def _run_crawler(crawler_id: int | None, requests: list[str], storage_dir: str) -> StatisticsState:
17581809
"""Run crawler and return its statistics state.
17591810
17601811
Must be defined like this to be pickable for ProcessPoolExecutor."""
1761-
service_locator.set_configuration(
1762-
Configuration(
1763-
storage_dir=storage_dir,
1764-
purge_on_start=False,
1765-
)
1766-
)
17671812

17681813
async def request_handler(context: BasicCrawlingContext) -> None:
17691814
context.log.info(f'Processing {context.request.url} ...')
1815+
# Add visited url to crawler state and use it to verify state persistence.
1816+
state = await context.use_state({'urls': []})
1817+
state['urls'] = state.get('urls')
1818+
assert isinstance(state['urls'], list)
1819+
state['urls'].append(context.request.url)
1820+
context.log.info(f'State {state}')
17701821

17711822
crawler = BasicCrawler(
1823+
id=crawler_id,
17721824
request_handler=request_handler,
17731825
concurrency_settings=ConcurrencySettings(max_concurrency=1, desired_concurrency=1),
1826+
configuration=Configuration(
1827+
storage_dir=storage_dir,
1828+
purge_on_start=False,
1829+
),
17741830
)
17751831

17761832
await crawler.run(requests)
17771833
return crawler.statistics.state
17781834

17791835

1780-
def _process_run_crawler(requests: list[str], storage_dir: str) -> StatisticsState:
1781-
return asyncio.run(_run_crawler(requests=requests, storage_dir=storage_dir))
1836+
@dataclass
1837+
class _CrawlerInput:
1838+
requests: list[str]
1839+
id: None | int = None
1840+
17821841

1842+
def _process_run_crawlers(crawler_inputs: list[_CrawlerInput], storage_dir: str) -> list[StatisticsState]:
1843+
return [
1844+
asyncio.run(_run_crawler(crawler_id=crawler_input.id, requests=crawler_input.requests, storage_dir=storage_dir))
1845+
for crawler_input in crawler_inputs
1846+
]
17831847

1784-
async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
1785-
"""Test that crawler statistics persist and are loaded correctly.
1848+
1849+
async def test_crawler_state_persistence(tmp_path: Path) -> None:
1850+
"""Test that crawler statistics and state persist and are loaded correctly.
17861851
17871852
This test simulates starting the crawler process twice, and checks that the statistics include first run."""
17881853

1854+
state_kvs = await KeyValueStore.open(
1855+
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
1856+
)
1857+
17891858
with ProcessPoolExecutor() as executor:
17901859
# Crawl 2 requests in the first run and automatically persist the state.
17911860
first_run_state = executor.submit(
1792-
_process_run_crawler,
1793-
requests=['https://a.placeholder.com', 'https://b.placeholder.com'],
1861+
_process_run_crawlers,
1862+
crawler_inputs=[_CrawlerInput(requests=['https://a.placeholder.com', 'https://b.placeholder.com'])],
17941863
storage_dir=str(tmp_path),
1795-
).result()
1864+
).result()[0]
1865+
# Expected state after first crawler run
17961866
assert first_run_state.requests_finished == 2
1867+
state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1868+
assert state.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']
17971869

17981870
# Do not reuse the executor to simulate a fresh process to avoid modified class attributes.
17991871
with ProcessPoolExecutor() as executor:
18001872
# Crawl 1 additional requests in the second run, but use previously automatically persisted state.
18011873
second_run_state = executor.submit(
1802-
_process_run_crawler, requests=['https://c.placeholder.com'], storage_dir=str(tmp_path)
1803-
).result()
1874+
_process_run_crawlers,
1875+
crawler_inputs=[_CrawlerInput(requests=['https://c.placeholder.com'])],
1876+
storage_dir=str(tmp_path),
1877+
).result()[0]
1878+
1879+
# Expected state after second crawler run
1880+
# 2 requests from first run and 1 request from second run.
18041881
assert second_run_state.requests_finished == 3
18051882

1883+
state = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1884+
assert state.get('urls') == [
1885+
'https://a.placeholder.com',
1886+
'https://b.placeholder.com',
1887+
'https://c.placeholder.com',
1888+
]
1889+
18061890
assert first_run_state.crawler_started_at == second_run_state.crawler_started_at
18071891
assert first_run_state.crawler_finished_at
18081892
assert second_run_state.crawler_finished_at
@@ -1811,6 +1895,52 @@ async def test_crawler_statistics_persistence(tmp_path: Path) -> None:
18111895
assert first_run_state.crawler_runtime < second_run_state.crawler_runtime
18121896

18131897

1898+
async def test_crawler_state_persistence_2_crawlers_with_migration(tmp_path: Path) -> None:
1899+
"""Test that crawler statistics and state persist and are loaded correctly.
1900+
1901+
This test simulates starting the crawler process twice, and checks that the statistics include first run.
1902+
Each time two distinct crawlers are running, and they should keep using their own statistics and state."""
1903+
state_kvs = await KeyValueStore.open(
1904+
storage_client=FileSystemStorageClient(), configuration=Configuration(storage_dir=str(tmp_path))
1905+
)
1906+
1907+
with ProcessPoolExecutor() as executor:
1908+
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
1909+
first_run_states = executor.submit(
1910+
_process_run_crawlers,
1911+
crawler_inputs=[
1912+
_CrawlerInput(requests=['https://a.placeholder.com']),
1913+
_CrawlerInput(requests=['https://c.placeholder.com']),
1914+
],
1915+
storage_dir=str(tmp_path),
1916+
).result()
1917+
# Expected state after first crawler run
1918+
assert first_run_states[0].requests_finished == 1
1919+
assert first_run_states[1].requests_finished == 1
1920+
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1921+
assert state_0.get('urls') == ['https://a.placeholder.com']
1922+
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
1923+
assert state_1.get('urls') == ['https://c.placeholder.com']
1924+
1925+
with ProcessPoolExecutor() as executor:
1926+
# Run 2 crawler, each crawl 1 request in and automatically persist the state.
1927+
second_run_states = executor.submit(
1928+
_process_run_crawlers,
1929+
crawler_inputs=[
1930+
_CrawlerInput(requests=['https://b.placeholder.com']),
1931+
_CrawlerInput(requests=['https://d.placeholder.com']),
1932+
],
1933+
storage_dir=str(tmp_path),
1934+
).result()
1935+
# Expected state after first crawler run
1936+
assert second_run_states[0].requests_finished == 2
1937+
assert second_run_states[1].requests_finished == 2
1938+
state_0 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_0')
1939+
assert state_0.get('urls') == ['https://a.placeholder.com', 'https://b.placeholder.com']
1940+
state_1 = await state_kvs.get_value(f'{BasicCrawler._CRAWLEE_STATE_KEY}_1')
1941+
assert state_1.get('urls') == ['https://c.placeholder.com', 'https://d.placeholder.com']
1942+
1943+
18141944
async def test_crawler_intermediate_statistics() -> None:
18151945
"""Test that crawler statistics are correctly updating total runtime on every calculate call."""
18161946
crawler = BasicCrawler()

0 commit comments

Comments
 (0)