-
Notifications
You must be signed in to change notification settings - Fork 704
fix: Add retry logic for KeyValueStore.set_value and Dataset.push_data
#1838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,8 @@ | ||||||||
| from __future__ import annotations | ||||||||
|
|
||||||||
| import asyncio | ||||||||
| import logging | ||||||||
| from datetime import timedelta | ||||||||
| from io import StringIO | ||||||||
| from typing import TYPE_CHECKING, overload | ||||||||
|
|
||||||||
|
|
@@ -9,6 +11,7 @@ | |||||||
| from crawlee import service_locator | ||||||||
| from crawlee._utils.docs import docs_group | ||||||||
| from crawlee._utils.file import export_csv_to_stream, export_json_to_stream | ||||||||
| from crawlee.errors import StorageWriteError | ||||||||
|
|
||||||||
| from ._base import Storage | ||||||||
| from ._key_value_store import KeyValueStore | ||||||||
|
|
@@ -134,7 +137,13 @@ async def drop(self) -> None: | |||||||
| async def purge(self) -> None: | ||||||||
| await self._client.purge() | ||||||||
|
|
||||||||
| async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: | ||||||||
| async def push_data( | ||||||||
| self, | ||||||||
| data: list[dict[str, Any]] | dict[str, Any], | ||||||||
| *, | ||||||||
| max_attempts: int = 5, | ||||||||
| wait_time_between_retries: timedelta = timedelta(seconds=1), | ||||||||
| ) -> None: | ||||||||
| """Store an object or an array of objects to the dataset. | ||||||||
|
|
||||||||
| The size of the data is limited by the receiving API and therefore `push_data()` will only | ||||||||
|
|
@@ -144,8 +153,26 @@ async def push_data(self, data: list[dict[str, Any]] | dict[str, Any]) -> None: | |||||||
| Args: | ||||||||
| data: A JSON serializable data structure to be stored in the dataset. The JSON representation | ||||||||
| of each item must be smaller than 9MB. | ||||||||
| max_attempts: The maximum number of attempts to push data in case of failure. | ||||||||
| wait_time_between_retries: The time to wait between retries in case of failure. | ||||||||
| """ | ||||||||
| await self._client.push_data(data=data) | ||||||||
| if max_attempts < 1: | ||||||||
| raise ValueError('max_attempts must be at least 1') | ||||||||
|
|
||||||||
| wait_time_between_retries_seconds = wait_time_between_retries.total_seconds() | ||||||||
| last_exception: StorageWriteError | None = None | ||||||||
|
|
||||||||
| for attempt in range(max_attempts): | ||||||||
| try: | ||||||||
| await self._client.push_data(data=data) | ||||||||
| break | ||||||||
| except StorageWriteError as e: | ||||||||
| last_exception = e | ||||||||
| if attempt < max_attempts - 1: | ||||||||
| await asyncio.sleep(wait_time_between_retries_seconds) | ||||||||
| else: | ||||||||
| if last_exception: | ||||||||
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | ||||||||
|
||||||||
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| raise last_exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |||||||
|
|
||||||||
| import asyncio | ||||||||
| from collections.abc import AsyncIterator | ||||||||
| from datetime import timedelta | ||||||||
| from logging import getLogger | ||||||||
| from typing import TYPE_CHECKING, Any, ClassVar, TypeVar, overload | ||||||||
|
|
||||||||
|
|
@@ -12,6 +13,7 @@ | |||||||
| from crawlee._types import JsonSerializable # noqa: TC001 | ||||||||
| from crawlee._utils.docs import docs_group | ||||||||
| from crawlee._utils.recoverable_state import RecoverableState | ||||||||
| from crawlee.errors import StorageWriteError | ||||||||
| from crawlee.storage_clients.models import KeyValueStoreMetadata | ||||||||
|
|
||||||||
| from ._base import Storage | ||||||||
|
|
@@ -175,15 +177,39 @@ async def set_value( | |||||||
| key: str, | ||||||||
| value: Any, | ||||||||
| content_type: str | None = None, | ||||||||
| *, | ||||||||
| max_attempts: int = 5, | ||||||||
| wait_time_between_retries: timedelta = timedelta(seconds=1), | ||||||||
| ) -> None: | ||||||||
| """Set a value in the KVS. | ||||||||
|
|
||||||||
| Args: | ||||||||
| key: Key of the record to set. | ||||||||
| value: Value to set. | ||||||||
| content_type: The MIME content type string. | ||||||||
| max_attempts: The maximum number of attempts to set the value in case of failure. | ||||||||
| wait_time_between_retries: Time to wait between retries. | ||||||||
| """ | ||||||||
| await self._client.set_value(key=key, value=value, content_type=content_type) | ||||||||
| if max_attempts < 1: | ||||||||
| raise ValueError('max_attempts must be at least 1') | ||||||||
|
|
||||||||
| wait_time_between_retries_seconds = wait_time_between_retries.total_seconds() | ||||||||
| last_exception: StorageWriteError | None = None | ||||||||
|
|
||||||||
| for attempt in range(max_attempts): | ||||||||
| try: | ||||||||
| await self._client.set_value(key=key, value=value, content_type=content_type) | ||||||||
| break | ||||||||
| except StorageWriteError as e: | ||||||||
| last_exception = e | ||||||||
| if attempt < max_attempts - 1: | ||||||||
| await asyncio.sleep(wait_time_between_retries_seconds) | ||||||||
| else: | ||||||||
| if last_exception: | ||||||||
| logger.warning( | ||||||||
| f'Failed to set value for key "{key}" after {max_attempts} attempts ' | ||||||||
| f'with error: {last_exception.cause}' | ||||||||
| ) | ||||||||
|
||||||||
| ) | |
| ) | |
| raise last_exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This guard is redundant/unreachable because both the PostgreSQL/MySQL branch and the fallback branch already return early when
requests_dbis empty. Keeping it adds noise and suggestsrequests_dbmight be unset here when it shouldn't be.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an error occurs that triggers a
rollback, this will prevent the failing