Skip to content

Commit 1065e9b

Browse files
committed
refactor: reimplement ThrottlingRequestManager with per-domain sub-queues and update its integration across crawlers.
1 parent 62ab3b8 commit 1065e9b

File tree

7 files changed

+391
-315
lines changed

7 files changed

+391
-315
lines changed

src/crawlee/_utils/http.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def parse_retry_after_header(value: str | None) -> timedelta | None:
1515
value: The raw Retry-After header value.
1616
1717
Returns:
18-
A timedelta representing the delay, or None if the header is missing or unparseable.
18+
A timedelta representing the delay, or None if the header is missing or unparsable.
1919
"""
2020
if not value:
2121
return None

src/crawlee/crawlers/_abstract_http/_abstract_http_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ async def _handle_status_code_response(
282282
self._raise_for_session_blocked_status_code(
283283
context.session,
284284
status_code,
285-
url=context.request.url,
285+
request_url=context.request.url,
286286
retry_after_header=context.http_response.headers.get('retry-after'),
287287
)
288288
self._raise_for_error_status_code(status_code)

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from asyncio import CancelledError
1313
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence
1414
from contextlib import AsyncExitStack, suppress
15-
from datetime import datetime, timedelta, timezone
15+
from datetime import timedelta
1616
from functools import partial
1717
from io import StringIO
1818
from pathlib import Path
@@ -45,9 +45,8 @@
4545
)
4646
from crawlee._utils.docs import docs_group
4747
from crawlee._utils.file import atomic_write, export_csv_to_stream, export_json_to_stream
48-
from crawlee._utils.recurring_task import RecurringTask
4948
from crawlee._utils.http import parse_retry_after_header
50-
from crawlee.request_loaders import ThrottlingRequestManager
49+
from crawlee._utils.recurring_task import RecurringTask
5150
from crawlee._utils.robots import RobotsTxtFile
5251
from crawlee._utils.urls import convert_to_absolute_url, is_url_absolute
5352
from crawlee._utils.wait import wait_for
@@ -65,6 +64,7 @@
6564
)
6665
from crawlee.events._types import Event, EventCrawlerStatusData
6766
from crawlee.http_clients import ImpitHttpClient
67+
from crawlee.request_loaders import ThrottlingRequestManager
6868
from crawlee.router import Router
6969
from crawlee.sessions import SessionPool
7070
from crawlee.statistics import Statistics, StatisticsState
@@ -487,7 +487,6 @@ async def persist_state_factory() -> KeyValueStore:
487487
self._robots_txt_file_cache: LRUCache[str, RobotsTxtFile] = LRUCache(maxsize=1000)
488488
self._robots_txt_lock = asyncio.Lock()
489489
self._tld_extractor = TLDExtract(cache_dir=tempfile.TemporaryDirectory().name)
490-
self._throttling_manager: ThrottlingRequestManager | None = None
491490
self._snapshotter = Snapshotter.from_config(config)
492491
self._autoscaled_pool = AutoscaledPool(
493492
system_status=SystemStatus(self._snapshotter),
@@ -624,8 +623,7 @@ async def get_request_manager(self) -> RequestManager:
624623
storage_client=self._service_locator.get_storage_client(),
625624
configuration=self._service_locator.get_configuration(),
626625
)
627-
self._throttling_manager = ThrottlingRequestManager(inner)
628-
self._request_manager = self._throttling_manager
626+
self._request_manager = ThrottlingRequestManager(inner)
629627

630628
return self._request_manager
631629

@@ -716,12 +714,21 @@ async def run(
716714
await self._session_pool.reset_store()
717715

718716
request_manager = await self.get_request_manager()
719-
if purge_request_queue and isinstance(request_manager, RequestQueue):
720-
await request_manager.drop()
721-
self._request_manager = await RequestQueue.open(
722-
storage_client=self._service_locator.get_storage_client(),
723-
configuration=self._service_locator.get_configuration(),
724-
)
717+
if purge_request_queue:
718+
if isinstance(request_manager, RequestQueue):
719+
await request_manager.drop()
720+
self._request_manager = await RequestQueue.open(
721+
storage_client=self._service_locator.get_storage_client(),
722+
configuration=self._service_locator.get_configuration(),
723+
)
724+
elif isinstance(request_manager, ThrottlingRequestManager):
725+
await request_manager.drop()
726+
inner = await RequestQueue.open(
727+
storage_client=self._service_locator.get_storage_client(),
728+
configuration=self._service_locator.get_configuration(),
729+
)
730+
self._throttling_manager = ThrottlingRequestManager(inner)
731+
self._request_manager = self._throttling_manager
725732

726733
if requests is not None:
727734
await self.add_requests(requests)
@@ -1452,8 +1459,8 @@ async def __run_task_function(self) -> None:
14521459
await self._mark_request_as_handled(request)
14531460

14541461
# Record successful request to reset rate limit backoff for this domain.
1455-
if self._throttling_manager:
1456-
self._throttling_manager.record_success(request.url)
1462+
if isinstance(request_manager, ThrottlingRequestManager):
1463+
request_manager.record_success(request.url)
14571464

14581465
if session and session.is_usable:
14591466
session.mark_good()
@@ -1560,27 +1567,30 @@ def _raise_for_session_blocked_status_code(
15601567
session: Session | None,
15611568
status_code: int,
15621569
*,
1563-
url: str = '',
1570+
request_url: str = '',
15641571
retry_after_header: str | None = None,
15651572
) -> None:
15661573
"""Raise an exception if the given status code indicates the session is blocked.
15671574
15681575
If the status code is 429 (Too Many Requests), the domain is recorded as
1569-
rate-limited in the `RequestThrottler` for per-domain backoff.
1576+
rate-limited in the `ThrottlingRequestManager` for per-domain backoff.
15701577
15711578
Args:
15721579
session: The session used for the request. If None, no check is performed.
15731580
status_code: The HTTP status code to check.
1574-
url: The request URL, used for per-domain rate limit tracking.
1581+
request_url: The request URL, used for per-domain rate limit tracking.
15751582
retry_after_header: The value of the Retry-After response header, if present.
15761583
15771584
Raises:
15781585
SessionError: If the status code indicates the session is blocked.
15791586
"""
1580-
if status_code == 429 and url:
1587+
if status_code == 429 and request_url: # noqa: PLR2004
15811588
retry_after = parse_retry_after_header(retry_after_header)
1582-
if self._throttling_manager:
1583-
self._throttling_manager.record_domain_delay(url, retry_after=retry_after)
1589+
1590+
# _request_manager might not be initialized yet if called directly or early,
1591+
# but usually it's set in get_request_manager().
1592+
if isinstance(self._request_manager, ThrottlingRequestManager):
1593+
self._request_manager.record_domain_delay(request_url, retry_after=retry_after)
15841594

15851595
if session is not None and session.is_blocked_status_code(
15861596
status_code=status_code,
@@ -1589,7 +1599,6 @@ def _raise_for_session_blocked_status_code(
15891599
raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}')
15901600

15911601
# NOTE: _parse_retry_after_header has been moved to crawlee._utils.http.parse_retry_after_header
1592-
15931602
def _check_request_collision(self, request: Request, session: Session | None) -> None:
15941603
"""Raise an exception if a request cannot access required resources.
15951604
@@ -1617,11 +1626,11 @@ async def _is_allowed_based_on_robots_txt_file(self, url: str) -> bool:
16171626
if not robots_txt_file:
16181627
return True
16191628

1620-
# Wire robots.txt crawl-delay into ThrottlingRequestManager (#1396).
1621-
if self._throttling_manager:
1629+
# Wire robots.txt crawl-delay into ThrottlingRequestManager
1630+
if isinstance(self._request_manager, ThrottlingRequestManager):
16221631
crawl_delay = robots_txt_file.get_crawl_delay()
16231632
if crawl_delay is not None:
1624-
self._throttling_manager.set_crawl_delay(url, crawl_delay)
1633+
self._request_manager.set_crawl_delay(url, crawl_delay)
16251634

16261635
return robots_txt_file.is_allowed(url)
16271636

src/crawlee/crawlers/_playwright/_playwright_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ async def _handle_status_code_response(
463463
self._raise_for_session_blocked_status_code(
464464
context.session,
465465
status_code,
466-
url=context.request.url,
466+
request_url=context.request.url,
467467
retry_after_header=retry_after_header,
468468
)
469469
self._raise_for_error_status_code(status_code)

0 commit comments

Comments
 (0)