-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_actor_request_queue.py
More file actions
251 lines (186 loc) · 9.07 KB
/
test_actor_request_queue.py
File metadata and controls
251 lines (186 loc) · 9.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING, Any
from unittest import mock
import pytest
from apify_shared.consts import ApifyEnvVars
from ._utils import generate_unique_resource_name
from apify import Actor, Request
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
from apify_client import ApifyClientAsync
from crawlee.storages import RequestQueue
from .conftest import MakeActorFunction, RunActorFunction
@pytest.fixture
async def apify_named_rq(
apify_client_async: ApifyClientAsync, monkeypatch: pytest.MonkeyPatch
) -> AsyncGenerator[RequestQueue]:
assert apify_client_async.token
monkeypatch.setenv(ApifyEnvVars.TOKEN, apify_client_async.token)
request_queue_name = generate_unique_resource_name('request_queue')
async with Actor:
request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True)
yield request_queue
await request_queue.drop()
async def test_same_references_in_default_rq(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
async with Actor:
rq1 = await Actor.open_request_queue()
rq2 = await Actor.open_request_queue()
assert rq1 is rq2
actor = await make_actor(label='rq-same-ref-default', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_same_references_in_named_rq(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
rq_name = generate_unique_resource_name('request-queue')
async def main() -> None:
async with Actor:
input_object = await Actor.get_input()
rq_name = input_object['rqName']
rq_by_name_1 = await Actor.open_request_queue(name=rq_name)
rq_by_name_2 = await Actor.open_request_queue(name=rq_name)
assert rq_by_name_1 is rq_by_name_2
rq_1_metadata = await rq_by_name_1.get_metadata()
rq_by_id_1 = await Actor.open_request_queue(id=rq_1_metadata.id)
rq_by_id_2 = await Actor.open_request_queue(id=rq_1_metadata.id)
assert rq_by_id_1 is rq_by_name_1
assert rq_by_id_2 is rq_by_id_1
await rq_by_name_1.drop()
actor = await make_actor(label='rq-same-ref-named', main_func=main)
run_result = await run_actor(actor, run_input={'rqName': rq_name})
assert run_result.status == 'SUCCEEDED'
async def test_force_cloud(
apify_client_async: ApifyClientAsync,
apify_named_rq: RequestQueue,
) -> None:
request_queue_id = (await apify_named_rq.get_metadata()).id
request_info = await apify_named_rq.add_request(Request.from_url('http://example.com'))
request_queue_client = apify_client_async.request_queue(request_queue_id)
request_queue_details = await request_queue_client.get()
assert request_queue_details is not None
assert request_queue_details.get('name') == apify_named_rq.name
request_queue_request = await request_queue_client.get_request(request_info.id)
assert request_queue_request is not None
assert request_queue_request['url'] == 'http://example.com'
async def test_request_queue_is_finished(
apify_named_rq: RequestQueue,
) -> None:
request_queue = await Actor.open_request_queue(name=apify_named_rq.name, force_cloud=True)
await request_queue.add_request(Request.from_url('http://example.com'))
assert not await request_queue.is_finished()
request = await request_queue.fetch_next_request()
assert request is not None
assert not await request_queue.is_finished(), (
'RequestQueue should not be finished unless the request is marked as handled.'
)
await request_queue.mark_request_as_handled(request)
assert await request_queue.is_finished()
async def test_request_queue_deduplication(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that the deduplication works correctly. Try to add 2 same requests, but it should call API just once.
This tests internal optimization that changes no behavior for the user.
The functions input/output behave the same way,it only uses less amount of API calls.
"""
async def main() -> None:
import asyncio
from apify import Actor, Request
async with Actor:
request = Request.from_url('http://example.com')
rq = await Actor.open_request_queue()
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
# Add same request twice
await rq.add_request(request)
await rq.add_request(request)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1
actor = await make_actor(label='rq-deduplication', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_parallel_deduplication(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
"""Test that the deduplication works correctly even with parallel attempts to add same links."""
async def main() -> None:
import asyncio
import logging
from apify import Actor, Request
async with Actor:
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
requests = [Request.from_url(f'http://example.com/{i}') for i in range(100)]
rq = await Actor.open_request_queue()
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
# Add same requests in 10 parallel workers
async def add_requests_worker() -> None:
await rq.add_requests(requests)
add_requests_workers = [asyncio.create_task(add_requests_worker()) for _ in range(10)]
await asyncio.gather(*add_requests_workers)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests)
actor = await make_actor(label='rq-parallel-deduplication', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
async def test_request_queue_deduplication_unprocessed_requests(
apify_named_rq: RequestQueue,
) -> None:
"""Test that the deduplication does not add unprocessed requests to the cache."""
logging.getLogger('apify.storage_clients._apify._request_queue_client').setLevel(logging.DEBUG)
await asyncio.sleep(10) # Wait to be sure that metadata are updated
# Get raw client, because stats are not exposed in `RequestQueue` class, but are available in raw client
rq_client = Actor.apify_client.request_queue(request_queue_id=apify_named_rq.id)
_rq = await rq_client.get()
assert _rq
stats_before = _rq.get('stats', {})
Actor.log.info(stats_before)
def return_unprocessed_requests(requests: list[dict], *_: Any, **__: Any) -> dict[str, list[dict]]:
"""Simulate API returning unprocessed requests."""
return {
'processedRequests': [],
'unprocessedRequests': [
{'url': request['url'], 'uniqueKey': request['uniqueKey'], 'method': request['method']}
for request in requests
],
}
with mock.patch(
'apify_client.clients.resource_clients.request_queue.RequestQueueClientAsync.batch_add_requests',
side_effect=return_unprocessed_requests,
):
# Simulate failed API call for adding requests. Request was not processed and should not be cached.
await apify_named_rq.add_requests(['http://example.com/1'])
# This will succeed.
await apify_named_rq.add_requests(['http://example.com/1'])
await asyncio.sleep(10) # Wait to be sure that metadata are updated
_rq = await rq_client.get()
assert _rq
stats_after = _rq.get('stats', {})
Actor.log.info(stats_after)
assert (stats_after['writeCount'] - stats_before['writeCount']) == 1