Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/crawlee/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
'RequestHandlerError',
'ServiceConflictError',
'SessionError',
'StorageWriteError',
'UserDefinedErrorHandlerError',
]

Expand Down Expand Up @@ -116,3 +117,12 @@ class ContextPipelineInterruptedError(Exception):
@docs_group('Errors')
class RequestCollisionError(Exception):
"""Raised when a request cannot be processed due to a conflict with required resources."""


@docs_group('Errors')
class StorageWriteError(Exception):
"""Raised when a write operation to a storage fails."""

def __init__(self, cause: Exception) -> None:
super().__init__(str(cause))
self.cause = cause
30 changes: 17 additions & 13 deletions src/crawlee/storage_clients/_file_system/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from crawlee._utils.crypto import crypto_random_object_id
from crawlee._utils.file import atomic_write, json_dumps
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -222,21 +223,24 @@ async def purge(self) -> None:
@override
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
async with self._lock:
new_item_count = self._metadata.item_count
if isinstance(data, list):
for item in data:
try:
new_item_count = self._metadata.item_count
if isinstance(data, list):
for item in data:
new_item_count += 1
await self._push_item(item, new_item_count)
else:
new_item_count += 1
await self._push_item(item, new_item_count)
else:
new_item_count += 1
await self._push_item(data, new_item_count)
await self._push_item(data, new_item_count)

# now update metadata under the same lock
await self._update_metadata(
update_accessed_at=True,
update_modified_at=True,
new_item_count=new_item_count,
)
# now update metadata under the same lock
await self._update_metadata(
update_accessed_at=True,
update_modified_at=True,
new_item_count=new_item_count,
)
except OSError as e:
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from crawlee._utils.crypto import crypto_random_object_id
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata

Expand Down Expand Up @@ -328,17 +329,20 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
record_metadata_content = await json_dumps(record_metadata.model_dump())

async with self._lock:
# Ensure the key-value store directory exists.
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)
try:
# Ensure the key-value store directory exists.
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)

# Write the value to the file.
await atomic_write(record_path, value_bytes)
# Write the value to the file.
await atomic_write(record_path, value_bytes)

# Write the record metadata to the file.
await atomic_write(record_metadata_filepath, record_metadata_content)
# Write the record metadata to the file.
await atomic_write(record_metadata_filepath, record_metadata_content)

# Update the KVS metadata to record the access and modification.
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
# Update the KVS metadata to record the access and modification.
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
except OSError as e:
raise StorageWriteError(e) from e

@override
async def delete_value(self, *, key: str) -> None:
Expand Down
22 changes: 14 additions & 8 deletions src/crawlee/storage_clients/_redis/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from logging import getLogger
from typing import TYPE_CHECKING, Any, cast

from redis.exceptions import RedisError
from typing_extensions import NotRequired, override

from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -124,14 +126,18 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
if isinstance(data, dict):
data = [data]

async with self._get_pipeline() as pipe:
pipe.json().arrappend(self._items_key, '$', *data)
await self._update_metadata(
pipe,
**_DatasetMetadataUpdateParams(
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
),
)
try:
async with self._get_pipeline() as pipe:
pipe.json().arrappend(self._items_key, '$', *data)
await self._update_metadata(
pipe,
**_DatasetMetadataUpdateParams(
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
),
)

except RedisError as e:
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
31 changes: 19 additions & 12 deletions src/crawlee/storage_clients/_redis/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from logging import getLogger
from typing import TYPE_CHECKING, Any

from redis.exceptions import RedisError
from typing_extensions import override

from crawlee._utils.file import infer_mime_type
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata

Expand Down Expand Up @@ -141,19 +143,24 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
content_type=content_type,
size=size,
)

async with self._get_pipeline() as pipe:
# redis-py typing issue
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]

await await_redis_response(
pipe.hset(
self._metadata_items_key,
key,
item_metadata.model_dump_json(),
try:
async with self._get_pipeline() as pipe:
# redis-py typing issue
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]

await await_redis_response(
pipe.hset(
self._metadata_items_key,
key,
item_metadata.model_dump_json(),
)
)
)
await self._update_metadata(pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True))
await self._update_metadata(
pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True)
)

except RedisError as e:
raise StorageWriteError(e) from e

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
Expand Down
13 changes: 10 additions & 3 deletions src/crawlee/storage_clients/_sql/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from sqlalchemy import Select, insert, select
from sqlalchemy import func as sql_func
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Self, override

from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -145,10 +147,15 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
db_items = [{'dataset_id': self._id, 'data': item} for item in data]
stmt = insert(self._ITEM_TABLE).values(db_items)

async with self.get_session(with_simple_commit=True) as session:
await session.execute(stmt)
async with self.get_session() as session:
try:
await session.execute(stmt)

Comment thread
Mantisus marked this conversation as resolved.
await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
await session.commit()
except SQLAlchemyError as e:
await session.rollback()
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
13 changes: 10 additions & 3 deletions src/crawlee/storage_clients/_sql/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from sqlalchemy import CursorResult, delete, select
from sqlalchemy import func as sql_func
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Self, override

from crawlee._utils.file import infer_mime_type
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import (
KeyValueStoreMetadata,
Expand Down Expand Up @@ -175,10 +177,15 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
conflict_cols=['key_value_store_id', 'key'],
)

async with self.get_session(with_simple_commit=True) as session:
await session.execute(upsert_stmt)
async with self.get_session() as session:
try:
await session.execute(upsert_stmt)

await self._add_buffer_record(session, update_modified_at=True)
await self._add_buffer_record(session, update_modified_at=True)
await session.commit()
except SQLAlchemyError as e:
await session.rollback()
raise StorageWriteError(e) from e

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
Expand Down
3 changes: 3 additions & 0 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,9 @@ async def fetch_next_request(self) -> Request | None:

await self._add_buffer_record(session)

if not requests_db:
return None

Comment thread
Mantisus marked this conversation as resolved.
requests = [Request.model_validate_json(r.data) for r in requests_db if r.request_id in blocked_ids]

if not requests:
Expand Down
31 changes: 29 additions & 2 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import logging
from datetime import timedelta
from io import StringIO
from typing import TYPE_CHECKING, overload

Expand All @@ -9,6 +11,7 @@
from crawlee import service_locator
from crawlee._utils.docs import docs_group
from crawlee._utils.file import export_csv_to_stream, export_json_to_stream
from crawlee.errors import StorageWriteError

from ._base import Storage
from ._key_value_store import KeyValueStore
Expand Down Expand Up @@ -134,7 +137,13 @@ async def drop(self) -> None:
async def purge(self) -> None:
await self._client.purge()

async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
async def push_data(
self,
data: list[dict[str, Any]] | dict[str, Any],
*,
max_attempts: int = 5,
wait_time_between_retries: timedelta = timedelta(seconds=1),
) -> None:
"""Store an object or an array of objects to the dataset.

The size of the data is limited by the receiving API and therefore `push_data()` will only
Expand All @@ -144,8 +153,26 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
Args:
data: A JSON serializable data structure to be stored in the dataset. The JSON representation
of each item must be smaller than 9MB.
max_attempts: The maximum number of attempts to push data in case of failure.
wait_time_between_retries: The time to wait between retries in case of failure.
"""
await self._client.push_data(data=data)
if max_attempts < 1:
raise ValueError('max_attempts must be at least 1')

wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
last_exception: StorageWriteError | None = None

for attempt in range(max_attempts):
try:
await self._client.push_data(data=data)
break
except StorageWriteError as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(wait_time_between_retries_seconds)
else:
if last_exception:
logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}')
Comment thread
Mantisus marked this conversation as resolved.
Outdated

async def get_data(
self,
Expand Down
28 changes: 27 additions & 1 deletion src/crawlee/storages/_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from collections.abc import AsyncIterator
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, overload

Expand All @@ -12,6 +13,7 @@
from crawlee._types import JsonSerializable # noqa: TC001
from crawlee._utils.docs import docs_group
from crawlee._utils.recoverable_state import RecoverableState
from crawlee.errors import StorageWriteError
from crawlee.storage_clients.models import KeyValueStoreMetadata

from ._base import Storage
Expand Down Expand Up @@ -175,15 +177,39 @@ async def set_value(
key: str,
value: Any,
content_type: str | None = None,
*,
max_attempts: int = 5,
wait_time_between_retries: timedelta = timedelta(seconds=1),
) -> None:
"""Set a value in the KVS.

Args:
key: Key of the record to set.
value: Value to set.
content_type: The MIME content type string.
max_attempts: The maximum number of attempts to set the value in case of failure.
wait_time_between_retries: Time to wait between retries.
"""
await self._client.set_value(key=key, value=value, content_type=content_type)
if max_attempts < 1:
raise ValueError('max_attempts must be at least 1')

wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
last_exception: StorageWriteError | None = None

for attempt in range(max_attempts):
try:
await self._client.set_value(key=key, value=value, content_type=content_type)
break
except StorageWriteError as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(wait_time_between_retries_seconds)
else:
if last_exception:
logger.warning(
f'Failed to set value for key "{key}" after {max_attempts} attempts '
f'with error: {last_exception.cause}'
)
Comment thread
Mantisus marked this conversation as resolved.
Outdated

async def delete_value(self, key: str) -> None:
"""Delete a value from the KVS.
Expand Down
Loading
Loading