-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathtest_apify_event_manager.py
More file actions
332 lines (257 loc) · 13 KB
/
test_apify_event_manager.py
File metadata and controls
332 lines (257 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
from collections import defaultdict
from datetime import timedelta
from typing import TYPE_CHECKING, Any
from unittest.mock import Mock
import pytest
import websockets
import websockets.asyncio.server
from apify_shared.consts import ActorEnvVars
from crawlee.events._types import Event
from apify import Configuration
from apify.events import ApifyEventManager
from apify.events._types import SystemInfoEventData
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Callable
@contextlib.asynccontextmanager
async def _platform_ws_server(
monkeypatch: pytest.MonkeyPatch,
) -> AsyncGenerator[tuple[set[websockets.asyncio.server.ServerConnection], asyncio.Event]]:
"""Create a local WebSocket server that simulates Apify platform events.
Binds explicitly to ``127.0.0.1`` instead of ``localhost`` so that only a
single IPv4 socket is created. On Windows, ``localhost`` resolves to both
``127.0.0.1`` *and* ``::1``, and the OS may assign **different** random
ports to each address — causing the client to connect to the wrong port.
Yields a ``(connected_ws_clients, client_connected_event)`` tuple. After
opening an `ApifyEventManager`, ``await client_connected_event.wait()``
before sending any messages to guarantee the server handler has registered
the connection.
"""
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
client_connected = asyncio.Event()
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
connected_ws_clients.add(websocket)
client_connected.set()
try:
await websocket.wait_closed()
finally:
connected_ws_clients.remove(websocket)
async with websockets.asyncio.server.serve(handler, host='127.0.0.1') as ws_server:
port: int = ws_server.sockets[0].getsockname()[1]
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://127.0.0.1:{port}')
yield connected_ws_clients, client_connected
async def test_lifecycle_local(caplog: pytest.LogCaptureFixture) -> None:
caplog.set_level(logging.DEBUG, logger='apify')
async with ApifyEventManager(Configuration()):
pass
assert len(caplog.records) == 1
assert caplog.records[0].levelno == logging.DEBUG
assert (
caplog.records[0].message
== 'APIFY_ACTOR_EVENTS_WS_URL env var not set, no events from Apify platform will be emitted.'
)
async def test_event_handling_local() -> None:
config = Configuration.get_global_configuration()
async with ApifyEventManager(config) as event_manager:
event_calls = defaultdict(list)
def on_event(event: Event, id: int | None = None) -> Callable:
def event_handler(data: Any) -> None:
nonlocal event_calls
event_calls[event].append((id, data))
return event_handler
handler_system_info = on_event(Event.SYSTEM_INFO)
dummy_system_info = Mock()
dummy_system_info_2 = Mock()
# Basic test with just one handler on event
# Test adding the handler
event_manager.on(event=Event.SYSTEM_INFO, listener=handler_system_info)
event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info)
await asyncio.sleep(0.1)
assert event_calls[Event.SYSTEM_INFO] == [(None, dummy_system_info)]
event_calls[Event.SYSTEM_INFO].clear()
# Test removing the handler
event_manager.off(event=Event.SYSTEM_INFO, listener=handler_system_info)
event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info_2)
await asyncio.sleep(0.1)
assert event_calls[Event.SYSTEM_INFO] == []
# Complicated test with multiple handlers
# Add three handlers
handler_persist_state_1 = on_event(Event.PERSIST_STATE, 1)
handler_persist_state_2 = on_event(Event.PERSIST_STATE, 2)
handler_persist_state_3 = on_event(Event.PERSIST_STATE, 3)
event_manager.on(event=Event.PERSIST_STATE, listener=handler_persist_state_1)
event_manager.on(event=Event.PERSIST_STATE, listener=handler_persist_state_2)
event_manager.on(event=Event.PERSIST_STATE, listener=handler_persist_state_3)
dummy_persist_state = Mock()
# Test that they all work
event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state)
await asyncio.sleep(0.1)
assert set(event_calls[Event.PERSIST_STATE]) == {
(1, dummy_persist_state),
(2, dummy_persist_state),
(3, dummy_persist_state),
}
event_calls[Event.PERSIST_STATE].clear()
# Test that if you remove one, the others stay
event_manager.off(event=Event.PERSIST_STATE, listener=handler_persist_state_3)
event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state)
await asyncio.sleep(0.1)
assert set(event_calls[Event.PERSIST_STATE]) == {
(1, dummy_persist_state),
(2, dummy_persist_state),
}
event_calls[Event.PERSIST_STATE].clear()
# Test that removing all in bulk works
event_manager.off(event=Event.PERSIST_STATE)
event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state)
await asyncio.sleep(0.1)
assert event_calls[Event.PERSIST_STATE] == []
async def test_event_async_handling_local() -> None:
dummy_system_info = Mock()
config = Configuration.get_global_configuration()
async with ApifyEventManager(config) as event_manager:
event_calls = []
async def event_handler(data: Any) -> None:
nonlocal event_calls
await asyncio.sleep(2)
event_calls.append(data)
# Test that async event handlers work, and that they don't block the main thread
event_manager.on(event=Event.SYSTEM_INFO, listener=event_handler)
event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info)
await asyncio.sleep(1)
assert event_calls == []
await asyncio.sleep(2)
assert event_calls == [dummy_system_info]
async def test_lifecycle_on_platform_without_websocket(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, 'ws://localhost:56565')
event_manager = ApifyEventManager(Configuration.get_global_configuration())
with pytest.raises(RuntimeError, match=r'Error connecting to platform events websocket!'):
async with event_manager:
pass
async def test_lifecycle_on_platform(monkeypatch: pytest.MonkeyPatch) -> None:
async with (
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
ApifyEventManager(Configuration.get_global_configuration()),
):
await client_connected.wait()
assert len(connected_ws_clients) == 1
async def test_event_handling_on_platform(monkeypatch: pytest.MonkeyPatch) -> None:
async with _platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected):
async def send_platform_event(event_name: Event, data: Any = None) -> None:
message: dict[str, Any] = {'name': event_name.value}
if data:
message['data'] = data
websockets.broadcast(connected_ws_clients, json.dumps(message))
dummy_system_info = {
'memAvgBytes': 19328860.328293584,
'memCurrentBytes': 65171456,
'memMaxBytes': 65171456,
'cpuAvgUsage': 2.0761105633130397,
'cpuMaxUsage': 53.941134593993326,
'cpuCurrentUsage': 8.45549815498155,
'isCpuOverloaded': False,
'createdAt': '2024-08-09T16:04:16.161Z',
}
SystemInfoEventData.model_validate(dummy_system_info)
async with ApifyEventManager(Configuration.get_global_configuration()) as event_manager:
await client_connected.wait()
event_calls = []
def listener(data: Any) -> None:
event_calls.append(json.loads(data.model_dump_json(by_alias=True)) if data else None)
event_manager.on(event=Event.SYSTEM_INFO, listener=listener)
# Test sending event with data
await send_platform_event(Event.SYSTEM_INFO, dummy_system_info)
await asyncio.sleep(0.1)
assert len(event_calls) == 1
assert event_calls[0] is not None
assert event_calls[0]['cpuInfo']['usedRatio'] == 0.0845549815498155
event_calls.clear()
async def test_event_listener_removal_stops_counting() -> None:
"""Test that removing an event listener stops it from receiving further events."""
config = Configuration.get_global_configuration()
async with ApifyEventManager(config, persist_state_interval=timedelta(milliseconds=500)) as event_manager:
persist_state_counter = 0
async def handler(_data: Any) -> None:
nonlocal persist_state_counter
persist_state_counter += 1
event_manager.on(event=Event.PERSIST_STATE, listener=handler)
await asyncio.sleep(1.5)
first_count = persist_state_counter
assert first_count > 0
event_manager.off(event=Event.PERSIST_STATE, listener=handler)
persist_state_counter = 0
await asyncio.sleep(1.5)
assert persist_state_counter == 0
async def test_deprecated_event_is_skipped(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that deprecated events (like CPU_INFO) are silently skipped."""
async with (
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
ApifyEventManager(Configuration.get_global_configuration()) as event_manager,
):
await client_connected.wait()
event_calls: list[Any] = []
def listener(data: Any) -> None:
event_calls.append(data)
event_manager.on(event=Event.SYSTEM_INFO, listener=listener)
# Send a deprecated event (cpuInfo is deprecated)
deprecated_message = json.dumps({'name': 'cpuInfo', 'data': {}})
websockets.broadcast(connected_ws_clients, deprecated_message)
await asyncio.sleep(0.2)
# No events should have been emitted
assert len(event_calls) == 0
async def test_unknown_event_is_logged(monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture) -> None:
"""Test that unknown events are logged and not emitted."""
async with (
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
ApifyEventManager(Configuration.get_global_configuration()),
):
await client_connected.wait()
# Send an unknown event
unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}})
websockets.broadcast(connected_ws_clients, unknown_message)
await asyncio.sleep(0.2)
assert 'Unknown message received' in caplog.text
assert 'totallyNewEvent2099' in caplog.text
async def test_migrating_event_triggers_persist_state(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that a MIGRATING event triggers a PERSIST_STATE event with is_migrating=True."""
async with (
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
ApifyEventManager(Configuration.get_global_configuration()) as event_manager,
):
await client_connected.wait()
persist_calls: list[Any] = []
migrating_calls: list[Any] = []
def persist_listener(data: Any) -> None:
persist_calls.append(data)
def migrating_listener(data: Any) -> None:
migrating_calls.append(data)
event_manager.on(event=Event.PERSIST_STATE, listener=persist_listener)
event_manager.on(event=Event.MIGRATING, listener=migrating_listener)
# Clear any initial persist state events
await asyncio.sleep(0.2)
persist_calls.clear()
# Send migrating event
migrating_message = json.dumps({'name': 'migrating'})
websockets.broadcast(connected_ws_clients, migrating_message)
await asyncio.sleep(0.2)
assert len(migrating_calls) == 1
# MIGRATING should also trigger a PERSIST_STATE with is_migrating=True
migration_persist_events = [c for c in persist_calls if hasattr(c, 'is_migrating') and c.is_migrating]
assert len(migration_persist_events) >= 1
async def test_malformed_message_logs_exception(
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
"""Test that malformed websocket messages are logged and don't crash the event manager."""
async with (
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
ApifyEventManager(Configuration.get_global_configuration()),
):
await client_connected.wait()
# Send malformed message
websockets.broadcast(connected_ws_clients, 'this is not valid json{{{')
await asyncio.sleep(0.2)
assert 'Cannot parse Actor event' in caplog.text