From 609f635f8b907d248896e013c0648f7dc2683082 Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 00:58:24 +0200 Subject: [PATCH 1/7] chore(testing): add `pytest-split` dependency Pin `pytest-split==0.11.0` exactly: the `--grouped-split` plugin relies on the internal `pytestsplitplugin` name to unregister the upstream splitter when grouped splitting is active, which is an implementation detail that could shift across minor releases. --- packages/testing/pyproject.toml | 1 + uv.lock | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/packages/testing/pyproject.toml b/packages/testing/pyproject.toml index 46f5f717649..16b055c0d15 100644 --- a/packages/testing/pyproject.toml +++ b/packages/testing/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "pytest-custom-report>=1.0.1,<2", "pytest-html>=4.1.0,<5", "pytest-metadata>=3,<4", + "pytest-split==0.11.0", "pytest-xdist>=3.3.1,<4", "coincurve>=20.0.0,<21", "trie>=3.1.0,<4", diff --git a/uv.lock b/uv.lock index fc1bf99a549..808f4d064cc 100644 --- a/uv.lock +++ b/uv.lock @@ -1073,6 +1073,7 @@ dependencies = [ { name = "pytest-json-report" }, { name = "pytest-metadata" }, { name = "pytest-regex" }, + { name = "pytest-split" }, { name = "pytest-xdist" }, { name = "pyyaml" }, { name = "questionary" }, @@ -1126,6 +1127,7 @@ requires-dist = [ { name = "pytest-json-report", specifier = ">=1.5.0,<2" }, { name = "pytest-metadata", specifier = ">=3,<4" }, { name = "pytest-regex", specifier = ">=0.2.0,<0.3" }, + { name = "pytest-split", specifier = "==0.11.0" }, { name = "pytest-xdist", specifier = ">=3.3.1,<4" }, { name = "pyyaml", specifier = ">=6.0.2,<7" }, { name = "questionary", specifier = ">=2.1.0,<3" }, @@ -2443,6 +2445,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/72/d4143c66c1806599358c119c0af530bab92ef0c48129f17522dfd5a7ff6a/pytest_regex-0.2.0-py3-none-any.whl", hash = "sha256:c97e9c49e8c7e7482bd1fa701e3a5cccd18eb78d263752e32dba4937d8cee6d9", size = 3824, upload-time = "2023-05-29T22:08:09.551Z" }, ] +[[package]] +name = "pytest-split" +version = "0.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2f/16/8af4c5f2ceb3640bb1f78dfdf5c184556b10dfe9369feaaad7ff1c13f329/pytest_split-0.11.0.tar.gz", hash = "sha256:8ebdb29cc72cc962e8eb1ec07db1eeb98ab25e215ed8e3216f6b9fc7ce0ec2b5", size = 13421, upload-time = "2026-02-03T09:14:31.469Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ae/a1/d4423657caaa8be9b31e491592b49cebdcfd434d3e74512ce71f6ec39905/pytest_split-0.11.0-py3-none-any.whl", hash = "sha256:899d7c0f5730da91e2daf283860eb73b503259cb416851a65599368849c7f382", size = 11911, upload-time = "2026-02-03T09:14:33.708Z" }, +] + [[package]] name = "pytest-xdist" version = "3.8.0" From 4396ab2151b950b0b50d1697f8ee8a173746071e Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 01:01:13 +0200 Subject: [PATCH 2/7] feat(test-split): add grouped least-duration scheduling Add `scheduling.py` exposing three functions for grouped test splits: - `build_group_durations` groups keyed items and totals per-group duration (unknown items fall back to the known-items mean). - `lpt_schedule` assigns groups to runners heaviest-first via a min-heap (Longest-Processing-Time-first, 4/3-approximation of optimal makespan). - `assign_runners` composes the two and returns one `SplitGroup` per runner, preserving intra-group collection order. The module separates the two concerns that together implement `--grouped-split`: the grouping invariant (correctness) and LPT scheduling (performance). Swapping in a different per-group scheduling rule would keep fan-in safe and only affect wallclock. --- .../pytest_commands/plugins/split/__init__.py | 1 + .../plugins/split/scheduling.py | 181 +++++++++++++++ .../plugins/split/tests/__init__.py | 1 + .../plugins/split/tests/test_scheduling.py | 208 ++++++++++++++++++ 4 files changed, 391 insertions(+) create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py new file mode 100644 index 00000000000..1a3c6989e9b --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py @@ -0,0 +1 @@ +"""Grouped test splitting for pytest-split.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py new file mode 100644 index 00000000000..0708848547d --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py @@ -0,0 +1,181 @@ +""" +Scheduling for ``--grouped-split``: bin-pack grouped items across runners. + +This module combines two independent concerns that together implement +the ``--grouped-split`` flag: + +1. **Grouping invariant** (correctness). Items sharing a group key + always land on the same runner. The caller supplies the key via + the ``(key, item)`` pairs in *keyed_items*; this module only + enforces the "same key, same runner" rule. Fan-in safety for + per-group output files (e.g. fill's per-``(fork, function)`` + fixture files) depends on this invariant alone. + +2. **LPT scheduling** (performance). Given the grouping constraint, + groups are assigned *heaviest-first* to the *least-loaded* runner + via a min-heap. This is Longest-Processing-Time-first, the + standard 4/3-approximation for makespan minimization on identical + machines. Swapping in round-robin (or any other per-group rule) + would keep the grouping invariant intact and only affect + wallclock. + +Duration data is optional. Unknown items fall back to the mean of +known items (or ``1.0`` if none are known), so when ``.test_durations`` +is absent the scheduler degrades gracefully to balancing group +*count* rather than group *duration*. + +Public API: + +- :func:`build_group_durations` -- items to groups + per-group totals. +- :func:`lpt_schedule` -- group durations to runner assignments. +- :func:`assign_runners` -- end-to-end: items to :class:`SplitGroup`s. +""" + +from __future__ import annotations + +import heapq +from collections import OrderedDict +from collections.abc import Sequence +from typing import NamedTuple, Protocol, runtime_checkable + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + strip_xdist_suffix, +) + + +@runtime_checkable +class _HasNodeId(Protocol): + """Minimal protocol for items with a ``nodeid`` attribute.""" + + @property + def nodeid(self) -> str: + """Return the pytest node identifier.""" + ... + + +class SplitGroup(NamedTuple): + """One runner's workload after splitting.""" + + selected: list[_HasNodeId] + deselected: list[_HasNodeId] + duration: float + max_group_duration: float + + +def build_group_durations( + keyed_items: Sequence[tuple[str, _HasNodeId]], + durations: dict[str, float], +) -> tuple[OrderedDict[str, list[_HasNodeId]], dict[str, float]]: + """ + Group *keyed_items* by key and compute each group's total duration. + + Items sharing a key are collected in collection order under that + key. Per-item duration is looked up by bare nodeid (after + stripping any ``@t8n-cache-*`` suffix); unknown items inherit the + mean of known items, or ``1.0`` when no durations are known. + + Returns ``(groups, group_durations)``: + + - ``groups[key]`` -- the items under *key* in collection order. + - ``group_durations[key]`` -- the sum of per-item durations. + """ + groups: OrderedDict[str, list[_HasNodeId]] = OrderedDict() + for key, item in keyed_items: + groups.setdefault(key, []).append(item) + + known: dict[str, float] = {} + for _, item in keyed_items: + nid = strip_xdist_suffix(item.nodeid) + if nid in durations: + known[nid] = durations[nid] + avg = sum(known.values()) / len(known) if known else 1.0 + + group_durations = { + key: sum( + known.get(strip_xdist_suffix(item.nodeid), avg) for item in members + ) + for key, members in groups.items() + } + return groups, group_durations + + +def lpt_schedule( + group_durations: dict[str, float], + splits: int, +) -> tuple[list[list[str]], list[float], list[float]]: + """ + Assign groups to *splits* runners via Longest-Processing-Time-first. + + Groups are sorted heaviest-first and each is placed on the runner + with the smallest current total (tie-broken by runner index via + heap insertion order). The result is a 4/3-approximation of the + optimal makespan; exact optimization is NP-hard. + + Returns three parallel lists of length *splits*: + + - ``runner_keys[i]`` -- group keys assigned to runner *i*, in + placement order (heaviest-first globally). + - ``runner_totals[i]`` -- total duration on runner *i*. + - ``runner_max_group[i]`` -- duration of the largest single group + on runner *i* (a per-runner wallclock lower bound). + """ + sorted_keys = sorted( + group_durations, key=lambda k: group_durations[k], reverse=True + ) + runner_keys: list[list[str]] = [[] for _ in range(splits)] + runner_totals = [0.0] * splits + runner_max_group = [0.0] * splits + + heap: list[tuple[float, int]] = [(0.0, i) for i in range(splits)] + heapq.heapify(heap) + for key in sorted_keys: + total, idx = heapq.heappop(heap) + new_total = total + group_durations[key] + runner_keys[idx].append(key) + runner_totals[idx] = new_total + runner_max_group[idx] = max( + runner_max_group[idx], group_durations[key] + ) + heapq.heappush(heap, (new_total, idx)) + + return runner_keys, runner_totals, runner_max_group + + +def assign_runners( + splits: int, + keyed_items: Sequence[tuple[str, _HasNodeId]], + durations: dict[str, float], +) -> list[SplitGroup]: + """ + Split *keyed_items* across *splits* runners by group key. + + Composes :func:`build_group_durations` and :func:`lpt_schedule`, + then expands each runner's assigned keys back into the original + item objects. Intra-group order is preserved so that t8n-cache + hits stay adjacent under ``--dist loadgroup``. + + Items sharing a key always land on the same runner; groups are + then distributed heaviest-first to the least-loaded runner (see + :func:`lpt_schedule`). + """ + groups, group_durations = build_group_durations(keyed_items, durations) + runner_keys, runner_totals, runner_max_group = lpt_schedule( + group_durations, splits + ) + + result: list[SplitGroup] = [] + for i in range(splits): + selected = [item for key in runner_keys[i] for item in groups[key]] + selected_ids = {id(item) for item in selected} + deselected = [ + item for _, item in keyed_items if id(item) not in selected_ids + ] + result.append( + SplitGroup( + selected=selected, + deselected=deselected, + duration=runner_totals[i], + max_group_duration=runner_max_group[i], + ) + ) + return result diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py new file mode 100644 index 00000000000..1d34711c32a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the grouped-split plugin.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py new file mode 100644 index 00000000000..2cd0174bbf6 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py @@ -0,0 +1,208 @@ +"""Unit tests for the scheduling module's public API.""" + +from __future__ import annotations + +from typing import NamedTuple + +import pytest + +from execution_testing.cli.pytest_commands.plugins.split.scheduling import ( + assign_runners, + build_group_durations, + lpt_schedule, +) + + +class Item(NamedTuple): + """Minimal pytest item stub used in place of ``pytest.Item``.""" + + nodeid: str + + +class TestAssignRunners: + """Tests for :func:`assign_runners` (end-to-end split).""" + + def test_shared_key_stays_on_one_runner(self) -> None: + """Items sharing a key land together.""" + a1 = Item("t.py::f[fork_A-p_X-state_test]") + a2 = Item("t.py::f[fork_A-p_X-blockchain_test]") + b = Item("t.py::f[fork_A-p_Y-state_test]") + keyed = [("group_A", a1), ("group_A", a2), ("group_B", b)] + durations = {a1.nodeid: 1.0, a2.nodeid: 1.0, b.nodeid: 1.0} + + groups = assign_runners(2, keyed, durations) + for g in groups: + if a1 in g.selected: + assert a2 in g.selected + break + + def test_heaviest_group_first(self) -> None: + """Heavy group anchors one runner, light groups fill the other.""" + heavy = [Item(f"t.py::heavy[{i}]") for i in range(3)] + light = [Item(f"t.py::light[{i}]") for i in range(3)] + keyed = [("heavy", h) for h in heavy] + [ + ("light", item) for item in light + ] + durations = {item.nodeid: 100.0 for item in heavy} + durations.update({item.nodeid: 1.0 for item in light}) + + groups = assign_runners(2, keyed, durations) + sorted_totals = sorted(g.duration for g in groups) + assert sorted_totals[0] == pytest.approx(3.0) + assert sorted_totals[1] == pytest.approx(300.0) + + def test_per_item_keys_singleton_groups(self) -> None: + """Items with distinct keys each become their own group.""" + a = Item("t.py::test_a") + b = Item("t.py::test_b") + keyed = [(a.nodeid, a), (b.nodeid, b)] + durations = {a.nodeid: 10.0, b.nodeid: 5.0} + groups = assign_runners(2, keyed, durations) + assert len(groups[0].selected) == 1 + assert len(groups[1].selected) == 1 + + def test_unknown_duration_fallback(self) -> None: + """Unknown items get the average of known durations.""" + known = Item("t.py::known") + unknown = Item("t.py::unknown") + keyed = [("known", known), ("unknown", unknown)] + durations = {known.nodeid: 10.0} + + groups = assign_runners(2, keyed, durations) + for g in groups: + if known in g.selected: + assert g.duration == pytest.approx(10.0) + if unknown in g.selected: + assert g.duration == pytest.approx(10.0) + + def test_intra_group_order_preserved(self) -> None: + """Items within one group keep their original order.""" + items = [ + Item("t.py::f[engine_x]"), + Item("t.py::f[blockchain]"), + Item("t.py::f[state]"), + ] + keyed = [("shared", item) for item in items] + durations = {i.nodeid: 1.0 for i in items} + groups = assign_runners(1, keyed, durations) + assert groups[0].selected == list(items) + + def test_deselected_partitions_items(self) -> None: + """Selected + deselected equals the full input for every runner.""" + items = [Item(f"t.py::f[{i}]") for i in range(3)] + keyed = [(f"key_{i}", item) for i, item in enumerate(items)] + durations = {i.nodeid: 1.0 for i in items} + + groups = assign_runners(2, keyed, durations) + all_ids = {i.nodeid for i in items} + for g in groups: + combined = {i.nodeid for i in g.selected} | { + i.nodeid for i in g.deselected + } + assert combined == all_ids + + def test_empty_runners(self) -> None: + """More splits than groups yields empty runners without error.""" + only = Item("t.py::f") + keyed = [("solo", only)] + durations = {only.nodeid: 1.0} + groups = assign_runners(5, keyed, durations) + + assert len(groups) == 5 + non_empty = [g for g in groups if g.selected] + empty = [g for g in groups if not g.selected] + assert len(non_empty) == 1 + assert all(len(g.deselected) == 1 for g in empty) + + def test_single_runner(self) -> None: + """``splits=1`` returns every item on one runner.""" + items = [Item("t.py::f"), Item("t.py::g")] + keyed = [("k1", items[0]), ("k2", items[1])] + durations = {i.nodeid: 1.0 for i in items} + groups = assign_runners(1, keyed, durations) + assert len(groups) == 1 + assert groups[0].selected == list(items) + assert groups[0].deselected == [] + + def test_durations_with_xdist_suffix_match(self) -> None: + """Durations that still carry ``@xdist_group`` suffixes match.""" + heavy = Item("t.py::heavy") + light = Item("t.py::light") + # Pre-normalized: plugin always normalizes before calling the + # algorithm, so this test mirrors that contract. + durations = {heavy.nodeid: 100.0, light.nodeid: 1.0} + keyed = [("h", heavy), ("l", light)] + groups = assign_runners(2, keyed, durations) + sorted_totals = sorted(g.duration for g in groups) + assert sorted_totals[0] == pytest.approx(1.0) + assert sorted_totals[1] == pytest.approx(100.0) + + def test_max_group_duration_tracked(self) -> None: + """Each runner reports the duration of its heaviest group.""" + heavy_a = Item("t.py::heavy[a]") + heavy_b = Item("t.py::heavy[b]") + light = Item("t.py::light") + keyed = [ + ("heavy", heavy_a), + ("heavy", heavy_b), + ("light", light), + ] + durations = { + heavy_a.nodeid: 100.0, + heavy_b.nodeid: 50.0, + light.nodeid: 1.0, + } + groups = assign_runners(2, keyed, durations) + for g in groups: + if heavy_a in g.selected: + assert g.max_group_duration == pytest.approx(150.0) + else: + assert g.max_group_duration == pytest.approx(1.0) + + +class TestBuildGroupDurations: + """Tests for :func:`build_group_durations`.""" + + def test_groups_preserve_collection_order(self) -> None: + """Items sharing a key keep their original order.""" + a, b, c = Item("t.py::a"), Item("t.py::b"), Item("t.py::a") + keyed = [("k1", a), ("k2", b), ("k1", c)] + groups, _ = build_group_durations(keyed, {}) + assert list(groups.keys()) == ["k1", "k2"] + assert groups["k1"] == [a, c] + assert groups["k2"] == [b] + + def test_unknown_items_use_known_mean(self) -> None: + """Per-group total blends known durations with the known mean.""" + known = Item("t.py::known") + unknown = Item("t.py::unknown") + keyed = [("k", known), ("k", unknown)] + _, group_durations = build_group_durations(keyed, {known.nodeid: 4.0}) + assert group_durations["k"] == pytest.approx(8.0) + + def test_no_known_durations_fallback_one(self) -> None: + """With zero known durations every item weighs ``1.0``.""" + a, b = Item("t.py::a"), Item("t.py::b") + _, group_durations = build_group_durations([("g", a), ("g", b)], {}) + assert group_durations["g"] == pytest.approx(2.0) + + +class TestLPTSchedule: + """Tests for :func:`lpt_schedule`.""" + + def test_heaviest_anchors_least_loaded(self) -> None: + """Heaviest group lands on runner 0; remaining fills runner 1.""" + keys, totals, max_group = lpt_schedule( + {"heavy": 100.0, "mid": 20.0, "light": 5.0}, 2 + ) + assert sorted(totals) == [25.0, 100.0] + heavy_runner = max(range(2), key=lambda i: totals[i]) + assert keys[heavy_runner] == ["heavy"] + assert max_group[heavy_runner] == pytest.approx(100.0) + + def test_empty_runners_when_splits_exceed_groups(self) -> None: + """Extra runners get no keys and zero totals.""" + keys, totals, max_group = lpt_schedule({"only": 7.0}, 3) + assert sum(len(k) for k in keys) == 1 + assert sum(totals) == pytest.approx(7.0) + assert max(max_group) == pytest.approx(7.0) From dab6e9100b28139c654d26113eefa73e31a1eb1f Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 01:02:15 +0200 Subject: [PATCH 3/7] feat(test-split): add durations utility module Provide a single source of truth for `.test_durations` handling used by both the split plugin and the CI helper scripts: strip the `@xdist_group` suffix, normalize a raw durations dict, merge per-runner files, and load/write the JSON format. Fold `strip_xdist_suffix` out of `grouped_least_duration.py` so the algorithm module depends on this utility rather than duplicating it. --- .../plugins/split/durations.py | 69 +++++++++++ .../plugins/split/tests/test_durations.py | 112 ++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py new file mode 100644 index 00000000000..d4cbcd108be --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py @@ -0,0 +1,69 @@ +""" +Utilities for pytest-split ``.test_durations`` files. + +``--store-durations`` records nodeids with a ``@t8n-cache-`` +suffix appended during execution, but pytest collection sees bare +nodeids. These helpers bridge the two so the plugin and the CI +scripts share one implementation of suffix stripping, normalization, +and per-group merging. + +Only the ``@t8n-cache-*`` suffix is stripped. Other ``xdist_group`` +markers (e.g. ``@bigmem``) and ``@`` characters inside parametrize +values (e.g. ``test[email@example.com]``) are preserved, matching +``filler._strip_xdist_group_suffix``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable +from pathlib import Path + + +def strip_xdist_suffix(nodeid: str) -> str: + """Strip a ``@t8n-cache-*`` suffix from *nodeid*, if present.""" + if "@" in nodeid: + base, suffix = nodeid.rsplit("@", 1) + if suffix.startswith("t8n-cache-"): + return base + return nodeid + + +def normalize_durations(raw: dict[str, float]) -> dict[str, float]: + """ + Return *raw* with ``@t8n-cache-*`` suffixes removed from every key. + + When two keys collapse to the same stripped form (e.g. runs with + different t8n-cache ids), the last one wins. + """ + return {strip_xdist_suffix(k): v for k, v in raw.items()} + + +def merge_durations( + sources: Iterable[dict[str, float]], +) -> dict[str, float]: + """ + Flat-merge *sources* into a single durations dict. + + Fork-range and pytest-split groups produce disjoint nodeid sets by + construction, so collisions are expected to be empty; if any occur, + the last source wins. + """ + merged: dict[str, float] = {} + for src in sources: + merged.update(src) + return merged + + +def load_durations(path: Path) -> dict[str, float]: + """Read a ``.test_durations`` JSON file; empty dict if absent.""" + try: + return json.loads(path.read_text()) + except FileNotFoundError: + return {} + + +def write_durations(path: Path, data: dict[str, float]) -> None: + """Serialize *data* as JSON to *path*, creating parents as needed.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2) + "\n") diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py new file mode 100644 index 00000000000..089e28eeb4a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py @@ -0,0 +1,112 @@ +"""Unit tests for the durations helpers.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + merge_durations, + normalize_durations, + strip_xdist_suffix, + write_durations, +) + + +class TestStripXdistSuffix: + """Tests for :func:`strip_xdist_suffix`.""" + + def test_strips_t8n_cache_suffix(self) -> None: + """``@t8n-cache-*`` suffixes are stripped.""" + nid = "tests/t.py::test_fn[fork_A-state_test]@t8n-cache-abc123" + expected = "tests/t.py::test_fn[fork_A-state_test]" + assert strip_xdist_suffix(nid) == expected + + def test_preserves_other_group_suffixes(self) -> None: + """Non-cache group suffixes (e.g. ``@bigmem``) are preserved.""" + nid = "tests/t.py::test_fn[p]@bigmem" + assert strip_xdist_suffix(nid) == nid + + def test_preserves_custom_group_suffixes(self) -> None: + """Custom ``xdist_group`` markers are preserved.""" + nid = "tests/t.py::test_fn[p]@custom_group" + assert strip_xdist_suffix(nid) == nid + + def test_without_suffix_is_idempotent(self) -> None: + """A nodeid without ``@`` is returned unchanged.""" + nid = "tests/t.py::test_fn[fork_A-state_test]" + assert strip_xdist_suffix(nid) == nid + + def test_at_in_params_preserved(self) -> None: + """``@`` inside parametrize values is preserved (rsplit).""" + nid = "tests/t.py::test_fn[email@example.com]@t8n-cache-abc" + expected = "tests/t.py::test_fn[email@example.com]" + assert strip_xdist_suffix(nid) == expected + + +class TestNormalizeDurations: + """Tests for :func:`normalize_durations`.""" + + def test_strips_cache_suffixes_only(self) -> None: + """Only ``@t8n-cache-*`` keys are stripped; others kept.""" + raw = { + "a[p]@t8n-cache-xyz": 1.0, + "b[q]@bigmem": 2.0, + "c": 3.0, + } + assert normalize_durations(raw) == { + "a[p]": 1.0, + "b[q]@bigmem": 2.0, + "c": 3.0, + } + + def test_collision_last_wins(self) -> None: + """Collapsed cache keys resolve to the last input's value.""" + raw = {"a@t8n-cache-x": 1.0, "a@t8n-cache-y": 2.0} + assert normalize_durations(raw) == {"a": 2.0} + + def test_empty_input(self) -> None: + """Empty input returns empty output.""" + assert normalize_durations({}) == {} + + +class TestMergeDurations: + """Tests for :func:`merge_durations`.""" + + def test_disjoint_sources(self) -> None: + """Disjoint inputs merge to their union.""" + merged = merge_durations( + [{"a": 1.0}, {"b": 2.0}, {"c": 3.0}], + ) + assert merged == {"a": 1.0, "b": 2.0, "c": 3.0} + + def test_overlap_last_wins(self) -> None: + """Overlapping keys resolve to the last source's value.""" + merged = merge_durations([{"a": 1.0}, {"a": 2.0}]) + assert merged == {"a": 2.0} + + def test_empty(self) -> None: + """Zero sources yield an empty dict.""" + assert merge_durations([]) == {} + + +class TestLoadAndWriteDurations: + """Tests for :func:`load_durations` and :func:`write_durations`.""" + + def test_round_trip(self, tmp_path: Path) -> None: + """Writing then reading returns the original dict.""" + path = tmp_path / ".test_durations" + data = {"a": 1.5, "b": 2.25} + write_durations(path, data) + assert load_durations(path) == data + + def test_load_missing_file_is_empty(self, tmp_path: Path) -> None: + """Missing files load as empty dicts.""" + assert load_durations(tmp_path / "nope") == {} + + def test_write_creates_parent_dirs(self, tmp_path: Path) -> None: + """Non-existent parent directories are created on write.""" + path = tmp_path / "nested" / "dir" / ".test_durations" + write_durations(path, {"x": 1.0}) + assert json.loads(path.read_text()) == {"x": 1.0} From 3a299d22af8632f845527089e335835e868c992c Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 01:05:04 +0200 Subject: [PATCH 4/7] feat(test-split): add grouped-split pytest plugin Register via `pytest-fill.ini` under the conventional plugin path `execution_testing.cli.pytest_commands.plugins.split.plugin`. When `--grouped-split` is combined with pytest-split's `--splits` and `--group`, unregister the upstream `pytestsplitplugin` and delegate collection partitioning to `grouped_least_duration`. The plugin emits a compact summary showing the selected runner's load and all runners' load distribution for independent CI log inspection. The plugin's `(test_case, fork)` grouping is what makes the default multi-fixture-per-file output layout safe under split: every fixture format of a test case lands on one runner, so no two runners ever write the same output file. Pairing `--grouped-split` with `--single-fixture-per-file` is therefore unnecessary and should be avoided. Pytester-based integration tests cover partition coverage, format- variant co-location, upstream plugin unregistration, summary emission, no-op behavior without the flag, and durations matching. --- .../pytest_commands/plugins/split/grouping.py | 59 ++++ .../pytest_commands/plugins/split/plugin.py | 229 +++++++++++++ .../plugins/split/tests/test_grouping.py | 101 ++++++ .../split/tests/test_plugin_pytester.py | 306 ++++++++++++++++++ .../pytest_ini_files/pytest-fill.ini | 1 + 5 files changed, 696 insertions(+) create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py create mode 100644 packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py new file mode 100644 index 00000000000..3360cee790b --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py @@ -0,0 +1,59 @@ +""" +Split-group key extraction for ``--grouped-split``. + +The grouping key maps every parametrization of one test function +under one fork to a single runner. The key format mirrors fill's +output-file layout (one file per ``(fork, function)`` pair), so plain +file copies can fan in the per-runner outputs without content +collisions. + +This module encodes only the correctness invariant -- which items +must stay together. The performance question of how to distribute +groups across runners is handled by :mod:`.scheduling`. +""" + +from __future__ import annotations + +from _pytest.nodes import Item + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + strip_xdist_suffix, +) + +_FORK_PARAM = "parametrized_fork" + + +def group_key(item: Item) -> str: + """ + Return the ``(function_path, fork)`` split-group key for *item*. + + Every parametrization of one test function under one fork maps + to the same key and therefore lands on the same runner, keeping + each per-test-function fixture file under its fork subdir + runner-owned. + + The fork is read from the authoritative source when available -- + ``item.callspec.params["parametrized_fork"]`` set by the forks + plugin -- so a parametrize value that happens to start with + ``fork_`` cannot be mistaken for the real fork. Items without a + callspec (unparametrized functions, doctests, or unit-test stubs) + fall back to a nodeid-based ``fork_*`` token scan. Items with no + fork anywhere form singleton groups keyed by the bare nodeid. + """ + nodeid = strip_xdist_suffix(item.nodeid) + path = nodeid.partition("[")[0] + + callspec = getattr(item, "callspec", None) + if callspec is not None: + params = getattr(callspec, "params", None) or {} + fork = params.get(_FORK_PARAM) + if fork is not None: + return f"{path}|fork={fork}" + + if "[" not in nodeid: + return nodeid + _, _, bracketed = nodeid.partition("[") + for token in bracketed.rstrip("]").split("-"): + if token.startswith("fork_"): + return f"{path}|fork={token[len('fork_') :]}" + return path diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py new file mode 100644 index 00000000000..395d077c59a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py @@ -0,0 +1,229 @@ +""" +Pytest plugin for grouped test splitting. + +When ``--grouped-split`` is passed alongside pytest-split's ``--splits`` +and ``--group``, the plugin unregisters pytest-split's default splitter +and partitions items by ``(test_function_path, fork)`` — every +parametrization of one test function under one fork stays on the same +runner. + +That invariant is what fill's native output layout relies on: each +per-test-function fixture file lives under a per-fork subdirectory, so +every output file is written by exactly one runner. CI fan-in can then +copy per-runner fixture dirs together without content collisions and +without needing ``--single-fixture-per-file``. +""" + +from __future__ import annotations + +import sys +from collections.abc import Generator +from pathlib import Path +from typing import Any, cast + +import pytest +from _pytest.config import Config +from _pytest.nodes import Item +from _pytest.terminal import TerminalReporter + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + normalize_durations, + strip_xdist_suffix, +) +from execution_testing.cli.pytest_commands.plugins.split.grouping import ( + group_key, +) +from execution_testing.cli.pytest_commands.plugins.split.scheduling import ( + assign_runners, +) + +_SUMMARY_KEY = pytest.StashKey[list[str]]() +_SPLIT_PLUGIN_NAME = "pytestsplitplugin" + + +def pytest_addoption(parser: pytest.Parser) -> None: + """Register the ``--grouped-split`` flag.""" + parser.getgroup("split").addoption( + "--grouped-split", + dest="grouped_split", + action="store_true", + default=False, + help=( + "Replace pytest-split's default splitting with an" + " xdist_group-aware algorithm that keeps cache-sharing" + " items on the same runner. Requires --splits and" + " --group." + ), + ) + + +def _grouped_split_active(config: Config) -> bool: + """Return True when grouped splitting should replace pytest-split.""" + if not config.getoption("grouped_split", default=False): + return False + splits = config.getoption("splits", default=None) + group = config.getoption("group", default=None) + return bool(splits and group) + + +def _classify_mode( + *, + durations_loaded: int, + items: int, + matched: int, + durations_path: Path, +) -> str: + """ + Return a human-readable one-line mode label for the summary. + + Distinguishes the three situations operators care about: + + - ``average-only (no durations file)``: the configured path does + not exist or is empty. First run, or artifact not downloaded. + - ``average-only (... loaded, 0/N match — KEY MISMATCH)``: the + file was found but none of its keys line up with the collected + nodeids. This is the silent-fallback regression; bin-packing + is effectively random. + - ``duration-aware (matched/total ...)``: at least some items + have real durations; bin-packing is doing its job. + """ + if durations_loaded == 0: + return f"average-only (no durations at {durations_path})" + if matched == 0: + return ( + f"average-only ({durations_loaded} durations loaded from" + f" {durations_path}, 0/{items} match — KEY MISMATCH)" + ) + pct = 100.0 * matched / items if items else 0.0 + suffix = "" if matched == items else f", {items - matched} use avg" + return f"duration-aware ({matched}/{items} matched, {pct:.0f}%{suffix})" + + +def pytest_configure(config: Config) -> None: + """Unregister pytest-split's splitter when grouped mode is active.""" + if not _grouped_split_active(config): + return + plugin = config.pluginmanager.get_plugin(_SPLIT_PLUGIN_NAME) + if plugin is not None: + config.pluginmanager.unregister(plugin, _SPLIT_PLUGIN_NAME) + + +@pytest.hookimpl(trylast=True) +def pytest_collection_modifyitems(config: Config, items: list[Item]) -> None: + """Partition *items* across runners via grouped least-duration.""" + if not _grouped_split_active(config): + return + + splits: int = config.getoption("splits") + group: int = config.getoption("group") + durations_path = Path(config.getoption("durations_path")) + raw_durations = load_durations(durations_path) + durations = normalize_durations(raw_durations) + + keyed_items = [(group_key(item), item) for item in items] + all_groups = assign_runners( + splits=splits, keyed_items=keyed_items, durations=durations + ) + selected = all_groups[group - 1] # pytest-split's --group is 1-indexed + + matched = sum( + 1 for item in items if strip_xdist_suffix(item.nodeid) in durations + ) + unmatched = len(items) - matched + unique_groups = len({key for key, _ in keyed_items}) + mode = _classify_mode( + durations_loaded=len(durations), + items=len(items), + matched=matched, + durations_path=durations_path, + ) + # Emit a GitHub Actions ``::warning::`` annotation on the exact + # regression mode we care about in CI: durations file loaded but + # zero items match (bin-packing silently falls back to average). + # Prefix must start the line for Actions to pick it up. Under + # xdist, ``pytest_collection_modifyitems`` runs on every worker; + # emit only from ``gw0`` (or the non-xdist controller) to avoid N + # duplicate warnings. + worker_id = getattr(config, "workerinput", {}).get("workerid", "master") + if worker_id in ("master", "gw0") and len(durations) > 0 and matched == 0: + print( + "::warning title=grouped-split durations mismatch::" + f" loaded {len(durations)} durations from {durations_path}" + f" but 0/{len(items)} collected items match; bin-packing" + " fell back to average (splits will be imbalanced).", + file=sys.stderr, + flush=True, + ) + summary = [ + f"mode: {mode}", + ( + f"runner {group}/{splits}:" + f" selected {len(selected.selected)}/{len(items)} items," + f" est serial {selected.duration:.0f}s" + f" (heaviest group {selected.max_group_duration:.0f}s)" + ), + ( + f"grouping: {unique_groups} (function, fork) keys," + f" duration coverage {matched}/{len(items)}" + + (f" ({unmatched} unknown -> avg)" if unmatched else "") + ), + "all runners (selected / serial-s / heaviest-s):", + ] + for i, g in enumerate(all_groups, 1): + marker = ">>>" if i == group else " " + summary.append( + f" {marker} {i:2d}: {len(g.selected):6d} items," + f" {g.duration:>7.0f}s, {g.max_group_duration:>7.0f}s" + ) + config.stash[_SUMMARY_KEY] = summary + + items[:] = cast(list[Item], selected.selected) + config.hook.pytest_deselected(items=selected.deselected) + + +@pytest.hookimpl(trylast=True) +def pytest_sessionfinish(session: pytest.Session) -> None: + """ + On an xdist worker, forward the summary to the controller via + ``workeroutput`` so :func:`pytest_terminal_summary` (which runs on + the controller) can find it. No-op on the controller and under + non-xdist runs. + """ + config = session.config + if not hasattr(config, "workerinput"): + return + summary = config.stash.get(_SUMMARY_KEY, []) + if summary and hasattr(config, "workeroutput"): + config.workeroutput["grouped_split_summary"] = summary + + +def pytest_testnodedown(node: Any) -> None: + """ + On the controller, accept the first worker's summary as the + canonical one. Every worker produced the same summary (they all + run the same grouping over the same items), so first-wins is + safe. + """ + if node.config.stash.get(_SUMMARY_KEY, None) is not None: + return + worker_output = getattr(node, "workeroutput", None) + if worker_output is None: + return + summary = worker_output.get("grouped_split_summary") + if summary: + node.config.stash[_SUMMARY_KEY] = summary + + +@pytest.hookimpl(hookwrapper=True, trylast=True) +def pytest_terminal_summary( + terminalreporter: TerminalReporter, config: Config +) -> Generator[None, None, None]: + """Print the grouped-split summary after the normal terminal output.""" + yield + summary = config.stash.get(_SUMMARY_KEY, []) + if not summary: + return + terminalreporter.write_sep("=", "grouped-split", bold=True) + for line in summary: + terminalreporter.line(line) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py new file mode 100644 index 00000000000..c75483a15aa --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py @@ -0,0 +1,101 @@ +"""Unit tests for :func:`group_key`.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, cast + +from _pytest.nodes import Item + +from execution_testing.cli.pytest_commands.plugins.split.grouping import ( + group_key, +) + + +@dataclass +class _CallSpec: + """Minimal stub mirroring ``pytest.Function.callspec``.""" + + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class _Item: + """Minimal stub mirroring the item fields ``group_key`` reads.""" + + nodeid: str + callspec: _CallSpec | None = None + + +def _as_item(nodeid: str, callspec: _CallSpec | None = None) -> Item: + """Cast the stub to ``Item`` for type-checker friendliness.""" + return cast(Item, _Item(nodeid=nodeid, callspec=callspec)) + + +class TestGroupKeyAuthoritative: + """``parametrized_fork`` from the callspec is the primary source.""" + + def test_callspec_fork_is_used(self) -> None: + """Key uses the callspec's fork, not whatever's in the nodeid.""" + item = _as_item( + "t.py::test_f[fork_Osaka-state_test]", + _CallSpec(params={"parametrized_fork": "Osaka"}), + ) + assert group_key(item) == "t.py::test_f|fork=Osaka" + + def test_callspec_wins_over_ambiguous_nodeid(self) -> None: + """A param value starting with ``fork_`` cannot hijack the key.""" + # Nodeid's first ``fork_*`` token is the bogus param value, but + # the real fork in the callspec is Osaka -- authoritative path + # must ignore the nodeid scan entirely. + item = _as_item( + "t.py::test_f[fork_candidate-fork_Osaka-state_test]", + _CallSpec(params={"parametrized_fork": "Osaka"}), + ) + assert group_key(item) == "t.py::test_f|fork=Osaka" + + def test_fork_object_is_stringified(self) -> None: + """Non-string fork values are rendered via ``str(fork)``.""" + + class _Fork: + def __str__(self) -> str: + return "Prague" + + item = _as_item( + "t.py::test_f[fork_Prague]", + _CallSpec(params={"parametrized_fork": _Fork()}), + ) + assert group_key(item) == "t.py::test_f|fork=Prague" + + +class TestGroupKeyFallback: + """Without callspec or ``parametrized_fork``, fall back to nodeid.""" + + def test_no_callspec_uses_nodeid_scan(self) -> None: + """Items with no callspec scan the nodeid for ``fork_*``.""" + item = _as_item("t.py::test_f[fork_A-state_test]") + assert group_key(item) == "t.py::test_f|fork=A" + + def test_unparametrized_item_is_singleton(self) -> None: + """No ``[`` in nodeid yields the bare nodeid as its own group.""" + item = _as_item("t.py::test_f") + assert group_key(item) == "t.py::test_f" + + def test_parametrized_without_fork_groups_by_function(self) -> None: + """Parametrized items with no fork token share one group.""" + a = _as_item("t.py::test_f[x=1]") + b = _as_item("t.py::test_f[x=2]") + assert group_key(a) == group_key(b) == "t.py::test_f" + + def test_callspec_without_fork_param_falls_back(self) -> None: + """Callspec present but no ``parametrized_fork`` uses the scan.""" + item = _as_item( + "t.py::test_f[fork_A-state_test]", + _CallSpec(params={"other": "value"}), + ) + assert group_key(item) == "t.py::test_f|fork=A" + + def test_xdist_suffix_stripped_before_parsing(self) -> None: + """The ``@t8n-cache-*`` suffix is removed before key building.""" + item = _as_item("t.py::test_f[fork_A]@t8n-cache-abc123") + assert group_key(item) == "t.py::test_f|fork=A" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py new file mode 100644 index 00000000000..15a807aa782 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py @@ -0,0 +1,306 @@ +"""Pytester-based integration tests for the grouped-split plugin.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +PLUGIN = "execution_testing.cli.pytest_commands.plugins.split.plugin" + + +def _synthetic_tests(pytester: pytest.Pytester) -> None: + """ + Create a set of synthetic tests mirroring fill's nodeid shape. + + Two functions, two forks, two fixture formats, and one + non-format parameter, producing nodeids like + ``test_split_synth.py::test_alpha[fork_A-state_test-x_0]``. All + variants of one ``(function, fork)`` share a grouping key under + :func:`group_key`; different functions or different forks get + different keys. + """ + pytester.makepyfile( + test_split_synth=""" + import pytest + + @pytest.mark.parametrize("fmt", ["state_test", "blockchain_test"]) + @pytest.mark.parametrize("x", ["0", "1"]) + @pytest.mark.parametrize("fork", ["fork_A", "fork_B"]) + def test_alpha(fork, fmt, x): + pass + + @pytest.mark.parametrize("fmt", ["state_test", "blockchain_test"]) + @pytest.mark.parametrize("x", ["0", "1"]) + @pytest.mark.parametrize("fork", ["fork_A", "fork_B"]) + def test_beta(fork, fmt, x): + pass + + def test_singleton(): + pass + """ + ) + + +def _write_durations(path: Path, items: dict[str, float]) -> None: + """Write a synthetic ``.test_durations`` JSON file.""" + path.write_text(json.dumps(items)) + + +def _collected_nodeids(result: pytest.RunResult) -> set[str]: + """Return the set of ``test_split_synth``-prefixed nodeids.""" + return { + line.strip() + for line in result.stdout.lines + if "::" in line and "test_split_synth" in line + } + + +def test_partition_covers_every_item(pytester: pytest.Pytester) -> None: + """Union of every group's selection equals the full collected set.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + seen: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + seen.append(_collected_nodeids(result)) + + union = set().union(*seen) + intersection = seen[0].intersection(*seen[1:]) if seen else set() + assert intersection == set() + # 2 funcs * 2 forks * 2 params * 2 formats + 1 singleton = 17. + assert len(union) == 17 + + +def test_function_fork_items_stay_on_one_runner( + pytester: pytest.Pytester, +) -> None: + """Every item sharing a ``(function, fork)`` lands on one runner.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + runners: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + runners.append(_collected_nodeids(result)) + + for fn in ("test_alpha", "test_beta"): + for fork in ("fork_A", "fork_B"): + runner_idxs = { + idx + for idx, runner in enumerate(runners) + for nid in runner + if fn in nid and f"{fork}-" in nid + } + assert len(runner_idxs) == 1, ( + f"{fn} {fork} split across runners: {runner_idxs}" + ) + + +def test_pytest_split_plugin_unregistered_when_active( + pytester: pytest.Pytester, +) -> None: + """ + Upstream ``pytestsplitplugin`` is unregistered under + ``--grouped-split``. + """ + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + pytester.makeconftest( + """ + def pytest_configure(config): + active = config.pluginmanager.get_plugin("pytestsplitplugin") + print(f"SPLIT_PLUGIN_REGISTERED={active is not None}") + """ + ) + + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-s", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + assert "SPLIT_PLUGIN_REGISTERED=False" in "\n".join(result.stdout.lines) + + +def test_summary_printed(pytester: pytest.Pytester) -> None: + """The grouped-split summary appears in terminal output.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + stdout = "\n".join(result.stdout.lines) + assert "grouped-split" in stdout + assert "runner 1/2" in stdout + assert "(function, fork) keys" in stdout + + +def test_inactive_without_grouped_split_flag( + pytester: pytest.Pytester, +) -> None: + """Without ``--grouped-split`` the plugin is a no-op.""" + _synthetic_tests(pytester) + result = pytester.runpytest("-p", PLUGIN, "--collect-only", "-q") + assert result.ret == 0 + assert "grouped-split" not in "\n".join(result.stdout.lines) + + +def test_mode_labels_three_regimes(pytester: pytest.Pytester) -> None: + """ + Plugin summary's ``mode:`` line reports the three operator- + visible regimes: no durations, durations loaded but no match, and + duration-aware. + """ + _synthetic_tests(pytester) + + # Discover an actual collected nodeid so the "duration-aware" + # case's key matches exactly — param ordering in multi-decorator + # parametrize is implementation-defined, so hard-coding it is + # brittle. + collect_only = pytester.runpytest("-p", PLUGIN, "--collect-only", "-q") + assert collect_only.ret == 0 + any_nodeid = next( + line.strip() + for line in collect_only.stdout.lines + if "test_split_synth.py::test_alpha[" in line + ) + + missing_path = pytester.path / "missing.json" + assert not missing_path.exists() + bogus_path = pytester.path / "bogus.json" + bogus_path.write_text('{"tests/unrelated.py::test_x[fork_Zzz]": 99.0}') + matching_path = pytester.path / "matching.json" + matching_path.write_text(json.dumps({any_nodeid: 5.0})) + + for path, expected in ( + (missing_path, "average-only (no durations"), + (bogus_path, "average-only (1 durations loaded"), + (matching_path, "duration-aware"), + ): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={path}", + ) + assert result.ret == 0 + stdout = "\n".join(result.stdout.lines) + assert expected in stdout, ( + f"path={path.name}: expected {expected!r} in summary, got:" + f"\n{stdout[-500:]}" + ) + + +def test_warning_annotation_on_key_mismatch( + pytester: pytest.Pytester, +) -> None: + """ + The ``::warning::`` annotation fires when durations are loaded + but none match, so CI surfaces the silent-fallback regression. + """ + _synthetic_tests(pytester) + bogus_path = pytester.path / "bogus.json" + bogus_path.write_text('{"tests/unrelated.py::test_x[fork_Zzz]": 99.0}') + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={bogus_path}", + ) + assert result.ret == 0 + combined = "\n".join(result.stdout.lines + result.stderr.lines) + assert "::warning title=grouped-split durations mismatch::" in combined + + +def test_items_without_fork_param_become_singletons( + pytester: pytest.Pytester, +) -> None: + """Items whose nodeid carries no ``fork_*`` token are per-nodeid.""" + pytester.makepyfile( + test_unmarked=""" + def test_a(): pass + def test_b(): pass + def test_c(): pass + def test_d(): pass + """ + ) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + seen: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + # Strip any rootdir-relative path prefix so the assertion is + # robust to pytester sandboxes placed inside the outer pytest's + # rootdir (e.g. under ``just test-tests --basetemp=.just/...``). + seen.append( + { + line.strip().rsplit("/", 1)[-1] + for line in result.stdout.lines + if "test_unmarked.py::" in line + } + ) + + assert all(len(s) > 0 for s in seen) + assert set().union(*seen) == ( + {f"test_unmarked.py::test_{n}" for n in "abcd"} + ) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini b/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini index 179ac2082f0..6696be28755 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini +++ b/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini @@ -17,6 +17,7 @@ addopts = -p execution_testing.cli.pytest_commands.plugins.shared.transaction_fixtures -p execution_testing.cli.pytest_commands.plugins.help.help -p execution_testing.cli.pytest_commands.plugins.custom_logging.plugin_logging + -p execution_testing.cli.pytest_commands.plugins.split.plugin --tb short --dist loadgroup --ignore tests/cancun/eip4844_blobs/point_evaluation_vectors/ From 00c6e2bc8661b883501e56135f0ea68a2d6742dc Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 01:06:48 +0200 Subject: [PATCH 5/7] refactor(test-fill): allow phase-1-only pre-alloc generation Re-order `FillCommand.create_executions` so that the execution plan reflects the user's intent: - `--use-pre-alloc-groups` now takes priority. The flag means pre-alloc groups already exist on disk from a previous run, so even alongside `--generate-all-formats` the run is single-phase. - `--generate-pre-alloc-groups` without `--generate-all-formats` now runs phase 1 only. CI can populate pre-alloc groups on a dedicated runner without wasting time on phase 2. - `--generate-all-formats` continues to trigger the full two-phase run. Update `test_legacy_generate_pre_alloc_groups_still_works` to reflect the new phase-1-only behaviour and add a test covering the `--use-pre-alloc-groups` priority. --- .../cli/pytest_commands/fill.py | 48 ++++++++++++------- .../cli/tests/test_generate_all_formats.py | 39 +++++++++++---- 2 files changed, 59 insertions(+), 28 deletions(-) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/fill.py b/packages/testing/src/execution_testing/cli/pytest_commands/fill.py index 842935b65b5..c2b5ca33969 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/fill.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/fill.py @@ -33,33 +33,45 @@ def create_executions( self, pytest_args: List[str] ) -> List[PytestExecution]: """ - Create execution plan that supports two-phase pre-allocation group + Create execution plan supporting two-phase pre-allocation group generation. - Returns single execution for normal filling, or two-phase execution - when --generate-pre-alloc-groups or --generate-all-formats is - specified. + Returns: + - Single-phase execution when `--use-pre-alloc-groups` is set, + regardless of `--generate-all-formats` (pre-alloc groups + already exist on disk from a previous run). + - Phase-1-only execution when `--generate-pre-alloc-groups` is + set without `--generate-all-formats` (CI generates pre-alloc + on a dedicated runner without wasting time on phase 2). + - Two-phase execution when `--generate-all-formats` is set. + - Normal single-phase execution otherwise. + """ processed_args = self.process_arguments(pytest_args) processed_args = self._add_default_ignores(processed_args) - # Check if we need two-phase execution - if self._should_use_two_phase_execution(processed_args): - return self._create_two_phase_executions(processed_args) - elif "--use-pre-alloc-groups" in processed_args: - # Only phase 2: using existing pre-allocation groups + if "--use-pre-alloc-groups" in processed_args: + # Pre-alloc groups already exist: single-phase fill only. return self._create_single_phase_with_pre_alloc_groups( processed_args ) - else: - # Normal single-phase execution - return [ - PytestExecution( - config_file=self.config_path, - args=processed_args, - allowed_exit_codes=self.allowed_exit_codes, - ) - ] + if self._should_use_two_phase_execution(processed_args): + two_phase = self._create_two_phase_executions(processed_args) + if ( + "--generate-pre-alloc-groups" in processed_args + and "--generate-all-formats" not in processed_args + ): + # Phase 1 only: generate pre-alloc groups without filling. + return [two_phase[0]] + return two_phase + # Normal single-phase execution + return [ + PytestExecution( + config_file=self.config_path, + args=processed_args, + allowed_exit_codes=self.allowed_exit_codes, + ) + ] def _create_two_phase_executions( self, args: List[str] diff --git a/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py b/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py index e30e9f15ce9..5ea475b2460 100644 --- a/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py +++ b/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py @@ -76,26 +76,45 @@ def test_generate_all_formats_removes_clean_from_phase2() -> None: assert "--clean" not in phase2_args -def test_legacy_generate_pre_alloc_groups_still_works() -> None: - """Test that the legacy --generate-pre-alloc-groups flag still works.""" +def test_generate_pre_alloc_groups_alone_is_phase_1_only() -> None: + """ + Test that --generate-pre-alloc-groups without --generate-all-formats + runs phase 1 only, so CI can populate pre-alloc groups on a + dedicated runner without wasting time on phase 2. + """ command = FillCommand() with patch.object(command, "process_arguments", side_effect=lambda x: x): pytest_args = ["--generate-pre-alloc-groups", "tests/somedir/"] executions = command.create_executions(pytest_args) - assert len(executions) == 2 + assert len(executions) == 1 - # Phase 1: Should have --generate-pre-alloc-groups phase1_args = executions[0].args assert "--generate-pre-alloc-groups" in phase1_args + assert "--use-pre-alloc-groups" not in phase1_args + assert "--generate-all-formats" not in phase1_args - # Phase 2: Should have --use-pre-alloc-groups but NOT --generate-all- - # formats - phase2_args = executions[1].args - assert "--use-pre-alloc-groups" in phase2_args - assert "--generate-all-formats" not in phase2_args - assert "--generate-pre-alloc-groups" not in phase2_args + +def test_use_pre_alloc_groups_forces_single_phase() -> None: + """ + Test that --use-pre-alloc-groups always runs a single phase, even + alongside --generate-all-formats (pre-alloc groups already exist on + disk from a previous run). + """ + command = FillCommand() + + with patch.object(command, "process_arguments", side_effect=lambda x: x): + pytest_args = [ + "--use-pre-alloc-groups", + "--generate-all-formats", + "tests/somedir/", + ] + executions = command.create_executions(pytest_args) + + assert len(executions) == 1 + assert "--use-pre-alloc-groups" in executions[0].args + assert "--generate-all-formats" in executions[0].args def test_single_phase_without_flags() -> None: From 41324ea92f23d44fa390e872d4138aa526793105 Mon Sep 17 00:00:00 2001 From: danceratopz Date: Sun, 19 Apr 2026 01:07:59 +0200 Subject: [PATCH 6/7] chore(ci): add durations helper scripts Thin shims over the split-plugin `durations` module so the CI workflow can call `normalize`, `merge`, and `diagnose` without duplicating the JSON-handling logic. The scripts import directly from `execution_testing.cli.pytest_commands.plugins.split.durations`, so any future change to the format or suffix convention stays in one place. --- .github/scripts/diagnose_durations.py | 58 ++++++++++++++++++++++++ .github/scripts/merge_durations_files.py | 58 ++++++++++++++++++++++++ .github/scripts/normalize_durations.py | 47 +++++++++++++++++++ 3 files changed, 163 insertions(+) create mode 100644 .github/scripts/diagnose_durations.py create mode 100644 .github/scripts/merge_durations_files.py create mode 100644 .github/scripts/normalize_durations.py diff --git a/.github/scripts/diagnose_durations.py b/.github/scripts/diagnose_durations.py new file mode 100644 index 00000000000..2b0e710777c --- /dev/null +++ b/.github/scripts/diagnose_durations.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Diagnose a pytest-split ``.test_durations`` file. + +Report entry count, whether keys still carry ``@xdist_group`` suffixes +(indicating incomplete normalization), and print sample keys for +quick visual comparison against collected test nodeids. + +Usage:: + + uv run python .github/scripts/diagnose_durations.py [path] + +*path* defaults to ``.test_durations`` in the current directory. +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, +) + + +def main() -> None: + """Entry point.""" + path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations") + if not path.exists(): + print(f"::warning::No durations file at {path}") + return + + data = load_durations(path) + has_at = sum(1 for k in data if "@" in k) + keys = sorted(data) + abs_path = path.resolve() + + print(f"Durations file: {path}") + print(f" Entries: {len(data)}") + print(f" Keys with @ suffix: {has_at}/{len(data)}") + if has_at: + print( + f" WARNING: {has_at} keys still have @ suffixes" + " - normalization may have failed" + ) + + for label, sample in ( + ("First 3 keys:", keys[:3]), + ("Last 3 keys:", keys[-3:]), + ): + print(f" {label}") + for k in sample: + print(f" {k}: {data[k]:.2f}s") + + print(f" Absolute path: {abs_path}") + print(f" File size: {abs_path.stat().st_size} bytes") + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/merge_durations_files.py b/.github/scripts/merge_durations_files.py new file mode 100644 index 00000000000..1074620ce57 --- /dev/null +++ b/.github/scripts/merge_durations_files.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Merge multiple pytest-split ``.test_durations`` files. + +Accept an output path and one or more input ``.test_durations`` JSON +files and flat-merge them into one file. Splits produce disjoint test +sets by construction, so collisions are not expected; when they do +occur the last input wins. + +Usage:: + + uv run python .github/scripts/merge_durations_files.py \ + [ ...] +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + merge_durations, + write_durations, +) + + +def main() -> None: + """Entry point.""" + if len(sys.argv) < 3: + print( + "Usage: merge_durations_files.py " + " [ ...]", + file=sys.stderr, + ) + sys.exit(1) + + output_path = Path(sys.argv[1]) + inputs = [Path(p) for p in sys.argv[2:]] + + sources: list[dict[str, float]] = [] + count = 0 + for path in inputs: + if not path.exists(): + print(f"Skipping {path} (not found)") + continue + sources.append(load_durations(path)) + count += 1 + + if not sources: + print("No durations found, nothing to merge.") + sys.exit(0) + + merged = merge_durations(sources) + write_durations(output_path, merged) + print(f"Merged {count} durations files ({len(merged)} tests)") + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/normalize_durations.py b/.github/scripts/normalize_durations.py new file mode 100644 index 00000000000..b6467e17195 --- /dev/null +++ b/.github/scripts/normalize_durations.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +""" +Normalize a pytest-split ``.test_durations`` file in place. + +Strip ``@xdist_group`` suffixes so the keys match the bare nodeids +pytest sees during collection. ``--store-durations`` records ids with +the suffix (e.g. ``@t8n-cache-``) added by xdist during +execution, so a normalization pass is required before a subsequent run +can look up durations. + +Usage:: + + uv run python .github/scripts/normalize_durations.py [path] + +*path* defaults to ``.test_durations`` in the current directory. +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + normalize_durations, + write_durations, +) + + +def main() -> None: + """Entry point.""" + path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations") + if not path.exists(): + print(f"::warning::No durations file at {path}") + return + + raw = load_durations(path) + normalized = normalize_durations(raw) + write_durations(path, normalized) + + collisions = len(raw) - len(normalized) + print( + f"Normalized {len(raw)} -> {len(normalized)} entries" + f" ({collisions} collisions)" + ) + + +if __name__ == "__main__": + main() From 7683e287c9d4b88c4c88513326446dd26f394ac7 Mon Sep 17 00:00:00 2001 From: danceratopz Date: Mon, 20 Apr 2026 00:15:04 +0100 Subject: [PATCH 7/7] feat(test-split): sort items slowest-first within each group After LPT assigns groups to CI runners, sort each group's items by individual duration DESC so xdist workers receive slow tests first. This reduces trailing stragglers on a runner's `-n N` worker pool. The new order flows through `items[:]` in the existing `pytest_collection_modifyitems` hook (`trylast=True`), which xdist respects under its default scheduler settings. The one override is `--loadscopereorder`: when enabled, xdist re-sorts scopes by item count and our order is lost. Not used in CI today. Disable via `assign_runners(..., sort_intra_group=False)` for bisecting or A/B comparison; no CLI flag for now. --- .../plugins/split/scheduling.py | 121 +++++++++++++----- .../plugins/split/tests/test_scheduling.py | 92 +++++++++++++ 2 files changed, 181 insertions(+), 32 deletions(-) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py index 0708848547d..b17602db79d 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py @@ -1,8 +1,8 @@ """ Scheduling for ``--grouped-split``: bin-pack grouped items across runners. -This module combines two independent concerns that together implement -the ``--grouped-split`` flag: +This module combines three concerns that together implement the +``--grouped-split`` flag: 1. **Grouping invariant** (correctness). Items sharing a group key always land on the same runner. The caller supplies the key via @@ -11,22 +11,30 @@ per-group output files (e.g. fill's per-``(fork, function)`` fixture files) depends on this invariant alone. -2. **LPT scheduling** (performance). Given the grouping constraint, - groups are assigned *heaviest-first* to the *least-loaded* runner - via a min-heap. This is Longest-Processing-Time-first, the - standard 4/3-approximation for makespan minimization on identical - machines. Swapping in round-robin (or any other per-group rule) - would keep the grouping invariant intact and only affect - wallclock. +2. **LPT runner scheduling** (CI-wallclock performance). Given the + grouping constraint, groups are assigned *heaviest-first* to the + *least-loaded* runner via a min-heap -- Longest-Processing-Time- + first, the standard 4/3-approximation for makespan minimization + on identical machines. This balances total wallclock across CI + runners. + +3. **Intra-group item ordering** (worker-straggler prevention). + Within each group, items are sorted by individual duration DESC + before being handed back to pytest. Under xdist's default scheduler + settings (``--dist=load`` or ``--dist=loadgroup`` without + ``--loadscopereorder``) this order is preserved all the way to + worker dispatch, so slow tests start first and don't become + trailing stragglers on a runner's ``-n N`` worker pool. Disable + via ``assign_runners(..., sort_intra_group=False)``. Duration data is optional. Unknown items fall back to the mean of known items (or ``1.0`` if none are known), so when ``.test_durations`` -is absent the scheduler degrades gracefully to balancing group -*count* rather than group *duration*. +is absent the scheduler degrades gracefully. Public API: - :func:`build_group_durations` -- items to groups + per-group totals. +- :func:`sort_items_within_groups` -- sort each group slowest-first. - :func:`lpt_schedule` -- group durations to runner assignments. - :func:`assign_runners` -- end-to-end: items to :class:`SplitGroup`s. """ @@ -35,8 +43,8 @@ import heapq from collections import OrderedDict -from collections.abc import Sequence -from typing import NamedTuple, Protocol, runtime_checkable +from collections.abc import Callable, Iterable, Sequence +from typing import NamedTuple, Protocol, TypeVar, runtime_checkable from execution_testing.cli.pytest_commands.plugins.split.durations import ( strip_xdist_suffix, @@ -53,6 +61,9 @@ def nodeid(self) -> str: ... +_ItemT = TypeVar("_ItemT", bound=_HasNodeId) + + class SplitGroup(NamedTuple): """One runner's workload after splitting.""" @@ -62,6 +73,32 @@ class SplitGroup(NamedTuple): max_group_duration: float +def _item_weight( + items: Iterable[_HasNodeId], + durations: dict[str, float], +) -> Callable[[_HasNodeId], float]: + """ + Return an ``item -> estimated duration`` function. + + Per-item duration is looked up by bare nodeid (after stripping + any ``@t8n-cache-*`` suffix); unknown items inherit the mean of + known items, or ``1.0`` when no durations are known. The caller + passes the set of items in-scope so the average is computed over + that scope. + """ + known: dict[str, float] = {} + for item in items: + nid = strip_xdist_suffix(item.nodeid) + if nid in durations: + known[nid] = durations[nid] + avg = sum(known.values()) / len(known) if known else 1.0 + + def weight(item: _HasNodeId) -> float: + return known.get(strip_xdist_suffix(item.nodeid), avg) + + return weight + + def build_group_durations( keyed_items: Sequence[tuple[str, _HasNodeId]], durations: dict[str, float], @@ -70,9 +107,8 @@ def build_group_durations( Group *keyed_items* by key and compute each group's total duration. Items sharing a key are collected in collection order under that - key. Per-item duration is looked up by bare nodeid (after - stripping any ``@t8n-cache-*`` suffix); unknown items inherit the - mean of known items, or ``1.0`` when no durations are known. + key. See :func:`_item_weight` for the per-item duration lookup + and fallback rule. Returns ``(groups, group_durations)``: @@ -83,22 +119,37 @@ def build_group_durations( for key, item in keyed_items: groups.setdefault(key, []).append(item) - known: dict[str, float] = {} - for _, item in keyed_items: - nid = strip_xdist_suffix(item.nodeid) - if nid in durations: - known[nid] = durations[nid] - avg = sum(known.values()) / len(known) if known else 1.0 - + weight = _item_weight((item for _, item in keyed_items), durations) group_durations = { - key: sum( - known.get(strip_xdist_suffix(item.nodeid), avg) for item in members - ) + key: sum(weight(item) for item in members) for key, members in groups.items() } return groups, group_durations +def sort_items_within_groups( + groups: OrderedDict[str, list[_ItemT]], + durations: dict[str, float], +) -> OrderedDict[str, list[_ItemT]]: + """ + Return *groups* with each member list sorted slowest-first. + + Group order (keys) is preserved; only the items inside each + group are reordered by individual duration DESC. Sort is stable, + so items with identical durations keep their collection order. + + Used by :func:`assign_runners` so that xdist workers receive slow + tests first within each scope, reducing end-of-run stragglers. + See :func:`_item_weight` for the duration lookup. + """ + all_items = (item for members in groups.values() for item in members) + weight = _item_weight(all_items, durations) + return OrderedDict( + (key, sorted(members, key=weight, reverse=True)) + for key, members in groups.items() + ) + + def lpt_schedule( group_durations: dict[str, float], splits: int, @@ -145,20 +196,26 @@ def assign_runners( splits: int, keyed_items: Sequence[tuple[str, _HasNodeId]], durations: dict[str, float], + sort_intra_group: bool = True, ) -> list[SplitGroup]: """ Split *keyed_items* across *splits* runners by group key. - Composes :func:`build_group_durations` and :func:`lpt_schedule`, - then expands each runner's assigned keys back into the original - item objects. Intra-group order is preserved so that t8n-cache - hits stay adjacent under ``--dist loadgroup``. + Composes :func:`build_group_durations`, optionally + :func:`sort_items_within_groups`, and :func:`lpt_schedule`, then + expands each runner's assigned keys back into the original item + objects. Group contiguity is preserved so that t8n-cache hits + stay adjacent under ``--dist loadgroup``. Items sharing a key always land on the same runner; groups are - then distributed heaviest-first to the least-loaded runner (see - :func:`lpt_schedule`). + distributed heaviest-first to the least-loaded runner (see + :func:`lpt_schedule`). With *sort_intra_group* true (default), + items within each group are ordered slowest-first so xdist + workers start on the longest tests first. """ groups, group_durations = build_group_durations(keyed_items, durations) + if sort_intra_group: + groups = sort_items_within_groups(groups, durations) runner_keys, runner_totals, runner_max_group = lpt_schedule( group_durations, splits ) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py index 2cd0174bbf6..ac42faa1fb6 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections import OrderedDict from typing import NamedTuple import pytest @@ -10,6 +11,7 @@ assign_runners, build_group_durations, lpt_schedule, + sort_items_within_groups, ) @@ -206,3 +208,93 @@ def test_empty_runners_when_splits_exceed_groups(self) -> None: assert sum(len(k) for k in keys) == 1 assert sum(totals) == pytest.approx(7.0) assert max(max_group) == pytest.approx(7.0) + + +class TestSortItemsWithinGroups: + """Tests for :func:`sort_items_within_groups`.""" + + def test_items_sorted_slowest_first(self) -> None: + """Each group's items come out in duration-DESC order.""" + fast = Item("t.py::f[fast]") + mid = Item("t.py::f[mid]") + slow = Item("t.py::f[slow]") + groups = OrderedDict([("g", [fast, mid, slow])]) + durations = {fast.nodeid: 1.0, mid.nodeid: 5.0, slow.nodeid: 10.0} + out = sort_items_within_groups(groups, durations) + assert out["g"] == [slow, mid, fast] + + def test_group_order_preserved(self) -> None: + """Sort mutates within groups only; group key order is kept.""" + a, b = Item("t.py::a"), Item("t.py::b") + groups = OrderedDict([("k1", [a]), ("k2", [b])]) + out = sort_items_within_groups(groups, {}) + assert list(out.keys()) == ["k1", "k2"] + + def test_stable_on_ties(self) -> None: + """Items with identical durations keep their input order.""" + x = Item("t.py::f[x]") + y = Item("t.py::f[y]") + z = Item("t.py::f[z]") + groups = OrderedDict([("g", [x, y, z])]) + out = sort_items_within_groups( + groups, {x.nodeid: 1.0, y.nodeid: 1.0, z.nodeid: 1.0} + ) + assert out["g"] == [x, y, z] + + def test_unknown_items_use_known_mean(self) -> None: + """Unknown items are weighted at the known-items mean.""" + known_slow = Item("t.py::known_slow") + unknown = Item("t.py::unknown") + known_fast = Item("t.py::known_fast") + groups = OrderedDict([("g", [known_fast, unknown, known_slow])]) + out = sort_items_within_groups( + groups, + {known_slow.nodeid: 100.0, known_fast.nodeid: 1.0}, + ) + # Mean is ~50.5, so unknown > known_fast but < known_slow. + assert out["g"] == [known_slow, unknown, known_fast] + + +class TestAssignRunnersIntraGroupSort: + """End-to-end: :func:`assign_runners` + ``sort_intra_group``.""" + + def test_default_sorts_slowest_first_within_group(self) -> None: + """With ``sort_intra_group`` (default) slow items lead each group.""" + fast = Item("t.py::f[fast]") + slow = Item("t.py::f[slow]") + keyed = [("g", fast), ("g", slow)] + durations = {fast.nodeid: 1.0, slow.nodeid: 100.0} + groups = assign_runners(1, keyed, durations) + assert groups[0].selected == [slow, fast] + + def test_disabled_preserves_collection_order(self) -> None: + """``sort_intra_group=False`` keeps the original input order.""" + fast = Item("t.py::f[fast]") + slow = Item("t.py::f[slow]") + keyed = [("g", fast), ("g", slow)] + durations = {fast.nodeid: 1.0, slow.nodeid: 100.0} + groups = assign_runners(1, keyed, durations, sort_intra_group=False) + assert groups[0].selected == [fast, slow] + + def test_sort_does_not_cross_group_boundaries(self) -> None: + """Groups stay contiguous; sort only applies within each group.""" + a_fast = Item("t.py::a[fast]") + a_slow = Item("t.py::a[slow]") + b_fast = Item("t.py::b[fast]") + b_slow = Item("t.py::b[slow]") + keyed = [ + ("a", a_fast), + ("a", a_slow), + ("b", b_fast), + ("b", b_slow), + ] + durations = { + a_fast.nodeid: 1.0, + a_slow.nodeid: 10.0, + b_fast.nodeid: 2.0, + b_slow.nodeid: 20.0, + } + groups = assign_runners(1, keyed, durations) + # Heaviest group first (b total 22 > a total 11), each + # internally slowest-first, and no interleaving. + assert groups[0].selected == [b_slow, b_fast, a_slow, a_fast]