@@ -252,39 +252,45 @@ def get_storage_key(
252252 @classmethod
253253 async def register_aliases (cls , configuration : Configuration ) -> None :
254254 """Load alias mapping from configuration to the default kvs."""
255- if configuration .actor_storages is None :
256- return
257-
258- configuration_mapping = {}
259-
260- if configuration .default_dataset_id != configuration .actor_storages .datasets .get ('default' ):
261- logger .warning (
262- f'Conflicting default dataset ids: { configuration .default_dataset_id = } ,'
263- f" { configuration .actor_storages .datasets .get ('default' )= } "
264- )
265- additional_cache_key = hash_api_base_url_and_token (configuration )
266-
267- for mapping , storage_type in (
268- (configuration .actor_storages .key_value_stores , 'KeyValueStore' ),
269- (configuration .actor_storages .datasets , 'Dataset' ),
270- (configuration .actor_storages .request_queues , 'RequestQueue' ),
271- ):
272- for storage_alias , storage_id in mapping .items ():
273- configuration_mapping [
274- cls .get_storage_key (
275- storage_type ,
276- '__default__' if storage_alias == 'default' else storage_alias ,
277- additional_cache_key ,
278- )
279- ] = storage_id
280-
281- # Bulk update the mapping in the default KVS with the configuration mapping.
282- client = await cls ._get_default_kvs_client (configuration = configuration )
283- record = await client .get_record (cls ._ALIAS_MAPPING_KEY )
284- existing_mapping = record .get ('value' , {}) if record else {}
285-
286- # Update the existing mapping with the configuration mapping.
287- existing_mapping .update (configuration_mapping )
288- # Store the updated mapping back in the KVS and in memory.
289- await client .set_record (cls ._ALIAS_MAPPING_KEY , existing_mapping )
290- cls ._alias_map .update (existing_mapping )
255+ async with await cls ._get_alias_init_lock ():
256+ # Skip if no mapping or just default storages
257+ if configuration .actor_storages is None or set (
258+ configuration .actor_storages .datasets .keys ()
259+ | configuration .actor_storages .key_value_stores .keys ()
260+ | configuration .actor_storages .request_queues .keys ()
261+ ) == {'default' }:
262+ return
263+
264+ configuration_mapping = {}
265+
266+ if configuration .default_dataset_id != configuration .actor_storages .datasets .get ('default' ):
267+ logger .warning (
268+ f'Conflicting default dataset ids: { configuration .default_dataset_id = } ,'
269+ f" { configuration .actor_storages .datasets .get ('default' )= } "
270+ )
271+ additional_cache_key = hash_api_base_url_and_token (configuration )
272+
273+ for mapping , storage_type in (
274+ (configuration .actor_storages .key_value_stores , 'KeyValueStore' ),
275+ (configuration .actor_storages .datasets , 'Dataset' ),
276+ (configuration .actor_storages .request_queues , 'RequestQueue' ),
277+ ):
278+ for storage_alias , storage_id in mapping .items ():
279+ configuration_mapping [
280+ cls .get_storage_key (
281+ storage_type ,
282+ '__default__' if storage_alias == 'default' else storage_alias ,
283+ additional_cache_key ,
284+ )
285+ ] = storage_id
286+
287+ # Bulk update the mapping in the default KVS with the configuration mapping.
288+ client = await cls ._get_default_kvs_client (configuration = configuration )
289+ record = await client .get_record (cls ._ALIAS_MAPPING_KEY )
290+ existing_mapping = record .get ('value' , {}) if record else {}
291+
292+ # Update the existing mapping with the configuration mapping.
293+ existing_mapping .update (configuration_mapping )
294+ # Store the updated mapping back in the KVS and in memory.
295+ await client .set_record (cls ._ALIAS_MAPPING_KEY , existing_mapping )
296+ cls ._alias_map .update (existing_mapping )
0 commit comments