forked from apify/crawlee-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_request_queue.py
More file actions
1638 lines (1304 loc) · 53.7 KB
/
Copy pathtest_request_queue.py
File metadata and controls
1638 lines (1304 loc) · 53.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import asyncio
from datetime import timedelta
from typing import TYPE_CHECKING
import pytest
from crawlee import Request, service_locator
from crawlee.configuration import Configuration
from crawlee.storage_clients import MemoryStorageClient, StorageClient
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, UnprocessedRequest
from crawlee.storages import RequestQueue
from crawlee.storages._storage_instance_manager import StorageInstanceManager
from tests.unit.utils import poll_until_condition
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Sequence
from crawlee.storage_clients import StorageClient
@pytest.fixture
async def rq(
storage_client: StorageClient,
) -> AsyncGenerator[RequestQueue, None]:
"""Fixture that provides a request queue instance for each test."""
rq = await RequestQueue.open(
storage_client=storage_client,
)
yield rq
await rq.drop()
async def test_open_creates_new_rq(
storage_client: StorageClient,
) -> None:
"""Test that open() creates a new request queue with proper metadata."""
rq = await RequestQueue.open(
name='new-request-queue',
storage_client=storage_client,
)
# Verify request queue properties
assert rq.id is not None
assert rq.name == 'new-request-queue'
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 0
assert metadata.handled_request_count == 0
assert metadata.total_request_count == 0
await rq.drop()
async def test_open_existing_rq(
rq: RequestQueue,
storage_client: StorageClient,
) -> None:
"""Test that open() loads an existing request queue correctly."""
# Open the same request queue again
reopened_rq = await RequestQueue.open(
name=rq.name,
storage_client=storage_client,
)
# Verify request queue properties
assert rq.id == reopened_rq.id
assert rq.name == reopened_rq.name
# Verify they are the same object (from cache)
assert id(rq) == id(reopened_rq)
async def test_open_with_id_and_name(
storage_client: StorageClient,
) -> None:
"""Test that open() raises an error when both id and name are provided."""
with pytest.raises(
ValueError,
match=r'Only one of "id", "name", "alias" can be specified, but following arguments '
r'were specified: "id", "name".',
):
await RequestQueue.open(
id='some-id',
name='some-name',
storage_client=storage_client,
)
async def test_open_by_id(
storage_client: StorageClient,
) -> None:
"""Test opening a request queue by its ID."""
# First create a request queue by name
rq1 = await RequestQueue.open(
name='rq-by-id-test',
storage_client=storage_client,
)
# Add a request to identify it
await rq1.add_request('https://example.com/open-by-id-test')
# Open the request queue by ID
rq2 = await RequestQueue.open(
id=rq1.id,
storage_client=storage_client,
)
# Verify it's the same request queue
assert rq2.id == rq1.id
assert rq2.name == 'rq-by-id-test'
# Verify the request is still there
request = await rq2.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/open-by-id-test'
# Clean up
await rq2.drop()
async def test_add_request_string_url(rq: RequestQueue) -> None:
"""Test adding a request with a string URL."""
# Add a request with a string URL
url = 'https://example.com'
result = await rq.add_request(url)
# Verify request was added
assert result is not None
assert result.unique_key is not None
assert result.was_already_present is False
assert result.was_already_handled is False
# Verify the queue stats were updated
metadata = await rq.get_metadata()
assert metadata.total_request_count == 1
assert metadata.pending_request_count == 1
async def test_add_request_object(rq: RequestQueue) -> None:
"""Test adding a request object."""
# Create and add a request object
request = Request.from_url(url='https://example.com', user_data={'key': 'value'})
result = await rq.add_request(request)
# Verify request was added
assert result is not None
assert result.unique_key is not None
assert result.was_already_present is False
assert result.was_already_handled is False
# Verify the queue stats were updated
metadata = await rq.get_metadata()
assert metadata.total_request_count == 1
assert metadata.pending_request_count == 1
async def test_add_duplicate_request(rq: RequestQueue) -> None:
"""Test adding a duplicate request to the queue."""
# Add a request
url = 'https://example.com'
first_result = await rq.add_request(url)
assert first_result is not None
# Add the same request again
second_result = await rq.add_request(url)
# Verify the second request was detected as duplicate
assert second_result is not None
assert second_result.was_already_present is True
assert second_result.unique_key == first_result.unique_key
# Verify the queue stats weren't incremented twice
metadata = await rq.get_metadata()
assert metadata.total_request_count == 1
assert metadata.pending_request_count == 1
async def test_add_requests_batch(rq: RequestQueue) -> None:
"""Test adding multiple requests in a batch."""
# Create a batch of requests
urls = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
]
# Add the requests
await rq.add_requests(urls)
# Wait for all background tasks to complete
await asyncio.sleep(0.1)
# Verify the queue stats
metadata = await rq.get_metadata()
assert metadata.total_request_count == 3
assert metadata.pending_request_count == 3
async def test_add_requests_batch_with_forefront(rq: RequestQueue) -> None:
"""Test adding multiple requests in a batch with forefront option."""
# Add some initial requests
await rq.add_request('https://example.com/page1')
await rq.add_request('https://example.com/page2')
# Add a batch of priority requests at the forefront
await rq.add_requests(
[
'https://example.com/priority1',
'https://example.com/priority2',
'https://example.com/priority3',
],
forefront=True,
)
# Wait for all background tasks to complete
await asyncio.sleep(0.1)
# Fetch requests - they should come out in priority order first
next_request1 = await rq.fetch_next_request()
assert next_request1 is not None
assert next_request1.url.startswith('https://example.com/priority')
next_request2 = await rq.fetch_next_request()
assert next_request2 is not None
assert next_request2.url.startswith('https://example.com/priority')
next_request3 = await rq.fetch_next_request()
assert next_request3 is not None
assert next_request3.url.startswith('https://example.com/priority')
# Now we should get the original requests
next_request4 = await rq.fetch_next_request()
assert next_request4 is not None
assert next_request4.url == 'https://example.com/page1'
next_request5 = await rq.fetch_next_request()
assert next_request5 is not None
assert next_request5.url == 'https://example.com/page2'
# Queue should be empty now
next_request6 = await rq.fetch_next_request()
assert next_request6 is None
async def test_add_requests_with_forefront(rq: RequestQueue) -> None:
"""Test adding requests to the front of the queue."""
# Add some initial requests
await rq.add_request('https://example.com/page1')
await rq.add_request('https://example.com/page2')
# Add a priority request at the forefront
await rq.add_request('https://example.com/priority', forefront=True)
# Fetch the next request - should be the priority one
next_request = await rq.fetch_next_request()
assert next_request is not None
assert next_request.url == 'https://example.com/priority'
@pytest.mark.parametrize('forefront', [True, False])
async def test_add_requests_retry_preserves_forefront(
monkeypatch: pytest.MonkeyPatch,
*,
forefront: bool,
) -> None:
"""Regression test: when ``add_batch_of_requests`` returns unprocessed requests, the retry must preserve the
original `forefront` value rather than silently falling back to the parameter default."""
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
forefront_calls: list[bool] = []
async def patched_add_batch(
requests: Sequence[Request],
*,
forefront: bool = False,
) -> AddRequestsResponse:
forefront_calls.append(forefront)
if len(forefront_calls) == 1:
return AddRequestsResponse(
processed_requests=[],
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
)
return AddRequestsResponse(
processed_requests=[
ProcessedRequest(
unique_key=r.unique_key,
was_already_present=False,
was_already_handled=False,
)
for r in requests
],
unprocessed_requests=[],
)
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
try:
await rq.add_requests(
['https://example.com/a', 'https://example.com/b'],
forefront=forefront,
wait_time_between_batches=timedelta(seconds=0),
)
finally:
await rq.drop()
assert forefront_calls == [forefront, forefront], (
f'retry must propagate the original forefront={forefront} flag, got: {forefront_calls}'
)
async def _no_sleep(_seconds: float) -> None:
"""Drop-in replacement for `asyncio.sleep` that returns immediately, to keep retry tests fast."""
async def test_add_request_retries_unprocessed(monkeypatch: pytest.MonkeyPatch) -> None:
"""`add_request` must retry an unprocessed request (like `add_requests`) instead of silently dropping it."""
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
calls = 0
async def patched_add_batch(
requests: Sequence[Request],
*,
forefront: bool = False, # noqa: ARG001
) -> AddRequestsResponse:
nonlocal calls
calls += 1
# First attempt reports the request as unprocessed; the retry succeeds.
if calls == 1:
return AddRequestsResponse(
processed_requests=[],
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
)
return AddRequestsResponse(
processed_requests=[
ProcessedRequest(unique_key=r.unique_key, was_already_present=False, was_already_handled=False)
for r in requests
],
unprocessed_requests=[],
)
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
try:
result = await rq.add_request('https://example.com/retry')
finally:
await rq.drop()
assert calls == 2, f'expected one retry after the unprocessed response, got {calls} calls'
assert result is not None
assert result.was_already_present is False
async def test_add_request_returns_none_after_exhausting_retries(monkeypatch: pytest.MonkeyPatch) -> None:
"""When a request stays unprocessed across all retries, `add_request` returns `None` rather than raising."""
rq = await RequestQueue.open(storage_client=MemoryStorageClient())
monkeypatch.setattr('crawlee.storages._request_queue.asyncio.sleep', _no_sleep)
calls = 0
async def patched_add_batch(
requests: Sequence[Request],
*,
forefront: bool = False, # noqa: ARG001
) -> AddRequestsResponse:
nonlocal calls
calls += 1
return AddRequestsResponse(
processed_requests=[],
unprocessed_requests=[UnprocessedRequest(unique_key=r.unique_key, url=r.url) for r in requests],
)
monkeypatch.setattr(rq._client, 'add_batch_of_requests', patched_add_batch)
try:
result = await rq.add_request('https://example.com/doomed')
finally:
await rq.drop()
assert result is None
# One initial attempt plus five retries; the mechanism stops once `attempt` exceeds `max_attempts`.
assert calls == 6, f'expected 6 attempts (1 initial + 5 retries), got {calls}'
async def test_add_requests_mixed_forefront(rq: RequestQueue) -> None:
"""Test the ordering when adding requests with mixed forefront values."""
# Add normal requests
await rq.add_request('https://example.com/normal1')
await rq.add_request('https://example.com/normal2')
# Add a batch with forefront=True
await rq.add_requests(
['https://example.com/priority1', 'https://example.com/priority2'],
forefront=True,
)
# Add another normal request
await rq.add_request('https://example.com/normal3')
# Add another priority request
await rq.add_request('https://example.com/priority3', forefront=True)
# Wait for background tasks
await asyncio.sleep(0.1)
# The expected order should be:
# 1. priority3 (most recent forefront)
# 2. priority1 (from batch, forefront)
# 3. priority2 (from batch, forefront)
# 4. normal1 (oldest normal)
# 5. normal2
# 6. normal3 (newest normal)
requests = []
while True:
req = await rq.fetch_next_request()
if req is None:
break
requests.append(req)
await rq.mark_request_as_handled(req)
assert len(requests) == 6
assert requests[0].url == 'https://example.com/priority3'
# The next two should be from the forefront batch (exact order within batch may vary)
batch_urls = {requests[1].url, requests[2].url}
assert 'https://example.com/priority1' in batch_urls
assert 'https://example.com/priority2' in batch_urls
# Then the normal requests in order
assert requests[3].url == 'https://example.com/normal1'
assert requests[4].url == 'https://example.com/normal2'
assert requests[5].url == 'https://example.com/normal3'
async def test_fetch_next_request_and_mark_handled(rq: RequestQueue) -> None:
"""Test fetching and marking requests as handled."""
# Add some requests
await rq.add_request('https://example.com/page1')
await rq.add_request('https://example.com/page2')
# Fetch first request
request1 = await rq.fetch_next_request()
assert request1 is not None
assert request1.url == 'https://example.com/page1'
# Mark the request as handled
result = await rq.mark_request_as_handled(request1)
assert result is not None
assert result.was_already_handled is True
# Fetch next request
request2 = await rq.fetch_next_request()
assert request2 is not None
assert request2.url == 'https://example.com/page2'
# Mark the second request as handled
await rq.mark_request_as_handled(request2)
# Verify counts
metadata = await rq.get_metadata()
assert metadata.total_request_count == 2
assert metadata.handled_request_count == 2
assert metadata.pending_request_count == 0
# Verify queue is empty
empty_request: Request | None = await rq.fetch_next_request()
assert empty_request is None
async def test_get_request_by_id(rq: RequestQueue) -> None:
"""Test retrieving a request by its ID."""
# Add a request
added_result = await rq.add_request('https://example.com')
assert added_result is not None
unique_key = added_result.unique_key
# Retrieve the request by ID
retrieved_request = await rq.get_request(unique_key)
assert retrieved_request is not None
assert retrieved_request.unique_key == unique_key
assert retrieved_request.url == 'https://example.com'
async def test_handled_request_records_persistence(rq: RequestQueue) -> None:
request = Request.from_url('https://example.com/1')
await rq.add_request(request)
fetched_request = await rq.fetch_next_request()
assert isinstance(fetched_request, Request)
await rq.mark_request_as_handled(fetched_request)
fetched_request = await rq.get_request(request.unique_key)
assert isinstance(fetched_request, Request)
assert fetched_request.unique_key == request.unique_key
async def test_get_non_existent_request(rq: RequestQueue) -> None:
"""Test retrieving a request that doesn't exist."""
non_existent_request = await rq.get_request('non-existent-id')
assert non_existent_request is None
async def test_reclaim_request(rq: RequestQueue) -> None:
"""Test reclaiming a request that failed processing."""
# Add a request
await rq.add_request('https://example.com')
# Fetch the request
request = await rq.fetch_next_request()
assert request is not None
# Reclaim the request
result = await rq.reclaim_request(request)
assert result is not None
assert result.was_already_handled is False
# Verify we can fetch it again
reclaimed_request = await rq.fetch_next_request()
assert reclaimed_request is not None
assert reclaimed_request.unique_key == request.unique_key
assert reclaimed_request.url == 'https://example.com'
async def test_reclaim_request_with_forefront(rq: RequestQueue) -> None:
"""Test reclaiming a request to the front of the queue."""
# Add requests
await rq.add_request('https://example.com/first')
await rq.add_request('https://example.com/second')
# Fetch the first request
first_request = await rq.fetch_next_request()
assert first_request is not None
assert first_request.url == 'https://example.com/first'
# Reclaim it to the forefront
await rq.reclaim_request(first_request, forefront=True)
# The reclaimed request should be returned first (before the second request)
next_request = await rq.fetch_next_request()
assert next_request is not None
assert next_request.url == 'https://example.com/first'
async def test_is_empty_and_is_finished(rq: RequestQueue) -> None:
"""Test checking if a request queue is empty and finished."""
# Initially the queue should be empty and finished
assert await rq.is_empty() is True
assert await rq.is_finished() is True
# Add a request
await rq.add_request('https://example.com')
assert await rq.is_empty() is False
assert await rq.is_finished() is False
# Fetch the request
request = await rq.fetch_next_request()
assert request is not None
# Queue is empty, because there is no request for fetching
assert await rq.is_empty() is True
# Queue is not finished, because there is a request being processed
assert await rq.is_finished() is False
# Mark the request as handled
await rq.mark_request_as_handled(request)
# Queue should be empty and finished again
assert await rq.is_empty() is True
assert await rq.is_finished() is True
@pytest.mark.parametrize(
('wait_for_all'),
[
pytest.param(True, id='wait for all'),
pytest.param(False, id='do not wait for all'),
],
)
async def test_add_requests_wait_for_all(
rq: RequestQueue,
*,
wait_for_all: bool,
) -> None:
"""Test adding requests with wait_for_all_requests_to_be_added option."""
urls = [f'https://example.com/{i}' for i in range(15)]
# Add requests without waiting
await rq.add_requests(
urls,
batch_size=5,
wait_for_all_requests_to_be_added=wait_for_all,
wait_time_between_batches=timedelta(milliseconds=50),
)
if not wait_for_all:
# Immediately after adding, the total count may be less than 15 due to background processing
assert await rq.get_total_count() <= 15
# Wait for background tasks to complete.
await poll_until_condition(rq.get_total_count, lambda count: count >= 15)
# Verify all requests were added
assert await rq.get_total_count() == 15
async def test_is_finished(rq: RequestQueue) -> None:
"""Test checking if a request queue is finished."""
# Initially the queue should be finished (empty and no background tasks)
assert await rq.is_finished() is True
# Add a request
await rq.add_request('https://example.com')
assert await rq.is_finished() is False
# Add requests in the background
await rq.add_requests(
['https://example.com/1', 'https://example.com/2'],
wait_for_all_requests_to_be_added=False,
)
# Queue shouldn't be finished while background tasks are running
assert await rq.is_finished() is False
# Wait for the background add task to finish.
await poll_until_condition(lambda: not rq._add_requests_tasks)
# Process all requests
while True:
request = await rq.fetch_next_request()
if request is None:
break
await rq.mark_request_as_handled(request)
# Now queue should be finished
assert await rq.is_finished() is True
async def test_mark_non_existent_request_as_handled(rq: RequestQueue) -> None:
"""Test marking a non-existent request as handled."""
# Create a request that hasn't been added to the queue
request = Request.from_url(url='https://example.com', id='non-existent-id')
# Attempt to mark it as handled
result = await rq.mark_request_as_handled(request)
assert result is None
async def test_reclaim_non_existent_request(rq: RequestQueue) -> None:
"""Test reclaiming a non-existent request."""
# Create a request that hasn't been added to the queue
request = Request.from_url(url='https://example.com', id='non-existent-id')
# Attempt to reclaim it
result = await rq.reclaim_request(request)
assert result is None
async def test_drop(
storage_client: StorageClient,
) -> None:
"""Test dropping a request queue removes it from cache and clears its data."""
rq = await RequestQueue.open(
name='drop-test',
storage_client=storage_client,
)
# Add a request
await rq.add_request('https://example.com')
# Drop the request queue
await rq.drop()
# Verify request queue is empty (by creating a new one with the same name)
new_rq = await RequestQueue.open(
name='drop-test',
storage_client=storage_client,
)
# Verify the queue is empty
assert await new_rq.is_empty() is True
metadata = await new_rq.get_metadata()
assert metadata.total_request_count == 0
assert metadata.pending_request_count == 0
await new_rq.drop()
async def test_reopen_default(
storage_client: StorageClient,
) -> None:
"""Test reopening the default request queue."""
# First clean up any storage instance caches
storage_instance_manager = service_locator.storage_instance_manager
storage_instance_manager.clear_cache()
# Open the default request queue
rq1 = await RequestQueue.open(
storage_client=storage_client,
)
# If a request queue already exists (due to previous test run), purge it to start fresh
try:
await rq1.purge()
except Exception:
# If purge fails, try dropping and recreating
await rq1.drop()
rq1 = await RequestQueue.open(
storage_client=storage_client,
)
# Verify we're starting fresh
metadata1 = await rq1.get_metadata()
assert metadata1.pending_request_count == 0
# Add a request
await rq1.add_request('https://example.com/')
# Verify the request was added
metadata1 = await rq1.get_metadata()
assert metadata1.pending_request_count == 1
# Open the default request queue again
rq2 = await RequestQueue.open(
storage_client=storage_client,
)
# Verify they are the same queue
assert rq1.id == rq2.id
assert rq1.name == rq2.name
metadata1 = await rq1.get_metadata()
metadata2 = await rq2.get_metadata()
assert metadata1.total_request_count == metadata2.total_request_count
assert metadata1.pending_request_count == metadata2.pending_request_count
assert metadata1.handled_request_count == metadata2.handled_request_count
# Verify the request is accessible
request = await rq2.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/'
# Clean up after the test
await rq1.drop()
async def test_purge(
storage_client: StorageClient,
) -> None:
"""Test purging a request queue removes all requests but keeps the queue itself."""
# First create a request queue
rq = await RequestQueue.open(
name='purge-test-queue',
storage_client=storage_client,
)
# Add some requests
await rq.add_requests(
[
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
]
)
# Verify requests were added
metadata = await rq.get_metadata()
assert metadata.total_request_count == 3
assert metadata.pending_request_count == 3
assert metadata.handled_request_count == 0
# Record the queue ID
queue_id = rq.id
# Purge the queue
await rq.purge()
# Verify the queue still exists but is empty
assert rq.id == queue_id # Same ID preserved
assert rq.name == 'purge-test-queue' # Same name preserved
# Queue should be empty now
metadata = await rq.get_metadata()
assert metadata.total_request_count == 0
assert metadata.pending_request_count == 0
assert metadata.handled_request_count == 0
assert await rq.is_empty() is True
# Verify we can add new requests after purging
await rq.add_request('https://example.com/new-after-purge')
request = await rq.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/new-after-purge'
# Clean up
await rq.drop()
async def test_purge_falls_back_to_drop_for_unnamed_queue_when_not_implemented(
storage_client: StorageClient,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that `purge` falls back to drop+recreate for unnamed queues when the client raises `NotImplementedError`.
Some storage clients (e.g. the Apify platform client) do not support purging. For the default unnamed queue
used by `BasicCrawler`, `purge` should drop and recreate the queue so that callers keep working on repeated
runs. Named queues are handled separately to avoid destroying persistent data.
"""
rq = await RequestQueue.open(storage_client=storage_client)
assert rq.name is None
await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2
async def _raise_not_implemented(self: object) -> None:
raise NotImplementedError('Purge is not supported.')
monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)
with caplog.at_level('WARNING'):
await rq.purge()
assert any(
'does not support purging' in rec.message and 'dropping and recreating' in rec.message for rec in caplog.records
)
# The queue should be empty, usable, and backed by a fresh client (id may differ for backends that mint new ids).
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 0
assert metadata.total_request_count == 0
assert metadata.handled_request_count == 0
assert rq.id is not None
await rq.add_request('https://example.com/after-purge')
request = await rq.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/after-purge'
await rq.drop()
async def test_purge_skips_named_queue_when_not_implemented(
storage_client: StorageClient,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that `purge` is a logged no-op for named queues when the client raises `NotImplementedError`.
Named queues are considered persistent (e.g. shared across runs on the Apify platform), so falling back
to drop+recreate would silently destroy user data. Instead `purge` logs a warning and leaves the queue
intact.
"""
rq = await RequestQueue.open(
name='purge-fallback-named-test',
storage_client=storage_client,
)
original_id = rq.id
await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2
async def _raise_not_implemented(self: object) -> None:
raise NotImplementedError('Purge is not supported.')
monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)
with caplog.at_level('WARNING'):
await rq.purge()
assert any(
'does not support purging' in rec.message and 'Skipping purge for named queue' in rec.message
for rec in caplog.records
)
# Queue identity and contents must be preserved.
assert rq.id == original_id
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2
assert metadata.total_request_count == 2
await rq.drop()
async def test_open_with_alias(
storage_client: StorageClient,
) -> None:
"""Test opening request queues with alias parameter for NDU functionality."""
# Create request queues with different aliases
rq_1 = await RequestQueue.open(
alias='test_alias_1',
storage_client=storage_client,
)
rq_2 = await RequestQueue.open(
alias='test_alias_2',
storage_client=storage_client,
)
# Verify they have different IDs but no names (unnamed)
assert rq_1.id != rq_2.id
assert rq_1.name is None
assert rq_2.name is None
# Add different requests to each
await rq_1.add_request('https://example.com/1')
await rq_1.add_request('https://example.com/2')
await rq_2.add_request('https://example.com/3')
# Verify data isolation
request_1 = await rq_1.fetch_next_request()
request_2 = await rq_2.fetch_next_request()
assert request_1 is not None
assert request_2 is not None
assert request_1.url == 'https://example.com/1'
assert request_2.url == 'https://example.com/3'
# Clean up
await rq_1.drop()
await rq_2.drop()
async def test_alias_caching(
storage_client: StorageClient,
) -> None:
"""Test that request queues with same alias return same instance (cached)."""
# Open rq with alias
rq_1 = await RequestQueue.open(
alias='cache_test',
storage_client=storage_client,
)
# Open again with same alias
rq_2 = await RequestQueue.open(
alias='cache_test',
storage_client=storage_client,
)
# Should be same instance
assert rq_1 is rq_2
assert rq_1.id == rq_2.id
# Clean up
await rq_1.drop()
async def test_alias_with_id_error(
storage_client: StorageClient,
) -> None:
"""Test that providing both alias and id raises error."""
with pytest.raises(
ValueError,
match=r'Only one of "id", "name", "alias" can be specified, but following arguments '
r'were specified: "id", "alias".',
):
await RequestQueue.open(
id='some-id',
alias='some-alias',
storage_client=storage_client,
)
async def test_alias_with_name_error(
storage_client: StorageClient,
) -> None:
"""Test that providing both alias and name raises error."""
with pytest.raises(
ValueError,
match=r'Only one of "id", "name", "alias" can be specified, but following arguments '
r'were specified: "name", "alias".',
):
await RequestQueue.open(
name='some-name',
alias='some-alias',
storage_client=storage_client,
)
async def test_alias_with_special_characters(
storage_client: StorageClient,
) -> None:
"""Test alias functionality with special characters."""
special_aliases = [
'alias-with-dashes',
'alias_with_underscores',
'alias.with.dots',
'alias123with456numbers',
'CamelCaseAlias',
]
queues = []
for alias in special_aliases:
rq = await RequestQueue.open(
alias=alias,
storage_client=storage_client,
)