Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/upgrading/upgrading_to_v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,20 @@ service_locator.set_storage_client(MemoryStorageClient())
service_locator.set_storage_client(MemoryStorageClient()) # Raises an error
```

## BasicCrawler changes

### Renamed methods for opening storages
- `BasicCrawler.get_dataset` renamed to `BasicCrawler.open_dataset`
- `BasicCrawler.get_key_value_store` renamed to `BasicCrawler.open_key_value_store`

### Added method for opening RequestQueue that uses configuration and storage client of the crawler
- `BasicCrawler.open_request_queue`

### BasicCrawler has its own instance of ServiceLocator to track its own services
Explicitly passed services to the crawler can be different the global ones accessible in `crawlee.service_locator`. `BasicCrawler` no longer causes the global services in `service_locator` to be set to the crawler's explicitly passed services.

**Before (v0.6):**

```python
from crawlee import service_locator
from crawlee.crawlers import BasicCrawler
Expand Down Expand Up @@ -244,7 +254,7 @@ async def main() -> None:
crawler = BasicCrawler(storage_client=custom_storage_client)

assert service_locator.get_storage_client() is not custom_storage_client
assert await crawler.get_dataset() is not await Dataset.open()
assert await crawler.open_dataset() is not await Dataset.open()
```

This allows two crawlers with different services at the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ async def get_input_state(
use_state_function = context.use_state

# New result is created and injected to newly created context. This is done to ensure isolation of sub crawlers.
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store)
result = RequestHandlerRunResult(key_value_store_getter=self.open_key_value_store)
context_linked_to_result = BasicCrawlingContext(
request=deepcopy(context.request),
session=deepcopy(context.session),
Expand Down
42 changes: 27 additions & 15 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,21 +563,33 @@ async def _get_proxy_info(self, request: Request, session: Session | None) -> Pr
async def get_request_manager(self) -> RequestManager:
"""Return the configured request manager. If none is configured, open and return the default request queue."""
if not self._request_manager:
self._request_manager = await RequestQueue.open(
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)

self._request_manager = await self.open_request_queue()
return self._request_manager

async def get_dataset(
async def open_request_queue(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has some funk to it - if the crawler uses a non-default request manager, this will still return the default request queue. If somebody does that, they will probably be surprised that adding requests to this queue does nothing 😁

Perhaps the method could throw if there is a non-default request manager in place?

self,
*,
id: str | None = None,
name: str | None = None,
alias: str | None = None,
) -> RequestQueue:
"""Return `RequestQueue` with the given ID or name or alias. If none is provided, return the default one."""
return await RequestQueue.open(
id=id,
name=name,
alias=alias,
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)

async def open_dataset(
self,
*,
id: str | None = None,
name: str | None = None,
alias: str | None = None,
) -> Dataset:
"""Return the `Dataset` with the given ID or name. If none is provided, return the default one."""
"""Return `Dataset` with the given ID or name or alias. If none is provided, return the default one."""
return await Dataset.open(
id=id,
name=name,
Expand All @@ -586,14 +598,14 @@ async def get_dataset(
configuration=self._service_locator.get_configuration(),
)

async def get_key_value_store(
async def open_key_value_store(
self,
*,
id: str | None = None,
name: str | None = None,
alias: str | None = None,
) -> KeyValueStore:
"""Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS."""
"""Return `KeyValueStore` with the given ID or name or alias. If none is provided, return the default KVS."""
return await KeyValueStore.open(
id=id,
name=name,
Expand Down Expand Up @@ -659,7 +671,7 @@ async def run(
request_manager = await self.get_request_manager()
if purge_request_queue and isinstance(request_manager, RequestQueue):
await request_manager.drop()
self._request_manager = await RequestQueue.open()
self._request_manager = await self.open_request_queue()

if requests is not None:
await self.add_requests(requests)
Expand Down Expand Up @@ -793,11 +805,11 @@ async def _use_state(
self,
default_value: dict[str, JsonSerializable] | None = None,
) -> dict[str, JsonSerializable]:
kvs = await self.get_key_value_store()
kvs = await self.open_key_value_store()
return await kvs.get_auto_saved_value(self._CRAWLEE_STATE_KEY, default_value)

async def _save_crawler_state(self) -> None:
store = await self.get_key_value_store()
store = await self.open_key_value_store()
await store.persist_autosaved_values()

async def get_data(
Expand Down Expand Up @@ -887,7 +899,7 @@ async def _push_data(
dataset_alias: The alias of the `Dataset` (run scope, unnamed storage).
kwargs: Keyword arguments to be passed to the `Dataset.push_data()` method.
"""
dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
dataset = await self.open_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
await dataset.push_data(data, **kwargs)

def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) -> bool:
Expand Down Expand Up @@ -1269,7 +1281,7 @@ async def _commit_request_handler_result(self, context: BasicCrawlingContext) ->
for push_data_call in result.push_data_calls:
await self._push_data(**push_data_call)

await self._commit_key_value_store_changes(result, get_kvs=self.get_key_value_store)
await self._commit_key_value_store_changes(result, get_kvs=self.open_key_value_store)

@staticmethod
async def _commit_key_value_store_changes(
Expand Down Expand Up @@ -1336,7 +1348,7 @@ async def __run_task_function(self) -> None:
else:
session = await self._get_session()
proxy_info = await self._get_proxy_info(request, session)
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store)
result = RequestHandlerRunResult(key_value_store_getter=self.open_key_value_store)

context = BasicCrawlingContext(
request=request,
Expand Down
28 changes: 20 additions & 8 deletions src/crawlee/storages/_storage_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,33 @@ def remove_from_cache(self, storage_instance: Storage) -> None:
"""
storage_type = type(storage_instance)

for storage_client_cache in self._cache_by_storage_client.values():
for storage_client_type in self._cache_by_storage_client:
# Remove from ID cache
for additional_key in storage_client_cache.by_id[storage_type][storage_instance.id]:
del storage_client_cache.by_id[storage_type][storage_instance.id][additional_key]
for additional_key in self._cache_by_storage_client[storage_client_type].by_id[storage_type][
storage_instance.id
]:
del self._cache_by_storage_client[storage_client_type].by_id[storage_type][storage_instance.id][
additional_key
]
break

# Remove from name cache or alias cache. It can never be in both.
if storage_instance.name is not None:
for additional_key in storage_client_cache.by_name[storage_type][storage_instance.name]:
del storage_client_cache.by_name[storage_type][storage_instance.name][additional_key]
for additional_key in self._cache_by_storage_client[storage_client_type].by_name[storage_type][
storage_instance.name
]:
del self._cache_by_storage_client[storage_client_type].by_name[storage_type][storage_instance.name][
additional_key
]
break
else:
for alias_key in storage_client_cache.by_alias[storage_type]:
for additional_key in storage_client_cache.by_alias[storage_type][alias_key]:
del storage_client_cache.by_alias[storage_type][alias_key][additional_key]
for alias_key in self._cache_by_storage_client[storage_client_type].by_alias[storage_type]:
for additional_key in self._cache_by_storage_client[storage_client_type].by_alias[storage_type][
alias_key
]:
del self._cache_by_storage_client[storage_client_type].by_alias[storage_type][alias_key][
additional_key
]
break

def clear_cache(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ async def request_handler(context: AdaptivePlaywrightCrawlingContext) -> None:

await crawler.run(test_urls[:1])

dataset = await crawler.get_dataset()
dataset = await crawler.open_dataset()
stored_results = [item async for item in dataset.iterate_items()]

if error_in_pw_crawler:
Expand Down
51 changes: 37 additions & 14 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,10 @@ async def test_crawler_get_storages() -> None:
rp = await crawler.get_request_manager()
assert isinstance(rp, RequestQueue)

dataset = await crawler.get_dataset()
dataset = await crawler.open_dataset()
assert isinstance(dataset, Dataset)

kvs = await crawler.get_key_value_store()
kvs = await crawler.open_key_value_store()
assert isinstance(kvs, KeyValueStore)


Expand Down Expand Up @@ -725,7 +725,7 @@ async def handler(context: BasicCrawlingContext) -> None:

await crawler.run(['https://hello.world'])

store = await crawler.get_key_value_store()
store = await crawler.open_key_value_store()
assert (await store.get_value('foo')) == 'bar'


Expand All @@ -738,7 +738,7 @@ async def handler(context: BasicCrawlingContext) -> None:

await crawler.run(['https://hello.world'])

kvs = await crawler.get_key_value_store()
kvs = await crawler.open_key_value_store()
value = await kvs.get_value(BasicCrawler._CRAWLEE_STATE_KEY)

assert value == {'hello': 'world'}
Expand Down Expand Up @@ -781,7 +781,7 @@ async def handler_three(context: BasicCrawlingContext) -> None:
# The state in handler_three must match the final state updated in previous run
assert state_in_handler_three == {'hello': 'last_world'}

store = await crawler.get_key_value_store()
store = await crawler.open_key_value_store()

# The state in the KVS must match with the last set state
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY)) == {'hello': 'last_world'}
Expand Down Expand Up @@ -1109,8 +1109,8 @@ async def test_crawler_uses_default_storages(tmp_path: Path) -> None:

crawler = BasicCrawler()

assert dataset is await crawler.get_dataset()
assert kvs is await crawler.get_key_value_store()
assert dataset is await crawler.open_dataset()
assert kvs is await crawler.open_key_value_store()
assert rq is await crawler.get_request_manager()


Expand All @@ -1127,8 +1127,8 @@ async def test_crawler_can_use_other_storages(tmp_path: Path) -> None:

crawler = BasicCrawler(storage_client=MemoryStorageClient())

assert dataset is not await crawler.get_dataset()
assert kvs is not await crawler.get_key_value_store()
assert dataset is not await crawler.open_dataset()
assert kvs is not await crawler.open_key_value_store()
assert rq is not await crawler.get_request_manager()


Expand Down Expand Up @@ -1164,8 +1164,8 @@ async def test_crawler_can_use_other_storages_of_same_type(tmp_path: Path) -> No
crawler = BasicCrawler(storage_client=FileSystemStorageClient(), configuration=configuration_b)

# Assert that the storages are different
assert dataset is not await crawler.get_dataset()
assert kvs is not await crawler.get_key_value_store()
assert dataset is not await crawler.open_dataset()
assert kvs is not await crawler.open_key_value_store()
assert rq is not await crawler.get_request_manager()

# Assert that all storages exists on the filesystem
Expand Down Expand Up @@ -1193,7 +1193,7 @@ async def handler(context: BasicCrawlingContext) -> None:
await crawler.run(['https://does-not-matter.com'])
assert spy.call_count >= 1

dataset = await crawler.get_dataset()
dataset = await crawler.open_dataset()
data = await dataset.get_data()
assert data.items == [{'foo': 'bar'}]

Expand All @@ -1208,7 +1208,7 @@ async def test_context_use_state_race_condition_in_handlers(key_value_store: Key
from asyncio import Barrier # type:ignore[attr-defined] # noqa: PLC0415

crawler = BasicCrawler()
store = await crawler.get_key_value_store()
store = await crawler.open_key_value_store()
await store.set_value(BasicCrawler._CRAWLEE_STATE_KEY, {'counter': 0})
handler_barrier = Barrier(2)

Expand All @@ -1221,7 +1221,7 @@ async def handler(context: BasicCrawlingContext) -> None:

await crawler.run(['https://crawlee.dev/', 'https://crawlee.dev/docs/quick-start'])

store = await crawler.get_key_value_store()
store = await crawler.open_key_value_store()
# Ensure that local state is pushed back to kvs.
await store.persist_autosaved_values()
assert (await store.get_value(BasicCrawler._CRAWLEE_STATE_KEY))['counter'] == 2
Expand Down Expand Up @@ -1549,3 +1549,26 @@ def listener(event_data: EventCrawlerStatusData) -> None:
event_manager.off(event=Event.CRAWLER_STATUS, listener=listener)

assert status_message_listener.called


async def test_crawler_purge_request_queue_uses_same_storage_client() -> None:
"""Make sure that purge on start does not replace the storage client in the underlying storage manager"""

# Set some different storage_client globally and different for Crawlee.
service_locator.set_storage_client(FileSystemStorageClient())
unrelated_rq = await RequestQueue.open()
unrelated_request = Request.from_url('https://x.placeholder.com')
await unrelated_rq.add_request(unrelated_request)

crawler = BasicCrawler(storage_client=MemoryStorageClient())

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
context.log.info(context.request.url)

for _ in (1, 2):
await crawler.run(requests=[Request.from_url('https://a.placeholder.com')], purge_request_queue=True)
assert crawler.statistics.state.requests_finished == 1

# Crawler should not fall back to the default storage after the purge
assert await unrelated_rq.fetch_next_request() == unrelated_request
2 changes: 1 addition & 1 deletion tests/unit/crawlers/_http/test_http_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ async def request_handler(context: HttpCrawlingContext) -> None:

await crawler.run([str(server_url)])

kvs = await crawler.get_key_value_store()
kvs = await crawler.open_key_value_store()
kvs_content = {}
async for key_info in kvs.iterate_keys():
kvs_content[key_info.key] = await kvs.get_value(key_info.key)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/crawlers/_playwright/test_playwright_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ async def request_handler(context: PlaywrightCrawlingContext) -> None:
[str(server_url), str(server_url / 'page_1'), str(server_url / 'page_2'), str(server_url / 'headers')]
)

kvs = await crawler.get_key_value_store()
kvs = await crawler.open_key_value_store()
kvs_content = {}

async for key_info in kvs.iterate_keys():
Expand Down