Skip to content

Commit 4bd1ca3

Browse files
vdusekclaude
andcommitted
fix: stop silently swallowing exceptions in RQ mark_request_as_handled and reclaim_request
Both the single and shared request queue clients caught all exceptions in `mark_request_as_handled` and `reclaim_request`, logged them at DEBUG level, and returned `None`. This made it impossible for callers to distinguish between "request was not in progress" (legitimate None per the base class contract) and "API call failed" (swallowed error). Silently swallowing errors also meant metadata counters could drift from platform state without any visible indication of the problem. Remove the catch-all `except Exception` blocks and let exceptions propagate naturally so callers can handle failures appropriately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 56aa42e commit 4bd1ca3

File tree

2 files changed

+72
-91
lines changed

2 files changed

+72
-91
lines changed

src/apify/storage_clients/_apify/_request_queue_shared_client.py

Lines changed: 39 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -218,28 +218,24 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
218218

219219
if cached_request := self._requests_cache[request_id]:
220220
cached_request.was_already_handled = request.was_already_handled
221-
try:
222-
# Update the request in the API
223-
processed_request = await self._update_request(request)
224-
processed_request.id = request_id
225-
processed_request.unique_key = request.unique_key
226-
227-
# Update assumed handled count if this wasn't already handled
228-
if not processed_request.was_already_handled:
229-
self.metadata.handled_request_count += 1
230-
self.metadata.pending_request_count -= 1
221+
# Update the request in the API
222+
processed_request = await self._update_request(request)
223+
processed_request.id = request_id
224+
processed_request.unique_key = request.unique_key
225+
226+
# Update assumed handled count if this wasn't already handled
227+
if not processed_request.was_already_handled:
228+
self.metadata.handled_request_count += 1
229+
self.metadata.pending_request_count -= 1
230+
231+
# Update the cache with the handled request
232+
self._cache_request(
233+
cache_key=request_id,
234+
processed_request=processed_request,
235+
hydrated_request=request,
236+
)
231237

232-
# Update the cache with the handled request
233-
self._cache_request(
234-
cache_key=request_id,
235-
processed_request=processed_request,
236-
hydrated_request=request,
237-
)
238-
except Exception as exc:
239-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
240-
return None
241-
else:
242-
return processed_request
238+
return processed_request
243239

244240
async def reclaim_request(
245241
self,
@@ -255,35 +251,30 @@ async def reclaim_request(
255251

256252
# Reclaim with lock to prevent race conditions that could lead to double processing of the same request.
257253
async with self._fetch_lock:
258-
try:
259-
# Update the request in the API.
260-
processed_request = await self._update_request(request, forefront=forefront)
261-
processed_request.unique_key = request.unique_key
262-
263-
# If the request was previously handled, decrement our handled count since
264-
# we're putting it back for processing.
265-
if request.was_already_handled and not processed_request.was_already_handled:
266-
self.metadata.handled_request_count -= 1
267-
self.metadata.pending_request_count += 1
268-
269-
# Update the cache
270-
cache_key = request.unique_key
271-
self._cache_request(
272-
cache_key,
273-
processed_request,
274-
hydrated_request=request,
275-
)
254+
# Update the request in the API.
255+
processed_request = await self._update_request(request, forefront=forefront)
256+
processed_request.unique_key = request.unique_key
276257

277-
# If we're adding to the forefront, we need to check for forefront requests
278-
# in the next list_head call
279-
if forefront:
280-
self._should_check_for_forefront_requests = True
258+
# If the request was previously handled, decrement our handled count since
259+
# we're putting it back for processing.
260+
if request.was_already_handled and not processed_request.was_already_handled:
261+
self.metadata.handled_request_count -= 1
262+
self.metadata.pending_request_count += 1
281263

282-
except Exception as exc:
283-
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
284-
return None
285-
else:
286-
return processed_request
264+
# Update the cache
265+
cache_key = request.unique_key
266+
self._cache_request(
267+
cache_key,
268+
processed_request,
269+
hydrated_request=request,
270+
)
271+
272+
# If we're adding to the forefront, we need to check for forefront requests
273+
# in the next list_head call
274+
if forefront:
275+
self._should_check_for_forefront_requests = True
276+
277+
return processed_request
287278

288279
async def is_empty(self) -> bool:
289280
"""Specific implementation of this method for the RQ shared access mode."""

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,17 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
209209
if cached_request := self._requests_cache.get(request_id):
210210
cached_request.handled_at = request.handled_at
211211

212-
try:
213-
# Update the request in the API
214-
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
215-
# adding to the queue.)
216-
processed_request = await self._update_request(request)
217-
# Remember that we handled this request, to optimize local deduplication.
218-
self._requests_already_handled.add(request_id)
219-
# Remove request from cache. It will most likely not be needed.
220-
self._requests_cache.pop(request_id)
221-
self._requests_in_progress.discard(request_id)
222-
223-
except Exception as exc:
224-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
225-
return None
226-
else:
227-
return processed_request
212+
# Update the request in the API
213+
# Works as upsert - adds the request if it does not exist yet. (Local request that was handled before
214+
# adding to the queue.)
215+
processed_request = await self._update_request(request)
216+
# Remember that we handled this request, to optimize local deduplication.
217+
self._requests_already_handled.add(request_id)
218+
# Remove request from cache. It will most likely not be needed.
219+
self._requests_cache.pop(request_id)
220+
self._requests_in_progress.discard(request_id)
221+
222+
return processed_request
228223

229224
async def reclaim_request(
230225
self,
@@ -241,33 +236,28 @@ async def reclaim_request(
241236
if request.was_already_handled:
242237
request.handled_at = None
243238

244-
try:
245-
# Make sure request is in the local cache. We might need it.
246-
self._requests_cache[request_id] = request
247-
248-
# No longer in progress
249-
self._requests_in_progress.discard(request_id)
250-
# No longer handled
251-
self._requests_already_handled.discard(request_id)
252-
253-
if forefront:
254-
# Append to top of the local head estimation
255-
self._head_requests.append(request_id)
256-
257-
processed_request = await self._update_request(request, forefront=forefront)
258-
processed_request.id = request_id
259-
processed_request.unique_key = request.unique_key
260-
# If the request was previously handled, decrement our handled count since
261-
# we're putting it back for processing.
262-
if request.was_already_handled and not processed_request.was_already_handled:
263-
self.metadata.handled_request_count -= 1
264-
self.metadata.pending_request_count += 1
265-
266-
except Exception as exc:
267-
logger.debug(f'Error reclaiming request {request.unique_key}: {exc!s}')
268-
return None
269-
else:
270-
return processed_request
239+
# Make sure request is in the local cache. We might need it.
240+
self._requests_cache[request_id] = request
241+
242+
# No longer in progress
243+
self._requests_in_progress.discard(request_id)
244+
# No longer handled
245+
self._requests_already_handled.discard(request_id)
246+
247+
if forefront:
248+
# Append to top of the local head estimation
249+
self._head_requests.append(request_id)
250+
251+
processed_request = await self._update_request(request, forefront=forefront)
252+
processed_request.id = request_id
253+
processed_request.unique_key = request.unique_key
254+
# If the request was previously handled, decrement our handled count since
255+
# we're putting it back for processing.
256+
if request.was_already_handled and not processed_request.was_already_handled:
257+
self.metadata.handled_request_count -= 1
258+
self.metadata.pending_request_count += 1
259+
260+
return processed_request
271261

272262
async def is_empty(self) -> bool:
273263
"""Specific implementation of this method for the RQ single access mode."""

0 commit comments

Comments
 (0)