diff --git a/changelog/1219.feature.rst b/changelog/1219.feature.rst new file mode 100644 index 00000000..f74234f6 --- /dev/null +++ b/changelog/1219.feature.rst @@ -0,0 +1,2 @@ +Added ``--ramp`` to stagger when workers begin executing tests, while still +allowing workers to start and collect tests normally. diff --git a/docs/distribution.rst b/docs/distribution.rst index d2d2cd9a..2cce8ba6 100644 --- a/docs/distribution.rst +++ b/docs/distribution.rst @@ -56,6 +56,12 @@ Parallelization can be configured further with these options: * ``--max-worker-restart``: maximum number of workers that can be restarted when crashed (set to zero to disable this feature). +* ``--ramp=DURATION``: gradually start worker test execution over a duration. + Workers still start and collect tests normally, but each worker waits before + its first test according to its position in the worker pool. The duration is + specified in seconds by default and also accepts ``s``, ``m``, and ``h`` + suffixes, for example ``--ramp=10s`` or ``--ramp=5m``. + The test distribution algorithm is configured with the ``--dist`` command-line option: .. _distribution modes: diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 47e1de7d..41e62d46 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -90,6 +90,13 @@ def pytest_sessionstart(self, session: pytest.Session) -> None: nodes = self.nodemanager.setup_nodes(putevent=self.queue.put) self._active_nodes.update(nodes) self._session = session + ramp = self.config.getoption("ramp") + if ramp: + workers = len(nodes) + workers_noun = "worker" if workers == 1 else "workers" + self.report_line( + f"ramping test start over {ramp:g}s across {workers} {workers_noun}" + ) @pytest.hookimpl def pytest_sessionfinish(self) -> None: diff --git a/src/xdist/plugin.py b/src/xdist/plugin.py index d8553ce4..7dcdcdbb 100644 --- a/src/xdist/plugin.py +++ b/src/xdist/plugin.py @@ -1,5 +1,6 @@ from __future__ import annotations +import math import os import sys from typing import TYPE_CHECKING @@ -92,15 +93,13 @@ def _auto_num_workers_psutil(config: pytest.Config) -> int | None: def _auto_num_workers_os_sched_getaffinity(config: pytest.Config) -> int | None: - try: - from os import sched_getaffinity - + sched_getaffinity = getattr(os, "sched_getaffinity", None) + if sched_getaffinity is not None: return len(sched_getaffinity(0)) - except ImportError: - if os.environ.get("TRAVIS") == "true": - # workaround https://github.com/pypy/pypy/issues/2375 - return 2 - return None + if os.environ.get("TRAVIS") == "true": + # workaround https://github.com/pypy/pypy/issues/2375 + return 2 + return None def _auto_num_workers_os_multiprocessing_cpu_count(config: pytest.Config) -> int | None: @@ -141,6 +140,32 @@ def parse_numprocesses(s: str) -> int | Literal["auto", "logical"]: return int(s) +def parse_ramp_duration(s: str) -> float: + value = s.strip() + if not value: + raise pytest.UsageError("--ramp requires a duration") + + unit = value[-1] if value[-1].isalpha() else "" + number = value[:-1] if unit else value + multipliers = {"": 1.0, "s": 1.0, "m": 60.0, "h": 3600.0} + if unit not in multipliers or not number: + raise pytest.UsageError( + "--ramp duration must be a non-negative number with optional s, m, or h suffix" + ) + + try: + seconds = float(number) + except ValueError as e: + raise pytest.UsageError( + "--ramp duration must be a non-negative number with optional s, m, or h suffix" + ) from e + + if seconds < 0 or not math.isfinite(seconds): + raise pytest.UsageError("--ramp duration must be a non-negative finite value") + + return seconds * multipliers[unit] + + @pytest.hookimpl def pytest_addoption(parser: pytest.Parser) -> None: # 'Help' formatting (same rules as pytest's): @@ -178,6 +203,18 @@ def pytest_addoption(parser: pytest.Parser) -> None: help="Maximum number of workers that can be restarted " "when crashed (set to zero to disable this feature)", ) + group.addoption( + "--ramp", + action="store", + default=0.0, + dest="ramp", + metavar="DURATION", + type=parse_ramp_duration, + help=( + "Gradually start worker test execution over the given duration. " + "Accepts seconds by default, or s, m, h suffixes." + ), + ) group.addoption( "--dist", metavar="distmode", diff --git a/src/xdist/remote.py b/src/xdist/remote.py index 436b5ddf..409b90b0 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -115,6 +115,8 @@ def __init__(self, config: pytest.Config, channel: execnet.Channel) -> None: workerinput: dict[str, Any] = config.workerinput # type: ignore[attr-defined] self.workerid = workerinput.get("workerid", "?") self.testrunuid = workerinput["testrunuid"] + self.rampdelay = float(workerinput.get("rampdelay", 0.0)) + self._ramp_sleep_done = False self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug) self.channel = channel self.torun = TestQueue(self.channel.gateway.execmodel) @@ -221,6 +223,7 @@ def run_one_test(self) -> None: assert self.nextitem_index is not None nextitem = items[self.nextitem_index] + self._sleep_before_first_test() worker_title("[pytest-xdist running] %s" % item.nodeid) start = time.perf_counter() @@ -233,6 +236,13 @@ def run_one_test(self) -> None: "runtest_protocol_complete", item_index=self.item_index, duration=duration ) + def _sleep_before_first_test(self) -> None: + if self._ramp_sleep_done: + return + self._ramp_sleep_done = True + if self.rampdelay > 0: + time.sleep(self.rampdelay) + def pytest_collection_modifyitems( self, config: pytest.Config, diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index 201c8e71..c54b18fb 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -94,19 +94,23 @@ def setup_nodes( ) -> list[WorkerController]: self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs) self.trace("setting up nodes") - return [self.setup_node(spec, putevent) for spec in self.specs] + return [ + self.setup_node(spec, putevent, worker_index) + for worker_index, spec in enumerate(self.specs) + ] def setup_node( self, spec: execnet.XSpec, putevent: Callable[[tuple[str, dict[str, Any]]], None], + worker_index: int = 0, ) -> WorkerController: if getattr(spec, "execmodel", None) != "main_thread_only": spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}") gw = self.group.makegateway(spec) self.config.hook.pytest_xdist_newgateway(gateway=gw) self.rsync_roots(gw) - node = WorkerController(self, gw, self.config, putevent) + node = WorkerController(self, gw, self.config, putevent, worker_index) # Keep the node alive. gw.node = node # type: ignore[attr-defined] node.setup() @@ -299,17 +303,22 @@ def __init__( gateway: execnet.Gateway, config: pytest.Config, putevent: Callable[[tuple[str, dict[str, Any]]], None], + worker_index: int = 0, ) -> None: config.pluginmanager.register(self.RemoteHook()) self.nodemanager = nodemanager self.putevent = putevent self.gateway = gateway self.config = config + workercount = len(nodemanager.specs) + ramp = getattr(config.option, "ramp", 0.0) + rampdelay = ramp * worker_index / workercount if workercount and ramp else 0.0 self.workerinput = { "workerid": gateway.id, - "workercount": len(nodemanager.specs), + "workercount": workercount, "testrunuid": nodemanager.testrunuid, "mainargv": sys.argv, + "rampdelay": rampdelay, } self._down = False self._shutdown_sent = False diff --git a/testing/acceptance_test.py b/testing/acceptance_test.py index 1b44985d..24611832 100644 --- a/testing/acceptance_test.py +++ b/testing/acceptance_test.py @@ -22,6 +22,22 @@ def test_ok(): assert result.ret == 0 result.stdout.fnmatch_lines(["*1 passed*"]) + def test_ramp_reports_ramp_period(self, pytester: pytest.Pytester) -> None: + pytester.makepyfile( + """ + def test_ok(): + pass + """ + ) + result = pytester.runpytest("-n2", "--ramp=0.01s") + assert result.ret == 0 + result.stdout.fnmatch_lines( + [ + "ramping test start over 0.01s across 2 workers", + "*1 passed*", + ] + ) + def test_n1_fail(self, pytester: pytest.Pytester) -> None: p1 = pytester.makepyfile( """ diff --git a/testing/test_plugin.py b/testing/test_plugin.py index 0aaee8c3..56ab5f4e 100644 --- a/testing/test_plugin.py +++ b/testing/test_plugin.py @@ -9,6 +9,30 @@ from xdist.workermanage import NodeManager +@pytest.mark.parametrize( + ("value", "expected"), + [ + ("10", 10.0), + ("10s", 10.0), + ("0.01s", 0.01), + ("5m", 300.0), + ("1h", 3600.0), + ], +) +def test_parse_ramp_duration(value: str, expected: float) -> None: + from xdist.plugin import parse_ramp_duration + + assert parse_ramp_duration(value) == expected + + +@pytest.mark.parametrize("value", ["", "-1", "-1s", "1d", "1ss", "soon"]) +def test_parse_ramp_duration_rejects_invalid_values(value: str) -> None: + from xdist.plugin import parse_ramp_duration + + with pytest.raises(pytest.UsageError): + parse_ramp_duration(value) + + @pytest.fixture def monkeypatch_3_cpus(monkeypatch: pytest.MonkeyPatch) -> None: """Make pytest-xdist believe the system has 3 CPUs.""" diff --git a/testing/test_remote.py b/testing/test_remote.py index ae82e2b7..762c5734 100644 --- a/testing/test_remote.py +++ b/testing/test_remote.py @@ -4,6 +4,7 @@ import pprint from queue import Queue import sys +import time from typing import Any from typing import Callable from typing import cast @@ -13,6 +14,7 @@ import execnet import pytest +from xdist.remote import WorkerInteractor from xdist.workermanage import NodeManager from xdist.workermanage import WorkerController @@ -92,6 +94,35 @@ class TestWorkerInteractor: [dict[str, Any]], Union[pytest.CollectReport, pytest.TestReport] ] + def test_ramp_delay_sleeps_once_before_first_test( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + slept: list[float] = [] + monkeypatch.setattr(time, "sleep", slept.append) + + interactor = WorkerInteractor.__new__(WorkerInteractor) + interactor.rampdelay = 0.25 + interactor._ramp_sleep_done = False + + interactor._sleep_before_first_test() + interactor._sleep_before_first_test() + + assert slept == [0.25] + + def test_ramp_delay_zero_does_not_sleep( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + slept: list[float] = [] + monkeypatch.setattr(time, "sleep", slept.append) + + interactor = WorkerInteractor.__new__(WorkerInteractor) + interactor.rampdelay = 0.0 + interactor._ramp_sleep_done = False + + interactor._sleep_before_first_test() + + assert slept == [] + @pytest.fixture def unserialize_report(self, pytestconfig: pytest.Config) -> UnserializerReport: def unserialize( diff --git a/testing/test_workermanage.py b/testing/test_workermanage.py index b3e8a1c7..4b393150 100644 --- a/testing/test_workermanage.py +++ b/testing/test_workermanage.py @@ -91,6 +91,55 @@ def test_popen_makegateway_events( hm.teardown_nodes() assert not len(hm.group) + def test_popen_makegateway_passes_stable_worker_index( + self, + config: pytest.Config, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + worker_indices: list[tuple[str, int]] = [] + + class MockController: + def __init__( + self, + nodemanager: NodeManager, + gateway: execnet.Gateway, + config: pytest.Config, + putevent: object, + worker_index: int, + ) -> None: + worker_indices.append((gateway.id, worker_index)) + + def setup(self) -> None: + pass + + monkeypatch.setattr(workermanage, "WorkerController", MockController) + hm = NodeManager(config, ["popen//id=alpha", "popen//id=beta"]) + hm.setup_nodes(None) # type: ignore[arg-type] + + assert worker_indices == [("alpha", 0), ("beta", 1)] + + def test_workerinput_includes_ramp_delay(self, config: pytest.Config) -> None: + from xdist.workermanage import WorkerController + + class DummyManager: + testrunuid = "testrun" + specs = [execnet.XSpec("popen")] * 4 + + class DummyGateway: + id = "gw2" + spec = execnet.XSpec("popen") + + config.option.ramp = 12.0 + node = WorkerController( + nodemanager=DummyManager(), # type: ignore[arg-type] + gateway=DummyGateway(), # type: ignore[arg-type] + config=config, + putevent=lambda event: None, + worker_index=2, + ) + + assert node.workerinput["rampdelay"] == 6.0 + def test_popens_rsync( self, config: pytest.Config,