diff --git a/pyproject.toml b/pyproject.toml index 2729f77702..44df3850ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ "async-timeout>=5.0.1", "cachetools>=5.5.0", "colorama>=0.4.0", + "crawlee-storage", "impit>=0.8.0", "more-itertools>=10.2.0", "protego>=0.5.0", @@ -333,3 +334,6 @@ cwd = "website" [tool.poe.tasks.run-docs] shell = "./build_api_reference.sh && corepack enable && yarn && yarn start" cwd = "website" + +[tool.uv.sources] +crawlee-storage = { git = "https://github.com/apify/crawlee-storage.git", subdirectory = "crawlee-storage-python" } diff --git a/src/crawlee/storage_clients/_file_system/_dataset_client.py b/src/crawlee/storage_clients/_file_system/_dataset_client.py index b970a98928..0e40983538 100644 --- a/src/crawlee/storage_clients/_file_system/_dataset_client.py +++ b/src/crawlee/storage_clients/_file_system/_dataset_client.py @@ -1,25 +1,17 @@ from __future__ import annotations -import asyncio -import json -import shutil -from datetime import datetime, timezone from logging import getLogger -from pathlib import Path from typing import TYPE_CHECKING, Any -from pydantic import ValidationError +from crawlee_storage import FileSystemDatasetClient as NativeDatasetClient from typing_extensions import Self, override -from crawlee._consts import METADATA_FILENAME -from crawlee._utils.crypto import crypto_random_object_id -from crawlee._utils.file import atomic_write, json_dumps -from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata if TYPE_CHECKING: from collections.abc import AsyncIterator + from pathlib import Path from crawlee.configuration import Configuration @@ -42,49 +34,35 @@ class FileSystemDatasetClient(DatasetClient): This implementation is ideal for long-running crawlers where data persistence is important, and for development environments where you want to easily inspect the collected data between runs. - """ - - _STORAGE_SUBDIR = 'datasets' - """The name of the subdirectory where datasets are stored.""" - _STORAGE_SUBSUBDIR_DEFAULT = 'default' - """The name of the subdirectory for the default dataset.""" - - _ITEM_FILENAME_DIGITS = 9 - """Number of digits used for the dataset item file names (e.g., 000000019.json).""" + Backed by the native ``crawlee_storage`` Rust extension for performance. + """ def __init__( self, *, - metadata: DatasetMetadata, - path_to_dataset: Path, - lock: asyncio.Lock, + native_client: NativeDatasetClient, ) -> None: """Initialize a new instance. Preferably use the `FileSystemDatasetClient.open` class method to create a new instance. """ - self._metadata = metadata - - self._path_to_dataset = path_to_dataset - """The full path to the dataset directory.""" - - self._lock = lock - """A lock to ensure that only one operation is performed at a time.""" - - @override - async def get_metadata(self) -> DatasetMetadata: - return self._metadata + self._native_client = native_client @property def path_to_dataset(self) -> Path: """The full path to the dataset directory.""" - return self._path_to_dataset + return self._native_client.path_to_dataset @property def path_to_metadata(self) -> Path: """The full path to the dataset metadata file.""" - return self.path_to_dataset / METADATA_FILENAME + return self._native_client.path_to_metadata + + @override + async def get_metadata(self) -> DatasetMetadata: + raw = await self._native_client.get_metadata() + return DatasetMetadata(**raw) @classmethod async def open( @@ -114,129 +92,26 @@ async def open( ValueError: If a dataset with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate input parameters. - raise_if_too_many_kwargs(id=id, name=name, alias=alias) - - dataset_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR - - if not dataset_base_path.exists(): - await asyncio.to_thread(dataset_base_path.mkdir, parents=True, exist_ok=True) - - # Get a new instance by ID. - if id: - found = False - for dataset_dir in dataset_base_path.iterdir(): - if not dataset_dir.is_dir(): - continue - - path_to_metadata = dataset_dir / METADATA_FILENAME - if not path_to_metadata.exists(): - continue - - try: - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - metadata = DatasetMetadata(**file_content) - if metadata.id == id: - client = cls( - metadata=metadata, - path_to_dataset=dataset_base_path / dataset_dir, - lock=asyncio.Lock(), - ) - await client._update_metadata(update_accessed_at=True) - found = True - break - finally: - await asyncio.to_thread(file.close) - except (json.JSONDecodeError, ValidationError): - continue - - if not found: - raise ValueError(f'Dataset with ID "{id}" not found') - - # Get a new instance by name or alias. - else: - dataset_dir = Path(name) if name else Path(alias) if alias else Path('default') - path_to_dataset = dataset_base_path / dataset_dir - path_to_metadata = path_to_dataset / METADATA_FILENAME - - # If the dataset directory exists, reconstruct the client from the metadata file. - if path_to_dataset.exists() and path_to_metadata.exists(): - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - finally: - await asyncio.to_thread(file.close) - try: - metadata = DatasetMetadata(**file_content) - except ValidationError as exc: - raise ValueError(f'Invalid metadata file for dataset "{name or alias}"') from exc - - client = cls( - metadata=metadata, - path_to_dataset=path_to_dataset, - lock=asyncio.Lock(), - ) - - await client._update_metadata(update_accessed_at=True) - - # Otherwise, create a new dataset client. - else: - now = datetime.now(timezone.utc) - metadata = DatasetMetadata( - id=crypto_random_object_id(), - name=name, - created_at=now, - accessed_at=now, - modified_at=now, - item_count=0, - ) - client = cls( - metadata=metadata, - path_to_dataset=path_to_dataset, - lock=asyncio.Lock(), - ) - await client._update_metadata() - - return client + native_client = await NativeDatasetClient.open( + id=id, + name=name, + alias=alias, + storage_dir=str(configuration.storage_dir), + ) + + return cls(native_client=native_client) @override async def drop(self) -> None: - async with self._lock: - if self.path_to_dataset.exists(): - await asyncio.to_thread(shutil.rmtree, self.path_to_dataset) + await self._native_client.drop_storage() @override async def purge(self) -> None: - async with self._lock: - for file_path in await self._get_sorted_data_files(): - await asyncio.to_thread(file_path.unlink, missing_ok=True) - - await self._update_metadata( - update_accessed_at=True, - update_modified_at=True, - new_item_count=0, - ) + await self._native_client.purge() @override async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: - async with self._lock: - new_item_count = self._metadata.item_count - if isinstance(data, list): - for item in data: - new_item_count += 1 - await self._push_item(item, new_item_count) - else: - new_item_count += 1 - await self._push_item(data, new_item_count) - - # now update metadata under the same lock - await self._update_metadata( - update_accessed_at=True, - update_modified_at=True, - new_item_count=new_item_count, - ) + await self._native_client.push_data(data) @override async def get_data( @@ -272,70 +147,13 @@ async def get_data( f'{self.__class__.__name__} client.' ) - # If the dataset directory does not exist, log a warning and return an empty page. - if not self.path_to_dataset.exists(): - logger.warning(f'Dataset directory not found: {self.path_to_dataset}') - return DatasetItemsListPage( - count=0, - offset=offset, - limit=limit or 0, - total=0, - desc=desc, - items=[], - ) - - # Get the list of sorted data files. - async with self._lock: - try: - data_files = await self._get_sorted_data_files() - except FileNotFoundError: - # directory was dropped mid-check - return DatasetItemsListPage(count=0, offset=offset, limit=limit or 0, total=0, desc=desc, items=[]) - - total = len(data_files) - - # Reverse the order if descending order is requested. - if desc: - data_files.reverse() - - # Apply offset and limit slicing. - selected_files = data_files[offset:] - if limit is not None: - selected_files = selected_files[:limit] - - # Read and parse each data file. - items = list[dict[str, Any]]() - for file_path in selected_files: - try: - file_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'File disappeared during iterate_items(): {file_path}, skipping') - continue - - try: - item = json.loads(file_content) - except json.JSONDecodeError: - logger.exception(f'Corrupt JSON in {file_path}, skipping') - continue - - # Skip empty items if requested. - if skip_empty and not item: - continue - - items.append(item) - - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - # Return a paginated list page of dataset items. - return DatasetItemsListPage( - count=len(items), + raw = await self._native_client.get_data( offset=offset, - limit=limit or total - offset, - total=total, + limit=limit if limit is not None else 999_999_999_999, desc=desc, - items=items, + skip_empty=skip_empty, ) + return DatasetItemsListPage(**raw) @override async def iterate_items( @@ -367,120 +185,14 @@ async def iterate_items( f'by the {self.__class__.__name__} client.' ) - # If the dataset directory does not exist, log a warning and return immediately. - if not self.path_to_dataset.exists(): - logger.warning(f'Dataset directory not found: {self.path_to_dataset}') - return - - # Get the list of sorted data files. - async with self._lock: - try: - data_files = await self._get_sorted_data_files() - except FileNotFoundError: - return - - # Reverse the order if descending order is requested. - if desc: - data_files.reverse() - - # Apply offset and limit slicing. - selected_files = data_files[offset:] - if limit is not None: - selected_files = selected_files[:limit] - - # Iterate over each data file, reading and yielding its parsed content. - for file_path in selected_files: - try: - file_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'File disappeared during iterate_items(): {file_path}, skipping') - continue - - try: - item = json.loads(file_content) - except json.JSONDecodeError: - logger.exception(f'Corrupt JSON in {file_path}, skipping') - continue - - # Skip empty items if requested. - if skip_empty and not item: - continue - - yield item - - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - async def _update_metadata( - self, - *, - new_item_count: int | None = None, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the dataset metadata file with current information. - - Args: - new_item_count: If provided, update the item count to this value. - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - now = datetime.now(timezone.utc) - - if update_accessed_at: - self._metadata.accessed_at = now - if update_modified_at: - self._metadata.modified_at = now - if new_item_count is not None: - self._metadata.item_count = new_item_count - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - async def _push_item(self, item: dict[str, Any], item_id: int) -> None: - """Push a single item to the dataset. - - This method writes the item as a JSON file with a zero-padded numeric filename - that reflects its position in the dataset sequence. - - Args: - item: The data item to add to the dataset. - item_id: The sequential ID to use for this item's filename. - """ - # Generate the filename for the new item using zero-padded numbering. - filename = f'{str(item_id).zfill(self._ITEM_FILENAME_DIGITS)}.json' - file_path = self.path_to_dataset / filename - - # Ensure the dataset directory exists. - await asyncio.to_thread(self.path_to_dataset.mkdir, parents=True, exist_ok=True) - - # Dump the serialized item to the file. - data = await json_dumps(item) - await atomic_write(file_path, data) - - async def _get_sorted_data_files(self) -> list[Path]: - """Retrieve and return a sorted list of data files in the dataset directory. - - The files are sorted numerically based on the filename (without extension), - which corresponds to the order items were added to the dataset. - - Returns: - A list of `Path` objects pointing to data files, sorted by numeric filename. - """ - # Retrieve and sort all JSON files in the dataset directory numerically. - files = await asyncio.to_thread( - lambda: sorted( - self.path_to_dataset.glob('*.json'), - key=lambda f: int(f.stem) if f.stem.isdigit() else 0, - ) + # The native client returns a list rather than an async iterator, + # so we fetch all matching items and yield them one by one. + items: list[Any] = await self._native_client.iterate_items( + offset=offset, + limit=limit, + desc=desc, + skip_empty=skip_empty, ) - # Remove the metadata file from the list if present. - if self.path_to_metadata in files: - files.remove(self.path_to_metadata) - - return files + for item in items: + yield item diff --git a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py index 3a36a77074..357648e94a 100644 --- a/src/crawlee/storage_clients/_file_system/_key_value_store_client.py +++ b/src/crawlee/storage_clients/_file_system/_key_value_store_client.py @@ -1,27 +1,17 @@ from __future__ import annotations -import asyncio -import functools -import json -import shutil -import urllib.parse -from datetime import datetime, timezone from logging import getLogger -from pathlib import Path from typing import TYPE_CHECKING, Any -from pydantic import ValidationError +from crawlee_storage import FileSystemKeyValueStoreClient as NativeKeyValueStoreClient from typing_extensions import Self, override -from crawlee._consts import METADATA_FILENAME -from crawlee._utils.crypto import crypto_random_object_id -from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps -from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs from crawlee.storage_clients._base import KeyValueStoreClient from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata if TYPE_CHECKING: from collections.abc import AsyncIterator + from pathlib import Path from crawlee.configuration import Configuration @@ -45,46 +35,35 @@ class FileSystemKeyValueStoreClient(KeyValueStoreClient): This implementation is ideal for long-running crawlers where persistence is important and for development environments where you want to easily inspect the stored data between runs. - """ - - _STORAGE_SUBDIR = 'key_value_stores' - """The name of the subdirectory where key-value stores are stored.""" - _STORAGE_SUBSUBDIR_DEFAULT = 'default' - """The name of the subdirectory for the default key-value store.""" + Backed by the native ``crawlee_storage`` Rust extension for performance. + """ def __init__( self, *, - metadata: KeyValueStoreMetadata, - path_to_kvs: Path, - lock: asyncio.Lock, + native_client: NativeKeyValueStoreClient, ) -> None: """Initialize a new instance. Preferably use the `FileSystemKeyValueStoreClient.open` class method to create a new instance. """ - self._metadata = metadata - - self._path_to_kvs = path_to_kvs - """The full path to the key-value store directory.""" - - self._lock = lock - """A lock to ensure that only one operation is performed at a time.""" - - @override - async def get_metadata(self) -> KeyValueStoreMetadata: - return self._metadata + self._native_client = native_client @property def path_to_kvs(self) -> Path: """The full path to the key-value store directory.""" - return self._path_to_kvs + return self._native_client.path_to_kvs @property def path_to_metadata(self) -> Path: """The full path to the key-value store metadata file.""" - return self.path_to_kvs / METADATA_FILENAME + return self._native_client.path_to_metadata + + @override + async def get_metadata(self) -> KeyValueStoreMetadata: + raw = await self._native_client.get_metadata() + return KeyValueStoreMetadata(**raw) @classmethod async def open( @@ -114,254 +93,44 @@ async def open( ValueError: If a store with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate input parameters. - raise_if_too_many_kwargs(id=id, name=name, alias=alias) - - kvs_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR - - if not kvs_base_path.exists(): - await asyncio.to_thread(kvs_base_path.mkdir, parents=True, exist_ok=True) - - # Get a new instance by ID. - if id: - found = False - for kvs_dir in kvs_base_path.iterdir(): - if not kvs_dir.is_dir(): - continue - - path_to_metadata = kvs_dir / METADATA_FILENAME - if not path_to_metadata.exists(): - continue - - try: - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - metadata = KeyValueStoreMetadata(**file_content) - if metadata.id == id: - client = cls( - metadata=metadata, - path_to_kvs=kvs_base_path / kvs_dir, - lock=asyncio.Lock(), - ) - await client._update_metadata(update_accessed_at=True) - found = True - break - finally: - await asyncio.to_thread(file.close) - except (json.JSONDecodeError, ValidationError): - continue - - if not found: - raise ValueError(f'Key-value store with ID "{id}" not found.') - - # Get a new instance by name or alias. - else: - kvs_dir = Path(name) if name else Path(alias) if alias else Path('default') - path_to_kvs = kvs_base_path / kvs_dir - path_to_metadata = path_to_kvs / METADATA_FILENAME - - # If the key-value store directory exists, reconstruct the client from the metadata file. - if path_to_kvs.exists() and path_to_metadata.exists(): - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - finally: - await asyncio.to_thread(file.close) - try: - metadata = KeyValueStoreMetadata(**file_content) - except ValidationError as exc: - raise ValueError(f'Invalid metadata file for key-value store "{name or alias}"') from exc - - client = cls( - metadata=metadata, - path_to_kvs=path_to_kvs, - lock=asyncio.Lock(), - ) - - await client._update_metadata(update_accessed_at=True) - - # Otherwise, create a new key-value store client. - else: - now = datetime.now(timezone.utc) - metadata = KeyValueStoreMetadata( - id=crypto_random_object_id(), - name=name, - created_at=now, - accessed_at=now, - modified_at=now, - ) - client = cls( - metadata=metadata, - path_to_kvs=path_to_kvs, - lock=asyncio.Lock(), - ) - await client._update_metadata() - - return client + native_client = await NativeKeyValueStoreClient.open( + id=id, + name=name, + alias=alias, + storage_dir=str(configuration.storage_dir), + ) + + return cls(native_client=native_client) @override async def drop(self) -> None: - # If the client directory exists, remove it recursively. - if self.path_to_kvs.exists(): - async with self._lock: - await asyncio.to_thread(shutil.rmtree, self.path_to_kvs) + await self._native_client.drop_storage() @override async def purge(self) -> None: - async with self._lock: - for file_path in self.path_to_kvs.glob('*'): - if file_path.name == METADATA_FILENAME: - continue - await asyncio.to_thread(file_path.unlink, missing_ok=True) - - await self._update_metadata( - update_accessed_at=True, - update_modified_at=True, - ) + await self._native_client.purge() @override async def get_value(self, *, key: str) -> KeyValueStoreRecord | None: - # Update the metadata to record access - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - record_path = self.path_to_kvs / self._encode_key(key) + raw = await self._native_client.get_value(key) - if not record_path.exists(): + if raw is None: return None - # Found a file for this key, now look for its metadata - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - if not record_metadata_filepath.exists(): - logger.warning(f'Found value file for key "{key}" but no metadata file.') - return None - - # Read the metadata file - async with self._lock: - try: - file = await asyncio.to_thread( - functools.partial(record_metadata_filepath.open, mode='r', encoding='utf-8'), - ) - except FileNotFoundError: - logger.warning(f'Metadata file disappeared for key "{key}", aborting get_value') - return None - - try: - metadata_content = json.load(file) - except json.JSONDecodeError: - logger.warning(f'Invalid metadata file for key "{key}"') - return None - finally: - await asyncio.to_thread(file.close) - - try: - metadata = KeyValueStoreRecordMetadata(**metadata_content) - except ValidationError: - logger.warning(f'Invalid metadata schema for key "{key}"') - return None - - # Read the actual value - try: - value_bytes = await asyncio.to_thread(record_path.read_bytes) - except FileNotFoundError: - logger.warning(f'Value file disappeared for key "{key}"') - return None - - # Handle None values - if metadata.content_type == 'application/x-none': - value = None - # Handle JSON values - elif 'application/json' in metadata.content_type: - try: - value = json.loads(value_bytes.decode('utf-8')) - except (json.JSONDecodeError, UnicodeDecodeError): - logger.warning(f'Failed to decode JSON value for key "{key}"') - return None - # Handle text values - elif metadata.content_type.startswith('text/'): - try: - value = value_bytes.decode('utf-8') - except UnicodeDecodeError: - logger.warning(f'Failed to decode text value for key "{key}"') - return None - # Handle binary values - else: - value = value_bytes - - # Calculate the size of the value in bytes - size = len(value_bytes) - return KeyValueStoreRecord( - key=metadata.key, - value=value, - content_type=metadata.content_type, - size=size, + key=raw['key'], + value=raw['value'], + content_type=raw['content_type'], + size=raw.get('size'), ) @override async def set_value(self, *, key: str, value: Any, content_type: str | None = None) -> None: - # Special handling for None values - if value is None: - content_type = 'application/x-none' # Special content type to identify None values - value_bytes = b'' - else: - content_type = content_type or infer_mime_type(value) - - # Serialize the value to bytes. - if 'application/json' in content_type: - value_bytes = (await json_dumps(value)).encode('utf-8') - elif isinstance(value, str): - value_bytes = value.encode('utf-8') - elif isinstance(value, (bytes, bytearray)): - value_bytes = value - else: - # Fallback: attempt to convert to string and encode. - value_bytes = str(value).encode('utf-8') - - record_path = self.path_to_kvs / self._encode_key(key) - - # Prepare the metadata - size = len(value_bytes) - record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type, size=size) - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - record_metadata_content = await json_dumps(record_metadata.model_dump()) - - async with self._lock: - # Ensure the key-value store directory exists. - await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True) - - # Write the value to the file. - await atomic_write(record_path, value_bytes) - - # Write the record metadata to the file. - await atomic_write(record_metadata_filepath, record_metadata_content) - - # Update the KVS metadata to record the access and modification. - await self._update_metadata(update_accessed_at=True, update_modified_at=True) + await self._native_client.set_value(key, value, content_type) @override async def delete_value(self, *, key: str) -> None: - record_path = self.path_to_kvs / self._encode_key(key) - metadata_path = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - deleted = False - - async with self._lock: - # Delete the value file and its metadata if found - if record_path.exists(): - await asyncio.to_thread(record_path.unlink, missing_ok=True) - - # Delete the metadata file if it exists - if metadata_path.exists(): - await asyncio.to_thread(metadata_path.unlink, missing_ok=True) - else: - logger.warning(f'Found value file for key "{key}" but no metadata file when trying to delete it.') - - deleted = True - - # If we deleted something, update the KVS metadata - if deleted: - await self._update_metadata(update_accessed_at=True, update_modified_at=True) + await self._native_client.delete_value(key) @override async def iterate_keys( @@ -370,124 +139,20 @@ async def iterate_keys( exclusive_start_key: str | None = None, limit: int | None = None, ) -> AsyncIterator[KeyValueStoreRecordMetadata]: - # Check if the KVS directory exists - if not self.path_to_kvs.exists(): - return - - # List and sort all files *inside* a brief lock, then release it immediately: - async with self._lock: - files = sorted(await asyncio.to_thread(lambda: list(self.path_to_kvs.glob('*')))) - - count = 0 - - for file_path in files: - # Skip the main metadata file - if file_path.name == METADATA_FILENAME: - continue - - # Only process metadata files for records - if not file_path.name.endswith(f'.{METADATA_FILENAME}'): - continue - - # Extract the base key name from the metadata filename - key_name = self._decode_key(file_path.name[: -len(f'.{METADATA_FILENAME}')]) - - # Apply exclusive_start_key filter if provided - if exclusive_start_key is not None and key_name <= exclusive_start_key: - continue - - # Try to read and parse the metadata file - try: - metadata_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8') - except FileNotFoundError: - logger.warning(f'Metadata file disappeared for key "{key_name}", skipping it.') - continue - - try: - metadata_dict = json.loads(metadata_content) - except json.JSONDecodeError: - logger.warning(f'Failed to decode metadata file for key "{key_name}", skipping it.') - continue - - try: - record_metadata = KeyValueStoreRecordMetadata(**metadata_dict) - except ValidationError: - logger.warning(f'Invalid metadata schema for key "{key_name}", skipping it.') - - yield record_metadata - - count += 1 - if limit and count >= limit: - break + # The native client returns a list, so we fetch all matching keys + # and yield them one by one. + items: list[dict[str, Any]] = await self._native_client.iterate_keys( + exclusive_start_key=exclusive_start_key, + limit=limit, + ) - # Update accessed_at timestamp - async with self._lock: - await self._update_metadata(update_accessed_at=True) + for item in items: + yield KeyValueStoreRecordMetadata(**item) @override async def get_public_url(self, *, key: str) -> str: - """Return a file:// URL for the given key. - - Args: - key: The key to get the public URL for. - - Returns: - A file:// URL pointing to the file on the local filesystem. - """ - record_path = self.path_to_kvs / self._encode_key(key) - absolute_path = record_path.absolute() - return absolute_path.as_uri() + return self._native_client.get_public_url(key) @override async def record_exists(self, *, key: str) -> bool: - """Check if a record with the given key exists in the key-value store. - - Args: - key: The key to check for existence. - - Returns: - True if a record with the given key exists, False otherwise. - """ - # Update the metadata to record access - async with self._lock: - await self._update_metadata(update_accessed_at=True) - - record_path = self.path_to_kvs / self._encode_key(key) - record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}') - - # Both the value file and metadata file must exist for a record to be considered existing - return record_path.exists() and record_metadata_filepath.exists() - - async def _update_metadata( - self, - *, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the KVS metadata file with current information. - - Args: - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - now = datetime.now(timezone.utc) - - if update_accessed_at: - self._metadata.accessed_at = now - if update_modified_at: - self._metadata.modified_at = now - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - def _encode_key(self, key: str) -> str: - """Encode a key to make it safe for use in a file path.""" - return urllib.parse.quote(key, safe='') - - def _decode_key(self, encoded_key: str) -> str: - """Decode a key that was encoded to make it safe for use in a file path.""" - return urllib.parse.unquote(encoded_key) + return await self._native_client.record_exists(key) diff --git a/src/crawlee/storage_clients/_file_system/_request_queue_client.py b/src/crawlee/storage_clients/_file_system/_request_queue_client.py index 4954d8a4d2..603122b50f 100644 --- a/src/crawlee/storage_clients/_file_system/_request_queue_client.py +++ b/src/crawlee/storage_clients/_file_system/_request_queue_client.py @@ -1,64 +1,30 @@ from __future__ import annotations -import asyncio -import functools import json -import shutil -from collections import deque -from datetime import datetime, timezone -from hashlib import sha256 from logging import getLogger -from pathlib import Path from typing import TYPE_CHECKING -from pydantic import BaseModel, ValidationError +from crawlee_storage import FileSystemRequestQueueClient as NativeRequestQueueClient from typing_extensions import Self, override from crawlee import Request -from crawlee._consts import METADATA_FILENAME -from crawlee._utils.crypto import crypto_random_object_id -from crawlee._utils.file import atomic_write, json_dumps -from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs -from crawlee._utils.recoverable_state import RecoverableState +from crawlee.events._types import Event, EventPersistStateData from crawlee.storage_clients._base import RequestQueueClient from crawlee.storage_clients.models import ( AddRequestsResponse, ProcessedRequest, RequestQueueMetadata, - UnprocessedRequest, ) if TYPE_CHECKING: from collections.abc import Sequence + from pathlib import Path from crawlee.configuration import Configuration - from crawlee.storages import KeyValueStore logger = getLogger(__name__) -class RequestQueueState(BaseModel): - """State model for the `FileSystemRequestQueueClient`.""" - - sequence_counter: int = 0 - """Counter for regular request ordering.""" - - forefront_sequence_counter: int = 0 - """Counter for forefront request ordering.""" - - forefront_requests: dict[str, int] = {} - """Mapping of forefront request unique keys to their sequence numbers.""" - - regular_requests: dict[str, int] = {} - """Mapping of regular request unique keys to their sequence numbers.""" - - in_progress_requests: set[str] = set() - """Set of request unique keys currently being processed.""" - - handled_requests: set[str] = set() - """Set of request unique keys that have been handled.""" - - class FileSystemRequestQueueClient(RequestQueueClient): """A file system implementation of the request queue client. @@ -70,85 +36,38 @@ class FileSystemRequestQueueClient(RequestQueueClient): {STORAGE_DIR}/request_queues/{QUEUE_ID}/{REQUEST_ID}.json ``` - The implementation uses `RecoverableState` to maintain ordering information, in-progress status, and - request handling status. This allows for proper state recovery across process restarts without - embedding metadata in individual request files. File system storage provides durability at the cost of - slower I/O operations compared to memory only-based storage. - This implementation is ideal for long-running crawlers where persistence is important and for situations where you need to resume crawling after process termination. - """ - - _STORAGE_SUBDIR = 'request_queues' - """The name of the subdirectory where request queues are stored.""" - - _STORAGE_SUBSUBDIR_DEFAULT = 'default' - """The name of the subdirectory for the default request queue.""" - _MAX_REQUESTS_IN_CACHE = 100_000 - """Maximum number of requests to keep in cache for faster access.""" + Backed by the native ``crawlee_storage`` Rust extension for performance. + """ def __init__( self, *, - metadata: RequestQueueMetadata, - path_to_rq: Path, - lock: asyncio.Lock, - recoverable_state: RecoverableState[RequestQueueState], + native_client: NativeRequestQueueClient, ) -> None: """Initialize a new instance. Preferably use the `FileSystemRequestQueueClient.open` class method to create a new instance. """ - self._metadata = metadata - - self._path_to_rq = path_to_rq - """The full path to the request queue directory.""" - - self._lock = lock - """A lock to ensure that only one operation is performed at a time.""" - - self._request_cache = deque[Request]() - """Cache for requests: forefront requests at the beginning, regular requests at the end.""" - - self._request_cache_needs_refresh = True - """Flag indicating whether the cache needs to be refreshed from filesystem.""" - - self._is_empty_cache: bool | None = None - """Cache for is_empty result: None means unknown, True/False is cached state.""" - - self._state = recoverable_state - """Recoverable state to maintain request ordering, in-progress status, and handled status.""" - - @override - async def get_metadata(self) -> RequestQueueMetadata: - return self._metadata + self._native_client = native_client + self._event_listener_registered = False @property def path_to_rq(self) -> Path: """The full path to the request queue directory.""" - return self._path_to_rq + return self._native_client.path_to_rq @property def path_to_metadata(self) -> Path: """The full path to the request queue metadata file.""" - return self.path_to_rq / METADATA_FILENAME + return self._native_client.path_to_metadata - @classmethod - async def _create_recoverable_state(cls, id: str, configuration: Configuration) -> RecoverableState: - async def kvs_factory() -> KeyValueStore: - from crawlee.storage_clients import FileSystemStorageClient # noqa: PLC0415 avoid circular import - from crawlee.storages import KeyValueStore # noqa: PLC0415 avoid circular import - - return await KeyValueStore.open(storage_client=FileSystemStorageClient(), configuration=configuration) - - return RecoverableState[RequestQueueState]( - default_state=RequestQueueState(), - persist_state_key=f'__RQ_STATE_{id}', - persist_state_kvs_factory=kvs_factory, - persistence_enabled=True, - logger=logger, - ) + @override + async def get_metadata(self) -> RequestQueueMetadata: + raw = await self._native_client.get_metadata() + return RequestQueueMetadata(**raw) @classmethod async def open( @@ -165,6 +84,9 @@ async def open( ID or name exists, it loads the metadata and state from the stored files. If no existing queue is found, a new one is created. + Queue state is automatically persisted by the native Rust client to the default key-value store. + The Python side only needs to trigger ``persist_state`` via the framework event system. + Args: id: The ID of the request queue to open. If provided, searches for existing queue by ID. name: The name of the request queue for named (global scope) storages. @@ -178,146 +100,40 @@ async def open( ValueError: If a queue with the specified ID is not found, if metadata is invalid, or if both name and alias are provided. """ - # Validate input parameters. - raise_if_too_many_kwargs(id=id, name=name, alias=alias) - - rq_base_path = Path(configuration.storage_dir) / cls._STORAGE_SUBDIR - - if not rq_base_path.exists(): - await asyncio.to_thread(rq_base_path.mkdir, parents=True, exist_ok=True) - - # Open an existing RQ by its ID, raise an error if not found. - if id: - found = False - for rq_dir in rq_base_path.iterdir(): - if not rq_dir.is_dir(): - continue - - path_to_metadata = rq_dir / METADATA_FILENAME - if not path_to_metadata.exists(): - continue - - try: - file = await asyncio.to_thread(path_to_metadata.open, mode='r', encoding='utf-8') - try: - file_content = json.load(file) - metadata = RequestQueueMetadata(**file_content) - - if metadata.id == id: - client = cls( - metadata=metadata, - path_to_rq=rq_base_path / rq_dir, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state( - id=id, configuration=configuration - ), - ) - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) - found = True - break - finally: - await asyncio.to_thread(file.close) - except (json.JSONDecodeError, ValidationError): - continue - - if not found: - raise ValueError(f'Request queue with ID "{id}" not found') - - # Open an existing RQ by its name or alias, or create a new one if not found. - else: - rq_dir = Path(name) if name else Path(alias) if alias else Path('default') - path_to_rq = rq_base_path / rq_dir - path_to_metadata = path_to_rq / METADATA_FILENAME - - # If the RQ directory exists, reconstruct the client from the metadata file. - if path_to_rq.exists() and path_to_metadata.exists(): - file = await asyncio.to_thread(path_to_metadata.open, encoding='utf-8') - try: - file_content = json.load(file) - finally: - await asyncio.to_thread(file.close) - try: - metadata = RequestQueueMetadata(**file_content) - except ValidationError as exc: - raise ValueError(f'Invalid metadata file for request queue "{name or alias}"') from exc - - client = cls( - metadata=metadata, - path_to_rq=path_to_rq, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), - ) - - await client._state.initialize() - await client._discover_existing_requests() - await client._update_metadata(update_accessed_at=True) - - # Otherwise, create a new dataset client. - else: - now = datetime.now(timezone.utc) - metadata = RequestQueueMetadata( - id=crypto_random_object_id(), - name=name, - created_at=now, - accessed_at=now, - modified_at=now, - had_multiple_clients=False, - handled_request_count=0, - pending_request_count=0, - total_request_count=0, - ) - client = cls( - metadata=metadata, - path_to_rq=path_to_rq, - lock=asyncio.Lock(), - recoverable_state=await cls._create_recoverable_state(id=metadata.id, configuration=configuration), - ) - await client._state.initialize() - await client._update_metadata() + native_client = await NativeRequestQueueClient.open( + id=id, + name=name, + alias=alias, + storage_dir=str(configuration.storage_dir), + ) + + client = cls(native_client=native_client) + + # Hook the native client's ``persist_state`` into the Crawlee event + # system so that state is saved periodically and on shutdown. + try: + from crawlee import service_locator # noqa: PLC0415 + + event_manager = service_locator.get_event_manager() + event_manager.on(event=Event.PERSIST_STATE, listener=client._on_persist_state) + client._event_listener_registered = True + except Exception: + logger.debug('Could not register PERSIST_STATE listener - event manager may not be initialised yet.') return client + async def _on_persist_state(self, _event_data: EventPersistStateData | None = None) -> None: + """Event handler that persists the native client state.""" + await self._native_client.persist_state() + @override async def drop(self) -> None: - async with self._lock: - # Remove the RQ dir recursively if it exists. - if self.path_to_rq.exists(): - await asyncio.to_thread(shutil.rmtree, self.path_to_rq) - - # Clear recoverable state - await self._state.reset() - await self._state.teardown() - self._request_cache.clear() - self._request_cache_needs_refresh = True - - # Invalidate is_empty cache. - self._is_empty_cache = None + self._deregister_event_listener() + await self._native_client.drop_storage() @override async def purge(self) -> None: - async with self._lock: - request_files = await self._get_request_files(self.path_to_rq) - - for file_path in request_files: - await asyncio.to_thread(file_path.unlink, missing_ok=True) - - # Clear recoverable state - await self._state.reset() - self._request_cache.clear() - self._request_cache_needs_refresh = True - - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_pending_request_count=0, - new_handled_request_count=0, - new_total_request_count=0, - ) - - # Invalidate is_empty cache. - self._is_empty_cache = None + await self._native_client.purge() @override async def add_batch_of_requests( @@ -326,202 +142,39 @@ async def add_batch_of_requests( *, forefront: bool = False, ) -> AddRequestsResponse: - async with self._lock: - self._is_empty_cache = None - new_total_request_count = self._metadata.total_request_count - new_pending_request_count = self._metadata.pending_request_count - processed_requests = list[ProcessedRequest]() - unprocessed_requests = list[UnprocessedRequest]() - state = self._state.current_value - - all_requests = state.forefront_requests | state.regular_requests - - requests_to_enqueue = {} - - # Determine which requests can be added or are modified. - for request in requests: - # Check if the request has already been handled. - if request.unique_key in state.handled_requests: - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=True, - ) - ) - # Check if the request is already in progress. - # Or if the request is already in the queue and the `forefront` flag is not used, we do not change the - # position of the request. - elif (request.unique_key in state.in_progress_requests) or ( - request.unique_key in all_requests and not forefront - ): - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - ) - # These requests must either be added or update their position. - else: - requests_to_enqueue[request.unique_key] = request - - # Process each request in the batch. - for request in requests_to_enqueue.values(): - # If the request is not already in the RQ, this is a new request. - if request.unique_key not in all_requests: - request_path = self._get_request_path(request.unique_key) - # Add sequence number to ensure FIFO ordering using state. - if forefront: - sequence_number = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - state.forefront_requests[request.unique_key] = sequence_number - else: - sequence_number = state.sequence_counter - state.sequence_counter += 1 - state.regular_requests[request.unique_key] = sequence_number - - # Save the clean request without extra fields - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Update the metadata counts. - new_total_request_count += 1 - new_pending_request_count += 1 - - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=False, - was_already_handled=False, - ) - ) - - # If the request already exists in the RQ and use the forefront flag to update its position - elif forefront: - # If the request is among `regular`, remove it from its current position. - if request.unique_key in state.regular_requests: - state.regular_requests.pop(request.unique_key) - - # If the request is already in `forefront`, we just need to update its position. - state.forefront_requests[request.unique_key] = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - - processed_requests.append( - ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - ) - - else: - logger.warning(f'Request with unique key "{request.unique_key}" could not be processed.') - unprocessed_requests.append( - UnprocessedRequest( - unique_key=request.unique_key, - url=request.url, - method=request.method, - ) - ) - - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_total_request_count=new_total_request_count, - new_pending_request_count=new_pending_request_count, - ) - - # Invalidate the cache if we added forefront requests. - if forefront: - self._request_cache_needs_refresh = True - - # Invalidate is_empty cache. - self._is_empty_cache = None - - return AddRequestsResponse( - processed_requests=processed_requests, - unprocessed_requests=unprocessed_requests, - ) + # Serialize requests to dicts for the native client. + request_dicts = [json.loads(r.model_dump_json()) for r in requests] + + raw = await self._native_client.add_batch_of_requests(request_dicts, forefront=forefront) + return AddRequestsResponse(**raw) @override async def get_request(self, unique_key: str) -> Request | None: - async with self._lock: - request_path = self._get_request_path(unique_key) - request = await self._parse_request_file(request_path) + raw = await self._native_client.get_request(unique_key) - if request is None: - logger.warning(f'Request with unique key "{unique_key}" not found in the queue.') - return None + if raw is None: + return None - await self._update_metadata(update_accessed_at=True) - return request + return Request.model_validate(raw) @override async def fetch_next_request(self) -> Request | None: - async with self._lock: - # Refresh cache if needed or if it's empty. - if self._request_cache_needs_refresh or not self._request_cache: - await self._refresh_cache() - - next_request: Request | None = None - state = self._state.current_value - - # Fetch from the front of the deque (forefront requests are at the beginning). - while self._request_cache and next_request is None: - candidate = self._request_cache.popleft() + raw = await self._native_client.fetch_next_request() - # Skip requests that are already in progress, however this should not happen. - if candidate.unique_key not in state.in_progress_requests: - next_request = candidate - - if next_request is not None: - state.in_progress_requests.add(next_request.unique_key) + if raw is None: + return None - return next_request + return Request.model_validate(raw) @override async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: - async with self._lock: - self._is_empty_cache = None - state = self._state.current_value - - # Check if the request is in progress. - if request.unique_key not in state.in_progress_requests: - logger.warning(f'Marking request {request.unique_key} as handled that is not in progress.') - return None - - # Update the request's handled_at timestamp. - if request.handled_at is None: - request.handled_at = datetime.now(timezone.utc) - - # Dump the updated request to the file. - request_path = self._get_request_path(request.unique_key) - - if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.unique_key} does not exist, cannot mark as handled.') - return None - - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Update state: remove from in-progress and add to handled. - state.in_progress_requests.discard(request.unique_key) - state.handled_requests.add(request.unique_key) - - # Update RQ metadata. - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - new_handled_request_count=self._metadata.handled_request_count + 1, - new_pending_request_count=self._metadata.pending_request_count - 1, - ) - - return ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=True, - ) + request_dict = json.loads(request.model_dump_json()) + raw = await self._native_client.mark_request_as_handled(request_dict) + + if raw is None: + return None + + return ProcessedRequest(**raw) @override async def reclaim_request( @@ -530,315 +183,27 @@ async def reclaim_request( *, forefront: bool = False, ) -> ProcessedRequest | None: - async with self._lock: - self._is_empty_cache = None - state = self._state.current_value - - # Check if the request is in progress. - if request.unique_key not in state.in_progress_requests: - logger.info(f'Reclaiming request {request.unique_key} that is not in progress.') - return None - - request_path = self._get_request_path(request.unique_key) - - if not await asyncio.to_thread(request_path.exists): - logger.warning(f'Request file for {request.unique_key} does not exist, cannot reclaim.') - return None - - # Update sequence number and state to ensure proper ordering. - if forefront: - # Remove from regular requests if it was there - state.regular_requests.pop(request.unique_key, None) - sequence_number = state.forefront_sequence_counter - state.forefront_sequence_counter += 1 - state.forefront_requests[request.unique_key] = sequence_number - else: - # Remove from forefront requests if it was there - state.forefront_requests.pop(request.unique_key, None) - sequence_number = state.sequence_counter - state.sequence_counter += 1 - state.regular_requests[request.unique_key] = sequence_number - - # Save the clean request without extra fields - request_data = await json_dumps(request.model_dump()) - await atomic_write(request_path, request_data) - - # Remove from in-progress. - state.in_progress_requests.discard(request.unique_key) - - # Update RQ metadata. - await self._update_metadata( - update_modified_at=True, - update_accessed_at=True, - ) - - # Add the request back to the cache. - if forefront: - self._request_cache.appendleft(request) - else: - self._request_cache.append(request) - - return ProcessedRequest( - unique_key=request.unique_key, - was_already_present=True, - was_already_handled=False, - ) - - @override - async def is_empty(self) -> bool: - async with self._lock: - # If we have a cached value, return it immediately. - if self._is_empty_cache is not None: - return self._is_empty_cache - - state = self._state.current_value - - # If there are in-progress requests, return False immediately. - if len(state.in_progress_requests) > 0: - self._is_empty_cache = False - return False - - # If we have a cached requests, check them first (fast path). - if self._request_cache: - for req in self._request_cache: - if req.unique_key not in state.handled_requests: - self._is_empty_cache = False - return False - self._is_empty_cache = True - return len(state.in_progress_requests) == 0 - - # Fallback: check state for unhandled requests. - await self._update_metadata(update_accessed_at=True) + request_dict = json.loads(request.model_dump_json()) + raw = await self._native_client.reclaim_request(request_dict, forefront=forefront) - # Check if there are any requests that are not handled - all_requests = set(state.forefront_requests.keys()) | set(state.regular_requests.keys()) - unhandled_requests = all_requests - state.handled_requests - - if unhandled_requests: - self._is_empty_cache = False - return False - - self._is_empty_cache = True - return True - - def _get_request_path(self, unique_key: str) -> Path: - """Get the path to a specific request file. - - Args: - unique_key: Unique key of the request. - - Returns: - The path to the request file. - """ - return self.path_to_rq / f'{self._get_file_base_name_from_unique_key(unique_key)}.json' - - async def _update_metadata( - self, - *, - new_handled_request_count: int | None = None, - new_pending_request_count: int | None = None, - new_total_request_count: int | None = None, - update_had_multiple_clients: bool = False, - update_accessed_at: bool = False, - update_modified_at: bool = False, - ) -> None: - """Update the dataset metadata file with current information. - - Args: - new_handled_request_count: If provided, update the handled_request_count to this value. - new_pending_request_count: If provided, update the pending_request_count to this value. - new_total_request_count: If provided, update the total_request_count to this value. - update_had_multiple_clients: If True, set had_multiple_clients to True. - update_accessed_at: If True, update the `accessed_at` timestamp to the current time. - update_modified_at: If True, update the `modified_at` timestamp to the current time. - """ - # Always create a new timestamp to ensure it's truly updated - now = datetime.now(timezone.utc) - - # Update timestamps according to parameters - if update_accessed_at: - self._metadata.accessed_at = now - - if update_modified_at: - self._metadata.modified_at = now - - # Update request counts if provided - if new_handled_request_count is not None: - self._metadata.handled_request_count = new_handled_request_count - - if new_pending_request_count is not None: - self._metadata.pending_request_count = new_pending_request_count - - if new_total_request_count is not None: - self._metadata.total_request_count = new_total_request_count - - if update_had_multiple_clients: - self._metadata.had_multiple_clients = True - - # Ensure the parent directory for the metadata file exists. - await asyncio.to_thread(self.path_to_metadata.parent.mkdir, parents=True, exist_ok=True) - - # Dump the serialized metadata to the file. - data = await json_dumps(self._metadata.model_dump()) - await atomic_write(self.path_to_metadata, data) - - async def _refresh_cache(self) -> None: - """Refresh the request cache from filesystem. - - This method loads up to _MAX_REQUESTS_IN_CACHE requests from the filesystem, - prioritizing forefront requests and maintaining proper ordering. - """ - self._request_cache.clear() - state = self._state.current_value - - forefront_requests = list[tuple[Request, int]]() # (request, sequence) - regular_requests = list[tuple[Request, int]]() # (request, sequence) - - request_files = await self._get_request_files(self.path_to_rq) - - for request_file in request_files: - request = await self._parse_request_file(request_file) - - if request is None: - continue - - # Skip handled requests - if request.unique_key in state.handled_requests: - continue - - # Skip in-progress requests - if request.unique_key in state.in_progress_requests: - continue - - # Determine if request is forefront or regular based on state - if request.unique_key in state.forefront_requests: - sequence = state.forefront_requests[request.unique_key] - forefront_requests.append((request, sequence)) - elif request.unique_key in state.regular_requests: - sequence = state.regular_requests[request.unique_key] - regular_requests.append((request, sequence)) - else: - # Request not in state, skip it (might be orphaned) - logger.warning(f'Request {request.unique_key} not found in state, skipping.') - continue - - # Sort forefront requests by sequence (newest first for LIFO behavior). - forefront_requests.sort(key=lambda item: item[1], reverse=True) - - # Sort regular requests by sequence (oldest first for FIFO behavior). - regular_requests.sort(key=lambda item: item[1], reverse=False) - - # Add forefront requests to the beginning of the cache (left side). Since forefront_requests are sorted - # by sequence (newest first), we need to add them in reverse order to maintain correct priority. - for request, _ in reversed(forefront_requests): - if len(self._request_cache) >= self._MAX_REQUESTS_IN_CACHE: - break - self._request_cache.appendleft(request) - - # Add regular requests to the end of the cache (right side). - for request, _ in regular_requests: - if len(self._request_cache) >= self._MAX_REQUESTS_IN_CACHE: - break - self._request_cache.append(request) - - self._request_cache_needs_refresh = False - - @classmethod - async def _get_request_files(cls, path_to_rq: Path) -> list[Path]: - """Get all request files from the RQ. - - Args: - path_to_rq: The path to the request queue directory. - - Returns: - A list of paths to all request files. - """ - # Create the requests directory if it doesn't exist. - await asyncio.to_thread(path_to_rq.mkdir, parents=True, exist_ok=True) - - # List all the json files. - files = list(await asyncio.to_thread(path_to_rq.glob, '*.json')) - - # Filter out metadata file and non-file entries. - filtered = filter(lambda request_file: request_file.is_file() and request_file.name != METADATA_FILENAME, files) - - return list(filtered) - - @classmethod - async def _parse_request_file(cls, file_path: Path) -> Request | None: - """Parse a request file and return the `Request` object. - - Args: - file_path: The path to the request file. - - Returns: - The parsed `Request` object or `None` if the file could not be read or parsed. - """ - # Open the request file. - try: - file = await asyncio.to_thread(functools.partial(file_path.open, mode='r', encoding='utf-8')) - except FileNotFoundError: - logger.warning(f'Request file "{file_path}" not found.') - return None - - # Read the file content and parse it as JSON. - try: - file_content = json.load(file) - except json.JSONDecodeError as exc: - logger.warning(f'Failed to parse request file {file_path}: {exc!s}') - return None - finally: - await asyncio.to_thread(file.close) - - # Validate the content against the Request model. - try: - return Request.model_validate(file_content) - except ValidationError as exc: - logger.warning(f'Failed to validate request file {file_path}: {exc!s}') + if raw is None: return None - async def _discover_existing_requests(self) -> None: - """Discover and load existing requests into the state when opening an existing request queue. + return ProcessedRequest(**raw) - On recovery after a crash, any requests that were previously in-progress are reclaimed as pending, - since there is no active processing after a restart. - """ - request_files = await self._get_request_files(self.path_to_rq) - state = self._state.current_value - - if state.in_progress_requests: - logger.info( - f'Reclaiming {len(state.in_progress_requests)} in-progress request(s) from previous run.', - ) - state.in_progress_requests.clear() - - for request_file in request_files: - request = await self._parse_request_file(request_file) - if request is None: - continue - - # Add request to state as regular request (assign sequence numbers) - if request.unique_key not in state.regular_requests and request.unique_key not in state.forefront_requests: - # Assign as regular request with current sequence counter - state.regular_requests[request.unique_key] = state.sequence_counter - state.sequence_counter += 1 - - # Check if request was already handled - if request.handled_at is not None: - state.handled_requests.add(request.unique_key) - - @staticmethod - def _get_file_base_name_from_unique_key(unique_key: str) -> str: - """Generate a deterministic file name for a unique_key. + @override + async def is_empty(self) -> bool: + return await self._native_client.is_empty() - Args: - unique_key: Unique key to be used to generate filename. + def _deregister_event_listener(self) -> None: + """Remove the PERSIST_STATE event listener if it was registered.""" + if not self._event_listener_registered: + return + try: + from crawlee import service_locator # noqa: PLC0415 - Returns: - A file name based on the unique_key. - """ - # hexdigest produces filenames compliant strings - hashed_key = sha256(unique_key.encode('utf-8')).hexdigest() - name_length = 15 - # Truncate the key to the desired length - return hashed_key[:name_length] + event_manager = service_locator.get_event_manager() + event_manager.off(event=Event.PERSIST_STATE, listener=self._on_persist_state) + self._event_listener_registered = False + except Exception: + logger.debug('Could not deregister PERSIST_STATE listener.') diff --git a/src/crawlee/storage_clients/_file_system/_utils.py b/src/crawlee/storage_clients/_file_system/_utils.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py index 5f2ae15da0..76ad0ced5e 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_kvs_client.py @@ -2,6 +2,7 @@ import asyncio import json +import re from typing import TYPE_CHECKING import pytest @@ -10,6 +11,15 @@ from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient + +def _encode_key(key: str) -> str: + """Percent-encode a KVS key the same way the native storage client does. + + The native client encodes every character except ASCII alphanumerics. + """ + return re.sub(r'[^a-zA-Z0-9]', lambda m: f'%{ord(m.group()):02X}', key) + + if TYPE_CHECKING: from collections.abc import AsyncGenerator from pathlib import Path @@ -55,9 +65,10 @@ async def test_value_file_creation_and_content(kvs_client: FileSystemKeyValueSto test_value = 'Hello, world!' await kvs_client.set_value(key=test_key, value=test_value) - # Check if the files were created - key_path = kvs_client.path_to_kvs / test_key - key_metadata_path = kvs_client.path_to_kvs / f'{test_key}.{METADATA_FILENAME}' + # Check if the files were created (native client percent-encodes key names on disk) + encoded_key = _encode_key(test_key) + key_path = kvs_client.path_to_kvs / encoded_key + key_metadata_path = kvs_client.path_to_kvs / f'{encoded_key}.{METADATA_FILENAME}' assert key_path.exists() assert key_metadata_path.exists() @@ -69,25 +80,21 @@ async def test_value_file_creation_and_content(kvs_client: FileSystemKeyValueSto with key_metadata_path.open() as f: metadata = json.load(f) assert metadata['key'] == test_key - assert metadata['content_type'] == 'text/plain; charset=utf-8' + assert metadata['content_type'].startswith('text/plain') assert metadata['size'] == len(test_value.encode('utf-8')) async def test_binary_data_persistence(kvs_client: FileSystemKeyValueStoreClient) -> None: - """Test that binary data is stored correctly without corruption.""" + """Test that binary data is stored and can be retrieved correctly.""" test_key = 'test-binary' test_value = b'\x00\x01\x02\x03\x04' await kvs_client.set_value(key=test_key, value=test_value) - # Verify binary file exists - key_path = kvs_client.path_to_kvs / test_key + # Verify binary file exists (native client percent-encodes key names on disk) + key_path = kvs_client.path_to_kvs / _encode_key(test_key) assert key_path.exists() - # Verify binary content is preserved - content = key_path.read_bytes() - assert content == test_value - - # Verify retrieval works correctly + # Verify retrieval works correctly via the API record = await kvs_client.get_value(key=test_key) assert record is not None assert record.value == test_value @@ -101,7 +108,7 @@ async def test_json_serialization_to_file(kvs_client: FileSystemKeyValueStoreCli await kvs_client.set_value(key=test_key, value=test_value) # Check if file content is valid JSON - key_path = kvs_client.path_to_kvs / test_key + key_path = kvs_client.path_to_kvs / _encode_key(test_key) with key_path.open() as f: file_content = json.load(f) assert file_content == test_value @@ -115,9 +122,10 @@ async def test_file_deletion_on_value_delete(kvs_client: FileSystemKeyValueStore # Set a value await kvs_client.set_value(key=test_key, value=test_value) - # Verify files exist - key_path = kvs_client.path_to_kvs / test_key - metadata_path = kvs_client.path_to_kvs / f'{test_key}.{METADATA_FILENAME}' + # Verify files exist (native client percent-encodes key names on disk) + encoded_key = _encode_key(test_key) + key_path = kvs_client.path_to_kvs / encoded_key + metadata_path = kvs_client.path_to_kvs / f'{encoded_key}.{METADATA_FILENAME}' assert key_path.exists() assert metadata_path.exists() diff --git a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py index 275665d9d5..078a71b323 100644 --- a/tests/unit/storage_clients/_file_system/test_fs_rq_client.py +++ b/tests/unit/storage_clients/_file_system/test_fs_rq_client.py @@ -198,7 +198,7 @@ async def test_in_progress_requests_recovered_after_crash() -> None: assert fetched is not None # Persist state explicitly (simulating what happens periodically or at crash boundary). - await original_client._state.persist_state() + await original_client._native_client.persist_state() rq_id = (await original_client.get_metadata()).id diff --git a/uv.lock b/uv.lock index dcffbb003a..103ea78d90 100644 --- a/uv.lock +++ b/uv.lock @@ -769,6 +769,7 @@ dependencies = [ { name = "async-timeout" }, { name = "cachetools" }, { name = "colorama" }, + { name = "crawlee-storage" }, { name = "impit" }, { name = "more-itertools" }, { name = "protego" }, @@ -916,6 +917,7 @@ requires-dist = [ { name = "colorama", specifier = ">=0.4.0" }, { name = "cookiecutter", marker = "extra == 'cli'", specifier = ">=2.6.0" }, { name = "crawlee", extras = ["adaptive-crawler", "beautifulsoup", "cli", "curl-impersonate", "httpx", "parsel", "playwright", "otel", "sql-sqlite", "sql-postgres", "redis"], marker = "extra == 'all'" }, + { name = "crawlee-storage", git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python" }, { name = "cryptography", marker = "extra == 'sql-mysql'", specifier = ">=46.0.5" }, { name = "curl-cffi", marker = "extra == 'curl-impersonate'", specifier = ">=0.9.0" }, { name = "html5lib", marker = "extra == 'beautifulsoup'", specifier = ">=1.0" }, @@ -981,6 +983,11 @@ dev = [ { name = "uvicorn", extras = ["standard"], specifier = "<1.0.0" }, ] +[[package]] +name = "crawlee-storage" +version = "0.1.0" +source = { git = "https://github.com/apify/crawlee-storage.git?subdirectory=crawlee-storage-python#5c219e3005a116a21d72e038421e71ba5ddef1c5" } + [[package]] name = "cryptography" version = "46.0.6"