11from __future__ import annotations
22
3- import re
4- from base64 import b64encode
5- from hashlib import sha256
63from logging import getLogger
7- from typing import TYPE_CHECKING , Final
4+ from typing import TYPE_CHECKING , Final , Literal
85
96from typing_extensions import override
107
118from apify_client import ApifyClientAsync
129from crawlee ._utils .crypto import crypto_random_object_id
1310from crawlee .storage_clients ._base import RequestQueueClient
14- from crawlee .storage_clients .models import RequestQueueMetadata
11+ from crawlee .storage_clients .models import AddRequestsResponse , ProcessedRequest , RequestQueueMetadata
1512from crawlee .storages import RequestQueue
1613
1714from ._models import ApifyRequestQueueMetadata , RequestQueueStats
15+ from ._request_queue_shared_client import _ApifyRequestQueueSharedClient
16+ from ._request_queue_single_client import _ApifyRequestQueueSingleClient
1817from ._utils import AliasResolver
1918
2019if TYPE_CHECKING :
20+ from collections .abc import Sequence
21+
2122 from apify_client .clients import RequestQueueClientAsync
23+ from crawlee import Request
2224
2325 from apify import Configuration
2426
2527logger = getLogger (__name__ )
2628
2729
28- def unique_key_to_request_id (unique_key : str , * , request_id_length : int = 15 ) -> str :
29- """Generate a deterministic request ID based on a unique key.
30-
31- Args:
32- unique_key: The unique key to convert into a request ID.
33- request_id_length: The length of the request ID.
34-
35- Returns:
36- A URL-safe, truncated request ID based on the unique key.
37- """
38- # Encode the unique key and compute its SHA-256 hash
39- hashed_key = sha256 (unique_key .encode ('utf-8' )).digest ()
40-
41- # Encode the hash in base64 and decode it to get a string
42- base64_encoded = b64encode (hashed_key ).decode ('utf-8' )
43-
44- # Remove characters that are not URL-safe ('+', '/', or '=')
45- url_safe_key = re .sub (r'(\+|\/|=)' , '' , base64_encoded )
46-
47- # Truncate the key to the desired length
48- return url_safe_key [:request_id_length ]
49-
50-
5130class ApifyRequestQueueClient (RequestQueueClient ):
5231 """Base class for Apify platform implementations of the request queue client."""
5332
@@ -59,6 +38,7 @@ def __init__(
5938 * ,
6039 api_client : RequestQueueClientAsync ,
6140 metadata : RequestQueueMetadata ,
41+ access : Literal ['single' , 'shared' ] = 'single' ,
6242 ) -> None :
6343 """Initialize a new instance.
6444
@@ -67,8 +47,112 @@ def __init__(
6747 self ._api_client = api_client
6848 """The Apify request queue client for API operations."""
6949
70- self ._metadata = metadata
71- """Additional data related to the RequestQueue."""
50+ self ._implementation : _ApifyRequestQueueSingleClient | _ApifyRequestQueueSharedClient
51+ """Internal implementation used to communicate with the Apify platform based Request Queue."""
52+ if access == 'single' :
53+ self ._implementation = _ApifyRequestQueueSingleClient (
54+ api_client = self ._api_client , metadata = metadata , cache_size = self ._MAX_CACHED_REQUESTS
55+ )
56+ elif access == 'shared' :
57+ self ._implementation = _ApifyRequestQueueSharedClient (
58+ api_client = self ._api_client ,
59+ metadata = metadata ,
60+ cache_size = self ._MAX_CACHED_REQUESTS ,
61+ metadata_getter = self .get_metadata ,
62+ )
63+ else :
64+ raise RuntimeError (f"Unsupported access type: { access } . Allowed values are 'single' or 'shared'." )
65+
66+ @property
67+ def _metadata (self ) -> RequestQueueMetadata :
68+ return self ._implementation .metadata
69+
70+ @override
71+ async def add_batch_of_requests (
72+ self ,
73+ requests : Sequence [Request ],
74+ * ,
75+ forefront : bool = False ,
76+ ) -> AddRequestsResponse :
77+ """Add a batch of requests to the queue.
78+
79+ Args:
80+ requests: The requests to add.
81+ forefront: Whether to add the requests to the beginning of the queue.
82+
83+ Returns:
84+ Response containing information about the added requests.
85+ """
86+ return await self ._implementation .add_batch_of_requests (requests , forefront = forefront )
87+
88+ @override
89+ async def fetch_next_request (self ) -> Request | None :
90+ """Return the next request in the queue to be processed.
91+
92+ Once you successfully finish processing of the request, you need to call `mark_request_as_handled`
93+ to mark the request as handled in the queue. If there was some error in processing the request, call
94+ `reclaim_request` instead, so that the queue will give the request to some other consumer
95+ in another call to the `fetch_next_request` method.
96+
97+ Returns:
98+ The request or `None` if there are no more pending requests.
99+ """
100+ return await self ._implementation .fetch_next_request ()
101+
102+ @override
103+ async def mark_request_as_handled (self , request : Request ) -> ProcessedRequest | None :
104+ """Mark a request as handled after successful processing.
105+
106+ Handled requests will never again be returned by the `fetch_next_request` method.
107+
108+ Args:
109+ request: The request to mark as handled.
110+
111+ Returns:
112+ Information about the queue operation. `None` if the given request was not in progress.
113+ """
114+ return await self ._implementation .mark_request_as_handled (request )
115+
116+ @override
117+ async def get_request (self , unique_key : str ) -> Request | None :
118+ """Get a request by unique key.
119+
120+ Args:
121+ unique_key: Unique key of the request to get.
122+
123+ Returns:
124+ The request or None if not found.
125+ """
126+ return await self ._implementation .get_request (unique_key )
127+
128+ @override
129+ async def reclaim_request (
130+ self ,
131+ request : Request ,
132+ * ,
133+ forefront : bool = False ,
134+ ) -> ProcessedRequest | None :
135+ """Reclaim a failed request back to the queue.
136+
137+ The request will be returned for processing later again by another call to `fetch_next_request`.
138+
139+ Args:
140+ request: The request to return to the queue.
141+ forefront: Whether to add the request to the head or the end of the queue.
142+
143+ Returns:
144+ Information about the queue operation. `None` if the given request was not in progress.
145+ """
146+ return await self ._implementation .reclaim_request (request , forefront = forefront )
147+
148+ @override
149+ async def is_empty (self ) -> bool :
150+ """Check if the queue is empty.
151+
152+ Returns:
153+ True if the queue is empty, False otherwise.
154+ """
155+ return await self ._implementation .is_empty ()
72156
73157 @override
74158 async def get_metadata (self ) -> ApifyRequestQueueMetadata :
@@ -103,6 +187,7 @@ async def open(
103187 name : str | None ,
104188 alias : str | None ,
105189 configuration : Configuration ,
190+ access : Literal ['single' , 'shared' ] = 'single' ,
106191 ) -> ApifyRequestQueueClient :
107192 """Open an Apify request queue client.
108193
@@ -120,6 +205,18 @@ async def open(
120205 configuration: The configuration object containing API credentials and settings. Must include a valid
121206 `token` and `api_base_url`. May also contain a `default_request_queue_id` for fallback when neither
122207 `id`, `name`, nor `alias` is provided.
208+ access: Controls the implementation of the request queue client based on expected scenario:
209+ - 'single' is suitable for single consumer scenarios. It makes less API calls, is cheaper and faster.
210+ - 'shared' is suitable for multiple consumers scenarios at the cost of higher API usage.
211+
212+ Detailed constraints for the 'single' access type:
213+ - Only one client is consuming the request queue at the time.
214+ - Multiple producers can put requests to the queue, but their forefront requests are not guaranteed to
215+ be handled so quickly as this client does not aggressively fetch the forefront and relies on local
216+ head estimation.
217+ - Requests are only added to the queue, never deleted by other clients. (Marking as handled is ok.)
218+ - Other producers can add new requests, but not modify existing ones.
219+ (Modifications would not be included in local cache)
123220
124221 Returns:
125222 An instance for the opened or created storage client.
@@ -217,10 +314,7 @@ async def open(
217314
218315 metadata_model = RequestQueueMetadata .model_validate (metadata )
219316
220- return cls (
221- api_client = apify_rq_client ,
222- metadata = metadata_model ,
223- )
317+ return cls (api_client = apify_rq_client , metadata = metadata_model , access = access )
224318
225319 @override
226320 async def purge (self ) -> None :
0 commit comments