Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a3aa0e3
WIP
Pijukatel Aug 28, 2025
be54e46
Draft for discussion.
Pijukatel Aug 29, 2025
9c6a521
Remove temp edits for e2e tests
Pijukatel Aug 29, 2025
86c4244
Simplify types
Pijukatel Aug 29, 2025
dd5f914
Update configuration according to Pydantic docs recommendation
Pijukatel Sep 1, 2025
df06b82
Properly create custom storage clients when p[assing custom configura…
Pijukatel Sep 1, 2025
430f2ad
Update the create methods
Pijukatel Sep 1, 2025
1340c5c
Add global instanc manager
Pijukatel Sep 2, 2025
a1746f3
Try to keep config in open, but rework the instance cache
Pijukatel Sep 2, 2025
e86356d
Properly set global_storage_instance_manager
Pijukatel Sep 2, 2025
07ffa19
Avoid coroutine ... was never awaited
Pijukatel Sep 2, 2025
6026b3a
Revert incorrect delete, add test that would catch it, remove useless…
Pijukatel Sep 12, 2025
330b598
Revert unintentional edit in base class
Pijukatel Sep 12, 2025
8143277
Parametrize storage_instance_manager tests by storage type
Pijukatel Sep 12, 2025
ed4bca7
Extract shared fixture and remove unused fixtures
Pijukatel Sep 12, 2025
b06054b
Update upgrading guide
Pijukatel Sep 12, 2025
fa2703b
Merge remote-tracking branch 'origin/master' into storage-clients-and…
Pijukatel Sep 15, 2025
97055ee
Apply suggestions from code review
Pijukatel Sep 16, 2025
6affd8b
Review comments
Pijukatel Sep 16, 2025
1706ed8
Renamed client_opener to client_opener_coro
Pijukatel Sep 16, 2025
6ccd4f7
Update src/crawlee/storages/_storage_instance_manager.py
Pijukatel Sep 16, 2025
043d308
Review comments
Pijukatel Sep 16, 2025
0ff47c8
Update pyproject.toml
Pijukatel Sep 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from datetime import timedelta

from crawlee import service_locator
from crawlee.configuration import Configuration
from crawlee.storage_clients import MemoryStorageClient
from crawlee.storages import Dataset


Expand All @@ -11,10 +13,16 @@ async def main() -> None:
headless=False,
persist_state_interval=timedelta(seconds=30),
)
# Set the custom configuration as the global default configuration.
service_locator.set_configuration(configuration)

# Pass the configuration to the dataset (or other storage) when opening it.
dataset = await Dataset.open(
configuration=configuration,
# Use the global defaults when creating the dataset (or other storage).
dataset_1 = await Dataset.open()

# Or set explicitly specific configuration if
# you do not want to rely on global defaults.
dataset_2 = await Dataset.open(
storage_client=MemoryStorageClient(), configuration=configuration
)


Expand Down
2 changes: 1 addition & 1 deletion src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def from_config(cls, config: Configuration | None = None) -> Snapshotter:
Args:
config: The `Configuration` instance. Uses the global (default) one if not provided.
"""
config = service_locator.get_configuration()
config = config or service_locator.get_configuration()

# Compute the maximum memory size based on the provided configuration. If `memory_mbytes` is provided,
# it uses that value. Otherwise, it calculates the `max_memory_size` as a proportion of the system's
Expand Down
68 changes: 44 additions & 24 deletions src/crawlee/_service_locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
if TYPE_CHECKING:
from crawlee.storages._storage_instance_manager import StorageInstanceManager

from logging import getLogger

logger = getLogger(__name__)


@docs_group('Configuration')
class ServiceLocator:
Expand All @@ -19,23 +23,24 @@ class ServiceLocator:
All services are initialized to its default value lazily.
"""

def __init__(self) -> None:
self._configuration: Configuration | None = None
self._event_manager: EventManager | None = None
self._storage_client: StorageClient | None = None
self._storage_instance_manager: StorageInstanceManager | None = None
global_storage_instance_manager: StorageInstanceManager | None = None

# Flags to check if the services were already set.
self._configuration_was_retrieved = False
self._event_manager_was_retrieved = False
self._storage_client_was_retrieved = False
def __init__(
self,
configuration: Configuration | None = None,
event_manager: EventManager | None = None,
storage_client: StorageClient | None = None,
) -> None:
self._configuration = configuration
self._event_manager = event_manager
self._storage_client = storage_client

def get_configuration(self) -> Configuration:
"""Get the configuration."""
if self._configuration is None:
logger.warning('No configuration set, implicitly creating and using default Configuration.')
self._configuration = Configuration()

self._configuration_was_retrieved = True
return self._configuration

def set_configuration(self, configuration: Configuration) -> None:
Expand All @@ -47,21 +52,25 @@ def set_configuration(self, configuration: Configuration) -> None:
Raises:
ServiceConflictError: If the configuration has already been retrieved before.
"""
if self._configuration_was_retrieved:
if self._configuration is configuration:
# Same instance, no need to anything
return
if self._configuration:
raise ServiceConflictError(Configuration, configuration, self._configuration)

self._configuration = configuration

def get_event_manager(self) -> EventManager:
"""Get the event manager."""
if self._event_manager is None:
self._event_manager = (
LocalEventManager().from_config(config=self._configuration)
if self._configuration
else LocalEventManager.from_config()
)
logger.warning('No event manager set, implicitly creating and using default LocalEventManager.')
if self._configuration is None:
logger.warning(
'Implicit creation of event manager will implicitly set configuration as side effect. '
'It is advised to explicitly first set the configuration instead.'
)
self._event_manager = LocalEventManager().from_config(config=self._configuration)

self._event_manager_was_retrieved = True
return self._event_manager

def set_event_manager(self, event_manager: EventManager) -> None:
Expand All @@ -73,17 +82,25 @@ def set_event_manager(self, event_manager: EventManager) -> None:
Raises:
ServiceConflictError: If the event manager has already been retrieved before.
"""
if self._event_manager_was_retrieved:
if self._event_manager is event_manager:
# Same instance, no need to anything
return
if self._event_manager:
raise ServiceConflictError(EventManager, event_manager, self._event_manager)

self._event_manager = event_manager

def get_storage_client(self) -> StorageClient:
"""Get the storage client."""
if self._storage_client is None:
logger.warning('No storage client set, implicitly creating and using default FileSystemStorageClient.')
if self._configuration is None:
logger.warning(
'Implicit creation of storage client will implicitly set configuration as side effect. '
'It is advised to explicitly first set the configuration instead.'
)
self._storage_client = FileSystemStorageClient()

self._storage_client_was_retrieved = True
return self._storage_client

def set_storage_client(self, storage_client: StorageClient) -> None:
Expand All @@ -95,21 +112,24 @@ def set_storage_client(self, storage_client: StorageClient) -> None:
Raises:
ServiceConflictError: If the storage client has already been retrieved before.
"""
if self._storage_client_was_retrieved:
if self._storage_client is storage_client:
# Same instance, no need to anything
return
if self._storage_client:
raise ServiceConflictError(StorageClient, storage_client, self._storage_client)

self._storage_client = storage_client

@property
def storage_instance_manager(self) -> StorageInstanceManager:
"""Get the storage instance manager."""
if self._storage_instance_manager is None:
"""Get the storage instance manager. It is global manager shared by all instances of ServiceLocator."""
if ServiceLocator.global_storage_instance_manager is None:
# Import here to avoid circular imports.
from crawlee.storages._storage_instance_manager import StorageInstanceManager # noqa: PLC0415

self._storage_instance_manager = StorageInstanceManager()
ServiceLocator.global_storage_instance_manager = StorageInstanceManager()

return self._storage_instance_manager
return ServiceLocator.global_storage_instance_manager


service_locator = ServiceLocator()
2 changes: 1 addition & 1 deletion src/crawlee/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Configuration(BaseSettings):
Settings can also be configured via environment variables, prefixed with `CRAWLEE_`.
"""

model_config = SettingsConfigDict(populate_by_name=True)
model_config = SettingsConfigDict(validate_by_name=True, validate_by_alias=True)
Comment thread
Pijukatel marked this conversation as resolved.

internal_timeout: Annotated[timedelta | None, Field(alias='crawlee_internal_timeout')] = None
"""Timeout for the internal asynchronous operations."""
Expand Down
47 changes: 35 additions & 12 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from crawlee._autoscaling import AutoscaledPool, Snapshotter, SystemStatus
from crawlee._log_config import configure_logger, get_configured_log_level, string_to_log_level
from crawlee._request import Request, RequestOptions, RequestState
from crawlee._service_locator import ServiceLocator
from crawlee._types import (
BasicCrawlingContext,
EnqueueLinksKwargs,
Expand Down Expand Up @@ -346,14 +347,23 @@ def __init__(
_logger: A logger instance, typically provided by a subclass, for consistent logging labels.
Intended for use by subclasses rather than direct instantiation of `BasicCrawler`.
"""
if configuration:
service_locator.set_configuration(configuration)
if storage_client:
service_locator.set_storage_client(storage_client)
if event_manager:
service_locator.set_event_manager(event_manager)
if not configuration:
configuration = service_locator.get_configuration()

config = service_locator.get_configuration()
if not storage_client:
storage_client = service_locator.get_storage_client()

if not event_manager:
# This is weird if someone passes configuration and its event manager related stuff gets ignored as the
# event manager will be used from service_locator. Maybe keep the was created flag for it? It does not have
# the use cases like the storages
Comment thread
vdusek marked this conversation as resolved.
Outdated
event_manager = service_locator.get_event_manager()

self._service_locator = ServiceLocator(
configuration=configuration, storage_client=storage_client, event_manager=event_manager
)

config = self._service_locator.get_configuration()

# Core components
self._request_manager = request_manager
Expand Down Expand Up @@ -548,7 +558,10 @@ async def _get_proxy_info(self, request: Request, session: Session | None) -> Pr
async def get_request_manager(self) -> RequestManager:
"""Return the configured request manager. If none is configured, open and return the default request queue."""
if not self._request_manager:
self._request_manager = await RequestQueue.open()
self._request_manager = await RequestQueue.open(
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)

return self._request_manager

Expand All @@ -559,7 +572,12 @@ async def get_dataset(
name: str | None = None,
) -> Dataset:
"""Return the `Dataset` with the given ID or name. If none is provided, return the default one."""
return await Dataset.open(id=id, name=name)
return await Dataset.open(
id=id,
name=name,
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
Comment thread
vdusek marked this conversation as resolved.
)

async def get_key_value_store(
self,
Expand All @@ -568,7 +586,12 @@ async def get_key_value_store(
name: str | None = None,
) -> KeyValueStore:
"""Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS."""
return await KeyValueStore.open(id=id, name=name)
return await KeyValueStore.open(
id=id,
name=name,
storage_client=self._service_locator.get_storage_client(),
configuration=self._service_locator.get_configuration(),
)

def error_handler(
self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext]
Expand Down Expand Up @@ -684,7 +707,7 @@ def sigint_handler() -> None:
return final_statistics

async def _run_crawler(self) -> None:
event_manager = service_locator.get_event_manager()
event_manager = self._service_locator.get_event_manager()

self._crawler_state_rec_task.start()

Expand Down Expand Up @@ -1520,7 +1543,7 @@ def _log_status_message(self, message: str, level: LogLevel = 'DEBUG') -> None:

async def _crawler_state_task(self) -> None:
"""Emit a persist state event with the given migration status."""
event_manager = service_locator.get_event_manager()
event_manager = self._service_locator.get_event_manager()

current_state = self.statistics.state

Expand Down
9 changes: 9 additions & 0 deletions src/crawlee/storage_clients/_base/_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from crawlee._utils.docs import docs_group

if TYPE_CHECKING:
from collections.abc import Hashable

from crawlee.configuration import Configuration

from ._dataset_client import DatasetClient
Expand All @@ -28,6 +30,13 @@ class StorageClient(ABC):
(where applicable), and consistent access patterns across all storage types it supports.
"""

def get_additional_cache_key(self, configuration: Configuration) -> Hashable: # noqa: ARG002
Comment thread
vdusek marked this conversation as resolved.
"""Return a cache key that can differentiate between different storages of this client.

Can be based on configuration or on the client itself. By default, returns an empty string.
"""
return ''

@abstractmethod
async def create_dataset_client(
self,
Expand Down
10 changes: 10 additions & 0 deletions src/crawlee/storage_clients/_file_system/_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from typing_extensions import override

from crawlee._utils.docs import docs_group
Expand All @@ -10,6 +12,9 @@
from ._key_value_store_client import FileSystemKeyValueStoreClient
from ._request_queue_client import FileSystemRequestQueueClient

if TYPE_CHECKING:
from collections.abc import Hashable


@docs_group('Storage clients')
class FileSystemStorageClient(StorageClient):
Expand All @@ -29,6 +34,11 @@ class FileSystemStorageClient(StorageClient):
Use it only when running a single crawler process at a time.
"""

@override
def get_additional_cache_key(self, configuration: Configuration) -> Hashable:
# Even different client instances should return same storage if the storage_dir is the same.
return configuration.storage_dir

@override
async def create_dataset_client(
self,
Expand Down
3 changes: 0 additions & 3 deletions src/crawlee/storages/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from crawlee._utils.docs import docs_group

if TYPE_CHECKING:
from crawlee.configuration import Configuration
from crawlee.storage_clients._base import StorageClient
from crawlee.storage_clients.models import DatasetMetadata, KeyValueStoreMetadata, RequestQueueMetadata

Expand Down Expand Up @@ -36,15 +35,13 @@ async def open(
*,
id: str | None = None,
name: str | None = None,
configuration: Configuration | None = None,
Comment thread
vdusek marked this conversation as resolved.
storage_client: StorageClient | None = None,
) -> Storage:
"""Open a storage, either restore existing or create a new one.

Args:
id: The storage ID.
name: The storage name.
configuration: Configuration object used during the storage creation or restoration process.
Comment thread
vdusek marked this conversation as resolved.
storage_client: Underlying storage client to use. If not provided, the default global storage client
from the service locator will be used.
"""
Expand Down
Loading
Loading