-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_actor_events.py
More file actions
73 lines (56 loc) · 2.7 KB
/
test_actor_events.py
File metadata and controls
73 lines (56 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from apify_shared.consts import ActorEventTypes
from apify import Actor
if TYPE_CHECKING:
from .conftest import MakeActorFunction, RunActorFunction
async def test_emit_and_capture_interval_events(
make_actor: MakeActorFunction,
run_actor: RunActorFunction,
) -> None:
async def main() -> None:
# TC003 - TYPE_CHECKING section is problematic for our integration tests.
import os
from collections.abc import Callable # noqa: TC003
from datetime import datetime
from typing import Any
from apify_shared.consts import ActorEventTypes, ApifyEnvVars
from crawlee.events._types import Event, EventSystemInfoData
os.environ[ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS] = '900'
was_system_info_emitted = False
system_infos = list[EventSystemInfoData]()
def on_event(event_type: ActorEventTypes) -> Callable:
async def log_event(data: Any) -> None:
nonlocal was_system_info_emitted
nonlocal system_infos
print(f'Got actor event ({event_type=}, {data=})')
await Actor.push_data({'event_type': event_type, 'data': data})
if event_type == ActorEventTypes.SYSTEM_INFO:
was_system_info_emitted = True
system_infos.append(data)
return log_event
async with Actor:
Actor.on(Event.SYSTEM_INFO, on_event(ActorEventTypes.SYSTEM_INFO))
Actor.on(Event.PERSIST_STATE, on_event(ActorEventTypes.PERSIST_STATE))
await asyncio.sleep(3)
# The SYSTEM_INFO event sometimes takes a while to appear, let's wait for it for a while longer.
for _ in range(20):
if was_system_info_emitted:
break
await asyncio.sleep(1)
# Check that parsing datetimes works correctly.
# Check `createdAt` is a datetime (so it's the same locally and on platform).
assert isinstance(system_infos[0].cpu_info.created_at, datetime)
actor = await make_actor(label='actor-interval-events', main_func=main)
run_result = await run_actor(actor)
assert run_result.status == 'SUCCEEDED'
dataset_items_page = await actor.last_run().dataset().list_items()
persist_state_events = [
item for item in dataset_items_page.items if item['event_type'] == ActorEventTypes.PERSIST_STATE
]
system_info_events = [
item for item in dataset_items_page.items if item['event_type'] == ActorEventTypes.SYSTEM_INFO
]
assert len(persist_state_events) > 2
assert len(system_info_events) > 0