-
Notifications
You must be signed in to change notification settings - Fork 708
Expand file tree
/
Copy path_request_manager.py
More file actions
75 lines (61 loc) · 3.02 KB
/
_request_manager.py
File metadata and controls
75 lines (61 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import timedelta
from typing import TYPE_CHECKING
from crawlee._utils.docs import docs_group
from crawlee.request_loaders._request_loader import RequestLoader
from crawlee.storage_clients.models import ProcessedRequest
if TYPE_CHECKING:
from collections.abc import Sequence
from crawlee._request import Request
@docs_group('Request loaders')
class RequestManager(RequestLoader, ABC):
"""Base class that extends `RequestLoader` with the capability to enqueue new requests and reclaim failed ones."""
@abstractmethod
async def drop(self) -> None:
"""Remove persistent state either from the Apify Cloud storage or from the local database."""
@abstractmethod
async def add_request(
self,
request: str | Request,
*,
forefront: bool = False,
) -> ProcessedRequest | None:
"""Add a single request to the manager and store it in underlying resource client.
Args:
request: The request object (or its string representation) to be added to the manager.
forefront: Determines whether the request should be added to the beginning (if True) or the end (if False)
of the manager.
Returns:
Information about the request addition to the manager, or None if the request was not added.
"""
async def add_requests(
self,
requests: Sequence[str | Request],
*,
forefront: bool = False,
batch_size: int = 1000, # noqa: ARG002
wait_time_between_batches: timedelta = timedelta(seconds=1), # noqa: ARG002
wait_for_all_requests_to_be_added: bool = False, # noqa: ARG002
wait_for_all_requests_to_be_added_timeout: timedelta | None = None, # noqa: ARG002
) -> None:
"""Add requests to the manager in batches.
Args:
requests: Requests to enqueue.
forefront: If True, add requests to the beginning of the queue.
batch_size: The number of requests to add in one batch.
wait_time_between_batches: Time to wait between adding batches.
wait_for_all_requests_to_be_added: If True, wait for all requests to be added before returning.
wait_for_all_requests_to_be_added_timeout: Timeout for waiting for all requests to be added.
"""
# Default and dumb implementation.
processed_requests = list[ProcessedRequest]()
for request in requests:
processed_request = await self.add_request(request, forefront=forefront)
if processed_request is not None:
processed_requests.append(processed_request)
@abstractmethod
async def reclaim_request(self, request: Request, *, forefront: bool = False) -> ProcessedRequest | None:
"""Reclaims a failed request back to the source, so that it can be returned for processing later again.
It is possible to modify the request data by supplying an updated request as a parameter.
"""