Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import os
import re
import shutil
Expand All @@ -24,6 +25,7 @@
from distributed.utils_test import (
assert_can_connect_from_everywhere_4_6,
assert_can_connect_locally_4,
gen_test,
popen,
)

Expand Down Expand Up @@ -672,7 +674,8 @@ def test_signal_handling(loop, sig):
[
sys.executable,
"-m",
"distributed.cli.dask_scheduler",
"dask",
"scheduler",
f"--port={port}",
"--dashboard-address=:0",
],
Expand All @@ -690,3 +693,27 @@ def test_signal_handling(loop, sig):
assert scheduler.returncode == 0
assert "scheduler closing" in logs
assert "end scheduler" in logs


@gen_test()
async def test_uvloop():
uvloop = pytest.importorskip("uvloop")
port = open_port()

def check():
return isinstance(asyncio.get_event_loop(), uvloop.Loop)

with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--no-dashboard",
"--host",
f"127.0.0.1:{port}",
],
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
):
async with Client(f"127.0.0.1:{port}", asynchronous=True) as c:
assert await c.run_on_scheduler(check)
43 changes: 43 additions & 0 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,46 @@ async def test_signal_handling_scheduler(sig):
assert "exception" not in logs
finally:
await scheduler.wait()


@gen_test()
async def test_uvloop():
uvloop = pytest.importorskip("uvloop")
port = open_port()
env = {"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}

def check():
return isinstance(asyncio.get_event_loop(), uvloop.Loop)

with popen(
[
sys.executable,
"-m",
"dask",
"spec",
"--spec",
json.dumps({"cls": "dask.distributed.Scheduler", "opts": {"port": port}}),
],
env=env,
):
with popen(
[
sys.executable,
"-m",
"dask",
"spec",
f"tcp://localhost:{port}",
"--spec",
json.dumps(
{
"cls": "dask.distributed.Worker",
"opts": {"nanny": False, "nthreads": 1},
}
),
],
env=env,
):
async with Client(f"tcp://localhost:{port}", asynchronous=True) as client:
await client.wait_for_workers(1)
assert await client.run_on_scheduler(check)
assert all((await client.run(check)).values())
26 changes: 26 additions & 0 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,3 +811,29 @@ def test_error_during_startup(monkeypatch, nanny, loop):
],
) as worker:
assert worker.wait(10) == 1


@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@gen_cluster(client=True, nthreads=[])
async def test_uvloop(c, s, nanny):
uvloop = pytest.importorskip("uvloop")

def check():
return isinstance(asyncio.get_event_loop(), uvloop.Loop)

with popen(
[
sys.executable,
"-m",
"dask",
"worker",
s.address,
nanny,
"--no-dashboard",
],
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
):
await c.wait_for_workers(1)
assert all((await c.run(check)).values())
if nanny == "--nanny":
assert all((await c.run(check, nanny=True)).values())
27 changes: 21 additions & 6 deletions distributed/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,30 @@ def test_matches(c, s, root):
test_matches(config, schema, "")


def test_uvloop_event_loop():
uvloop_test_script = """
import asyncio

import uvloop

import distributed

async def test():
return isinstance(asyncio.get_event_loop(), uvloop.Loop)

if __name__ == "__main__":
with distributed.Client(processes=True, n_workers=1) as client:
assert client.sync(test) # client loop
assert client.run_on_scheduler(test) # also client loop
assert client.run(test) # nanny loop
"""


def test_uvloop():
"""Check that configuring distributed to use uvloop actually sets the event loop policy"""
pytest.importorskip("uvloop")
script = (
"import distributed, asyncio, uvloop\n"
"assert isinstance(asyncio.get_event_loop_policy(), uvloop.EventLoopPolicy)"
)

subprocess.check_call(
[sys.executable, "-c", script],
[sys.executable, "-c", uvloop_test_script],
env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"},
)

Expand Down
Loading