Skip to content

Commit eeebe9b

Browse files
janbucharvdusek
andauthored
feat: Capture statistics about the crawler run (#142)
- closes #97 --------- Co-authored-by: Vlada Dusek <v.dusek96@gmail.com>
1 parent 56ee407 commit eeebe9b

16 files changed

Lines changed: 589 additions & 17 deletions

File tree

pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,11 @@ exclude_lines = [
195195
"if TYPE_CHECKING:",
196196
"assert_never()"
197197
]
198+
199+
[tool.basedpyright]
200+
reportPrivateLocalImportUsage = false
201+
reportUnusedCallResult = false
202+
reportUnusedVariable = false
203+
reportCallInDefaultInitializer = false
204+
reportImplicitStringConcatenation = false
205+
reportAny = false

src/crawlee/_utils/models.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import annotations
2+
3+
from datetime import timedelta
4+
from typing import Annotated, Any, Callable
5+
6+
from pydantic import PlainSerializer, WrapValidator
7+
8+
"""Utility types for Pydantic models."""
9+
10+
11+
def _timedelta_to_ms(td: timedelta | None) -> Any:
12+
if td == timedelta.max:
13+
return float('inf')
14+
15+
if td is None:
16+
return td
17+
18+
return int(round(td.total_seconds() * 1000))
19+
20+
21+
def _timedelta_from_ms(value: float | timedelta | Any | None, handler: Callable[[Any], Any]) -> Any:
22+
if value == float('inf'):
23+
return timedelta.max
24+
25+
if not isinstance(value, (int, float)):
26+
return handler(value)
27+
28+
return timedelta(milliseconds=value)
29+
30+
31+
timedelta_ms = Annotated[timedelta, PlainSerializer(_timedelta_to_ms), WrapValidator(_timedelta_from_ms)]

src/crawlee/basic_crawler/basic_crawler.py

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import timedelta
77
from functools import partial
88
from logging import getLogger
9-
from typing import TYPE_CHECKING, AsyncGenerator, Awaitable, Callable, Generic, Sequence, Union, cast
9+
from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Generic, Sequence, Union, cast
1010

1111
import httpx
1212
from tldextract import TLDExtract
@@ -30,7 +30,6 @@
3030
from crawlee.basic_crawler.router import Router
3131
from crawlee.basic_crawler.types import (
3232
BasicCrawlingContext,
33-
FinalStatistics,
3433
RequestHandlerRunResult,
3534
SendRequestFunction,
3635
)
@@ -40,13 +39,15 @@
4039
from crawlee.http_clients.httpx_client import HttpxClient
4140
from crawlee.models import BaseRequestData, Request, RequestState
4241
from crawlee.sessions import SessionPool
42+
from crawlee.statistics.statistics import Statistics
4343
from crawlee.storages.request_queue import RequestQueue
4444

4545
if TYPE_CHECKING:
4646
import re
4747

4848
from crawlee.http_clients.base_http_client import BaseHttpClient, HttpResponse
4949
from crawlee.sessions.session import Session
50+
from crawlee.statistics.models import FinalStatistics, StatisticsState
5051
from crawlee.storages.request_provider import RequestProvider
5152

5253
TCrawlingContext = TypeVar('TCrawlingContext', bound=BasicCrawlingContext, default=BasicCrawlingContext)
@@ -70,6 +71,7 @@ class BasicCrawlerOptions(TypedDict, Generic[TCrawlingContext]):
7071
session_pool: NotRequired[SessionPool]
7172
use_session_pool: NotRequired[bool]
7273
retry_on_blocked: NotRequired[bool]
74+
statistics: NotRequired[Statistics[StatisticsState]]
7375
_context_pipeline: NotRequired[ContextPipeline[TCrawlingContext]]
7476

7577

@@ -98,6 +100,7 @@ def __init__(
98100
session_pool: SessionPool | None = None,
99101
use_session_pool: bool = True,
100102
retry_on_blocked: bool = True,
103+
statistics: Statistics | None = None,
101104
_context_pipeline: ContextPipeline[TCrawlingContext] | None = None,
102105
) -> None:
103106
"""Initialize the BasicCrawler.
@@ -114,8 +117,9 @@ def __init__(
114117
configuration: Crawler configuration
115118
request_handler_timeout: How long is a single request handler allowed to run
116119
use_session_pool: Enables using the session pool for crawling
117-
session_pool: A preconfigured SessionPool instance if you wish to use non-default configuration
120+
session_pool: A preconfigured `SessionPool` instance if you wish to use non-default configuration
118121
retry_on_blocked: If set to True, the crawler will try to automatically bypass any detected bot protection
122+
statistics: A preconfigured `Statistics` instance if you wish to use non-default configuration
119123
_context_pipeline: Allows extending the request lifecycle and modifying the crawling context.
120124
This parameter is meant to be used by child classes, not when BasicCrawler is instantiated directly.
121125
"""
@@ -165,6 +169,14 @@ def __init__(
165169

166170
self._retry_on_blocked = retry_on_blocked
167171

172+
self._statistics = statistics or Statistics(
173+
event_manager=self._event_manager,
174+
log_message=f'{logger.name} request statistics',
175+
)
176+
177+
self._running = False
178+
self._has_finished_before = False
179+
168180
@property
169181
def router(self) -> Router[TCrawlingContext]:
170182
"""The router used to handle each individual crawling request."""
@@ -180,6 +192,11 @@ def router(self, router: Router[TCrawlingContext]) -> None:
180192

181193
self._router = router
182194

195+
@property
196+
def statistics(self) -> Statistics[StatisticsState]:
197+
"""Statistics about the current (or last) crawler run."""
198+
return self._statistics
199+
183200
async def _get_session(self) -> Session | None:
184201
"""If session pool is being used, try to take a session from it."""
185202
if not self._use_session_pool:
@@ -235,19 +252,43 @@ async def add_requests(
235252

236253
async def run(self, requests: list[str | BaseRequestData] | None = None) -> FinalStatistics:
237254
"""Run the crawler until all requests are processed."""
255+
if self._running:
256+
raise RuntimeError(
257+
'This crawler instance is already running, you can add more requests to it via `crawler.add_requests()`'
258+
)
259+
260+
self._running = True
261+
262+
if self._has_finished_before:
263+
await self._statistics.reset()
264+
265+
if self._use_session_pool:
266+
await self._session_pool.reset_store()
267+
238268
if requests is not None:
239269
await self.add_requests(requests)
240270

241271
async with AsyncExitStack() as exit_stack:
242272
await exit_stack.enter_async_context(self._event_manager)
243273
await exit_stack.enter_async_context(self._snapshotter)
274+
await exit_stack.enter_async_context(self._statistics)
244275

245276
if self._use_session_pool:
246277
await exit_stack.enter_async_context(self._session_pool)
247278

248279
await self._pool.run()
249280

250-
return FinalStatistics()
281+
if self._statistics.error_tracker.total > 0:
282+
logger.info(
283+
'Error analysis:'
284+
f' total_errors={self._statistics.error_tracker.total}'
285+
f' unique_errors={self._statistics.error_tracker.unique_error_count}'
286+
)
287+
288+
self._running = False
289+
self._has_finished_before = True
290+
291+
return self._statistics.calculate()
251292

252293
def _should_retry_request(self, crawling_context: BasicCrawlingContext, error: Exception) -> bool:
253294
if crawling_context.request.no_retry:
@@ -298,13 +339,13 @@ def _check_enqueue_strategy(
298339
if strategy == EnqueueStrategy.ALL:
299340
return True
300341

301-
assert_never()
342+
assert_never(strategy)
302343

303344
def _check_url_patterns(
304345
self,
305346
target_url: httpx.URL,
306-
include: Sequence[re.Pattern | Glob] | None,
307-
exclude: Sequence[re.Pattern | Glob] | None,
347+
include: Sequence[re.Pattern[Any] | Glob] | None,
348+
exclude: Sequence[re.Pattern[Any] | Glob] | None,
308349
) -> bool:
309350
"""Check if a URL matches configured include/exclude patterns."""
310351
# If the URL matches any `exclude` pattern, reject it
@@ -332,10 +373,11 @@ def _check_url_patterns(
332373

333374
async def _handle_request_error(self, crawling_context: TCrawlingContext, error: Exception) -> None:
334375
request_provider = await self.get_request_provider()
376+
request = crawling_context.request
335377

336378
if self._should_retry_request(crawling_context, error):
337-
request = crawling_context.request
338379
request.retry_count += 1
380+
self._statistics.error_tracker.add(error)
339381

340382
if self._error_handler:
341383
try:
@@ -357,9 +399,11 @@ async def _handle_request_error(self, crawling_context: TCrawlingContext, error:
357399
max_retries=3,
358400
)
359401
await self._handle_failed_request(crawling_context, error)
402+
self._statistics.record_request_processing_failure(request.id or request.unique_key)
360403

361404
async def _handle_failed_request(self, crawling_context: TCrawlingContext, error: Exception) -> None:
362405
logger.exception('Request failed and reached maximum retries', exc_info=error)
406+
self._statistics.error_tracker.add(error)
363407

364408
if self._failed_request_handler:
365409
try:
@@ -441,6 +485,9 @@ async def __run_task_function(self) -> None: # noqa: PLR0912
441485
add_requests=result.add_requests,
442486
)
443487

488+
statistics_id = request.id or request.unique_key
489+
self._statistics.record_request_processing_start(statistics_id)
490+
444491
try:
445492
request.state = RequestState.REQUEST_HANDLER
446493

@@ -467,6 +514,8 @@ async def __run_task_function(self) -> None: # noqa: PLR0912
467514

468515
if crawling_context.session:
469516
crawling_context.session.mark_good()
517+
518+
self._statistics.record_request_processing_finish(statistics_id)
470519
except RequestHandlerError as primary_error:
471520
primary_error = cast(
472521
RequestHandlerError[TCrawlingContext], primary_error
@@ -514,6 +563,7 @@ async def __run_task_function(self) -> None: # noqa: PLR0912
514563
crawling_context.request.session_rotation_count += 1
515564

516565
await request_provider.reclaim_request(request)
566+
self._statistics.error_tracker_retry.add(session_error)
517567
else:
518568
logger.exception('Request failed and reached maximum retries', exc_info=session_error)
519569

@@ -525,6 +575,9 @@ async def __run_task_function(self) -> None: # noqa: PLR0912
525575
logger=logger,
526576
max_retries=3,
527577
)
578+
579+
self._statistics.record_request_processing_failure(statistics_id)
580+
self._statistics.error_tracker.add(session_error)
528581
except ContextPipelineInterruptedError as interruped_error:
529582
logger.debug('The context pipeline was interrupted', exc_info=interruped_error)
530583

src/crawlee/basic_crawler/types.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ class BasicCrawlingContext:
6868
add_requests: AddRequestsFunction
6969

7070

71-
@dataclass(frozen=True)
72-
class FinalStatistics:
73-
"""Statistics about a crawler run."""
74-
75-
7671
class AddRequestsFunctionCall(AddRequestsFunctionKwargs):
7772
"""Record of a call to `add_requests`."""
7873

src/crawlee/beautifulsoup_crawler/beautifulsoup_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262
super().__init__(**kwargs)
6363

6464
async def _make_http_request(self, context: BasicCrawlingContext) -> AsyncGenerator[HttpCrawlingContext, None]:
65-
result = await self._http_client.crawl(context.request, context.session)
65+
result = await self._http_client.crawl(context.request, context.session, self._statistics)
6666

6767
yield HttpCrawlingContext(
6868
request=context.request,

src/crawlee/http_clients/base_http_client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from crawlee.models import Request
1111
from crawlee.sessions.session import Session
12+
from crawlee.statistics.statistics import Statistics
1213

1314

1415
class HttpResponse(Protocol):
@@ -48,7 +49,12 @@ def __init__(
4849
self._ignore_http_error_status_codes = set(ignore_http_error_status_codes)
4950

5051
@abstractmethod
51-
async def crawl(self, request: Request, session: Session | None) -> HttpCrawlingResult:
52+
async def crawl(
53+
self,
54+
request: Request,
55+
session: Session | None,
56+
statistics: Statistics,
57+
) -> HttpCrawlingResult:
5258
"""Perform a crawl of an URL."""
5359

5460
@abstractmethod

src/crawlee/http_clients/httpx_client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
if TYPE_CHECKING:
1414
from crawlee.models import Request
15+
from crawlee.statistics.statistics import Statistics
1516

1617

1718
class HttpTransport(httpx.AsyncHTTPTransport):
@@ -64,7 +65,7 @@ def __init__(
6465
self._client = httpx.AsyncClient(transport=HttpTransport())
6566

6667
@override
67-
async def crawl(self, request: Request, session: Session | None) -> HttpCrawlingResult:
68+
async def crawl(self, request: Request, session: Session | None, statistics: Statistics) -> HttpCrawlingResult:
6869
http_request = self._client.build_request(
6970
method=request.method,
7071
url=request.url,
@@ -81,6 +82,8 @@ async def crawl(self, request: Request, session: Session | None) -> HttpCrawling
8182

8283
raise
8384

85+
statistics.register_status_code(response.status_code)
86+
8487
exclude_error = response.status_code in self._ignore_http_error_status_codes
8588
include_error = response.status_code in self._additional_http_error_status_codes
8689

src/crawlee/http_crawler/http_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def __init__(
5151
async def _make_http_request(
5252
self, crawling_context: BasicCrawlingContext
5353
) -> AsyncGenerator[HttpCrawlingContext, None]:
54-
result = await self._http_client.crawl(crawling_context.request, crawling_context.session)
54+
result = await self._http_client.crawl(crawling_context.request, crawling_context.session, self._statistics)
5555

5656
yield HttpCrawlingContext(
5757
request=crawling_context.request,

src/crawlee/statistics/__init__.py

Whitespace-only changes.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Inspiration: https://github.com/apify/crawlee/blob/v3.9.2/packages/utils/src/internals/error_tracker.ts
2+
3+
from __future__ import annotations
4+
5+
from collections import Counter
6+
from dataclasses import dataclass
7+
8+
9+
@dataclass(frozen=True, unsafe_hash=True)
10+
class ErrorGroup:
11+
"""Identifies a group of similar errors."""
12+
13+
class_name: str | None
14+
15+
16+
class ErrorTracker:
17+
"""Track errors and aggregates their counts by similarity."""
18+
19+
def __init__(self) -> None:
20+
self._errors = Counter[ErrorGroup]()
21+
22+
def add(self, error: Exception) -> None:
23+
"""Include an error in the statistics."""
24+
error_group = ErrorGroup(class_name=error.__class__.__name__)
25+
self._errors[error_group] += 1
26+
27+
@property
28+
def unique_error_count(self) -> int:
29+
"""Number of distinct kinds of errors."""
30+
return len(self._errors)
31+
32+
@property
33+
def total(self) -> int:
34+
"""Total number of errors."""
35+
return sum(self._errors.values())

0 commit comments

Comments
 (0)