-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathtest_actor_migration.py
More file actions
48 lines (33 loc) · 1.85 KB
/
Copy pathtest_actor_migration.py
File metadata and controls
48 lines (33 loc) · 1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .conftest import MakeActorFunction, RunActorFunction
async def test_migration_through_reboot(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
"""Test that actor works as expected after migration through testing behavior after reboot.
Handle two requests. Migrate in between the two requests."""
async def main() -> None:
from crawlee._types import BasicCrawlingContext, ConcurrencySettings
from crawlee.crawlers import BasicCrawler
from apify import Actor
async with Actor:
crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1))
requests = ['https://example.com/1', 'https://example.com/2']
run = await Actor.apify_client.run(Actor.config.actor_run_id or '').get()
assert run
first_run = run.get('stats', {}).get('rebootCount', 0) == 0
Actor.log.warning(run)
@crawler.router.default_handler
async def default_handler(context: BasicCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Simulate migration through reboot
if context.request.url == requests[1] and first_run:
context.log.info(f'Reclaiming {context.request.url} ...')
rq = await crawler.get_request_manager()
await rq.reclaim_request(context.request)
await Actor.reboot()
await crawler.run(requests)
# Each time one request is finished.
assert crawler.statistics.state.requests_finished == 1
actor = await make_actor(label='migration', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'