Skip to content

Commit 7ad2450

Browse files
committed
Revert "fix: stop silently swallowing exceptions in RQ mark_request_as_handled and reclaim_request"
This reverts commit 4bd1ca3.
1 parent 4bd1ca3 commit 7ad2450

File tree

2 files changed

+91
-72
lines changed

2 files changed

+91
-72
lines changed

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -218,24 +218,28 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
218218

219219
if cached_request := self._requests_cache[request_id]:
220220
cached_request.was_already_handled = request.was_already_handled
221-
# Update the request in the API
222-
processed_request = await self._update_request(request)
223-
processed_request.id = request_id
224-
processed_request.unique_key = request.unique_key
225-
226-
# Update assumed handled count if this wasn't already handled
227-
if not processed_request.was_already_handled:
228-
self.metadata.handled_request_count += 1
229-
self.metadata.pending_request_count -= 1
230-
231-
# Update the cache with the handled request
232-
self._cache_request(
233-
cache_key=request_id,
234-
processed_request=processed_request,
235-
hydrated_request=request,
236-
)
221+
try:
222+
# Update the request in the API
223+
processed_request = await self._update_request(request)
224+
processed_request.id = request_id
225+
processed_request.unique_key = request.unique_key
237226

238-
return processed_request
227+
# Update assumed handled count if this wasn't already handled
228+
if not processed_request.was_already_handled:
229+
self.metadata.handled_request_count += 1
230+
self.metadata.pending_request_count -= 1
231+
232+
# Update the cache with the handled request
233+
self._cache_request(
234+
cache_key=request_id,
235+
processed_request=processed_request,
236+
hydrated_request=request,
237+
)
238+
except Exception as exc:
239+
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
240+
return None
241+
else:
242+
return processed_request
239243

240244
async def reclaim_request(
241245
self,
@@ -251,30 +255,35 @@ async def reclaim_request(
251255

252256
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
253257
async with self._fetch_lock:
254-
# Update the request in the API.
255-
processed_request = await self._update_request(request, forefront=forefront)
256-
processed_request.unique_key = request.unique_key
257-
258-
# If the request was previously handled, decrement our handled count since
259-
# we're putting it back for processing.
260-
if request.was_already_handled and not processed_request.was_already_handled:
261-
self.metadata.handled_request_count -= 1
262-
self.metadata.pending_request_count += 1
263-
264-
# Update the cache
265-
cache_key = request.unique_key
266-
self._cache_request(
267-
cache_key,
268-
processed_request,
269-
hydrated_request=request,
270-
)
258+
try:
259+
# Update the request in the API.
260+
processed_request = await self._update_request(request, forefront=forefront)
261+
processed_request.unique_key = request.unique_key
262+
263+
# If the request was previously handled, decrement our handled count since
264+
# we're putting it back for processing.
265+
if request.was_already_handled and not processed_request.was_already_handled:
266+
self.metadata.handled_request_count -= 1
267+
self.metadata.pending_request_count += 1
268+
269+
# Update the cache
270+
cache_key = request.unique_key
271+
self._cache_request(
272+
cache_key,
273+
processed_request,
274+
hydrated_request=request,
275+
)
271276

272-
# If we're adding to the forefront, we need to check for forefront requests
273-
# in the next list_head call
274-
if forefront:
275-
self._should_check_for_forefront_requests = True
277+
# If we're adding to the forefront, we need to check for forefront requests
278+
# in the next list_head call
279+
if forefront:
280+
self._should_check_for_forefront_requests = True
276281

277-
return processed_request
282+
except Exception as exc:
283+
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
284+
return None
285+
else:
286+
return processed_request
278287

279288
async def is_empty(self) -> bool:
280289
"""Specific implementation of this method for the RQ shared access mode."""

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,22 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
209209
if cached_request := self._requests_cache.get(request_id):
210210
cached_request.handled_at = request.handled_at
211211

212-
# Update the request in the API
213-
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
214-
# adding to the queue.)
215-
processed_request = await self._update_request(request)
216-
# Remember that we handled this request, to optimize local deduplication.
217-
self._requests_already_handled.add(request_id)
218-
# Remove request from cache. It will most likely not be needed.
219-
self._requests_cache.pop(request_id)
220-
self._requests_in_progress.discard(request_id)
221-
222-
return processed_request
212+
try:
213+
# Update the request in the API
214+
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
215+
# adding to the queue.)
216+
processed_request = await self._update_request(request)
217+
# Remember that we handled this request, to optimize local deduplication.
218+
self._requests_already_handled.add(request_id)
219+
# Remove request from cache. It will most likely not be needed.
220+
self._requests_cache.pop(request_id)
221+
self._requests_in_progress.discard(request_id)
222+
223+
except Exception as exc:
224+
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
225+
return None
226+
else:
227+
return processed_request
223228

224229
async def reclaim_request(
225230
self,
@@ -236,28 +241,33 @@ async def reclaim_request(
236241
if request.was_already_handled:
237242
request.handled_at = None
238243

239-
# Make sure request is in the local cache. We might need it.
240-
self._requests_cache[request_id] = request
241-
242-
# No longer in progress
243-
self._requests_in_progress.discard(request_id)
244-
# No longer handled
245-
self._requests_already_handled.discard(request_id)
246-
247-
if forefront:
248-
# Append to top of the local head estimation
249-
self._head_requests.append(request_id)
250-
251-
processed_request = await self._update_request(request, forefront=forefront)
252-
processed_request.id = request_id
253-
processed_request.unique_key = request.unique_key
254-
# If the request was previously handled, decrement our handled count since
255-
# we're putting it back for processing.
256-
if request.was_already_handled and not processed_request.was_already_handled:
257-
self.metadata.handled_request_count -= 1
258-
self.metadata.pending_request_count += 1
259-
260-
return processed_request
244+
try:
245+
# Make sure request is in the local cache. We might need it.
246+
self._requests_cache[request_id] = request
247+
248+
# No longer in progress
249+
self._requests_in_progress.discard(request_id)
250+
# No longer handled
251+
self._requests_already_handled.discard(request_id)
252+
253+
if forefront:
254+
# Append to top of the local head estimation
255+
self._head_requests.append(request_id)
256+
257+
processed_request = await self._update_request(request, forefront=forefront)
258+
processed_request.id = request_id
259+
processed_request.unique_key = request.unique_key
260+
# If the request was previously handled, decrement our handled count since
261+
# we're putting it back for processing.
262+
if request.was_already_handled and not processed_request.was_already_handled:
263+
self.metadata.handled_request_count -= 1
264+
self.metadata.pending_request_count += 1
265+
266+
except Exception as exc:
267+
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
268+
return None
269+
else:
270+
return processed_request
261271

262272
async def is_empty(self) -> bool:
263273
"""Specific implementation of this method for the RQ single access mode."""

0 commit comments

Comments
 (0)