Skip to content

Commit 10986ac

Browse files
PijukatelvdusekCopilot
authored
feat: Support Actor schema storages with Alias mechanism (#797)
### Description - Update `Configuration` to include `actor_storages` that is loaded from `actor_storages_json` env variable. - Update `AliasResolver` to be able to resolve alias mapping from `Configuration`. ### Issues - Closes: #762 ### Testing - Added E2E test - Added unit tests - Manual Actor test ### Checklist - [x] CI passed --------- Co-authored-by: Vlada Dusek <v.dusek96@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent ac2a0b0 commit 10986ac

File tree

8 files changed

+168
-25
lines changed

8 files changed

+168
-25
lines changed

src/apify/_configuration.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing import Annotated, Any
99

1010
from pydantic import AliasChoices, BeforeValidator, Field, model_validator
11-
from typing_extensions import Self, deprecated
11+
from typing_extensions import Self, TypedDict, deprecated
1212

1313
from crawlee import service_locator
1414
from crawlee._utils.models import timedelta_ms
@@ -34,6 +34,43 @@ def _transform_to_list(value: Any) -> list[str] | None:
3434
return value if isinstance(value, list) else str(value).split(',')
3535

3636

37+
class ActorStorages(TypedDict):
38+
"""Mapping of storage aliases to their IDs, grouped by storage type.
39+
40+
Populated from the `ACTOR_STORAGES_JSON` env var that the Apify platform sets when an Actor declares
41+
named storages in its `actor.json` schema. Each key maps a user-defined alias (e.g. `'custom'`)
42+
to the platform-assigned storage ID.
43+
"""
44+
45+
key_value_stores: dict[str, str]
46+
datasets: dict[str, str]
47+
request_queues: dict[str, str]
48+
49+
50+
def _load_storage_keys(data: None | str | ActorStorages) -> ActorStorages | None:
51+
"""Parse the `ACTOR_STORAGES_JSON` value into a normalized `ActorStorages` dict.
52+
53+
The platform provides this as a JSON string with camelCase keys (`keyValueStores`, `requestQueues`, `datasets`).
54+
This validator deserializes the JSON when needed and normalizes the keys to snake_case, falling back to empty
55+
dicts for missing storage types.
56+
57+
Args:
58+
data: Raw value - `None` when the env var is unset, a JSON string from the env var, or an already-parsed
59+
`ActorStorages` dict when set programmatically.
60+
61+
Returns:
62+
Normalized storage mapping, or `None` if the input is `None`.
63+
"""
64+
if data is None:
65+
return None
66+
storage_mapping = json.loads(data) if isinstance(data, str) else data
67+
return {
68+
'key_value_stores': storage_mapping.get('keyValueStores', storage_mapping.get('key_value_stores', {})),
69+
'datasets': storage_mapping.get('datasets', storage_mapping.get('datasets', {})),
70+
'request_queues': storage_mapping.get('requestQueues', storage_mapping.get('request_queues', {})),
71+
}
72+
73+
3774
@docs_group('Configuration')
3875
class Configuration(CrawleeConfiguration):
3976
"""A class for specifying the configuration of an Actor.
@@ -446,6 +483,15 @@ class Configuration(CrawleeConfiguration):
446483
BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data or None),
447484
] = None
448485

486+
actor_storages: Annotated[
487+
ActorStorages | None,
488+
Field(
489+
alias='actor_storages_json',
490+
description='Mapping of storage aliases to their platform-assigned IDs.',
491+
),
492+
BeforeValidator(_load_storage_keys),
493+
] = None
494+
449495
@model_validator(mode='after')
450496
def disable_browser_sandbox_on_platform(self) -> Self:
451497
"""Disable the browser sandbox mode when running on the Apify platform.

src/apify/storage_clients/_apify/_alias_resolving.py

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
from asyncio import Lock
5+
from functools import cached_property
56
from logging import getLogger
67
from typing import TYPE_CHECKING, ClassVar, Literal, overload
78

@@ -139,7 +140,6 @@ def __init__(
139140
self._storage_type = storage_type
140141
self._alias = alias
141142
self._configuration = configuration
142-
self._additional_cache_key = hash_api_base_url_and_token(configuration)
143143

144144
async def __aenter__(self) -> AliasResolver:
145145
"""Context manager to prevent race condition in alias creation."""
@@ -183,15 +183,7 @@ async def _get_alias_map(cls, configuration: Configuration) -> dict[str, str]:
183183
default_kvs_client = await cls._get_default_kvs_client(configuration)
184184

185185
record = await default_kvs_client.get_record(cls._ALIAS_MAPPING_KEY)
186-
187-
# get_record can return {key: ..., value: ..., content_type: ...}
188-
if isinstance(record, dict):
189-
if 'value' in record and isinstance(record['value'], dict):
190-
cls._alias_map = record['value']
191-
else:
192-
cls._alias_map = record
193-
else:
194-
cls._alias_map = dict[str, str]()
186+
cls._alias_map = record.get('value', {}) if record else {}
195187

196188
return cls._alias_map
197189

@@ -201,6 +193,18 @@ async def resolve_id(self) -> str | None:
201193
Returns:
202194
Storage id if it exists, None otherwise.
203195
"""
196+
# First try to find the alias in the configuration mapping to avoid any API calls.
197+
# This mapping is maintained by the Apify platform and does not have to be maintained in the default KVS.
198+
if self._configuration.actor_storages and self._alias != 'default':
199+
storage_maps = {
200+
'Dataset': self._configuration.actor_storages['datasets'],
201+
'KeyValueStore': self._configuration.actor_storages['key_value_stores'],
202+
'RequestQueue': self._configuration.actor_storages['request_queues'],
203+
}
204+
if storage_id := storage_maps.get(self._storage_type, {}).get(self._alias):
205+
return storage_id
206+
207+
# Fallback to the mapping saved in the default KVS
204208
return (await self._get_alias_map(self._configuration)).get(self._storage_key, None)
205209

206210
async def store_mapping(self, storage_id: str) -> None:
@@ -220,30 +224,22 @@ async def store_mapping(self, storage_id: str) -> None:
220224

221225
try:
222226
record = await default_kvs_client.get_record(self._ALIAS_MAPPING_KEY)
223-
224-
# get_record can return {key: ..., value: ..., content_type: ...}
225-
if isinstance(record, dict) and 'value' in record:
226-
record = record['value']
227-
228-
# Update or create the record with the new alias mapping
229-
if isinstance(record, dict):
230-
record[self._storage_key] = storage_id
231-
else:
232-
record = {self._storage_key: storage_id}
227+
value = record.get('value', {}) if record else {}
228+
value[self._storage_key] = storage_id
233229

234230
# Store the mapping back in the KVS.
235-
await default_kvs_client.set_record(self._ALIAS_MAPPING_KEY, record)
231+
await default_kvs_client.set_record(key=self._ALIAS_MAPPING_KEY, value=value)
236232
except Exception as exc:
237233
logger.warning(f'Error storing alias mapping for {self._alias}: {exc}')
238234

239-
@property
235+
@cached_property
240236
def _storage_key(self) -> str:
241237
"""Get a unique storage key used for storing the alias in the mapping."""
242238
return self._ALIAS_STORAGE_KEY_SEPARATOR.join(
243239
[
244240
self._storage_type,
245241
self._alias,
246-
self._additional_cache_key,
242+
hash_api_base_url_and_token(self._configuration),
247243
]
248244
)
249245

tests/e2e/test_schema_storages/__init__.py

Whitespace-only changes.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"actorSpecification": 1,
3+
"version": "0.0",
4+
"storages": {
5+
"datasets": {
6+
"default": {
7+
"actorSpecification": 1,
8+
"fields": {
9+
"properties": {
10+
"id": { "type": "string" }
11+
}
12+
}
13+
},
14+
"custom": {
15+
"actorSpecification": 1,
16+
"fields": {
17+
"properties": {
18+
"id": { "type": "string" }
19+
}
20+
}
21+
}
22+
}
23+
}
24+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from apify import Actor
2+
3+
4+
async def main() -> None:
5+
async with Actor:
6+
assert Actor.configuration.actor_storages
7+
dataset = await Actor.open_dataset(alias='custom')
8+
expected_id = Actor.configuration.actor_storages['datasets']['custom']
9+
assert dataset.id == expected_id
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
from typing import TYPE_CHECKING
5+
6+
if TYPE_CHECKING:
7+
from ..conftest import MakeActorFunction, RunActorFunction
8+
9+
_ACTOR_SOURCE_DIR = Path(__file__).parent / 'actor_source'
10+
11+
12+
def read_actor_source(filename: str) -> str:
13+
return (_ACTOR_SOURCE_DIR / filename).read_text()
14+
15+
16+
async def test_configuration_storages(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
17+
actor = await make_actor(
18+
label='schema_storages',
19+
source_files={
20+
'src/main.py': read_actor_source('main.py'),
21+
'.actor/actor.json': read_actor_source('actor.json'),
22+
},
23+
)
24+
run_result = await run_actor(actor)
25+
26+
assert run_result.status == 'SUCCEEDED'

tests/unit/actor/test_configuration.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from pathlib import Path
23

34
import pytest
@@ -286,7 +287,6 @@ def test_max_total_charge_usd_decimal_parsing(monkeypatch: pytest.MonkeyPatch) -
286287

287288
def test_actor_pricing_info_from_json_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
288289
"""Test that actor_pricing_info is parsed from JSON env var."""
289-
import json
290290

291291
pricing_json = json.dumps(
292292
{
@@ -300,3 +300,24 @@ def test_actor_pricing_info_from_json_env_var(monkeypatch: pytest.MonkeyPatch) -
300300
config = ApifyConfiguration()
301301
assert config.actor_pricing_info is not None
302302
assert config.actor_pricing_info.pricing_model == 'PAY_PER_EVENT'
303+
304+
305+
def test_actor_storage_json_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
306+
"""Test that actor_storages_json is parsed from JSON env var."""
307+
datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
308+
request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_request_queue_id'}
309+
key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_key_value_store_id'}
310+
311+
actor_storages_json = json.dumps(
312+
{
313+
'datasets': datasets,
314+
'requestQueues': request_queues,
315+
'keyValueStores': key_value_stores,
316+
}
317+
)
318+
monkeypatch.setenv('ACTOR_STORAGES_JSON', actor_storages_json)
319+
config = ApifyConfiguration()
320+
assert config.actor_storages
321+
assert config.actor_storages['datasets'] == datasets
322+
assert config.actor_storages['request_queues'] == request_queues
323+
assert config.actor_storages['key_value_stores'] == key_value_stores

tests/unit/storage_clients/test_alias_resolver.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,24 @@ async def test_get_alias_map_returns_in_memory_map() -> None:
7676
AliasResolver._alias_map = {}
7777
result = await AliasResolver._get_alias_map(config)
7878
assert result == {}
79+
80+
81+
async def test_configuration_storages_alias_resolving() -> None:
82+
"""Test that `AliasResolver` correctly resolves ids of storages registered in Configuration."""
83+
84+
# Actor storages
85+
datasets = {'default': 'default_dataset_id', 'custom': 'custom_Dataset_id'}
86+
request_queues = {'default': 'default_request_queue_id', 'custom': 'custom_RequestQueue_id'}
87+
key_value_stores = {'default': 'default_key_value_store_id', 'custom': 'custom_KeyValueStore_id'}
88+
89+
# Set up the configuration with the storage mapping
90+
configuration = Configuration(
91+
actor_storages={'datasets': datasets, 'request_queues': request_queues, 'key_value_stores': key_value_stores}
92+
)
93+
94+
# Check that id of each non-default storage saved in the mapping is resolved
95+
for storage_type in ('Dataset', 'KeyValueStore', 'RequestQueue'):
96+
assert (
97+
await AliasResolver(storage_type=storage_type, alias='custom', configuration=configuration).resolve_id()
98+
== f'custom_{storage_type}_id'
99+
)

0 commit comments

Comments
 (0)