Skip to content

Commit c07461e

Browse files
vdusekclaude
andauthored
test: Eliminate race conditions in event manager WebSocket tests (#801)
## Summary - Fix flaky `test_event_handling_on_platform` test that failed intermittently on Windows CI - Root cause: dual-stack `localhost` binding created two sockets with different ports on Windows, and a race condition between server handler registration and message broadcasting - Extract shared `_platform_ws_server` helper binding to `127.0.0.1` with `asyncio.Event` synchronization — applied to all 6 WebSocket-based tests Example failure: https://github.com/apify/apify-sdk-python/actions/runs/22132559177/job/63976018507?pr=782 Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent be4997e commit c07461e

File tree

1 file changed

+108
-131
lines changed

1 file changed

+108
-131
lines changed

tests/unit/events/test_apify_event_manager.py

Lines changed: 108 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import contextlib
45
import json
56
import logging
67
from collections import defaultdict
@@ -20,7 +21,40 @@
2021
from apify.events._types import SystemInfoEventData
2122

2223
if TYPE_CHECKING:
23-
from collections.abc import Callable
24+
from collections.abc import AsyncGenerator, Callable
25+
26+
27+
@contextlib.asynccontextmanager
28+
async def _platform_ws_server(
29+
monkeypatch: pytest.MonkeyPatch,
30+
) -> AsyncGenerator[tuple[set[websockets.asyncio.server.ServerConnection], asyncio.Event]]:
31+
"""Create a local WebSocket server that simulates Apify platform events.
32+
33+
Binds explicitly to ``127.0.0.1`` instead of ``localhost`` so that only a
34+
single IPv4 socket is created. On Windows, ``localhost`` resolves to both
35+
``127.0.0.1`` *and* ``::1``, and the OS may assign **different** random
36+
ports to each address — causing the client to connect to the wrong port.
37+
38+
Yields a ``(connected_ws_clients, client_connected_event)`` tuple. After
39+
opening an `ApifyEventManager`, ``await client_connected_event.wait()``
40+
before sending any messages to guarantee the server handler has registered
41+
the connection.
42+
"""
43+
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
44+
client_connected = asyncio.Event()
45+
46+
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
47+
connected_ws_clients.add(websocket)
48+
client_connected.set()
49+
try:
50+
await websocket.wait_closed()
51+
finally:
52+
connected_ws_clients.remove(websocket)
53+
54+
async with websockets.asyncio.server.serve(handler, host='127.0.0.1') as ws_server:
55+
port: int = ws_server.sockets[0].getsockname()[1]
56+
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://127.0.0.1:{port}')
57+
yield connected_ws_clients, client_connected
2458

2559

2660
async def test_lifecycle_local(caplog: pytest.LogCaptureFixture) -> None:
@@ -137,47 +171,23 @@ async def test_lifecycle_on_platform_without_websocket(monkeypatch: pytest.Monke
137171

138172

139173
async def test_lifecycle_on_platform(monkeypatch: pytest.MonkeyPatch) -> None:
140-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
141-
142-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
143-
connected_ws_clients.add(websocket)
144-
try:
145-
await websocket.wait_closed()
146-
finally:
147-
connected_ws_clients.remove(websocket)
148-
149-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
150-
# When you don't specify a port explicitly, the websocket connection is opened on a random free port.
151-
# We need to find out which port is that.
152-
port: int = ws_server.sockets[0].getsockname()[1]
153-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
154-
155-
async with ApifyEventManager(Configuration.get_global_configuration()):
156-
assert len(connected_ws_clients) == 1
174+
async with (
175+
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
176+
ApifyEventManager(Configuration.get_global_configuration()),
177+
):
178+
await client_connected.wait()
179+
assert len(connected_ws_clients) == 1
157180

158181

159182
async def test_event_handling_on_platform(monkeypatch: pytest.MonkeyPatch) -> None:
160-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
183+
async with _platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected):
161184

162-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
163-
connected_ws_clients.add(websocket)
164-
try:
165-
await websocket.wait_closed()
166-
finally:
167-
connected_ws_clients.remove(websocket)
185+
async def send_platform_event(event_name: Event, data: Any = None) -> None:
186+
message: dict[str, Any] = {'name': event_name.value}
187+
if data:
188+
message['data'] = data
168189

169-
async def send_platform_event(event_name: Event, data: Any = None) -> None:
170-
message: dict[str, Any] = {'name': event_name.value}
171-
if data:
172-
message['data'] = data
173-
174-
websockets.broadcast(connected_ws_clients, json.dumps(message))
175-
176-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
177-
# When you don't specify a port explicitly, the websocket connection is opened on a random free port.
178-
# We need to find out which port is that.
179-
port: int = ws_server.sockets[0].getsockname()[1]
180-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
190+
websockets.broadcast(connected_ws_clients, json.dumps(message))
181191

182192
dummy_system_info = {
183193
'memAvgBytes': 19328860.328293584,
@@ -192,6 +202,7 @@ async def send_platform_event(event_name: Event, data: Any = None) -> None:
192202
SystemInfoEventData.model_validate(dummy_system_info)
193203

194204
async with ApifyEventManager(Configuration.get_global_configuration()) as event_manager:
205+
await client_connected.wait()
195206
event_calls = []
196207

197208
def listener(data: Any) -> None:
@@ -232,124 +243,90 @@ async def handler(_data: Any) -> None:
232243

233244
async def test_deprecated_event_is_skipped(monkeypatch: pytest.MonkeyPatch) -> None:
234245
"""Test that deprecated events (like CPU_INFO) are silently skipped."""
235-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
236-
237-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
238-
connected_ws_clients.add(websocket)
239-
try:
240-
await websocket.wait_closed()
241-
finally:
242-
connected_ws_clients.remove(websocket)
243-
244-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
245-
port: int = ws_server.sockets[0].getsockname()[1]
246-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
247-
248-
async with ApifyEventManager(Configuration.get_global_configuration()) as event_manager:
249-
event_calls: list[Any] = []
250-
251-
def listener(data: Any) -> None:
252-
event_calls.append(data)
246+
async with (
247+
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
248+
ApifyEventManager(Configuration.get_global_configuration()) as event_manager,
249+
):
250+
await client_connected.wait()
251+
event_calls: list[Any] = []
252+
253+
def listener(data: Any) -> None:
254+
event_calls.append(data)
253255

254-
event_manager.on(event=Event.SYSTEM_INFO, listener=listener)
256+
event_manager.on(event=Event.SYSTEM_INFO, listener=listener)
255257

256-
# Send a deprecated event (cpuInfo is deprecated)
257-
deprecated_message = json.dumps({'name': 'cpuInfo', 'data': {}})
258-
websockets.broadcast(connected_ws_clients, deprecated_message)
259-
await asyncio.sleep(0.2)
258+
# Send a deprecated event (cpuInfo is deprecated)
259+
deprecated_message = json.dumps({'name': 'cpuInfo', 'data': {}})
260+
websockets.broadcast(connected_ws_clients, deprecated_message)
261+
await asyncio.sleep(0.2)
260262

261-
# No events should have been emitted
262-
assert len(event_calls) == 0
263+
# No events should have been emitted
264+
assert len(event_calls) == 0
263265

264266

265267
async def test_unknown_event_is_logged(monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture) -> None:
266268
"""Test that unknown events are logged and not emitted."""
267-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
269+
async with (
270+
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
271+
ApifyEventManager(Configuration.get_global_configuration()),
272+
):
273+
await client_connected.wait()
268274

269-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
270-
connected_ws_clients.add(websocket)
271-
try:
272-
await websocket.wait_closed()
273-
finally:
274-
connected_ws_clients.remove(websocket)
275+
# Send an unknown event
276+
unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}})
277+
websockets.broadcast(connected_ws_clients, unknown_message)
278+
await asyncio.sleep(0.2)
275279

276-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
277-
port: int = ws_server.sockets[0].getsockname()[1]
278-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
279-
280-
async with ApifyEventManager(Configuration.get_global_configuration()):
281-
# Send an unknown event
282-
unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}})
283-
websockets.broadcast(connected_ws_clients, unknown_message)
284-
await asyncio.sleep(0.2)
285-
286-
assert 'Unknown message received' in caplog.text
287-
assert 'totallyNewEvent2099' in caplog.text
280+
assert 'Unknown message received' in caplog.text
281+
assert 'totallyNewEvent2099' in caplog.text
288282

289283

290284
async def test_migrating_event_triggers_persist_state(monkeypatch: pytest.MonkeyPatch) -> None:
291285
"""Test that a MIGRATING event triggers a PERSIST_STATE event with is_migrating=True."""
292-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
286+
async with (
287+
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
288+
ApifyEventManager(Configuration.get_global_configuration()) as event_manager,
289+
):
290+
await client_connected.wait()
291+
persist_calls: list[Any] = []
292+
migrating_calls: list[Any] = []
293293

294-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
295-
connected_ws_clients.add(websocket)
296-
try:
297-
await websocket.wait_closed()
298-
finally:
299-
connected_ws_clients.remove(websocket)
300-
301-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
302-
port: int = ws_server.sockets[0].getsockname()[1]
303-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
304-
305-
async with ApifyEventManager(Configuration.get_global_configuration()) as event_manager:
306-
persist_calls: list[Any] = []
307-
migrating_calls: list[Any] = []
308-
309-
def persist_listener(data: Any) -> None:
310-
persist_calls.append(data)
294+
def persist_listener(data: Any) -> None:
295+
persist_calls.append(data)
311296

312-
def migrating_listener(data: Any) -> None:
313-
migrating_calls.append(data)
297+
def migrating_listener(data: Any) -> None:
298+
migrating_calls.append(data)
314299

315-
event_manager.on(event=Event.PERSIST_STATE, listener=persist_listener)
316-
event_manager.on(event=Event.MIGRATING, listener=migrating_listener)
300+
event_manager.on(event=Event.PERSIST_STATE, listener=persist_listener)
301+
event_manager.on(event=Event.MIGRATING, listener=migrating_listener)
317302

318-
# Clear any initial persist state events
319-
await asyncio.sleep(0.2)
320-
persist_calls.clear()
303+
# Clear any initial persist state events
304+
await asyncio.sleep(0.2)
305+
persist_calls.clear()
321306

322-
# Send migrating event
323-
migrating_message = json.dumps({'name': 'migrating'})
324-
websockets.broadcast(connected_ws_clients, migrating_message)
325-
await asyncio.sleep(0.2)
307+
# Send migrating event
308+
migrating_message = json.dumps({'name': 'migrating'})
309+
websockets.broadcast(connected_ws_clients, migrating_message)
310+
await asyncio.sleep(0.2)
326311

327-
assert len(migrating_calls) == 1
328-
# MIGRATING should also trigger a PERSIST_STATE with is_migrating=True
329-
migration_persist_events = [c for c in persist_calls if hasattr(c, 'is_migrating') and c.is_migrating]
330-
assert len(migration_persist_events) >= 1
312+
assert len(migrating_calls) == 1
313+
# MIGRATING should also trigger a PERSIST_STATE with is_migrating=True
314+
migration_persist_events = [c for c in persist_calls if hasattr(c, 'is_migrating') and c.is_migrating]
315+
assert len(migration_persist_events) >= 1
331316

332317

333318
async def test_malformed_message_logs_exception(
334319
monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
335320
) -> None:
336321
"""Test that malformed websocket messages are logged and don't crash the event manager."""
337-
connected_ws_clients: set[websockets.asyncio.server.ServerConnection] = set()
338-
339-
async def handler(websocket: websockets.asyncio.server.ServerConnection) -> None:
340-
connected_ws_clients.add(websocket)
341-
try:
342-
await websocket.wait_closed()
343-
finally:
344-
connected_ws_clients.remove(websocket)
345-
346-
async with websockets.asyncio.server.serve(handler, host='localhost') as ws_server:
347-
port: int = ws_server.sockets[0].getsockname()[1]
348-
monkeypatch.setenv(ActorEnvVars.EVENTS_WEBSOCKET_URL, f'ws://localhost:{port}')
322+
async with (
323+
_platform_ws_server(monkeypatch) as (connected_ws_clients, client_connected),
324+
ApifyEventManager(Configuration.get_global_configuration()),
325+
):
326+
await client_connected.wait()
349327

350-
async with ApifyEventManager(Configuration.get_global_configuration()):
351-
# Send malformed message
352-
websockets.broadcast(connected_ws_clients, 'this is not valid json{{{')
353-
await asyncio.sleep(0.2)
328+
# Send malformed message
329+
websockets.broadcast(connected_ws_clients, 'this is not valid json{{{')
330+
await asyncio.sleep(0.2)
354331

355-
assert 'Cannot parse Actor event' in caplog.text
332+
assert 'Cannot parse Actor event' in caplog.text

0 commit comments

Comments
 (0)