-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_storages.py
More file actions
136 lines (110 loc) · 5.85 KB
/
test_storages.py
File metadata and controls
136 lines (110 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations
import asyncio
import pytest
from crawlee import service_locator
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
from apify import Actor, Configuration
from apify.storage_clients import ApifyStorageClient, MemoryStorageClient, SmartApifyStorageClient
from apify.storage_clients._apify._alias_resolving import AliasResolver
@pytest.mark.parametrize(
'storage_type',
[Dataset, KeyValueStore, RequestQueue],
)
async def test_alias_concurrent_creation_local(
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
) -> None:
"""Test that storages created with same alias are created only once even when created concurrently."""
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(ApifyStorageClient())
tasks = [asyncio.create_task(storage_type.open(alias='test')) for _ in range(2)]
storages = await asyncio.gather(*tasks)
unique_storage_ids = {storage.id for storage in storages}
try:
# Only one aliased storage should be created.
assert len(unique_storage_ids) == 1
# Clean up
await storages[0].drop()
except AssertionError:
for storage in storages:
await storage.drop()
@pytest.mark.parametrize(
'storage_type',
[Dataset, KeyValueStore, RequestQueue],
)
async def test_unnamed_default_without_config(
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
) -> None:
"""Test that default Apify storage used locally is unnamed storage."""
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(ApifyStorageClient())
# Open storage and make sure it has no name and it has id
storage = await storage_type.open()
assert storage.name is None
assert storage.id
# Make sure the same instance is returned when opened again without name or alias
storage_again = await storage_type.open()
assert storage is storage_again
await storage.drop()
@pytest.mark.parametrize(
'storage_type',
[Dataset, KeyValueStore, RequestQueue],
)
async def test_aliases_not_stored_on_platform_when_local(
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
) -> None:
"""Test that default Apify storage used locally is not persisting aliases to Apify based default KVS."""
service_locator.set_configuration(Configuration(token=apify_token))
async with Actor(configure_logging=False):
await storage_type.open(alias='test')
default_kvs = await Actor.open_key_value_store(force_cloud=True)
# The default KVS should be empty
assert len(await default_kvs.list_keys()) == 0
async def test_actor_full_explicit_storage_init(apify_token: str) -> None:
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(
SmartApifyStorageClient(
local_storage_client=MemoryStorageClient(),
cloud_storage_client=ApifyStorageClient(request_queue_access='shared'),
)
)
async with Actor:
# If service locator was already set with SmartApifyStorageClient, the actor will use it.
# Storages should be different when force_cloud is used outside the Apify platform
assert await Actor.open_dataset() is not await Actor.open_dataset(force_cloud=True)
assert await Actor.open_key_value_store() is not await Actor.open_key_value_store(force_cloud=True)
assert await Actor.open_request_queue() is not await Actor.open_request_queue(force_cloud=True)
async def test_actor_full_explicit_storage_init_same_client(apify_token: str) -> None:
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(
SmartApifyStorageClient(
local_storage_client=ApifyStorageClient(request_queue_access='shared'),
cloud_storage_client=ApifyStorageClient(request_queue_access='shared'),
)
)
async with Actor:
# If service locator was already set with SmartApifyStorageClient, the actor will use it.
# Storages should be same as the equivalent storage client is for both local and cloud storage client
assert await Actor.open_dataset() is await Actor.open_dataset(force_cloud=True)
assert await Actor.open_key_value_store() is await Actor.open_key_value_store(force_cloud=True)
assert await Actor.open_request_queue() is await Actor.open_request_queue(force_cloud=True)
async def test_actor_partial_explicit_cloud_storage_init(apify_token: str) -> None:
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(ApifyStorageClient(request_queue_access='shared'))
with pytest.raises(
RuntimeError, match=r'^The storage client in the service locator has to be instance of SmartApifyStorageClient'
):
async with Actor:
# If service locator was explicitly set to something different than SmartApifyStorageClient, raise an error.
...
async def test_actor_implicit_storage_init(apify_token: str) -> None:
service_locator.set_configuration(Configuration(token=apify_token))
async with Actor:
assert await Actor.open_dataset() is not await Actor.open_dataset(force_cloud=True)
assert await Actor.open_key_value_store() is not await Actor.open_key_value_store(force_cloud=True)
assert await Actor.open_request_queue() is not await Actor.open_request_queue(force_cloud=True)
async def test_default_storage(apify_token: str) -> None:
service_locator.set_configuration(Configuration(token=apify_token))
async with Actor:
dataset_1 = await Actor.open_dataset(force_cloud=True)
dataset_2 = await Actor.open_dataset(force_cloud=True, alias=AliasResolver.default_storage_key)
assert dataset_1.id == dataset_2.id