11from __future__ import annotations
22
3+ from asyncio import Lock
34from typing import TYPE_CHECKING
45
56from typing_extensions import override
67
8+ from crawlee import service_locator
79from crawlee .storage_clients ._base import StorageClient
810
911from ._dataset_client import ApifyDatasetClient
1012from ._key_value_store_client import ApifyKeyValueStoreClient
1113from ._request_queue_client import ApifyRequestQueueClient
14+ from ._utils import _ALIAS_MAPPING_KEY , _Alias
15+ from apify ._configuration import Configuration
1216from apify ._configuration import Configuration as ApifyConfiguration
1317from apify ._utils import docs_group
1418
2226class ApifyStorageClient (StorageClient ):
2327 """Apify storage client."""
2428
29+ _alias_storages_initialized = False
30+ """Flag that indicates whether the pre-existing alias storages were already initialized."""
31+ _alias_init_lock : Lock | None = None
32+ """Lock for creating alias storages. Only one alias storage can be created at the time."""
33+
2534 # This class breaches Liskov Substitution Principle. It requires specialized Configuration compared to its parent.
2635 _lsp_violation_error_message_template = (
2736 'Expected "configuration" to be an instance of "apify.Configuration", but got {} instead.'
@@ -30,7 +39,9 @@ class ApifyStorageClient(StorageClient):
3039 @override
3140 def get_additional_cache_key (self , configuration : CrawleeConfiguration ) -> Hashable :
3241 if isinstance (configuration , ApifyConfiguration ):
33- return f'{ configuration .api_base_url } ,{ configuration .token } '
42+ if configuration .api_base_url is None or configuration .token is None :
43+ raise ValueError ("'Configuration.api_base_url' and 'Configuration.token' must be set." )
44+ return _Alias .get_additional_cache_key (configuration .api_base_url , configuration .token )
3445 raise TypeError (self ._lsp_violation_error_message_template .format (type (configuration ).__name__ ))
3546
3647 @override
@@ -44,6 +55,10 @@ async def create_dataset_client(
4455 ) -> ApifyDatasetClient :
4556 configuration = configuration or ApifyConfiguration .get_global_configuration ()
4657 if isinstance (configuration , ApifyConfiguration ):
58+ if alias :
59+ await self ._initialize_alias_storages ()
60+ async with self .get_alias_init_lock ():
61+ return await ApifyDatasetClient .open (id = id , name = name , alias = alias , configuration = configuration )
4762 return await ApifyDatasetClient .open (id = id , name = name , alias = alias , configuration = configuration )
4863
4964 raise TypeError (self ._lsp_violation_error_message_template .format (type (configuration ).__name__ ))
@@ -59,6 +74,12 @@ async def create_kvs_client(
5974 ) -> ApifyKeyValueStoreClient :
6075 configuration = configuration or ApifyConfiguration .get_global_configuration ()
6176 if isinstance (configuration , ApifyConfiguration ):
77+ if alias :
78+ await self ._initialize_alias_storages ()
79+ async with self .get_alias_init_lock ():
80+ return await ApifyKeyValueStoreClient .open (
81+ id = id , name = name , alias = alias , configuration = configuration
82+ )
6283 return await ApifyKeyValueStoreClient .open (id = id , name = name , alias = alias , configuration = configuration )
6384
6485 raise TypeError (self ._lsp_violation_error_message_template .format (type (configuration ).__name__ ))
@@ -74,6 +95,58 @@ async def create_rq_client(
7495 ) -> ApifyRequestQueueClient :
7596 configuration = configuration or ApifyConfiguration .get_global_configuration ()
7697 if isinstance (configuration , ApifyConfiguration ):
98+ if alias :
99+ await self ._initialize_alias_storages ()
100+ async with self .get_alias_init_lock ():
101+ return await ApifyRequestQueueClient .open (
102+ id = id , name = name , alias = alias , configuration = configuration
103+ )
77104 return await ApifyRequestQueueClient .open (id = id , name = name , alias = alias , configuration = configuration )
78105
79106 raise TypeError (self ._lsp_violation_error_message_template .format (type (configuration ).__name__ ))
107+
108+ @classmethod
109+ def get_alias_init_lock (cls ) -> Lock :
110+ if not cls ._alias_init_lock :
111+ cls ._alias_init_lock = Lock ()
112+ return cls ._alias_init_lock
113+
114+ @classmethod
115+ async def _initialize_alias_storages (cls ) -> None :
116+ """Initialize alias storages.
117+
118+ This method is called once to populate storage_instance_manager alias related cache. All existing alias
119+ storages are saved in storage_instance_manager cache. If the alias storage is not there, it does not exist yet.
120+ """
121+ async with cls .get_alias_init_lock ():
122+ if cls ._alias_storages_initialized :
123+ return
124+
125+ cache = service_locator .storage_instance_manager ._cache_by_storage_client [ApifyStorageClient ] # noqa: SLF001
126+
127+ default_kvs_client = await _Alias .get_default_kvs_client ()
128+
129+ record = await default_kvs_client .get_record (key = _ALIAS_MAPPING_KEY )
130+
131+ if record is not None and 'value' in record :
132+ # get_record can return {key: ..., value: ..., content_type: ...}
133+ alias_export_map = record ['value' ]
134+
135+ for export_key , storage_id in alias_export_map .value .items ():
136+ exported_alias = _Alias .from_exported_string (export_key )
137+
138+ # Re-create custom config used to open the storage
139+ custom_config = Configuration ()
140+ custom_config .api_base_url = exported_alias .api_url
141+ custom_config .token = exported_alias .token
142+
143+ # Populate the id cache by opening storage by id
144+ storage = await exported_alias .storage_type .open (
145+ id = storage_id , configuration = custom_config , storage_client = ApifyStorageClient ()
146+ )
147+ # Populate the alias cache as well
148+ cache .by_alias [exported_alias .storage_type ][exported_alias .alias ][
149+ exported_alias .additional_cache_key
150+ ] = storage
151+
152+ cls ._alias_storages_initialized = True
0 commit comments