Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/crawlee/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
'RequestHandlerError',
'ServiceConflictError',
'SessionError',
'StorageWriteError',
'UserDefinedErrorHandlerError',
]

Expand Down Expand Up @@ -116,3 +117,12 @@ class ContextPipelineInterruptedError(Exception):
@docs_group('Errors')
class RequestCollisionError(Exception):
"""Raised when a request cannot be processed due to a conflict with required resources."""


@docs_group('Errors')
class StorageWriteError(Exception):
"""Raised when a write operation to a storage fails."""

def __init__(self, cause: Exception) -> None:
super().__init__(str(cause))
self.cause = cause
30 changes: 17 additions & 13 deletions src/crawlee/storage_clients/_file_system/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from crawlee._utils.crypto import crypto_random_object_id
from crawlee._utils.file import atomic_write, json_dumps
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -222,21 +223,24 @@ async def purge(self) -> None:
@override
async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
async with self._lock:
new_item_count = self._metadata.item_count
if isinstance(data, list):
for item in data:
try:
new_item_count = self._metadata.item_count
if isinstance(data, list):
for item in data:
new_item_count += 1
await self._push_item(item, new_item_count)
else:
new_item_count += 1
await self._push_item(item, new_item_count)
else:
new_item_count += 1
await self._push_item(data, new_item_count)
await self._push_item(data, new_item_count)

# now update metadata under the same lock
await self._update_metadata(
update_accessed_at=True,
update_modified_at=True,
new_item_count=new_item_count,
)
# now update metadata under the same lock
await self._update_metadata(
update_accessed_at=True,
update_modified_at=True,
new_item_count=new_item_count,
)
except OSError as e:
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from crawlee._utils.crypto import crypto_random_object_id
from crawlee._utils.file import atomic_write, infer_mime_type, json_dumps
from crawlee._utils.raise_if_too_many_kwargs import raise_if_too_many_kwargs
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata

Expand Down Expand Up @@ -328,17 +329,20 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
record_metadata_content = await json_dumps(record_metadata.model_dump())

async with self._lock:
# Ensure the key-value store directory exists.
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)
try:
# Ensure the key-value store directory exists.
await asyncio.to_thread(self.path_to_kvs.mkdir, parents=True, exist_ok=True)

# Write the value to the file.
await atomic_write(record_path, value_bytes)
# Write the value to the file.
await atomic_write(record_path, value_bytes)

# Write the record metadata to the file.
await atomic_write(record_metadata_filepath, record_metadata_content)
# Write the record metadata to the file.
await atomic_write(record_metadata_filepath, record_metadata_content)

# Update the KVS metadata to record the access and modification.
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
# Update the KVS metadata to record the access and modification.
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
except OSError as e:
raise StorageWriteError(e) from e

@override
async def delete_value(self, *, key: str) -> None:
Expand Down
22 changes: 14 additions & 8 deletions src/crawlee/storage_clients/_redis/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from logging import getLogger
from typing import TYPE_CHECKING, Any, cast

from redis.exceptions import RedisError
from typing_extensions import NotRequired, override

from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -124,14 +126,18 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
if isinstance(data, dict):
data = [data]

async with self._get_pipeline() as pipe:
pipe.json().arrappend(self._items_key, '$', *data)
await self._update_metadata(
pipe,
**_DatasetMetadataUpdateParams(
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
),
)
try:
async with self._get_pipeline() as pipe:
pipe.json().arrappend(self._items_key, '$', *data)
await self._update_metadata(
pipe,
**_DatasetMetadataUpdateParams(
update_accessed_at=True, update_modified_at=True, delta_item_count=len(data)
),
)

except RedisError as e:
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
31 changes: 19 additions & 12 deletions src/crawlee/storage_clients/_redis/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from logging import getLogger
from typing import TYPE_CHECKING, Any

from redis.exceptions import RedisError
from typing_extensions import override

from crawlee._utils.file import infer_mime_type
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata

Expand Down Expand Up @@ -141,19 +143,24 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
content_type=content_type,
size=size,
)

async with self._get_pipeline() as pipe:
# redis-py typing issue
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]

await await_redis_response(
pipe.hset(
self._metadata_items_key,
key,
item_metadata.model_dump_json(),
try:
async with self._get_pipeline() as pipe:
# redis-py typing issue
await await_redis_response(pipe.hset(self._items_key, key, value_bytes)) # ty: ignore[invalid-argument-type]

await await_redis_response(
pipe.hset(
self._metadata_items_key,
key,
item_metadata.model_dump_json(),
)
)
)
await self._update_metadata(pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True))
await self._update_metadata(
pipe, **MetadataUpdateParams(update_accessed_at=True, update_modified_at=True)
)

except RedisError as e:
raise StorageWriteError(e) from e

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
Expand Down
13 changes: 10 additions & 3 deletions src/crawlee/storage_clients/_sql/_dataset_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from sqlalchemy import Select, insert, select
from sqlalchemy import func as sql_func
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Self, override

from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import DatasetClient
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata

Expand Down Expand Up @@ -145,10 +147,15 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
db_items = [{'dataset_id': self._id, 'data': item} for item in data]
stmt = insert(self._ITEM_TABLE).values(db_items)

async with self.get_session(with_simple_commit=True) as session:
await session.execute(stmt)
async with self.get_session() as session:
try:
await session.execute(stmt)

await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
await self._add_buffer_record(session, update_modified_at=True, delta_item_count=len(data))
await session.commit()
except SQLAlchemyError as e:
await session.rollback()
raise StorageWriteError(e) from e

@override
async def get_data(
Expand Down
13 changes: 10 additions & 3 deletions src/crawlee/storage_clients/_sql/_key_value_store_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from sqlalchemy import CursorResult, delete, select
from sqlalchemy import func as sql_func
from sqlalchemy.exc import SQLAlchemyError
from typing_extensions import Self, override

from crawlee._utils.file import infer_mime_type
from crawlee.errors import StorageWriteError
from crawlee.storage_clients._base import KeyValueStoreClient
from crawlee.storage_clients.models import (
KeyValueStoreMetadata,
Expand Down Expand Up @@ -175,10 +177,15 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
conflict_cols=['key_value_store_id', 'key'],
)

async with self.get_session(with_simple_commit=True) as session:
await session.execute(upsert_stmt)
async with self.get_session() as session:
try:
await session.execute(upsert_stmt)

await self._add_buffer_record(session, update_modified_at=True)
await self._add_buffer_record(session, update_modified_at=True)
await session.commit()
except SQLAlchemyError as e:
await session.rollback()
raise StorageWriteError(e) from e

@override
async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
Expand Down
5 changes: 5 additions & 0 deletions src/crawlee/storage_clients/_sql/_request_queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ async def fetch_next_request(self) -> Request | None:
.limit(self._MAX_BATCH_FETCH_SIZE)
)

requests_db = None

async with self.get_session(with_simple_commit=True) as session:
# We use the `skip_locked` database mechanism to prevent the 'interception' of requests by another client
if dialect in {'postgresql', 'mysql', 'mariadb'}:
Expand Down Expand Up @@ -483,6 +485,9 @@ async def fetch_next_request(self) -> Request | None:

await self._add_buffer_record(session)

if not requests_db:
return None

Comment on lines +488 to +490
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard is redundant/unreachable because both the PostgreSQL/MySQL branch and the fallback branch already return early when requests_db is empty. Keeping it adds noise and suggests requests_db might be unset here when it shouldn't be.

Suggested change
if not requests_db:
return None

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an error occurs that triggers a rollback, this will prevent the failing

requests = [Request.model_validate_json(r.data) for r in requests_db if r.request_id in blocked_ids]

if not requests:
Expand Down
31 changes: 29 additions & 2 deletions src/crawlee/storages/_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import annotations

import asyncio
import logging
from datetime import timedelta
from io import StringIO
from typing import TYPE_CHECKING, overload

Expand All @@ -9,6 +11,7 @@
from crawlee import service_locator
from crawlee._utils.docs import docs_group
from crawlee._utils.file import export_csv_to_stream, export_json_to_stream
from crawlee.errors import StorageWriteError

from ._base import Storage
from ._key_value_store import KeyValueStore
Expand Down Expand Up @@ -134,7 +137,13 @@ async def drop(self) -> None:
async def purge(self) -> None:
await self._client.purge()

async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
async def push_data(
self,
data: list[dict[str, Any]] | dict[str, Any],
*,
max_attempts: int = 5,
wait_time_between_retries: timedelta = timedelta(seconds=1),
) -> None:
"""Store an object or an array of objects to the dataset.

The size of the data is limited by the receiving API and therefore `push_data()` will only
Expand All @@ -144,8 +153,26 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None:
Args:
data: A JSON serializable data structure to be stored in the dataset. The JSON representation
of each item must be smaller than 9MB.
max_attempts: The maximum number of attempts to push data in case of failure.
wait_time_between_retries: The time to wait between retries in case of failure.
"""
await self._client.push_data(data=data)
if max_attempts < 1:
raise ValueError('max_attempts must be at least 1')

wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
last_exception: StorageWriteError | None = None

for attempt in range(max_attempts):
try:
await self._client.push_data(data=data)
break
except StorageWriteError as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(wait_time_between_retries_seconds)
else:
if last_exception:
logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}')
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After exhausting retries, this method only logs a warning and then returns successfully. That means push_data failures can be silently ignored and data can be lost while the caller believes the write succeeded. After the final attempt, re-raise the last StorageWriteError (or raise a new one) rather than returning.

Suggested change
logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}')
logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}')
raise last_exception

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.


async def get_data(
self,
Expand Down
28 changes: 27 additions & 1 deletion src/crawlee/storages/_key_value_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from collections.abc import AsyncIterator
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, overload

Expand All @@ -12,6 +13,7 @@
from crawlee._types import JsonSerializable # noqa: TC001
from crawlee._utils.docs import docs_group
from crawlee._utils.recoverable_state import RecoverableState
from crawlee.errors import StorageWriteError
from crawlee.storage_clients.models import KeyValueStoreMetadata

from ._base import Storage
Expand Down Expand Up @@ -175,15 +177,39 @@ async def set_value(
key: str,
value: Any,
content_type: str | None = None,
*,
max_attempts: int = 5,
wait_time_between_retries: timedelta = timedelta(seconds=1),
) -> None:
"""Set a value in the KVS.

Args:
key: Key of the record to set.
value: Value to set.
content_type: The MIME content type string.
max_attempts: The maximum number of attempts to set the value in case of failure.
wait_time_between_retries: Time to wait between retries.
"""
await self._client.set_value(key=key, value=value, content_type=content_type)
if max_attempts < 1:
raise ValueError('max_attempts must be at least 1')

wait_time_between_retries_seconds = wait_time_between_retries.total_seconds()
last_exception: StorageWriteError | None = None

for attempt in range(max_attempts):
try:
await self._client.set_value(key=key, value=value, content_type=content_type)
break
except StorageWriteError as e:
last_exception = e
if attempt < max_attempts - 1:
await asyncio.sleep(wait_time_between_retries_seconds)
else:
if last_exception:
logger.warning(
f'Failed to set value for key "{key}" after {max_attempts} attempts '
f'with error: {last_exception.cause}'
)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After exhausting retries, this method only logs a warning and then returns successfully. That will silently drop failed writes (similar to the original bug report) and callers will think the value was persisted. After the final attempt, re-raise the last caught StorageWriteError (or wrap/raise a new one) instead of returning.

Suggested change
)
)
raise last_exception

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.


async def delete_value(self, key: str) -> None:
"""Delete a value from the KVS.
Expand Down
Loading
Loading