Skip to content

Commit 97c2c93

Browse files
committed
uvloop tests
1 parent c12638f commit 97c2c93

4 files changed

Lines changed: 118 additions & 7 deletions

File tree

distributed/cli/tests/test_dask_scheduler.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import os
45
import re
56
import shutil
@@ -24,6 +25,7 @@
2425
from distributed.utils_test import (
2526
assert_can_connect_from_everywhere_4_6,
2627
assert_can_connect_locally_4,
28+
gen_test,
2729
popen,
2830
)
2931

@@ -672,7 +674,8 @@ def test_signal_handling(loop, sig):
672674
[
673675
sys.executable,
674676
"-m",
675-
"distributed.cli.dask_scheduler",
677+
"dask",
678+
"scheduler",
676679
f"--port={port}",
677680
"--dashboard-address=:0",
678681
],
@@ -690,3 +693,27 @@ def test_signal_handling(loop, sig):
690693
assert scheduler.returncode == 0
691694
assert "scheduler closing" in logs
692695
assert "end scheduler" in logs
696+
697+
698+
@gen_test()
699+
async def test_uvloop():
700+
uvloop = pytest.importorskip("uvloop")
701+
port = open_port()
702+
703+
def check():
704+
return isinstance(asyncio.get_event_loop(), uvloop.Loop)
705+
706+
with popen(
707+
[
708+
sys.executable,
709+
"-m",
710+
"dask",
711+
"scheduler",
712+
"--no-dashboard",
713+
"--host",
714+
f"127.0.0.1:{port}",
715+
],
716+
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
717+
):
718+
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c:
719+
assert await c.run_on_scheduler(check)

distributed/cli/tests/test_dask_spec.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,3 +172,46 @@ async def test_signal_handling_scheduler(sig):
172172
assert "exception" not in logs
173173
finally:
174174
await scheduler.wait()
175+
176+
177+
@gen_test()
178+
async def test_uvloop():
179+
uvloop = pytest.importorskip("uvloop")
180+
port = open_port()
181+
env = {"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}
182+
183+
def check():
184+
return isinstance(asyncio.get_event_loop(), uvloop.Loop)
185+
186+
with popen(
187+
[
188+
sys.executable,
189+
"-m",
190+
"dask",
191+
"spec",
192+
"--spec",
193+
json.dumps({"cls": "dask.distributed.Scheduler", "opts": {"port": port}}),
194+
],
195+
env=env,
196+
):
197+
with popen(
198+
[
199+
sys.executable,
200+
"-m",
201+
"dask",
202+
"spec",
203+
f"tcp://localhost:{port}",
204+
"--spec",
205+
json.dumps(
206+
{
207+
"cls": "dask.distributed.Worker",
208+
"opts": {"nanny": False, "nthreads": 1},
209+
}
210+
),
211+
],
212+
env=env,
213+
):
214+
async with Client(f"tcp://localhost:{port}", asynchronous=True) as client:
215+
await client.wait_for_workers(1)
216+
assert await client.run_on_scheduler(check)
217+
assert all((await client.run(check)).values())

distributed/cli/tests/test_dask_worker.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,3 +811,29 @@ def test_error_during_startup(monkeypatch, nanny, loop):
811811
],
812812
) as worker:
813813
assert worker.wait(10) == 1
814+
815+
816+
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
817+
@gen_cluster(client=True, nthreads=[])
818+
async def test_uvloop(c, s, nanny):
819+
uvloop = pytest.importorskip("uvloop")
820+
821+
def check():
822+
return isinstance(asyncio.get_event_loop(), uvloop.Loop)
823+
824+
with popen(
825+
[
826+
sys.executable,
827+
"-m",
828+
"dask",
829+
"worker",
830+
s.address,
831+
nanny,
832+
"--no-dashboard",
833+
],
834+
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
835+
):
836+
await c.wait_for_workers(1)
837+
assert all((await c.run(check)).values())
838+
if nanny == "--nanny":
839+
assert all((await c.run(check, nanny=True)).values())

distributed/tests/test_config.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,15 +347,30 @@ def test_matches(c, s, root):
347347
test_matches(config, schema, "")
348348

349349

350-
def test_uvloop_event_loop():
350+
uvloop_test_script = """
351+
import asyncio
352+
353+
import uvloop
354+
355+
import distributed
356+
357+
async def test():
358+
return isinstance(asyncio.get_event_loop(), uvloop.Loop)
359+
360+
if __name__ == "__main__":
361+
with distributed.Client(processes=True, n_workers=1) as client:
362+
assert client.sync(test) # client loop
363+
assert client.run_on_scheduler(test) # also client loop
364+
assert client.run(test) # nanny loop
365+
"""
366+
367+
368+
def test_uvloop():
351369
"""Check that configuring distributed to use uvloop actually sets the event loop policy"""
352370
pytest.importorskip("uvloop")
353-
script = (
354-
"import distributed, asyncio, uvloop\n"
355-
"assert isinstance(asyncio.get_event_loop_policy(), uvloop.EventLoopPolicy)"
356-
)
371+
357372
subprocess.check_call(
358-
[sys.executable, "-c", script],
373+
[sys.executable, "-c", uvloop_test_script],
359374
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
360375
)
361376

0 commit comments

Comments
 (0)