Skip to content

Commit a996dca

Browse files
committed
Simplify and avoid unnecessary API calls
1 parent a7e645f commit a996dca

File tree

6 files changed

+92
-118
lines changed

6 files changed

+92
-118
lines changed

src/apify/_actor.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
from apify.events import ApifyEventManager, EventManager, LocalEventManager
3737
from apify.log import _configure_logging, logger
3838
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
39-
from apify.storage_clients._apify._alias_resolving import AliasResolver
4039
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
4140
from apify.storages import Dataset, KeyValueStore, RequestQueue
4241

@@ -204,10 +203,6 @@ async def __aenter__(self) -> Self:
204203
if not Actor.is_at_home():
205204
# Make sure that the input related KVS is initialized to ensure that the input aware client is used
206205
await self.open_key_value_store()
207-
else:
208-
# Load pre-existing non-default aliased storages from configuration
209-
# Supported only on the Apify platform, where those storages are pre-created by the platform.
210-
await AliasResolver.register_aliases(configuration=self.configuration)
211206
return self
212207

213208
async def __aexit__(

src/apify/storage_clients/_apify/_alias_resolving.py

Lines changed: 21 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from logging import getLogger
66
from typing import TYPE_CHECKING, ClassVar, Literal, overload
77

8+
from propcache import cached_property
9+
810
from apify_client import ApifyClientAsync
911

1012
from ._utils import hash_api_base_url_and_token
@@ -136,11 +138,9 @@ def __init__(
136138
alias: str,
137139
configuration: Configuration,
138140
) -> None:
141+
self._storage_type = storage_type
139142
self._alias = alias
140143
self._configuration = configuration
141-
self._storage_key = self.get_storage_key(
142-
storage_type=storage_type, alias=alias, additional_cache_key=hash_api_base_url_and_token(configuration)
143-
)
144144

145145
async def __aenter__(self) -> AliasResolver:
146146
"""Context manager to prevent race condition in alias creation."""
@@ -194,6 +194,18 @@ async def resolve_id(self) -> str | None:
194194
Returns:
195195
Storage id if it exists, None otherwise.
196196
"""
197+
# First try to find the alias in the configuration mapping to avoid any API calls.
198+
# This mapping is maintained by the Apify platform and does not have to be maintained in the default KVS.
199+
if self._configuration.actor_storages and self._alias != 'default':
200+
storage_maps = {
201+
'Dataset': self._configuration.actor_storages.datasets,
202+
'KeyValueStore': self._configuration.actor_storages.key_value_stores,
203+
'RequestQueue': self._configuration.actor_storages.request_queues,
204+
}
205+
if storage_id := storage_maps.get(self._storage_type, {}).get(self._alias):
206+
return storage_id
207+
208+
# Fallback to the mapping saved in the default KVS
197209
return (await self._get_alias_map(self._configuration)).get(self._storage_key, None)
198210

199211
async def store_mapping(self, storage_id: str) -> None:
@@ -237,60 +249,12 @@ async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStore
237249

238250
return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id)
239251

240-
@classmethod
241-
def get_storage_key(
242-
cls, storage_type: Literal['Dataset', 'KeyValueStore', 'RequestQueue'], alias: str, additional_cache_key: str
243-
) -> str:
244-
return cls._ALIAS_STORAGE_KEY_SEPARATOR.join(
252+
@cached_property
253+
def _storage_key(self) -> str:
254+
return self._ALIAS_STORAGE_KEY_SEPARATOR.join(
245255
[
246-
storage_type,
247-
alias,
248-
additional_cache_key,
256+
self._storage_type,
257+
self._alias,
258+
hash_api_base_url_and_token(self._configuration),
249259
]
250260
)
251-
252-
@classmethod
253-
async def register_aliases(cls, configuration: Configuration) -> None:
254-
"""Load alias mapping from configuration to the default kvs."""
255-
async with await cls._get_alias_init_lock():
256-
# Skip if no mapping or just default storages
257-
if configuration.actor_storages is None or set(
258-
configuration.actor_storages.datasets.keys()
259-
| configuration.actor_storages.key_value_stores.keys()
260-
| configuration.actor_storages.request_queues.keys()
261-
) == {'default'}:
262-
return
263-
264-
configuration_mapping = {}
265-
266-
if configuration.default_dataset_id != configuration.actor_storages.datasets.get('default'):
267-
logger.warning(
268-
f'Conflicting default dataset ids: {configuration.default_dataset_id=},'
269-
f" {configuration.actor_storages.datasets.get('default')=}"
270-
)
271-
additional_cache_key = hash_api_base_url_and_token(configuration)
272-
273-
for mapping, storage_type in (
274-
(configuration.actor_storages.key_value_stores, 'KeyValueStore'),
275-
(configuration.actor_storages.datasets, 'Dataset'),
276-
(configuration.actor_storages.request_queues, 'RequestQueue'),
277-
):
278-
for storage_alias, storage_id in mapping.items():
279-
configuration_mapping[
280-
cls.get_storage_key(
281-
storage_type,
282-
'__default__' if storage_alias == 'default' else storage_alias,
283-
additional_cache_key,
284-
)
285-
] = storage_id
286-
287-
# Bulk update the mapping in the default KVS with the configuration mapping.
288-
client = await cls._get_default_kvs_client(configuration=configuration)
289-
record = await client.get_record(cls._ALIAS_MAPPING_KEY)
290-
existing_mapping = record.get('value', {}) if record else {}
291-
292-
# Update the existing mapping with the configuration mapping.
293-
existing_mapping.update(configuration_mapping)
294-
# Store the updated mapping back in the KVS and in memory.
295-
await client.set_record(cls._ALIAS_MAPPING_KEY, existing_mapping)
296-
cls._alias_map.update(existing_mapping)

tests/e2e/test_schema_storages/actor_source/actor.json

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,43 @@
1111
}
1212
}
1313
},
14-
"custom": {
14+
"custom_d": {
15+
"actorSpecification": 1,
16+
"fields": {
17+
"properties": {
18+
"id": { "type": "string" }
19+
}
20+
}
21+
}
22+
},
23+
"key_value_stores": {
24+
"default": {
25+
"actorSpecification": 1,
26+
"fields": {
27+
"properties": {
28+
"id": { "type": "string" }
29+
}
30+
}
31+
},
32+
"custom_kvs": {
33+
"actorSpecification": 1,
34+
"fields": {
35+
"properties": {
36+
"id": { "type": "string" }
37+
}
38+
}
39+
}
40+
},
41+
"request_queues": {
42+
"default": {
43+
"actorSpecification": 1,
44+
"fields": {
45+
"properties": {
46+
"id": { "type": "string" }
47+
}
48+
}
49+
},
50+
"custom_rq": {
1551
"actorSpecification": 1,
1652
"fields": {
1753
"properties": {

tests/e2e/test_schema_storages/actor_source/main.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,12 @@
44
async def main() -> None:
55
async with Actor:
66
assert Actor.configuration.actor_storages
7-
assert (await Actor.open_dataset(alias='custom')).id == Actor.configuration.actor_storages.datasets['custom']
7+
assert (await Actor.open_dataset(alias='custom_d')).id == Actor.configuration.actor_storages.datasets[
8+
'custom_d'
9+
]
10+
assert (await Actor.open_dataset(alias='custom_kvs')).id == Actor.configuration.actor_storages.datasets[
11+
'custom_kvs'
12+
]
13+
assert (await Actor.open_dataset(alias='custom_rq')).id == Actor.configuration.actor_storages.datasets[
14+
'custom_rq'
15+
]

tests/integration/test_storages.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
from __future__ import annotations
22

33
import asyncio
4-
from typing import cast
54

65
import pytest
76

87
from crawlee import service_locator
98
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
109

1110
from apify import Actor, Configuration
12-
from apify._configuration import ActorStorages
1311
from apify.storage_clients import ApifyStorageClient, MemoryStorageClient, SmartApifyStorageClient
14-
from apify.storage_clients._apify._alias_resolving import AliasResolver
1512

1613

1714
@pytest.mark.parametrize(
@@ -128,53 +125,3 @@ async def test_actor_implicit_storage_init(apify_token: str) -> None:
128125
assert await Actor.open_dataset() is not await Actor.open_dataset(force_cloud=True)
129126
assert await Actor.open_key_value_store() is not await Actor.open_key_value_store(force_cloud=True)
130127
assert await Actor.open_request_queue() is not await Actor.open_request_queue(force_cloud=True)
131-
132-
133-
async def test_actor_storages_alias_resolving(apify_token: str) -> None:
134-
"""Test that `AliasResolver.register_aliases` correctly updates default KVS with Actor storages."""
135-
136-
# Actor storages
137-
datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
138-
request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_request_queue_id'}
139-
key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_key_value_store_id'}
140-
141-
# Set up the configuration and storage client for the test
142-
configuration = Configuration(
143-
default_key_value_store_id='default_kvs_id',
144-
token=apify_token,
145-
actor_storages=ActorStorages(
146-
datasets=datasets, request_queues=request_queues, key_value_stores=key_value_stores
147-
),
148-
)
149-
storage_client = ApifyStorageClient()
150-
service_locator.set_configuration(configuration)
151-
service_locator.set_storage_client(storage_client)
152-
153-
client_cache_key = cast('tuple', storage_client.get_storage_client_cache_key(configuration))[-1]
154-
# Add some unrelated pre-existing alias mapping (it should be preserved after registering aliases)
155-
pre_existing_mapping = {f'KeyValueStore,pre_existing_alias,{client_cache_key}': 'pre_existing_id'}
156-
157-
default_kvs = await KeyValueStore.open(configuration=configuration, storage_client=storage_client)
158-
await default_kvs.set_value(AliasResolver._ALIAS_MAPPING_KEY, pre_existing_mapping)
159-
160-
# Construct the expected mapping
161-
expected_mapping = {}
162-
for storage_type, storage_map in (
163-
('Dataset', datasets),
164-
('KeyValueStore', key_value_stores),
165-
('RequestQueue', request_queues),
166-
):
167-
for storage_alias, storage_id in storage_map.items():
168-
expected_mapping[
169-
','.join(
170-
(storage_type, '__default__' if storage_alias == 'default' else storage_alias, client_cache_key)
171-
)
172-
] = storage_id
173-
expected_mapping.update(pre_existing_mapping)
174-
175-
try:
176-
configuration.default_key_value_store_id = default_kvs.id
177-
await AliasResolver.register_aliases(configuration=configuration)
178-
assert await default_kvs.get_value(AliasResolver._ALIAS_MAPPING_KEY) == expected_mapping
179-
finally:
180-
await default_kvs.drop()

tests/unit/storage_clients/test_alias_resolver.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from apify._configuration import Configuration
3+
from apify._configuration import ActorStorages, Configuration
44
from apify.storage_clients._apify._alias_resolving import AliasResolver
55

66

@@ -76,3 +76,27 @@ async def test_get_alias_map_returns_in_memory_map() -> None:
7676
AliasResolver._alias_map = {}
7777
result = await AliasResolver._get_alias_map(config)
7878
assert result == {}
79+
80+
81+
async def test_actor_storages_alias_resolving() -> None:
82+
"""Test that `AliasResolver.register_aliases` correctly updates default KVS with Actor storages."""
83+
84+
# Actor storages
85+
datasets = {'default': 'default_dataset_id', 'custom': 'custom_Dataset_id'}
86+
request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_RequestQueue_id'}
87+
key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_KeyValueStore_id'}
88+
89+
# Set up the configuration and storage client for the test
90+
configuration = Configuration(
91+
default_key_value_store_id='default_kvs_id',
92+
actor_storages=ActorStorages(
93+
datasets=datasets, request_queues=request_queues, key_value_stores=key_value_stores
94+
),
95+
)
96+
97+
# Construct the expected mapping
98+
for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'):
99+
assert (
100+
await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id()
101+
== f'custom_{storage_type}_id'
102+
)

0 commit comments

Comments
 (0)