diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 3b8b4ee03c..1affd8b4fd 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import os import re import shutil @@ -24,6 +25,7 @@ from distributed.utils_test import ( assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, + gen_test, popen, ) @@ -672,7 +674,8 @@ def test_signal_handling(loop, sig): [ sys.executable, "-m", - "distributed.cli.dask_scheduler", + "dask", + "scheduler", f"--port={port}", "--dashboard-address=:0", ], @@ -690,3 +693,27 @@ def test_signal_handling(loop, sig): assert scheduler.returncode == 0 assert "scheduler closing" in logs assert "end scheduler" in logs + + +@gen_test() +async def test_uvloop(): + uvloop = pytest.importorskip("uvloop") + port = open_port() + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "scheduler", + "--no-dashboard", + "--host", + f"127.0.0.1:{port}", + ], + env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, + ): + async with Client(f"127.0.0.1:{port}", asynchronous=True) as c: + assert await c.run_on_scheduler(check) diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 59bf858531..3c961a47a7 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -172,3 +172,46 @@ async def test_signal_handling_scheduler(sig): assert "exception" not in logs finally: await scheduler.wait() + + +@gen_test() +async def test_uvloop(): + uvloop = pytest.importorskip("uvloop") + port = open_port() + env = {"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"} + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "spec", + "--spec", + json.dumps({"cls": "dask.distributed.Scheduler", "opts": {"port": port}}), + ], + env=env, + ): + with popen( + [ + sys.executable, + "-m", + "dask", + "spec", + f"tcp://localhost:{port}", + "--spec", + json.dumps( + { + "cls": "dask.distributed.Worker", + "opts": {"nanny": False, "nthreads": 1}, + } + ), + ], + env=env, + ): + async with Client(f"tcp://localhost:{port}", asynchronous=True) as client: + await client.wait_for_workers(1) + assert await client.run_on_scheduler(check) + assert all((await client.run(check)).values()) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index bd12f8fc45..5449f17352 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -811,3 +811,29 @@ def test_error_during_startup(monkeypatch, nanny, loop): ], ) as worker: assert worker.wait(10) == 1 + + +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_uvloop(c, s, nanny): + uvloop = pytest.importorskip("uvloop") + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "worker", + s.address, + nanny, + "--no-dashboard", + ], + env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, + ): + await c.wait_for_workers(1) + assert all((await c.run(check)).values()) + if nanny == "--nanny": + assert all((await c.run(check, nanny=True)).values()) diff --git a/distributed/tests/test_config.py b/distributed/tests/test_config.py index b7bf97ef92..91737bb775 100644 --- a/distributed/tests/test_config.py +++ b/distributed/tests/test_config.py @@ -347,15 +347,30 @@ def test_matches(c, s, root): test_matches(config, schema, "") -def test_uvloop_event_loop(): +uvloop_test_script = """ +import asyncio + +import uvloop + +import distributed + +async def test(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + +if __name__ == "__main__": + with distributed.Client(processes=True, n_workers=1) as client: + assert client.sync(test) # client loop + assert client.run_on_scheduler(test) # also client loop + assert client.run(test) # nanny loop +""" + + +def test_uvloop(): """Check that configuring distributed to use uvloop actually sets the event loop policy""" pytest.importorskip("uvloop") - script = ( - "import distributed, asyncio, uvloop\n" - "assert isinstance(asyncio.get_event_loop_policy(), uvloop.EventLoopPolicy)" - ) + subprocess.check_call( - [sys.executable, "-c", script], + [sys.executable, "-c", uvloop_test_script], env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, )