fix: Add retry logic for KeyValueStore.set_value and Dataset.push_data#1838
fix: Add retry logic for KeyValueStore.set_value and Dataset.push_data#1838Mantisus wants to merge 3 commits intoapify:masterfrom
KeyValueStore.set_value and Dataset.push_data#1838Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a unified StorageWriteError and adds retry behavior for write operations (KeyValueStore.set_value and Dataset.push_data) to reduce the risk of data loss on transient storage failures (notably SQL/Redis).
Changes:
- Add
StorageWriteErrorand update SQL/Redis/FS storage clients to raise it on write failures instead of swallowing errors. - Add retry parameters (
max_attempts,wait_time_between_retries) toKeyValueStore.set_valueandDataset.push_data. - Add unit tests asserting that client-level write failures surface as
StorageWriteError.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/crawlee/errors.py |
Introduces StorageWriteError for consistent write-failure signaling. |
src/crawlee/storages/_key_value_store.py |
Adds retry loop to KeyValueStore.set_value (currently logs instead of raising after retries). |
src/crawlee/storages/_dataset.py |
Adds retry loop to Dataset.push_data (currently logs instead of raising after retries). |
src/crawlee/storage_clients/_sql/_key_value_store_client.py |
Converts SQL write failures into StorageWriteError and avoids with_simple_commit swallow behavior. |
src/crawlee/storage_clients/_sql/_dataset_client.py |
Converts SQL write failures into StorageWriteError and avoids with_simple_commit swallow behavior. |
src/crawlee/storage_clients/_redis/_key_value_store_client.py |
Wraps Redis write failures as StorageWriteError. |
src/crawlee/storage_clients/_redis/_dataset_client.py |
Wraps Redis write failures as StorageWriteError. |
src/crawlee/storage_clients/_file_system/_key_value_store_client.py |
Wraps filesystem write failures as StorageWriteError. |
src/crawlee/storage_clients/_file_system/_dataset_client.py |
Wraps filesystem write failures as StorageWriteError. |
src/crawlee/storage_clients/_sql/_request_queue_client.py |
Adds an extra requests_db emptiness guard (currently redundant). |
tests/unit/storage_clients/_sql/test_sql_kvs_client.py |
Adds test ensuring SQL KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_sql/test_sql_dataset_client.py |
Adds test ensuring SQL Dataset write errors raise StorageWriteError. |
tests/unit/storage_clients/_redis/test_redis_kvs_client.py |
Adds test ensuring Redis KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_redis/test_redis_dataset_client.py |
Adds test ensuring Redis Dataset write errors raise StorageWriteError. |
tests/unit/storage_clients/_file_system/test_fs_kvs_client.py |
Adds test ensuring FS KVS write errors raise StorageWriteError. |
tests/unit/storage_clients/_file_system/test_fs_dataset_client.py |
Adds test ensuring FS Dataset write errors raise StorageWriteError. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.warning( | ||
| f'Failed to set value for key "{key}" after {max_attempts} attempts ' | ||
| f'with error: {last_exception.cause}' | ||
| ) |
There was a problem hiding this comment.
After exhausting retries, this method only logs a warning and then returns successfully. That will silently drop failed writes (similar to the original bug report) and callers will think the value was persisted. After the final attempt, re-raise the last caught StorageWriteError (or wrap/raise a new one) instead of returning.
| ) | |
| ) | |
| raise last_exception |
There was a problem hiding this comment.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
| await asyncio.sleep(wait_time_between_retries_seconds) | ||
| else: | ||
| if last_exception: | ||
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') |
There was a problem hiding this comment.
After exhausting retries, this method only logs a warning and then returns successfully. That means push_data failures can be silently ignored and data can be lost while the caller believes the write succeeded. After the final attempt, re-raise the last StorageWriteError (or raise a new one) rather than returning.
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| logger.warning(f'Failed to push data after {max_attempts} attempts with error: {last_exception.cause}') | |
| raise last_exception |
There was a problem hiding this comment.
This is expected behavior. This PR allows users to configure retry settings, but propagating errors could cause issues.
| if not requests_db: | ||
| return None | ||
|
|
There was a problem hiding this comment.
This guard is redundant/unreachable because both the PostgreSQL/MySQL branch and the fallback branch already return early when requests_db is empty. Keeping it adds noise and suggests requests_db might be unset here when it shouldn't be.
| if not requests_db: | |
| return None |
There was a problem hiding this comment.
If an error occurs that triggers a rollback, this will prevent the failing
KeyValueStore.set_value and Dataset.push_dataKeyValueStore.set_value and Dataset.push_data
Description
DatasetandKeyValueStore(push_data/set_value). This is especially critical forSqlStorageClientandRedisStorageClientdue to the risk of data loss on transient failures.Issues