-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_actor_request_queue.py
More file actions
400 lines (298 loc) · 15.5 KB
/
test_actor_request_queue.py
File metadata and controls
400 lines (298 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING, Any
from unittest import mock
import pytest
from apify_shared.consts import ApifyEnvVars
from ._utils import generate_unique_resource_name
from apify import Actor, Request
from apify._models import ActorRun
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from apify_client import ApifyClientAsync
from crawlee.storages import RequestQueue
from .conftest import MakeActorFunction, RunActorFunction
@pytest.fixture
async def apify_named_rq(
apify_client_async: ApifyClientAsync, monkeypatch: pytest.MonkeyPatch
) -> AsyncGenerator[RequestQueue]:
assert apify_client_async.token
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
request_queue_name = generate_unique_resource_name('request_queue')
async with Actor:
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
yield request_queue
await request_queue.drop()
async def test_same_references_in_default_rq(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
async with Actor:
rq1 = await Actor.open_request_queue()
rq2 = await Actor.open_request_queue()
assert rq1 is rq2
actor = await make_actor(label='rq-same-ref-default', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_same_references_in_named_rq(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
rq_name = generate_unique_resource_name('request-queue')
async def main() -> None:
async with Actor:
input_object = await Actor.get_input()
rq_name = input_object['rqName']
rq_by_name_1 = await Actor.open_request_queue(name=rq_name)
rq_by_name_2 = await Actor.open_request_queue(name=rq_name)
assert rq_by_name_1 is rq_by_name_2
rq_1_metadata = await rq_by_name_1.get_metadata()
rq_by_id_1 = await Actor.open_request_queue(id=rq_1_metadata.id)
rq_by_id_2 = await Actor.open_request_queue(id=rq_1_metadata.id)
assert rq_by_id_1 is rq_by_name_1
assert rq_by_id_2 is rq_by_id_1
await rq_by_name_1.drop()
actor = await make_actor(label='rq-same-ref-named', main_func=main)
run_result = await run_actor(actor, run_input={'rqName': rq_name})
assert run_result.status == 'SUCCEEDED'
async def test_force_cloud(
apify_client_async: ApifyClientAsync,
apify_named_rq: RequestQueue,
) -> None:
request_queue_id = (await apify_named_rq.get_metadata()).id
request_info = await apify_named_rq.add_request(Request.from_url('http://example.com'))
assert request_info.id is not None
request_queue_client = apify_client_async.request_queue(request_queue_id)
request_queue_details = await request_queue_client.get()
assert request_queue_details is not None
assert request_queue_details.get('name') == apify_named_rq.name
request_queue_request = await request_queue_client.get_request(request_info.id)
assert request_queue_request is not None
assert request_queue_request['url'] == 'http://example.com'
async def test_request_queue_is_finished(
apify_named_rq: RequestQueue,
) -> None:
await apify_named_rq.add_request(Request.from_url('http://example.com'))
assert not await apify_named_rq.is_finished()
request = await apify_named_rq.fetch_next_request()
assert request is not None
assert not await apify_named_rq.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)
await apify_named_rq.mark_request_as_handled(request)
assert await apify_named_rq.is_finished()
async def test_request_queue_deduplication(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that the deduplication works correctly. Try to add 2 similar requests, but it should call API just once.
Deduplication works based on the request's `unique_key` only. To include more attributes in the unique key the
`use_extended_unique_key=True` argument of `Request.from_url` method can be used.
This tests internal optimization that changes no behavior for the user.
The functions input/output behave the same way,it only uses less amount of API calls.
"""
async def main() -> None:
import asyncio
from apify import Actor, Request
async with Actor:
request1 = Request.from_url('http://example.com', method='POST')
request2 = Request.from_url('http://example.com', method='GET')
rq = await Actor.open_request_queue()
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
# Add same request twice
await rq.add_request(request1)
await rq.add_request(request2)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
actor = await make_actor(label='rq-deduplication', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_deduplication_use_extended_unique_key(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that the deduplication works correctly. Try to add 2 similar requests and it should call API just twice.
Deduplication works based on the request's `unique_key` only. To include more attributes in the unique key the
`use_extended_unique_key=True` argument of `Request.from_url` method can be used.
This tests internal optimization that changes no behavior for the user.
The functions input/output behave the same way,it only uses less amount of API calls.
"""
async def main() -> None:
import asyncio
from apify import Actor, Request
async with Actor:
request1 = Request.from_url('http://example.com', method='POST', use_extended_unique_key=True)
request2 = Request.from_url('http://example.com', method='GET', use_extended_unique_key=True)
rq = await Actor.open_request_queue()
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
# Add same request twice
await rq.add_request(request1)
await rq.add_request(request2)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == 2
actor = await make_actor(label='rq-deduplication', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_parallel_deduplication(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that the deduplication works correctly even with parallel attempts to add same links.
The test is set up in a way for workers to have some requests that were already added to the queue and some new
requests. The function must correctly deduplicate the requests and add only new requests. For example:
First worker adding 10 new requests,
second worker adding 10 new requests and 10 known requests,
third worker adding 10 new requests and 20 known requests and so on"""
async def main() -> None:
import asyncio
import logging
from apify import Actor, Request
worker_count = 10
max_requests = 100
batch_size = iter(range(10, max_requests + 1, int(max_requests / worker_count)))
async with Actor:
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
requests = [Request.from_url(f'http://example.com/{i}') for i in range(max_requests)]
rq = await Actor.open_request_queue()
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
# Add batches of some new and some already present requests in workers
async def add_requests_worker() -> None:
await rq.add_requests(requests[: next(batch_size)])
# Start all workers
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(worker_count)]
await asyncio.gather(*add_requests_workers)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests)
actor = await make_actor(label='rq-parallel-deduplication', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_deduplication_unprocessed_requests(
apify_named_rq: RequestQueue,
) -> None:
"""Test that the deduplication does not add unprocessed requests to the cache.
In this test the first call is "hardcoded" to fail, even on all retries, so it never even sends the API request and
thus has no chance of increasing the `writeCount`. The second call can increase the `writeCount` only if it is not
cached, as cached requests do not make the call (tested in other tests). So this means the `unprocessedRequests`
request was intentionally not cached."""
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=apify_named_rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dict[str, list[dict]]:
"""Simulate API returning unprocessed requests."""
return {
'processedRequests': [],
'unprocessedRequests': [
{'url': request['url'], 'uniqueKey': request['uniqueKey'], 'method': request['method']}
for request in requests
],
}
with mock.patch(
'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests',
side_effect=return_unprocessed_requests,
):
# Simulate failed API call for adding requests. Request was not processed and should not be cached.
await apify_named_rq.add_requests(['http://example.com/1'])
# This will succeed.
await apify_named_rq.add_requests(['http://example.com/1'])
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
async def test_request_queue_had_multiple_clients_platform(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that `RequestQueue` clients created with different `client_key` appear as distinct clients."""
async def main() -> None:
from apify_client import ApifyClientAsync
async with Actor:
rq_1 = await Actor.open_request_queue()
await rq_1.fetch_next_request()
# Accessed with client created explicitly with `client_key=None` should appear as distinct client
api_client = ApifyClientAsync(token=Actor.configuration.token).request_queue(
request_queue_id=rq_1.id, client_key=None
)
await api_client.list_head()
assert (await rq_1.get_metadata()).had_multiple_clients is True
actor = await make_actor(label='rq-had-multiple-clients', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_not_had_multiple_clients_platform(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that same `RequestQueue` created from Actor does not act as multiple clients."""
async def main() -> None:
async with Actor:
rq_1 = await Actor.open_request_queue()
# Two calls to API to create situation where unset `client_key` can cause `had_multiple_clients` to True
await rq_1.fetch_next_request()
await rq_1.fetch_next_request()
assert (await rq_1.get_metadata()).had_multiple_clients is False
actor = await make_actor(label='rq-not-had-multiple-clients', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_not_had_multiple_clients_platform_resurrection(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
apify_client_async: ApifyClientAsync,
) -> None:
"""Test `RequestQueue` created from Actor does not act as multiple clients even after resurrection."""
async def main() -> None:
async with Actor:
rq_1 = await Actor.open_request_queue()
assert (await rq_1.get_metadata()).had_multiple_clients is False, 'Not accessed yet, should be False'
await rq_1.fetch_next_request()
assert (await rq_1.get_metadata()).had_multiple_clients is False, (
'Accessed with the same client, should be False'
)
actor = await make_actor(label='rq-clients-resurrection', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
# Resurrect the run, the RequestQueue should still use same client key and thus not have multiple clients.
run_client = apify_client_async.run(run_id=run_result.id)
# Redirect logs even from the resurrected run
streamed_log = await run_client.get_streamed_log(from_start=False)
await run_client.resurrect()
async with streamed_log:
run_result = ActorRun.model_validate(await run_client.wait_for_finish(wait_secs=600))
assert run_result.status == 'SUCCEEDED'