Skip to content

Commit 202726d

Browse files
authored
fix: Gracefully close sitemap stream on SitemapRequestLoader abort (#1979)
### Description Use `contextlib.aclosing` to let the stream request finishes gracefully and avoids errors when closing generators. Example error: ```bash an error occurred during closing of asynchronous generator <async_generator object HttpxHttpClient.stream at 0x7bad2c3fbe10> asyncgen: <async_generator object HttpxHttpClient.stream at 0x7bad2c3fbe10> RuntimeError: aclose(): asynchronous generator is already running ```
1 parent 55cf290 commit 202726d

1 file changed

Lines changed: 40 additions & 34 deletions

File tree

src/crawlee/request_loaders/_sitemap_request_loader.py

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import asyncio
44
from collections import deque
5-
from contextlib import suppress
5+
from contextlib import aclosing, suppress
66
from logging import getLogger
77
from typing import TYPE_CHECKING, Annotated, Any
88

@@ -360,46 +360,52 @@ async def _load_sitemaps(self) -> None:
360360
)
361361
parsed_sitemap_url = URL(sitemap_url)
362362

363-
async for item in parse_sitemap(
364-
[SitemapSource(type='url', url=sitemap_url)],
365-
self._http_client,
366-
proxy_info=self._proxy_info,
367-
options=parse_options,
368-
):
369-
if isinstance(item, NestedSitemap):
370-
# Add nested sitemap to queue
371-
if item.loc not in state.pending_sitemap_urls and item.loc not in state.processed_sitemap_urls:
372-
if not self._passes_filters(item.loc, parsed_sitemap_url, 'nested sitemap'):
373-
continue
374-
state.pending_sitemap_urls.append(item.loc)
375-
continue
363+
async with aclosing(
364+
parse_sitemap(
365+
[SitemapSource(type='url', url=sitemap_url)],
366+
self._http_client,
367+
proxy_info=self._proxy_info,
368+
options=parse_options,
369+
)
370+
) as sitemap_items:
371+
async for item in sitemap_items:
372+
if isinstance(item, NestedSitemap):
373+
# Add nested sitemap to queue
374+
if (
375+
item.loc not in state.pending_sitemap_urls
376+
and item.loc not in state.processed_sitemap_urls
377+
):
378+
if not self._passes_filters(item.loc, parsed_sitemap_url, 'nested sitemap'):
379+
continue
380+
state.pending_sitemap_urls.append(item.loc)
381+
continue
376382

377-
if isinstance(item, SitemapUrl):
378-
url = item.loc
383+
if isinstance(item, SitemapUrl):
384+
url = item.loc
379385

380-
state = await self._get_state()
386+
state = await self._get_state()
381387

382-
# Skip if already processed
383-
if url in state.current_sitemap_processed_urls:
384-
continue
388+
# Skip if already processed
389+
if url in state.current_sitemap_processed_urls:
390+
continue
385391

386-
# Check if URL should be included
387-
if not self._check_url_patterns(url, self._include, self._exclude):
388-
continue
392+
# Check if URL should be included
393+
if not self._check_url_patterns(url, self._include, self._exclude):
394+
continue
389395

390-
if not self._passes_filters(url, parsed_sitemap_url, 'sitemap URL'):
391-
continue
396+
if not self._passes_filters(url, parsed_sitemap_url, 'sitemap URL'):
397+
continue
392398

393-
# Check if we have capacity in the queue
394-
await self._queue_has_capacity.wait()
399+
# Check if we have capacity in the queue
400+
await self._queue_has_capacity.wait()
395401

396-
async with self._queue_lock:
397-
state.url_queue.append(url)
398-
state.current_sitemap_processed_urls.add(url)
399-
state.total_count += 1
400-
if len(state.url_queue) >= self._max_buffer_size:
401-
# Notify that the queue is full
402-
self._queue_has_capacity.clear()
402+
async with self._queue_lock:
403+
state.url_queue.append(url)
404+
state.current_sitemap_processed_urls.add(url)
405+
state.total_count += 1
406+
if len(state.url_queue) >= self._max_buffer_size:
407+
# Notify that the queue is full
408+
self._queue_has_capacity.clear()
403409

404410
# Clear current sitemap after processing
405411
state = await self._get_state()

0 commit comments

Comments
 (0)