Skip to content

Commit 851042f

Browse files
authored
fix: storage manager & purging the defaults (#150)
### Description - Auto-purging of default storages was already implemented. I just checked it how/if it works, how it works with more instances of storages, and updated the StorageManager to be singleton-like. ### Related issues - #87 ### Testing - Covered by unit-tests
1 parent 8b4797b commit 851042f

10 files changed

Lines changed: 55 additions & 35 deletions

File tree

src/crawlee/memory_storage_client/memory_storage_client.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import contextlib
55
import os
6+
from logging import getLogger
67
from pathlib import Path
78

89
import aioshutil
@@ -19,6 +20,8 @@
1920
from crawlee.memory_storage_client.request_queue_client import RequestQueueClient
2021
from crawlee.memory_storage_client.request_queue_collection_client import RequestQueueCollectionClient
2122

23+
logger = getLogger(__name__)
24+
2225

2326
class MemoryStorageClient(BaseStorageClient):
2427
"""Represents an in-memory storage client for managing datasets, key-value stores, and request queues.
@@ -47,6 +50,7 @@ def __init__(self, configuration: Configuration | None = None) -> None:
4750
self.datasets_handled: list[DatasetClient] = []
4851
self.key_value_stores_handled: list[KeyValueStoreClient] = []
4952
self.request_queues_handled: list[RequestQueueClient] = []
53+
5054
self._purged_on_start = False # Indicates whether a purge was already performed on this instance.
5155
self._purge_lock = asyncio.Lock()
5256

@@ -134,6 +138,7 @@ def request_queues(self) -> RequestQueueCollectionClient:
134138
async def purge_on_start(self) -> None:
135139
# Optimistic, non-blocking check
136140
if self._purged_on_start is True:
141+
logger.debug('Storage was already purged on start.')
137142
return
138143

139144
async with self._purge_lock:
@@ -142,10 +147,10 @@ async def purge_on_start(self) -> None:
142147
# Mypy doesn't understand that the _purged_on_start can change while we're getting the async lock
143148
return # type: ignore[unreachable]
144149

145-
await self._purge_inner()
150+
await self._purge_default_storages()
146151
self._purged_on_start = True
147152

148-
async def _purge_inner(self) -> None:
153+
async def _purge_default_storages(self) -> None:
149154
"""Cleans up the storage directories, preparing the environment for a new run.
150155
151156
It aims to remove residues from previous executions to avoid data contamination between runs.
@@ -163,21 +168,23 @@ async def _purge_inner(self) -> None:
163168
self._TEMPORARY_DIR_NAME
164169
) or key_value_store_folder.name.startswith('__OLD'):
165170
await self._batch_remove_files(key_value_store_folder.path)
166-
elif key_value_store_folder.name == 'default':
171+
elif key_value_store_folder.name == self.default_storage_id:
167172
await self._handle_default_key_value_store(key_value_store_folder.path)
168173

169174
# Datasets
170175
if await ospath.exists(self.datasets_directory):
171176
dataset_folders = await scandir(self.datasets_directory)
172177
for dataset_folder in dataset_folders:
173-
if dataset_folder.name == 'default' or dataset_folder.name.startswith(self._TEMPORARY_DIR_NAME):
178+
if dataset_folder.name == self.default_storage_id or dataset_folder.name.startswith(
179+
self._TEMPORARY_DIR_NAME
180+
):
174181
await self._batch_remove_files(dataset_folder.path)
175182

176183
# Request queues
177184
if await ospath.exists(self.request_queues_directory):
178185
request_queue_folders = await scandir(self.request_queues_directory)
179186
for request_queue_folder in request_queue_folders:
180-
if request_queue_folder.name == 'default' or request_queue_folder.name.startswith(
187+
if request_queue_folder.name == self.default_storage_id or request_queue_folder.name.startswith(
181188
self._TEMPORARY_DIR_NAME
182189
):
183190
await self._batch_remove_files(request_queue_folder.path)

src/crawlee/storage_client_manager.py

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,11 @@
1111
class StorageClientManager:
1212
"""A class for managing storage clients."""
1313

14-
def __init__(
15-
self,
16-
*,
17-
local_client: BaseStorageClient | None = None,
18-
cloud_client: BaseStorageClient | None = None,
19-
) -> None:
20-
"""Create a new instance.
14+
_local_client: BaseStorageClient = MemoryStorageClient()
15+
_cloud_client: BaseStorageClient | None = None
2116

22-
Args:
23-
local_client: The storage client to be used in the local environment.
24-
cloud_client: The storage client to be used in the cloud environment.
25-
"""
26-
self._local_client = local_client or MemoryStorageClient()
27-
self._cloud_client = cloud_client
28-
29-
def get_storage_client(self, *, in_cloud: bool = False) -> BaseStorageClient:
17+
@classmethod
18+
def get_storage_client(cls, *, in_cloud: bool = False) -> BaseStorageClient:
3019
"""Get the storage client instance for the current environment.
3120
3221
Args:
@@ -36,8 +25,26 @@ def get_storage_client(self, *, in_cloud: bool = False) -> BaseStorageClient:
3625
The current storage client instance.
3726
"""
3827
if in_cloud:
39-
if self._cloud_client is None:
28+
if cls._cloud_client is None:
4029
raise RuntimeError('Running in cloud environment, but cloud client was not provided.')
41-
return self._cloud_client
30+
return cls._cloud_client
4231

43-
return self._local_client
32+
return cls._local_client
33+
34+
@classmethod
35+
def set_cloud_client(cls, cloud_client: BaseStorageClient) -> None:
36+
"""Set the cloud storage client instance.
37+
38+
Args:
39+
cloud_client: The cloud storage client instance.
40+
"""
41+
cls._cloud_client = cloud_client
42+
43+
@classmethod
44+
def set_local_client(cls, local_client: BaseStorageClient) -> None:
45+
"""Set the local storage client instance.
46+
47+
Args:
48+
local_client: The local storage client instance.
49+
"""
50+
cls._local_client = local_client

src/crawlee/storages/_creation_management.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def open_storage(
149149
) -> Dataset | KeyValueStore | RequestQueue:
150150
"""Open a either a new key-value store or restore existing one and return it."""
151151
configuration = configuration or Configuration()
152-
storage_client = StorageClientManager().get_storage_client(in_cloud=configuration.in_cloud)
152+
storage_client = StorageClientManager.get_storage_client(in_cloud=configuration.in_cloud)
153153

154154
# Try to restore the storage from cache by ID
155155
if name:

src/crawlee/storages/base_storage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def name(self) -> str | None:
2727
@abstractmethod
2828
async def open(
2929
cls,
30+
*,
3031
id: str | None = None,
3132
name: str | None = None,
3233
configuration: Configuration | None = None,

src/crawlee/storages/dataset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ def name(self) -> str | None:
8080
@classmethod
8181
async def open(
8282
cls,
83+
*,
8384
id: str | None = None,
8485
name: str | None = None,
8586
configuration: Configuration | None = None,

src/crawlee/storages/key_value_store.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def name(self) -> str | None:
6868
@classmethod
6969
async def open(
7070
cls,
71+
*,
7172
id: str | None = None,
7273
name: str | None = None,
7374
configuration: Configuration | None = None,

src/crawlee/storages/request_queue.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def name(self) -> str | None:
113113
@classmethod
114114
async def open(
115115
cls,
116+
*,
116117
id: str | None = None,
117118
name: str | None = None,
118119
configuration: Configuration | None = None,

tests/unit/conftest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from crawlee.configuration import Configuration
88
from crawlee.memory_storage_client import MemoryStorageClient
9+
from crawlee.storage_client_manager import StorageClientManager
910
from crawlee.storages import _creation_management
1011

1112
if TYPE_CHECKING:
@@ -15,6 +16,9 @@
1516
@pytest.fixture()
1617
def reset_default_instances(monkeypatch: pytest.MonkeyPatch) -> Callable[[], None]:
1718
def reset() -> None:
19+
StorageClientManager._local_client = MemoryStorageClient()
20+
StorageClientManager._cloud_client = None
21+
1822
monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {})
1923
monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {})
2024
monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {})

tests/unit/memory_storage_client/test_memory_storage_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ async def test_purge_datasets(tmp_path: Path) -> None:
132132
assert default_dataset_info.name in folders_before_purge
133133
assert non_default_dataset_info.name in folders_before_purge
134134

135-
await ms._purge_inner()
135+
await ms._purge_default_storages()
136136
folders_after_purge = os.listdir(ms.datasets_directory)
137137
assert default_dataset_info.name not in folders_after_purge
138138
assert non_default_dataset_info.name in folders_after_purge
@@ -164,7 +164,7 @@ async def test_purge_key_value_stores(tmp_path: Path) -> None:
164164
assert 'INPUT.json' in default_folder_files_before_purge
165165
assert 'test.json' in default_folder_files_before_purge
166166

167-
await ms._purge_inner()
167+
await ms._purge_default_storages()
168168
folders_after_purge = os.listdir(ms.key_value_stores_directory)
169169
assert default_kvs_info.name in folders_after_purge
170170
assert non_default_kvs_info.name in folders_after_purge
@@ -189,7 +189,7 @@ async def test_purge_request_queues(tmp_path: Path) -> None:
189189
folders_before_purge = os.listdir(ms.request_queues_directory)
190190
assert default_rq_info.name in folders_before_purge
191191
assert non_default_rq_info.name in folders_before_purge
192-
await ms._purge_inner()
192+
await ms._purge_default_storages()
193193
folders_after_purge = os.listdir(ms.request_queues_directory)
194194
assert default_rq_info.name not in folders_after_purge
195195
assert non_default_rq_info.name in folders_after_purge

tests/unit/test_storage_client_manager.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,24 @@
1010

1111

1212
def test_returns_memory_storage_client_as_default() -> None:
13-
manager = StorageClientManager()
14-
storage_client = manager.get_storage_client()
13+
storage_client = StorageClientManager.get_storage_client()
1514
assert isinstance(storage_client, MemoryStorageClient), 'Should return the memory storage client by default'
1615

1716

1817
def test_returns_provided_local_client_for_non_cloud_environment() -> None:
1918
local_client = Mock(spec=BaseStorageClient)
20-
manager = StorageClientManager(local_client=local_client)
21-
storage_client = manager.get_storage_client()
19+
StorageClientManager.set_local_client(local_client)
20+
storage_client = StorageClientManager.get_storage_client()
2221
assert storage_client == local_client, 'Should return the local client when not in cloud'
2322

2423

2524
def test_returns_provided_cloud_client_for_cloud_environment() -> None:
2625
cloud_client = Mock(spec=BaseStorageClient)
27-
manager = StorageClientManager(cloud_client=cloud_client)
28-
storage_client = manager.get_storage_client(in_cloud=True)
26+
StorageClientManager.set_cloud_client(cloud_client)
27+
storage_client = StorageClientManager.get_storage_client(in_cloud=True)
2928
assert storage_client == cloud_client, 'Should return the cloud client when in cloud'
3029

3130

3231
def test_raises_error_when_no_cloud_client_provided() -> None:
33-
manager = StorageClientManager()
3432
with pytest.raises(RuntimeError, match='cloud client was not provided'):
35-
manager.get_storage_client(in_cloud=True)
33+
StorageClientManager.get_storage_client(in_cloud=True)

0 commit comments

Comments
 (0)