-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_apify_storages.py
More file actions
57 lines (45 loc) · 1.87 KB
/
test_apify_storages.py
File metadata and controls
57 lines (45 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import asyncio
import pytest
from crawlee import service_locator
from crawlee.storages import Dataset, KeyValueStore, RequestQueue
from apify import Configuration
from apify.storage_clients import ApifyStorageClient
@pytest.mark.parametrize(
'storage_type',
[Dataset, KeyValueStore, RequestQueue],
)
async def test_alias_concurrent_creation_local(
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
) -> None:
"""Test that storages created with same alias are created only once even when created concurrently."""
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(ApifyStorageClient())
tasks = [asyncio.create_task(storage_type.open(alias='test')) for _ in range(2)]
storages = await asyncio.gather(*tasks)
unique_storage_ids = {storage.id for storage in storages}
try:
# Only one aliased storage should be created.
assert len(unique_storage_ids) == 1
# Clean up
await storages[0].drop()
except AssertionError:
for storage in storages:
await storage.drop()
@pytest.mark.parametrize(
'storage_type',
[Dataset, KeyValueStore, RequestQueue],
)
async def test_unnamed_default_without_config(
storage_type: Dataset | KeyValueStore | RequestQueue, apify_token: str
) -> None:
"""Test that default Apify storage used locally is unnamed storage."""
service_locator.set_configuration(Configuration(token=apify_token))
service_locator.set_storage_client(ApifyStorageClient())
# Open storage and make sure it has no name and it has id
storage = await storage_type.open()
assert storage.name is None
assert storage.id
# Make sure the same instance is returned when opened again without name or alias
storage_again = await storage_type.open()
assert storage is storage_again
await storage.drop()