Skip to content

Commit d746848

Browse files
committed
feat: add worker ramp option
1 parent c554eb1 commit d746848

10 files changed

Lines changed: 202 additions & 11 deletions

File tree

changelog/1219.feature.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Added ``--ramp`` to stagger when workers begin executing tests, while still
2+
allowing workers to start and collect tests normally.

docs/distribution.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ Parallelization can be configured further with these options:
5656
* ``--max-worker-restart``: maximum number of workers that can be restarted
5757
when crashed (set to zero to disable this feature).
5858

59+
* ``--ramp=DURATION``: gradually start worker test execution over a duration.
60+
Workers still start and collect tests normally, but each worker waits before
61+
its first test according to its position in the worker pool. The duration is
62+
specified in seconds by default and also accepts ``s``, ``m``, and ``h``
63+
suffixes, for example ``--ramp=10s`` or ``--ramp=5m``.
64+
5965
The test distribution algorithm is configured with the ``--dist`` command-line option:
6066

6167
.. _distribution modes:

src/xdist/dsession.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ def pytest_sessionstart(self, session: pytest.Session) -> None:
9090
nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
9191
self._active_nodes.update(nodes)
9292
self._session = session
93+
ramp = self.config.getoption("ramp")
94+
if ramp:
95+
workers = len(nodes)
96+
workers_noun = "worker" if workers == 1 else "workers"
97+
self.report_line(
98+
f"ramping test start over {ramp:g}s across {workers} {workers_noun}"
99+
)
93100

94101
@pytest.hookimpl
95102
def pytest_sessionfinish(self) -> None:

src/xdist/plugin.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import math
34
import os
45
import sys
56
from typing import TYPE_CHECKING
@@ -92,15 +93,13 @@ def _auto_num_workers_psutil(config: pytest.Config) -> int | None:
9293

9394

9495
def _auto_num_workers_os_sched_getaffinity(config: pytest.Config) -> int | None:
95-
try:
96-
from os import sched_getaffinity
97-
96+
sched_getaffinity = getattr(os, "sched_getaffinity", None)
97+
if sched_getaffinity is not None:
9898
return len(sched_getaffinity(0))
99-
except ImportError:
100-
if os.environ.get("TRAVIS") == "true":
101-
# workaround https://github.com/pypy/pypy/issues/2375
102-
return 2
103-
return None
99+
if os.environ.get("TRAVIS") == "true":
100+
# workaround https://github.com/pypy/pypy/issues/2375
101+
return 2
102+
return None
104103

105104

106105
def _auto_num_workers_os_multiprocessing_cpu_count(config: pytest.Config) -> int | None:
@@ -141,6 +140,32 @@ def parse_numprocesses(s: str) -> int | Literal["auto", "logical"]:
141140
return int(s)
142141

143142

143+
def parse_ramp_duration(s: str) -> float:
144+
value = s.strip()
145+
if not value:
146+
raise pytest.UsageError("--ramp requires a duration")
147+
148+
unit = value[-1] if value[-1].isalpha() else ""
149+
number = value[:-1] if unit else value
150+
multipliers = {"": 1.0, "s": 1.0, "m": 60.0, "h": 3600.0}
151+
if unit not in multipliers or not number:
152+
raise pytest.UsageError(
153+
"--ramp duration must be a non-negative number with optional s, m, or h suffix"
154+
)
155+
156+
try:
157+
seconds = float(number)
158+
except ValueError as e:
159+
raise pytest.UsageError(
160+
"--ramp duration must be a non-negative number with optional s, m, or h suffix"
161+
) from e
162+
163+
if seconds < 0 or not math.isfinite(seconds):
164+
raise pytest.UsageError("--ramp duration must be a non-negative finite value")
165+
166+
return seconds * multipliers[unit]
167+
168+
144169
@pytest.hookimpl
145170
def pytest_addoption(parser: pytest.Parser) -> None:
146171
# 'Help' formatting (same rules as pytest's):
@@ -178,6 +203,18 @@ def pytest_addoption(parser: pytest.Parser) -> None:
178203
help="Maximum number of workers that can be restarted "
179204
"when crashed (set to zero to disable this feature)",
180205
)
206+
group.addoption(
207+
"--ramp",
208+
action="store",
209+
default=0.0,
210+
dest="ramp",
211+
metavar="DURATION",
212+
type=parse_ramp_duration,
213+
help=(
214+
"Gradually start worker test execution over the given duration. "
215+
"Accepts seconds by default, or s, m, h suffixes."
216+
),
217+
)
181218
group.addoption(
182219
"--dist",
183220
metavar="distmode",

src/xdist/remote.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ def __init__(self, config: pytest.Config, channel: execnet.Channel) -> None:
115115
workerinput: dict[str, Any] = config.workerinput # type: ignore[attr-defined]
116116
self.workerid = workerinput.get("workerid", "?")
117117
self.testrunuid = workerinput["testrunuid"]
118+
self.rampdelay = float(workerinput.get("rampdelay", 0.0))
119+
self._ramp_sleep_done = False
118120
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
119121
self.channel = channel
120122
self.torun = TestQueue(self.channel.gateway.execmodel)
@@ -221,6 +223,7 @@ def run_one_test(self) -> None:
221223
assert self.nextitem_index is not None
222224
nextitem = items[self.nextitem_index]
223225

226+
self._sleep_before_first_test()
224227
worker_title("[pytest-xdist running] %s" % item.nodeid)
225228

226229
start = time.perf_counter()
@@ -233,6 +236,13 @@ def run_one_test(self) -> None:
233236
"runtest_protocol_complete", item_index=self.item_index, duration=duration
234237
)
235238

239+
def _sleep_before_first_test(self) -> None:
240+
if self._ramp_sleep_done:
241+
return
242+
self._ramp_sleep_done = True
243+
if self.rampdelay > 0:
244+
time.sleep(self.rampdelay)
245+
236246
def pytest_collection_modifyitems(
237247
self,
238248
config: pytest.Config,

src/xdist/workermanage.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,23 @@ def setup_nodes(
9494
) -> list[WorkerController]:
9595
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
9696
self.trace("setting up nodes")
97-
return [self.setup_node(spec, putevent) for spec in self.specs]
97+
return [
98+
self.setup_node(spec, putevent, worker_index)
99+
for worker_index, spec in enumerate(self.specs)
100+
]
98101

99102
def setup_node(
100103
self,
101104
spec: execnet.XSpec,
102105
putevent: Callable[[tuple[str, dict[str, Any]]], None],
106+
worker_index: int = 0,
103107
) -> WorkerController:
104108
if getattr(spec, "execmodel", None) != "main_thread_only":
105109
spec = execnet.XSpec(f"execmodel=main_thread_only//{spec}")
106110
gw = self.group.makegateway(spec)
107111
self.config.hook.pytest_xdist_newgateway(gateway=gw)
108112
self.rsync_roots(gw)
109-
node = WorkerController(self, gw, self.config, putevent)
113+
node = WorkerController(self, gw, self.config, putevent, worker_index)
110114
# Keep the node alive.
111115
gw.node = node # type: ignore[attr-defined]
112116
node.setup()
@@ -299,17 +303,22 @@ def __init__(
299303
gateway: execnet.Gateway,
300304
config: pytest.Config,
301305
putevent: Callable[[tuple[str, dict[str, Any]]], None],
306+
worker_index: int = 0,
302307
) -> None:
303308
config.pluginmanager.register(self.RemoteHook())
304309
self.nodemanager = nodemanager
305310
self.putevent = putevent
306311
self.gateway = gateway
307312
self.config = config
313+
workercount = len(nodemanager.specs)
314+
ramp = getattr(config.option, "ramp", 0.0)
315+
rampdelay = ramp * worker_index / workercount if workercount and ramp else 0.0
308316
self.workerinput = {
309317
"workerid": gateway.id,
310-
"workercount": len(nodemanager.specs),
318+
"workercount": workercount,
311319
"testrunuid": nodemanager.testrunuid,
312320
"mainargv": sys.argv,
321+
"rampdelay": rampdelay,
313322
}
314323
self._down = False
315324
self._shutdown_sent = False

testing/acceptance_test.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,22 @@ def test_ok():
2222
assert result.ret == 0
2323
result.stdout.fnmatch_lines(["*1 passed*"])
2424

25+
def test_ramp_reports_ramp_period(self, pytester: pytest.Pytester) -> None:
26+
pytester.makepyfile(
27+
"""
28+
def test_ok():
29+
pass
30+
"""
31+
)
32+
result = pytester.runpytest("-n2", "--ramp=0.01s")
33+
assert result.ret == 0
34+
result.stdout.fnmatch_lines(
35+
[
36+
"ramping test start over 0.01s across 2 workers",
37+
"*1 passed*",
38+
]
39+
)
40+
2541
def test_n1_fail(self, pytester: pytest.Pytester) -> None:
2642
p1 = pytester.makepyfile(
2743
"""

testing/test_plugin.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,30 @@
99
from xdist.workermanage import NodeManager
1010

1111

12+
@pytest.mark.parametrize(
13+
("value", "expected"),
14+
[
15+
("10", 10.0),
16+
("10s", 10.0),
17+
("0.01s", 0.01),
18+
("5m", 300.0),
19+
("1h", 3600.0),
20+
],
21+
)
22+
def test_parse_ramp_duration(value: str, expected: float) -> None:
23+
from xdist.plugin import parse_ramp_duration
24+
25+
assert parse_ramp_duration(value) == expected
26+
27+
28+
@pytest.mark.parametrize("value", ["", "-1", "-1s", "1d", "1ss", "soon"])
29+
def test_parse_ramp_duration_rejects_invalid_values(value: str) -> None:
30+
from xdist.plugin import parse_ramp_duration
31+
32+
with pytest.raises(pytest.UsageError):
33+
parse_ramp_duration(value)
34+
35+
1236
@pytest.fixture
1337
def monkeypatch_3_cpus(monkeypatch: pytest.MonkeyPatch) -> None:
1438
"""Make pytest-xdist believe the system has 3 CPUs."""

testing/test_remote.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pprint
55
from queue import Queue
66
import sys
7+
import time
78
from typing import Any
89
from typing import Callable
910
from typing import cast
@@ -13,6 +14,7 @@
1314
import execnet
1415
import pytest
1516

17+
from xdist.remote import WorkerInteractor
1618
from xdist.workermanage import NodeManager
1719
from xdist.workermanage import WorkerController
1820

@@ -92,6 +94,35 @@ class TestWorkerInteractor:
9294
[dict[str, Any]], Union[pytest.CollectReport, pytest.TestReport]
9395
]
9496

97+
def test_ramp_delay_sleeps_once_before_first_test(
98+
self, monkeypatch: pytest.MonkeyPatch
99+
) -> None:
100+
slept: list[float] = []
101+
monkeypatch.setattr(time, "sleep", slept.append)
102+
103+
interactor = WorkerInteractor.__new__(WorkerInteractor)
104+
interactor.rampdelay = 0.25
105+
interactor._ramp_sleep_done = False
106+
107+
interactor._sleep_before_first_test()
108+
interactor._sleep_before_first_test()
109+
110+
assert slept == [0.25]
111+
112+
def test_ramp_delay_zero_does_not_sleep(
113+
self, monkeypatch: pytest.MonkeyPatch
114+
) -> None:
115+
slept: list[float] = []
116+
monkeypatch.setattr(time, "sleep", slept.append)
117+
118+
interactor = WorkerInteractor.__new__(WorkerInteractor)
119+
interactor.rampdelay = 0.0
120+
interactor._ramp_sleep_done = False
121+
122+
interactor._sleep_before_first_test()
123+
124+
assert slept == []
125+
95126
@pytest.fixture
96127
def unserialize_report(self, pytestconfig: pytest.Config) -> UnserializerReport:
97128
def unserialize(

testing/test_workermanage.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,55 @@ def test_popen_makegateway_events(
9191
hm.teardown_nodes()
9292
assert not len(hm.group)
9393

94+
def test_popen_makegateway_passes_stable_worker_index(
95+
self,
96+
config: pytest.Config,
97+
monkeypatch: pytest.MonkeyPatch,
98+
) -> None:
99+
worker_indices: list[tuple[str, int]] = []
100+
101+
class MockController:
102+
def __init__(
103+
self,
104+
nodemanager: NodeManager,
105+
gateway: execnet.Gateway,
106+
config: pytest.Config,
107+
putevent: object,
108+
worker_index: int,
109+
) -> None:
110+
worker_indices.append((gateway.id, worker_index))
111+
112+
def setup(self) -> None:
113+
pass
114+
115+
monkeypatch.setattr(workermanage, "WorkerController", MockController)
116+
hm = NodeManager(config, ["popen//id=alpha", "popen//id=beta"])
117+
hm.setup_nodes(None) # type: ignore[arg-type]
118+
119+
assert worker_indices == [("alpha", 0), ("beta", 1)]
120+
121+
def test_workerinput_includes_ramp_delay(self, config: pytest.Config) -> None:
122+
from xdist.workermanage import WorkerController
123+
124+
class DummyManager:
125+
testrunuid = "testrun"
126+
specs = [execnet.XSpec("popen")] * 4
127+
128+
class DummyGateway:
129+
id = "gw2"
130+
spec = execnet.XSpec("popen")
131+
132+
config.option.ramp = 12.0
133+
node = WorkerController(
134+
nodemanager=DummyManager(), # type: ignore[arg-type]
135+
gateway=DummyGateway(), # type: ignore[arg-type]
136+
config=config,
137+
putevent=lambda event: None,
138+
worker_index=2,
139+
)
140+
141+
assert node.workerinput["rampdelay"] == 6.0
142+
94143
def test_popens_rsync(
95144
self,
96145
config: pytest.Config,

0 commit comments

Comments
 (0)