Skip to content

Commit 6226d93

Browse files
authored
fix: Fix potential deadlocks in SitemapRequestLoader and RequestManagerTandem (#1843)
### Description - This PR fixes several issues in `SitemapRequestLoader` and `RequestManagerTandem` that could cause the crawler to deadlock or stall indefinitely. ### Issues - Relates: #1842
1 parent 2eda280 commit 6226d93

File tree

3 files changed

+95
-5
lines changed

3 files changed

+95
-5
lines changed

src/crawlee/request_loaders/_request_manager_tandem.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,12 @@ async def fetch_next_request(self) -> Request | None:
8989
'Adding request from the RequestLoader to the RequestManager failed, the request has been dropped',
9090
extra={'url': request.url, 'unique_key': request.unique_key},
9191
)
92-
return None
9392

94-
await self._read_only_loader.mark_request_as_handled(request)
93+
return None
94+
finally:
95+
# Mark it as processed so that the `request` doesn't get stuck in the `in_progress` status
96+
# in `RequestLoader`
97+
await self._read_only_loader.mark_request_as_handled(request)
9598

9699
return await self._read_write_manager.fetch_next_request()
97100

src/crawlee/request_loaders/_sitemap_request_loader.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,11 @@ def __init__(
160160

161161
async def _get_state(self) -> SitemapRequestLoaderState:
162162
"""Initialize and return the current state."""
163+
if self._state.is_initialized:
164+
return self._state.current_value
165+
163166
async with self._queue_lock:
167+
# Re-check if state got initialized while waiting for the lock
164168
if self._state.is_initialized:
165169
return self._state.current_value
166170

@@ -260,7 +264,6 @@ async def _load_sitemaps(self) -> None:
260264
# Check if we have capacity in the queue
261265
await self._queue_has_capacity.wait()
262266

263-
state = await self._get_state()
264267
async with self._queue_lock:
265268
state.url_queue.append(url)
266269
state.current_sitemap_processed_urls.add(url)
@@ -318,19 +321,26 @@ async def fetch_next_request(self) -> Request | None:
318321
continue
319322

320323
async with self._queue_lock:
324+
# Double-check if the queue is still not empty after acquiring the lock
325+
if not state.url_queue:
326+
continue
327+
321328
url = state.url_queue.popleft()
322329
request_option = RequestOptions(url=url)
330+
331+
if len(state.url_queue) < self._max_buffer_size:
332+
self._queue_has_capacity.set()
333+
323334
if self._transform_request_function:
324335
transform_request_option = self._transform_request_function(request_option)
325336
if transform_request_option == 'skip':
326337
state.total_count -= 1
327338
continue
328339
if transform_request_option != 'unchanged':
329340
request_option = transform_request_option
341+
330342
request = Request.from_url(**request_option)
331343
state.in_progress.add(request.url)
332-
if len(state.url_queue) < self._max_buffer_size:
333-
self._queue_has_capacity.set()
334344

335345
return request
336346

tests/unit/request_loaders/test_sitemap_request_loader.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import base64
33
import gzip
44
from typing import TYPE_CHECKING
5+
from unittest.mock import patch
56

67
from yarl import URL
78

@@ -216,3 +217,79 @@ def transform_request(request_options: RequestOptions) -> RequestOptions | Reque
216217
'http://not-exists.com/catalog?item=74&desc=vacation_newfoundland',
217218
'http://not-exists.com/catalog?item=83&desc=vacation_usa',
218219
}
220+
221+
222+
async def test_transform_request_function_with_skip(server_url: URL, http_client: HttpClient) -> None:
223+
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))
224+
225+
def transform_request(_request_options: RequestOptions) -> RequestOptions | RequestTransformAction:
226+
return 'skip'
227+
228+
sitemap_loader = SitemapRequestLoader(
229+
[str(sitemap_url)],
230+
http_client=http_client,
231+
transform_request_function=transform_request,
232+
)
233+
234+
while not await sitemap_loader.is_finished():
235+
request = await sitemap_loader.fetch_next_request()
236+
237+
if request:
238+
await sitemap_loader.mark_request_as_handled(request)
239+
240+
# Even though the sitemap had URLs, all were skipped, so the loader should be empty and finished with
241+
# 0 handled requests.
242+
assert await sitemap_loader.is_empty()
243+
assert await sitemap_loader.is_finished()
244+
assert await sitemap_loader.get_total_count() == 0
245+
assert await sitemap_loader.get_handled_count() == 0
246+
247+
248+
async def test_sitemap_loader_to_tandem(
249+
server_url: URL,
250+
http_client: HttpClient,
251+
) -> None:
252+
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))
253+
254+
sitemap_loader = SitemapRequestLoader([str(sitemap_url)], http_client=http_client)
255+
request_manager = await sitemap_loader.to_tandem()
256+
257+
while not await sitemap_loader.is_finished():
258+
request = await request_manager.fetch_next_request()
259+
260+
if request:
261+
await request_manager.mark_request_as_handled(request)
262+
263+
assert await sitemap_loader.is_empty()
264+
assert await sitemap_loader.is_finished()
265+
266+
assert await request_manager.is_empty()
267+
assert await request_manager.is_finished()
268+
269+
270+
async def test_sitemap_loader_to_tandem_with_request_dropped(
271+
server_url: URL,
272+
http_client: HttpClient,
273+
) -> None:
274+
sitemap_url = (server_url / 'sitemap.xml').with_query(base64=encode_base64(BASIC_SITEMAP.encode()))
275+
276+
sitemap_loader = SitemapRequestLoader(
277+
[str(sitemap_url)],
278+
http_client=http_client,
279+
)
280+
request_manager = await sitemap_loader.to_tandem()
281+
282+
with patch.object(
283+
request_manager._read_write_manager, 'add_request', side_effect=Exception('Failed to add request')
284+
):
285+
while not await sitemap_loader.is_finished():
286+
request = await request_manager.fetch_next_request()
287+
288+
if request:
289+
await request_manager.mark_request_as_handled(request)
290+
291+
assert await sitemap_loader.is_empty()
292+
assert await sitemap_loader.is_finished()
293+
294+
assert await request_manager.is_empty()
295+
assert await request_manager.is_finished()

0 commit comments

Comments
 (0)