Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/1219.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added ``--ramp`` to stagger when workers begin executing tests, while still
allowing workers to start and collect tests normally.
6 changes: 6 additions & 0 deletions docs/distribution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ Parallelization can be configured further with these options:
* ``--max-worker-restart``: maximum number of workers that can be restarted
when crashed (set to zero to disable this feature).

* ``--ramp=DURATION``: gradually start worker test execution over a duration.
Workers still start and collect tests normally, but each worker waits before
its first test according to its position in the worker pool. The duration is
specified in seconds by default and also accepts ``s``, ``m``, and ``h``
suffixes, for example ``--ramp=10s`` or ``--ramp=5m``.

The test distribution algorithm is configured with the ``--dist`` command-line option:

.. _distribution modes:
Expand Down
7 changes: 7 additions & 0 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ def pytest_sessionstart(self, session: pytest.Session) -> None:
nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
self._active_nodes.update(nodes)
self._session = session
ramp = self.config.getoption("ramp")
if ramp:
workers = len(nodes)
workers_noun = "worker" if workers == 1 else "workers"
self.report_line(
f"ramping test start over {ramp:g}s across {workers} {workers_noun}"
)

@pytest.hookimpl
def pytest_sessionfinish(self) -> None:
Expand Down
53 changes: 45 additions & 8 deletions src/xdist/plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import math
import os
import sys
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -92,15 +93,13 @@ def _auto_num_workers_psutil(config: pytest.Config) -> int | None:


def _auto_num_workers_os_sched_getaffinity(config: pytest.Config) -> int | None:
try:
from os import sched_getaffinity

sched_getaffinity = getattr(os, "sched_getaffinity", None)
if sched_getaffinity is not None:
return len(sched_getaffinity(0))
except ImportError:
if os.environ.get("TRAVIS") == "true":
# workaround https://github.com/pypy/pypy/issues/2375
return 2
return None
if os.environ.get("TRAVIS") == "true":

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

followup note - travis is no longer a thing in direct use - we might want to drop explicit handling for it

# workaround https://github.com/pypy/pypy/issues/2375
return 2
return None


def _auto_num_workers_os_multiprocessing_cpu_count(config: pytest.Config) -> int | None:
Expand Down Expand Up @@ -141,6 +140,32 @@ def parse_numprocesses(s: str) -> int | Literal["auto", "logical"]:
return int(s)


def parse_ramp_duration(s: str) -> float:
value = s.strip()
if not value:
raise pytest.UsageError("--ramp requires a duration")

unit = value[-1] if value[-1].isalpha() else ""
number = value[:-1] if unit else value
multipliers = {"": 1.0, "s": 1.0, "m": 60.0, "h": 3600.0}
if unit not in multipliers or not number:
raise pytest.UsageError(
"--ramp duration must be a non-negative number with optional s, m, or h suffix"
)

try:
seconds = float(number)
except ValueError as e:
raise pytest.UsageError(
"--ramp duration must be a non-negative number with optional s, m, or h suffix"
) from e

if seconds < 0 or not math.isfinite(seconds):
raise pytest.UsageError("--ramp duration must be a non-negative finite value")

return seconds * multipliers[unit]


@pytest.hookimpl
def pytest_addoption(parser: pytest.Parser) -> None:
# 'Help' formatting (same rules as pytest's):
Expand Down Expand Up @@ -178,6 +203,18 @@ def pytest_addoption(parser: pytest.Parser) -> None:
help="Maximum number of workers that can be restarted "
"when crashed (set to zero to disable this feature)",
)
group.addoption(
"--ramp",
action="store",
default=0.0,
dest="ramp",
metavar="DURATION",
type=parse_ramp_duration,
help=(
"Gradually start worker test execution over the given duration. "
"Accepts seconds by default, or s, m, h suffixes."
),
)
group.addoption(
"--dist",
metavar="distmode",
Expand Down
10 changes: 10 additions & 0 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def __init__(self, config: pytest.Config, channel: execnet.Channel) -> None:
workerinput: dict[str, Any] = config.workerinput # type: ignore[attr-defined]
self.workerid = workerinput.get("workerid", "?")
self.testrunuid = workerinput["testrunuid"]
self.rampdelay = float(workerinput.get("rampdelay", 0.0))
self._ramp_sleep_done = False
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
self.channel = channel
self.torun = TestQueue(self.channel.gateway.execmodel)
Expand Down Expand Up @@ -221,6 +223,7 @@ def run_one_test(self) -> None:
assert self.nextitem_index is not None
nextitem = items[self.nextitem_index]

self._sleep_before_first_test()
worker_title("[pytest-xdist running] %s" % item.nodeid)

start = time.perf_counter()
Expand All @@ -233,6 +236,13 @@ def run_one_test(self) -> None:
"runtest_protocol_complete", item_index=self.item_index, duration=duration
)

def _sleep_before_first_test(self) -> None:
if self._ramp_sleep_done:
return
self._ramp_sleep_done = True
if self.rampdelay > 0:
time.sleep(self.rampdelay)

def pytest_collection_modifyitems(
self,
config: pytest.Config,
Expand Down
15 changes: 12 additions & 3 deletions src/xdist/workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,23 @@ def setup_nodes(
) -> list[WorkerController]:
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
return [self.setup_node(spec, putevent) for spec in self.specs]
return [
self.setup_node(spec, putevent, worker_index)
for worker_index, spec in enumerate(self.specs)
]

def setup_node(
self,
spec: execnet.XSpec,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
worker_index: int = 0,
) -> WorkerController:
if getattr(spec, "execmodel", None) != "main_thread_only":
spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}")
gw = self.group.makegateway(spec)
self.config.hook.pytest_xdist_newgateway(gateway=gw)
self.rsync_roots(gw)
node = WorkerController(self, gw, self.config, putevent)
node = WorkerController(self, gw, self.config, putevent, worker_index)
# Keep the node alive.
gw.node = node # type: ignore[attr-defined]
node.setup()
Expand Down Expand Up @@ -299,17 +303,22 @@ def __init__(
gateway: execnet.Gateway,
config: pytest.Config,
putevent: Callable[[tuple[str, dict[str, Any]]], None],
worker_index: int = 0,
) -> None:
config.pluginmanager.register(self.RemoteHook())
self.nodemanager = nodemanager
self.putevent = putevent
self.gateway = gateway
self.config = config
workercount = len(nodemanager.specs)
ramp = getattr(config.option, "ramp", 0.0)
rampdelay = ramp * worker_index / workercount if workercount and ramp else 0.0
self.workerinput = {
"workerid": gateway.id,
"workercount": len(nodemanager.specs),
"workercount": workercount,
"testrunuid": nodemanager.testrunuid,
"mainargv": sys.argv,
"rampdelay": rampdelay,
}
self._down = False
self._shutdown_sent = False
Expand Down
16 changes: 16 additions & 0 deletions testing/acceptance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ def test_ok():
assert result.ret == 0
result.stdout.fnmatch_lines(["*1 passed*"])

def test_ramp_reports_ramp_period(self, pytester: pytest.Pytester) -> None:
pytester.makepyfile(
"""
def test_ok():
pass
"""
)
result = pytester.runpytest("-n2", "--ramp=0.01s")
assert result.ret == 0
result.stdout.fnmatch_lines(
[
"ramping test start over 0.01s across 2 workers",
"*1 passed*",
]
)

def test_n1_fail(self, pytester: pytest.Pytester) -> None:
p1 = pytester.makepyfile(
"""
Expand Down
24 changes: 24 additions & 0 deletions testing/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,30 @@
from xdist.workermanage import NodeManager


@pytest.mark.parametrize(
("value", "expected"),
[
("10", 10.0),
("10s", 10.0),
("0.01s", 0.01),
("5m", 300.0),
("1h", 3600.0),
],
)
def test_parse_ramp_duration(value: str, expected: float) -> None:
from xdist.plugin import parse_ramp_duration

assert parse_ramp_duration(value) == expected


@pytest.mark.parametrize("value", ["", "-1", "-1s", "1d", "1ss", "soon"])
def test_parse_ramp_duration_rejects_invalid_values(value: str) -> None:
from xdist.plugin import parse_ramp_duration

with pytest.raises(pytest.UsageError):
parse_ramp_duration(value)


@pytest.fixture
def monkeypatch_3_cpus(monkeypatch: pytest.MonkeyPatch) -> None:
"""Make pytest-xdist believe the system has 3 CPUs."""
Expand Down
31 changes: 31 additions & 0 deletions testing/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pprint
from queue import Queue
import sys
import time
from typing import Any
from typing import Callable
from typing import cast
Expand All @@ -13,6 +14,7 @@
import execnet
import pytest

from xdist.remote import WorkerInteractor
from xdist.workermanage import NodeManager
from xdist.workermanage import WorkerController

Expand Down Expand Up @@ -92,6 +94,35 @@ class TestWorkerInteractor:
[dict[str, Any]], Union[pytest.CollectReport, pytest.TestReport]
]

def test_ramp_delay_sleeps_once_before_first_test(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
slept: list[float] = []
monkeypatch.setattr(time, "sleep", slept.append)

interactor = WorkerInteractor.__new__(WorkerInteractor)
interactor.rampdelay = 0.25
interactor._ramp_sleep_done = False

interactor._sleep_before_first_test()
interactor._sleep_before_first_test()

assert slept == [0.25]

def test_ramp_delay_zero_does_not_sleep(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
slept: list[float] = []
monkeypatch.setattr(time, "sleep", slept.append)

interactor = WorkerInteractor.__new__(WorkerInteractor)
interactor.rampdelay = 0.0
interactor._ramp_sleep_done = False

interactor._sleep_before_first_test()

assert slept == []

@pytest.fixture
def unserialize_report(self, pytestconfig: pytest.Config) -> UnserializerReport:
def unserialize(
Expand Down
49 changes: 49 additions & 0 deletions testing/test_workermanage.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,55 @@ def test_popen_makegateway_events(
hm.teardown_nodes()
assert not len(hm.group)

def test_popen_makegateway_passes_stable_worker_index(
self,
config: pytest.Config,
monkeypatch: pytest.MonkeyPatch,
) -> None:
worker_indices: list[tuple[str, int]] = []

class MockController:
def __init__(
self,
nodemanager: NodeManager,
gateway: execnet.Gateway,
config: pytest.Config,
putevent: object,
worker_index: int,
) -> None:
worker_indices.append((gateway.id, worker_index))

def setup(self) -> None:
pass

monkeypatch.setattr(workermanage, "WorkerController", MockController)
hm = NodeManager(config, ["popen//id=alpha", "popen//id=beta"])
hm.setup_nodes(None) # type: ignore[arg-type]

assert worker_indices == [("alpha", 0), ("beta", 1)]

def test_workerinput_includes_ramp_delay(self, config: pytest.Config) -> None:
from xdist.workermanage import WorkerController

class DummyManager:
testrunuid = "testrun"
specs = [execnet.XSpec("popen")] * 4

class DummyGateway:
id = "gw2"
spec = execnet.XSpec("popen")

config.option.ramp = 12.0
node = WorkerController(
nodemanager=DummyManager(), # type: ignore[arg-type]
gateway=DummyGateway(), # type: ignore[arg-type]
config=config,
putevent=lambda event: None,
worker_index=2,
)

assert node.workerinput["rampdelay"] == 6.0

def test_popens_rsync(
self,
config: pytest.Config,
Expand Down
Loading