Skip to content

Commit e86794a

Browse files
vdusekclaude
andauthored
fix(file-system): Reclaim orphaned in-progress requests on RQ recovery (#1825)
## Summary - When a process crashed after `fetch_next_request` but before `mark_request_as_handled` or `reclaim_request`, the request remained in persisted `in_progress_requests` and was silently skipped on restart - Clear `in_progress_requests` during `_discover_existing_requests` so orphaned requests are retried - Add unit test simulating the crash-recovery scenario --- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e1b4346 commit e86794a

File tree

2 files changed

+56
-1
lines changed

2 files changed

+56
-1
lines changed

src/crawlee/storage_clients/_file_system/_request_queue_client.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,10 +798,20 @@ async def _parse_request_file(cls, file_path: Path) -> Request | None:
798798
return None
799799

800800
async def _discover_existing_requests(self) -> None:
801-
"""Discover and load existing requests into the state when opening an existing request queue."""
801+
"""Discover and load existing requests into the state when opening an existing request queue.
802+
803+
On recovery after a crash, any requests that were previously in-progress are reclaimed as pending,
804+
since there is no active processing after a restart.
805+
"""
802806
request_files = await self._get_request_files(self.path_to_rq)
803807
state = self._state.current_value
804808

809+
if state.in_progress_requests:
810+
logger.info(
811+
f'Reclaiming {len(state.in_progress_requests)} in-progress request(s) from previous run.',
812+
)
813+
state.in_progress_requests.clear()
814+
805815
for request_file in request_files:
806816
request = await self._parse_request_file(request_file)
807817
if request is None:

tests/unit/storage_clients/_file_system/test_fs_rq_client.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,51 @@ async def test_data_persistence_across_reopens() -> None:
175175
await reopened_client.drop()
176176

177177

178+
async def test_in_progress_requests_recovered_after_crash() -> None:
179+
"""Test that requests left in-progress at crash time are recovered as pending on reopen.
180+
181+
Simulates a crash: requests are added, one is fetched (in-progress), state is persisted,
182+
then the queue is reopened. The in-progress request should be available for fetching again.
183+
"""
184+
storage_client = FileSystemStorageClient()
185+
186+
# Create RQ and add requests.
187+
original_client = await storage_client.create_rq_client(name='crash-recovery-test')
188+
189+
test_requests = [
190+
Request.from_url('https://example.com/1'),
191+
Request.from_url('https://example.com/2'),
192+
Request.from_url('https://example.com/3'),
193+
]
194+
await original_client.add_batch_of_requests(test_requests)
195+
196+
# Fetch one request, putting it in-progress (simulating work before crash).
197+
fetched = await original_client.fetch_next_request()
198+
assert fetched is not None
199+
200+
# Persist state explicitly (simulating what happens periodically or at crash boundary).
201+
await original_client._state.persist_state()
202+
203+
rq_id = (await original_client.get_metadata()).id
204+
205+
# Simulate crash: reopen the queue without calling mark_request_as_handled or reclaim_request.
206+
reopened_client = await storage_client.create_rq_client(id=rq_id)
207+
208+
# All 3 requests should be fetchable (the in-progress one should have been reclaimed).
209+
fetched_urls = set()
210+
for _ in range(3):
211+
req = await reopened_client.fetch_next_request()
212+
assert req is not None, f'Expected 3 fetchable requests, only got {len(fetched_urls)}'
213+
fetched_urls.add(req.url)
214+
215+
assert fetched_urls == {'https://example.com/1', 'https://example.com/2', 'https://example.com/3'}
216+
217+
# No more requests should be available.
218+
assert await reopened_client.fetch_next_request() is None
219+
220+
await reopened_client.drop()
221+
222+
178223
async def test_get_request_does_not_mark_in_progress(rq_client: FileSystemRequestQueueClient) -> None:
179224
"""Test that get_request does not block a request from being fetched."""
180225
request = Request.from_url('https://example.com/blocked')

0 commit comments

Comments
 (0)