Skip to content

Commit 66ed5ea

Browse files
authored
fix: Fix crawler deadlock when API fails during marking request as handled (#768)
### Description - Fix a rare scenario where `RequestQueue` is in an invalid state due to a failed API call to mark the request as handled. This invalid state can lead to a crawler deadlock. - Fixed only for `ApifyRequestQueueSingleClient`, as for the shared client, it is not possible to distinguish between API failure and external modification of the queue on the platform. ### Issues - Partially fixes: [Fix rare case when crawler can get deadlocked](apify/crawlee-python#1694 (comment)) ### Testing - Added unit test ### Checklist - [x] CI passed
1 parent a99b5ff commit 66ed5ea

File tree

3 files changed

+51
-10
lines changed

3 files changed

+51
-10
lines changed

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
236236
hydrated_request=request,
237237
)
238238
except Exception:
239-
logger.exception(f'Error marking request {request.unique_key} as handled')
239+
logger.exception(f'Error marking request {request.unique_key} as handled.')
240240
return None
241241
else:
242242
return processed_request

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,27 +201,27 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
201201
# Set the handled_at timestamp if not already set
202202
request_id = unique_key_to_request_id(request.unique_key)
203203

204+
if cached_request := self._requests_cache.get(request_id):
205+
cached_request.handled_at = request.handled_at
206+
204207
if request.handled_at is None:
205208
request.handled_at = datetime.now(tz=timezone.utc)
206209
self.metadata.handled_request_count += 1
207210
self.metadata.pending_request_count -= 1
208211

209-
if cached_request := self._requests_cache.get(request_id):
210-
cached_request.handled_at = request.handled_at
211-
212212
try:
213+
# Remember that we handled this request, to optimize local deduplication.
214+
self._requests_already_handled.add(request_id)
215+
self._requests_in_progress.discard(request_id)
216+
# Remove request from cache, it will most likely not be needed.
217+
self._requests_cache.pop(request_id, None)
213218
# Update the request in the API
214219
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
215220
# adding to the queue.)
216221
processed_request = await self._update_request(request)
217-
# Remember that we handled this request, to optimize local deduplication.
218-
self._requests_already_handled.add(request_id)
219-
# Remove request from cache. It will most likely not be needed.
220-
self._requests_cache.pop(request_id)
221-
self._requests_in_progress.discard(request_id)
222222

223223
except Exception:
224-
logger.exception(f'Error marking request {request.unique_key} as handled')
224+
logger.exception(f'Error marking request {request.unique_key} as handled.')
225225
return None
226226
else:
227227
return processed_request
@@ -303,6 +303,11 @@ async def _list_head(self) -> None:
303303
# Ignore requests that are already in progress, we will not process them again.
304304
continue
305305

306+
if request_id in self._requests_already_handled:
307+
# Request is locally known to be handled, but platform is not aware of it.
308+
# This can be either due to delay in API data propagation or failed API call to mark it as handled.
309+
continue
310+
306311
if request.was_already_handled:
307312
# Do not cache fully handled requests, we do not need them. Just cache their id.
308313
self._requests_already_handled.add(request_id)

tests/integration/test_request_queue.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,3 +1198,39 @@ def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dic
11981198
Actor.log.info(stats_after)
11991199

12001200
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
1201+
1202+
1203+
async def test_request_queue_api_fail_when_marking_as_handled(
1204+
apify_token: str, monkeypatch: pytest.MonkeyPatch
1205+
) -> None:
1206+
"""Test that single-access based Apify RQ can deal with API failures when marking requests as handled.
1207+
1208+
Single-access based Apify RQ is aware that local information is reliable, so even if marking as handled fails
1209+
during API call, the RQ correctly tracks the handling information locally.
1210+
"""
1211+
1212+
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_token)
1213+
async with Actor:
1214+
rq = await RequestQueue.open(storage_client=ApifyStorageClient(request_queue_access='single'))
1215+
1216+
try:
1217+
request = Request.from_url('http://example.com')
1218+
# Fetch request
1219+
await rq.add_request(request)
1220+
assert request == await rq.fetch_next_request()
1221+
1222+
# Mark as handled, but simulate API failure.
1223+
with mock.patch.object(
1224+
rq._client._api_client, 'update_request', side_effect=Exception('Simulated API failure')
1225+
):
1226+
await rq.mark_request_as_handled(request)
1227+
assert not (await rq.get_request(request.unique_key)).was_already_handled
1228+
1229+
# RQ with `request_queue_access="single"` knows, that the local information is reliable, so it knows it
1230+
# handled this request already despite the platform not being aware of it.
1231+
assert not await rq.fetch_next_request()
1232+
assert await rq.is_finished()
1233+
assert await rq.is_empty()
1234+
1235+
finally:
1236+
await rq.drop()

0 commit comments

Comments
 (0)