diff --git a/.github/scripts/diagnose_durations.py b/.github/scripts/diagnose_durations.py new file mode 100644 index 00000000000..2b0e710777c --- /dev/null +++ b/.github/scripts/diagnose_durations.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Diagnose a pytest-split ``.test_durations`` file. + +Report entry count, whether keys still carry ``@xdist_group`` suffixes +(indicating incomplete normalization), and print sample keys for +quick visual comparison against collected test nodeids. + +Usage:: + + uv run python .github/scripts/diagnose_durations.py [path] + +*path* defaults to ``.test_durations`` in the current directory. +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, +) + + +def main() -> None: + """Entry point.""" + path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations") + if not path.exists(): + print(f"::warning::No durations file at {path}") + return + + data = load_durations(path) + has_at = sum(1 for k in data if "@" in k) + keys = sorted(data) + abs_path = path.resolve() + + print(f"Durations file: {path}") + print(f" Entries: {len(data)}") + print(f" Keys with @ suffix: {has_at}/{len(data)}") + if has_at: + print( + f" WARNING: {has_at} keys still have @ suffixes" + " - normalization may have failed" + ) + + for label, sample in ( + ("First 3 keys:", keys[:3]), + ("Last 3 keys:", keys[-3:]), + ): + print(f" {label}") + for k in sample: + print(f" {k}: {data[k]:.2f}s") + + print(f" Absolute path: {abs_path}") + print(f" File size: {abs_path.stat().st_size} bytes") + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/merge_durations_files.py b/.github/scripts/merge_durations_files.py new file mode 100644 index 00000000000..1074620ce57 --- /dev/null +++ b/.github/scripts/merge_durations_files.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Merge multiple pytest-split ``.test_durations`` files. + +Accept an output path and one or more input ``.test_durations`` JSON +files and flat-merge them into one file. Splits produce disjoint test +sets by construction, so collisions are not expected; when they do +occur the last input wins. + +Usage:: + + uv run python .github/scripts/merge_durations_files.py \ + [ ...] +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + merge_durations, + write_durations, +) + + +def main() -> None: + """Entry point.""" + if len(sys.argv) < 3: + print( + "Usage: merge_durations_files.py " + " [ ...]", + file=sys.stderr, + ) + sys.exit(1) + + output_path = Path(sys.argv[1]) + inputs = [Path(p) for p in sys.argv[2:]] + + sources: list[dict[str, float]] = [] + count = 0 + for path in inputs: + if not path.exists(): + print(f"Skipping {path} (not found)") + continue + sources.append(load_durations(path)) + count += 1 + + if not sources: + print("No durations found, nothing to merge.") + sys.exit(0) + + merged = merge_durations(sources) + write_durations(output_path, merged) + print(f"Merged {count} durations files ({len(merged)} tests)") + + +if __name__ == "__main__": + main() diff --git a/.github/scripts/normalize_durations.py b/.github/scripts/normalize_durations.py new file mode 100644 index 00000000000..b6467e17195 --- /dev/null +++ b/.github/scripts/normalize_durations.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +""" +Normalize a pytest-split ``.test_durations`` file in place. + +Strip ``@xdist_group`` suffixes so the keys match the bare nodeids +pytest sees during collection. ``--store-durations`` records ids with +the suffix (e.g. ``@t8n-cache-``) added by xdist during +execution, so a normalization pass is required before a subsequent run +can look up durations. + +Usage:: + + uv run python .github/scripts/normalize_durations.py [path] + +*path* defaults to ``.test_durations`` in the current directory. +""" + +import sys +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + normalize_durations, + write_durations, +) + + +def main() -> None: + """Entry point.""" + path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations") + if not path.exists(): + print(f"::warning::No durations file at {path}") + return + + raw = load_durations(path) + normalized = normalize_durations(raw) + write_durations(path, normalized) + + collisions = len(raw) - len(normalized) + print( + f"Normalized {len(raw)} -> {len(normalized)} entries" + f" ({collisions} collisions)" + ) + + +if __name__ == "__main__": + main() diff --git a/packages/testing/pyproject.toml b/packages/testing/pyproject.toml index 46f5f717649..16b055c0d15 100644 --- a/packages/testing/pyproject.toml +++ b/packages/testing/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "pytest-custom-report>=1.0.1,<2", "pytest-html>=4.1.0,<5", "pytest-metadata>=3,<4", + "pytest-split==0.11.0", "pytest-xdist>=3.3.1,<4", "coincurve>=20.0.0,<21", "trie>=3.1.0,<4", diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/fill.py b/packages/testing/src/execution_testing/cli/pytest_commands/fill.py index 842935b65b5..c2b5ca33969 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/fill.py +++ b/packages/testing/src/execution_testing/cli/pytest_commands/fill.py @@ -33,33 +33,45 @@ def create_executions( self, pytest_args: List[str] ) -> List[PytestExecution]: """ - Create execution plan that supports two-phase pre-allocation group + Create execution plan supporting two-phase pre-allocation group generation. - Returns single execution for normal filling, or two-phase execution - when --generate-pre-alloc-groups or --generate-all-formats is - specified. + Returns: + - Single-phase execution when `--use-pre-alloc-groups` is set, + regardless of `--generate-all-formats` (pre-alloc groups + already exist on disk from a previous run). + - Phase-1-only execution when `--generate-pre-alloc-groups` is + set without `--generate-all-formats` (CI generates pre-alloc + on a dedicated runner without wasting time on phase 2). + - Two-phase execution when `--generate-all-formats` is set. + - Normal single-phase execution otherwise. + """ processed_args = self.process_arguments(pytest_args) processed_args = self._add_default_ignores(processed_args) - # Check if we need two-phase execution - if self._should_use_two_phase_execution(processed_args): - return self._create_two_phase_executions(processed_args) - elif "--use-pre-alloc-groups" in processed_args: - # Only phase 2: using existing pre-allocation groups + if "--use-pre-alloc-groups" in processed_args: + # Pre-alloc groups already exist: single-phase fill only. return self._create_single_phase_with_pre_alloc_groups( processed_args ) - else: - # Normal single-phase execution - return [ - PytestExecution( - config_file=self.config_path, - args=processed_args, - allowed_exit_codes=self.allowed_exit_codes, - ) - ] + if self._should_use_two_phase_execution(processed_args): + two_phase = self._create_two_phase_executions(processed_args) + if ( + "--generate-pre-alloc-groups" in processed_args + and "--generate-all-formats" not in processed_args + ): + # Phase 1 only: generate pre-alloc groups without filling. + return [two_phase[0]] + return two_phase + # Normal single-phase execution + return [ + PytestExecution( + config_file=self.config_path, + args=processed_args, + allowed_exit_codes=self.allowed_exit_codes, + ) + ] def _create_two_phase_executions( self, args: List[str] diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py new file mode 100644 index 00000000000..1a3c6989e9b --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/__init__.py @@ -0,0 +1 @@ +"""Grouped test splitting for pytest-split.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py new file mode 100644 index 00000000000..d4cbcd108be --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/durations.py @@ -0,0 +1,69 @@ +""" +Utilities for pytest-split ``.test_durations`` files. + +``--store-durations`` records nodeids with a ``@t8n-cache-`` +suffix appended during execution, but pytest collection sees bare +nodeids. These helpers bridge the two so the plugin and the CI +scripts share one implementation of suffix stripping, normalization, +and per-group merging. + +Only the ``@t8n-cache-*`` suffix is stripped. Other ``xdist_group`` +markers (e.g. ``@bigmem``) and ``@`` characters inside parametrize +values (e.g. ``test[email@example.com]``) are preserved, matching +``filler._strip_xdist_group_suffix``. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterable +from pathlib import Path + + +def strip_xdist_suffix(nodeid: str) -> str: + """Strip a ``@t8n-cache-*`` suffix from *nodeid*, if present.""" + if "@" in nodeid: + base, suffix = nodeid.rsplit("@", 1) + if suffix.startswith("t8n-cache-"): + return base + return nodeid + + +def normalize_durations(raw: dict[str, float]) -> dict[str, float]: + """ + Return *raw* with ``@t8n-cache-*`` suffixes removed from every key. + + When two keys collapse to the same stripped form (e.g. runs with + different t8n-cache ids), the last one wins. + """ + return {strip_xdist_suffix(k): v for k, v in raw.items()} + + +def merge_durations( + sources: Iterable[dict[str, float]], +) -> dict[str, float]: + """ + Flat-merge *sources* into a single durations dict. + + Fork-range and pytest-split groups produce disjoint nodeid sets by + construction, so collisions are expected to be empty; if any occur, + the last source wins. + """ + merged: dict[str, float] = {} + for src in sources: + merged.update(src) + return merged + + +def load_durations(path: Path) -> dict[str, float]: + """Read a ``.test_durations`` JSON file; empty dict if absent.""" + try: + return json.loads(path.read_text()) + except FileNotFoundError: + return {} + + +def write_durations(path: Path, data: dict[str, float]) -> None: + """Serialize *data* as JSON to *path*, creating parents as needed.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2) + "\n") diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py new file mode 100644 index 00000000000..3360cee790b --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/grouping.py @@ -0,0 +1,59 @@ +""" +Split-group key extraction for ``--grouped-split``. + +The grouping key maps every parametrization of one test function +under one fork to a single runner. The key format mirrors fill's +output-file layout (one file per ``(fork, function)`` pair), so plain +file copies can fan in the per-runner outputs without content +collisions. + +This module encodes only the correctness invariant -- which items +must stay together. The performance question of how to distribute +groups across runners is handled by :mod:`.scheduling`. +""" + +from __future__ import annotations + +from _pytest.nodes import Item + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + strip_xdist_suffix, +) + +_FORK_PARAM = "parametrized_fork" + + +def group_key(item: Item) -> str: + """ + Return the ``(function_path, fork)`` split-group key for *item*. + + Every parametrization of one test function under one fork maps + to the same key and therefore lands on the same runner, keeping + each per-test-function fixture file under its fork subdir + runner-owned. + + The fork is read from the authoritative source when available -- + ``item.callspec.params["parametrized_fork"]`` set by the forks + plugin -- so a parametrize value that happens to start with + ``fork_`` cannot be mistaken for the real fork. Items without a + callspec (unparametrized functions, doctests, or unit-test stubs) + fall back to a nodeid-based ``fork_*`` token scan. Items with no + fork anywhere form singleton groups keyed by the bare nodeid. + """ + nodeid = strip_xdist_suffix(item.nodeid) + path = nodeid.partition("[")[0] + + callspec = getattr(item, "callspec", None) + if callspec is not None: + params = getattr(callspec, "params", None) or {} + fork = params.get(_FORK_PARAM) + if fork is not None: + return f"{path}|fork={fork}" + + if "[" not in nodeid: + return nodeid + _, _, bracketed = nodeid.partition("[") + for token in bracketed.rstrip("]").split("-"): + if token.startswith("fork_"): + return f"{path}|fork={token[len('fork_') :]}" + return path diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py new file mode 100644 index 00000000000..395d077c59a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/plugin.py @@ -0,0 +1,229 @@ +""" +Pytest plugin for grouped test splitting. + +When ``--grouped-split`` is passed alongside pytest-split's ``--splits`` +and ``--group``, the plugin unregisters pytest-split's default splitter +and partitions items by ``(test_function_path, fork)`` — every +parametrization of one test function under one fork stays on the same +runner. + +That invariant is what fill's native output layout relies on: each +per-test-function fixture file lives under a per-fork subdirectory, so +every output file is written by exactly one runner. CI fan-in can then +copy per-runner fixture dirs together without content collisions and +without needing ``--single-fixture-per-file``. +""" + +from __future__ import annotations + +import sys +from collections.abc import Generator +from pathlib import Path +from typing import Any, cast + +import pytest +from _pytest.config import Config +from _pytest.nodes import Item +from _pytest.terminal import TerminalReporter + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + normalize_durations, + strip_xdist_suffix, +) +from execution_testing.cli.pytest_commands.plugins.split.grouping import ( + group_key, +) +from execution_testing.cli.pytest_commands.plugins.split.scheduling import ( + assign_runners, +) + +_SUMMARY_KEY = pytest.StashKey[list[str]]() +_SPLIT_PLUGIN_NAME = "pytestsplitplugin" + + +def pytest_addoption(parser: pytest.Parser) -> None: + """Register the ``--grouped-split`` flag.""" + parser.getgroup("split").addoption( + "--grouped-split", + dest="grouped_split", + action="store_true", + default=False, + help=( + "Replace pytest-split's default splitting with an" + " xdist_group-aware algorithm that keeps cache-sharing" + " items on the same runner. Requires --splits and" + " --group." + ), + ) + + +def _grouped_split_active(config: Config) -> bool: + """Return True when grouped splitting should replace pytest-split.""" + if not config.getoption("grouped_split", default=False): + return False + splits = config.getoption("splits", default=None) + group = config.getoption("group", default=None) + return bool(splits and group) + + +def _classify_mode( + *, + durations_loaded: int, + items: int, + matched: int, + durations_path: Path, +) -> str: + """ + Return a human-readable one-line mode label for the summary. + + Distinguishes the three situations operators care about: + + - ``average-only (no durations file)``: the configured path does + not exist or is empty. First run, or artifact not downloaded. + - ``average-only (... loaded, 0/N match — KEY MISMATCH)``: the + file was found but none of its keys line up with the collected + nodeids. This is the silent-fallback regression; bin-packing + is effectively random. + - ``duration-aware (matched/total ...)``: at least some items + have real durations; bin-packing is doing its job. + """ + if durations_loaded == 0: + return f"average-only (no durations at {durations_path})" + if matched == 0: + return ( + f"average-only ({durations_loaded} durations loaded from" + f" {durations_path}, 0/{items} match — KEY MISMATCH)" + ) + pct = 100.0 * matched / items if items else 0.0 + suffix = "" if matched == items else f", {items - matched} use avg" + return f"duration-aware ({matched}/{items} matched, {pct:.0f}%{suffix})" + + +def pytest_configure(config: Config) -> None: + """Unregister pytest-split's splitter when grouped mode is active.""" + if not _grouped_split_active(config): + return + plugin = config.pluginmanager.get_plugin(_SPLIT_PLUGIN_NAME) + if plugin is not None: + config.pluginmanager.unregister(plugin, _SPLIT_PLUGIN_NAME) + + +@pytest.hookimpl(trylast=True) +def pytest_collection_modifyitems(config: Config, items: list[Item]) -> None: + """Partition *items* across runners via grouped least-duration.""" + if not _grouped_split_active(config): + return + + splits: int = config.getoption("splits") + group: int = config.getoption("group") + durations_path = Path(config.getoption("durations_path")) + raw_durations = load_durations(durations_path) + durations = normalize_durations(raw_durations) + + keyed_items = [(group_key(item), item) for item in items] + all_groups = assign_runners( + splits=splits, keyed_items=keyed_items, durations=durations + ) + selected = all_groups[group - 1] # pytest-split's --group is 1-indexed + + matched = sum( + 1 for item in items if strip_xdist_suffix(item.nodeid) in durations + ) + unmatched = len(items) - matched + unique_groups = len({key for key, _ in keyed_items}) + mode = _classify_mode( + durations_loaded=len(durations), + items=len(items), + matched=matched, + durations_path=durations_path, + ) + # Emit a GitHub Actions ``::warning::`` annotation on the exact + # regression mode we care about in CI: durations file loaded but + # zero items match (bin-packing silently falls back to average). + # Prefix must start the line for Actions to pick it up. Under + # xdist, ``pytest_collection_modifyitems`` runs on every worker; + # emit only from ``gw0`` (or the non-xdist controller) to avoid N + # duplicate warnings. + worker_id = getattr(config, "workerinput", {}).get("workerid", "master") + if worker_id in ("master", "gw0") and len(durations) > 0 and matched == 0: + print( + "::warning title=grouped-split durations mismatch::" + f" loaded {len(durations)} durations from {durations_path}" + f" but 0/{len(items)} collected items match; bin-packing" + " fell back to average (splits will be imbalanced).", + file=sys.stderr, + flush=True, + ) + summary = [ + f"mode: {mode}", + ( + f"runner {group}/{splits}:" + f" selected {len(selected.selected)}/{len(items)} items," + f" est serial {selected.duration:.0f}s" + f" (heaviest group {selected.max_group_duration:.0f}s)" + ), + ( + f"grouping: {unique_groups} (function, fork) keys," + f" duration coverage {matched}/{len(items)}" + + (f" ({unmatched} unknown -> avg)" if unmatched else "") + ), + "all runners (selected / serial-s / heaviest-s):", + ] + for i, g in enumerate(all_groups, 1): + marker = ">>>" if i == group else " " + summary.append( + f" {marker} {i:2d}: {len(g.selected):6d} items," + f" {g.duration:>7.0f}s, {g.max_group_duration:>7.0f}s" + ) + config.stash[_SUMMARY_KEY] = summary + + items[:] = cast(list[Item], selected.selected) + config.hook.pytest_deselected(items=selected.deselected) + + +@pytest.hookimpl(trylast=True) +def pytest_sessionfinish(session: pytest.Session) -> None: + """ + On an xdist worker, forward the summary to the controller via + ``workeroutput`` so :func:`pytest_terminal_summary` (which runs on + the controller) can find it. No-op on the controller and under + non-xdist runs. + """ + config = session.config + if not hasattr(config, "workerinput"): + return + summary = config.stash.get(_SUMMARY_KEY, []) + if summary and hasattr(config, "workeroutput"): + config.workeroutput["grouped_split_summary"] = summary + + +def pytest_testnodedown(node: Any) -> None: + """ + On the controller, accept the first worker's summary as the + canonical one. Every worker produced the same summary (they all + run the same grouping over the same items), so first-wins is + safe. + """ + if node.config.stash.get(_SUMMARY_KEY, None) is not None: + return + worker_output = getattr(node, "workeroutput", None) + if worker_output is None: + return + summary = worker_output.get("grouped_split_summary") + if summary: + node.config.stash[_SUMMARY_KEY] = summary + + +@pytest.hookimpl(hookwrapper=True, trylast=True) +def pytest_terminal_summary( + terminalreporter: TerminalReporter, config: Config +) -> Generator[None, None, None]: + """Print the grouped-split summary after the normal terminal output.""" + yield + summary = config.stash.get(_SUMMARY_KEY, []) + if not summary: + return + terminalreporter.write_sep("=", "grouped-split", bold=True) + for line in summary: + terminalreporter.line(line) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py new file mode 100644 index 00000000000..b17602db79d --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/scheduling.py @@ -0,0 +1,238 @@ +""" +Scheduling for ``--grouped-split``: bin-pack grouped items across runners. + +This module combines three concerns that together implement the +``--grouped-split`` flag: + +1. **Grouping invariant** (correctness). Items sharing a group key + always land on the same runner. The caller supplies the key via + the ``(key, item)`` pairs in *keyed_items*; this module only + enforces the "same key, same runner" rule. Fan-in safety for + per-group output files (e.g. fill's per-``(fork, function)`` + fixture files) depends on this invariant alone. + +2. **LPT runner scheduling** (CI-wallclock performance). Given the + grouping constraint, groups are assigned *heaviest-first* to the + *least-loaded* runner via a min-heap -- Longest-Processing-Time- + first, the standard 4/3-approximation for makespan minimization + on identical machines. This balances total wallclock across CI + runners. + +3. **Intra-group item ordering** (worker-straggler prevention). + Within each group, items are sorted by individual duration DESC + before being handed back to pytest. Under xdist's default scheduler + settings (``--dist=load`` or ``--dist=loadgroup`` without + ``--loadscopereorder``) this order is preserved all the way to + worker dispatch, so slow tests start first and don't become + trailing stragglers on a runner's ``-n N`` worker pool. Disable + via ``assign_runners(..., sort_intra_group=False)``. + +Duration data is optional. Unknown items fall back to the mean of +known items (or ``1.0`` if none are known), so when ``.test_durations`` +is absent the scheduler degrades gracefully. + +Public API: + +- :func:`build_group_durations` -- items to groups + per-group totals. +- :func:`sort_items_within_groups` -- sort each group slowest-first. +- :func:`lpt_schedule` -- group durations to runner assignments. +- :func:`assign_runners` -- end-to-end: items to :class:`SplitGroup`s. +""" + +from __future__ import annotations + +import heapq +from collections import OrderedDict +from collections.abc import Callable, Iterable, Sequence +from typing import NamedTuple, Protocol, TypeVar, runtime_checkable + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + strip_xdist_suffix, +) + + +@runtime_checkable +class _HasNodeId(Protocol): + """Minimal protocol for items with a ``nodeid`` attribute.""" + + @property + def nodeid(self) -> str: + """Return the pytest node identifier.""" + ... + + +_ItemT = TypeVar("_ItemT", bound=_HasNodeId) + + +class SplitGroup(NamedTuple): + """One runner's workload after splitting.""" + + selected: list[_HasNodeId] + deselected: list[_HasNodeId] + duration: float + max_group_duration: float + + +def _item_weight( + items: Iterable[_HasNodeId], + durations: dict[str, float], +) -> Callable[[_HasNodeId], float]: + """ + Return an ``item -> estimated duration`` function. + + Per-item duration is looked up by bare nodeid (after stripping + any ``@t8n-cache-*`` suffix); unknown items inherit the mean of + known items, or ``1.0`` when no durations are known. The caller + passes the set of items in-scope so the average is computed over + that scope. + """ + known: dict[str, float] = {} + for item in items: + nid = strip_xdist_suffix(item.nodeid) + if nid in durations: + known[nid] = durations[nid] + avg = sum(known.values()) / len(known) if known else 1.0 + + def weight(item: _HasNodeId) -> float: + return known.get(strip_xdist_suffix(item.nodeid), avg) + + return weight + + +def build_group_durations( + keyed_items: Sequence[tuple[str, _HasNodeId]], + durations: dict[str, float], +) -> tuple[OrderedDict[str, list[_HasNodeId]], dict[str, float]]: + """ + Group *keyed_items* by key and compute each group's total duration. + + Items sharing a key are collected in collection order under that + key. See :func:`_item_weight` for the per-item duration lookup + and fallback rule. + + Returns ``(groups, group_durations)``: + + - ``groups[key]`` -- the items under *key* in collection order. + - ``group_durations[key]`` -- the sum of per-item durations. + """ + groups: OrderedDict[str, list[_HasNodeId]] = OrderedDict() + for key, item in keyed_items: + groups.setdefault(key, []).append(item) + + weight = _item_weight((item for _, item in keyed_items), durations) + group_durations = { + key: sum(weight(item) for item in members) + for key, members in groups.items() + } + return groups, group_durations + + +def sort_items_within_groups( + groups: OrderedDict[str, list[_ItemT]], + durations: dict[str, float], +) -> OrderedDict[str, list[_ItemT]]: + """ + Return *groups* with each member list sorted slowest-first. + + Group order (keys) is preserved; only the items inside each + group are reordered by individual duration DESC. Sort is stable, + so items with identical durations keep their collection order. + + Used by :func:`assign_runners` so that xdist workers receive slow + tests first within each scope, reducing end-of-run stragglers. + See :func:`_item_weight` for the duration lookup. + """ + all_items = (item for members in groups.values() for item in members) + weight = _item_weight(all_items, durations) + return OrderedDict( + (key, sorted(members, key=weight, reverse=True)) + for key, members in groups.items() + ) + + +def lpt_schedule( + group_durations: dict[str, float], + splits: int, +) -> tuple[list[list[str]], list[float], list[float]]: + """ + Assign groups to *splits* runners via Longest-Processing-Time-first. + + Groups are sorted heaviest-first and each is placed on the runner + with the smallest current total (tie-broken by runner index via + heap insertion order). The result is a 4/3-approximation of the + optimal makespan; exact optimization is NP-hard. + + Returns three parallel lists of length *splits*: + + - ``runner_keys[i]`` -- group keys assigned to runner *i*, in + placement order (heaviest-first globally). + - ``runner_totals[i]`` -- total duration on runner *i*. + - ``runner_max_group[i]`` -- duration of the largest single group + on runner *i* (a per-runner wallclock lower bound). + """ + sorted_keys = sorted( + group_durations, key=lambda k: group_durations[k], reverse=True + ) + runner_keys: list[list[str]] = [[] for _ in range(splits)] + runner_totals = [0.0] * splits + runner_max_group = [0.0] * splits + + heap: list[tuple[float, int]] = [(0.0, i) for i in range(splits)] + heapq.heapify(heap) + for key in sorted_keys: + total, idx = heapq.heappop(heap) + new_total = total + group_durations[key] + runner_keys[idx].append(key) + runner_totals[idx] = new_total + runner_max_group[idx] = max( + runner_max_group[idx], group_durations[key] + ) + heapq.heappush(heap, (new_total, idx)) + + return runner_keys, runner_totals, runner_max_group + + +def assign_runners( + splits: int, + keyed_items: Sequence[tuple[str, _HasNodeId]], + durations: dict[str, float], + sort_intra_group: bool = True, +) -> list[SplitGroup]: + """ + Split *keyed_items* across *splits* runners by group key. + + Composes :func:`build_group_durations`, optionally + :func:`sort_items_within_groups`, and :func:`lpt_schedule`, then + expands each runner's assigned keys back into the original item + objects. Group contiguity is preserved so that t8n-cache hits + stay adjacent under ``--dist loadgroup``. + + Items sharing a key always land on the same runner; groups are + distributed heaviest-first to the least-loaded runner (see + :func:`lpt_schedule`). With *sort_intra_group* true (default), + items within each group are ordered slowest-first so xdist + workers start on the longest tests first. + """ + groups, group_durations = build_group_durations(keyed_items, durations) + if sort_intra_group: + groups = sort_items_within_groups(groups, durations) + runner_keys, runner_totals, runner_max_group = lpt_schedule( + group_durations, splits + ) + + result: list[SplitGroup] = [] + for i in range(splits): + selected = [item for key in runner_keys[i] for item in groups[key]] + selected_ids = {id(item) for item in selected} + deselected = [ + item for _, item in keyed_items if id(item) not in selected_ids + ] + result.append( + SplitGroup( + selected=selected, + deselected=deselected, + duration=runner_totals[i], + max_group_duration=runner_max_group[i], + ) + ) + return result diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py new file mode 100644 index 00000000000..1d34711c32a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the grouped-split plugin.""" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py new file mode 100644 index 00000000000..089e28eeb4a --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_durations.py @@ -0,0 +1,112 @@ +"""Unit tests for the durations helpers.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from execution_testing.cli.pytest_commands.plugins.split.durations import ( + load_durations, + merge_durations, + normalize_durations, + strip_xdist_suffix, + write_durations, +) + + +class TestStripXdistSuffix: + """Tests for :func:`strip_xdist_suffix`.""" + + def test_strips_t8n_cache_suffix(self) -> None: + """``@t8n-cache-*`` suffixes are stripped.""" + nid = "tests/t.py::test_fn[fork_A-state_test]@t8n-cache-abc123" + expected = "tests/t.py::test_fn[fork_A-state_test]" + assert strip_xdist_suffix(nid) == expected + + def test_preserves_other_group_suffixes(self) -> None: + """Non-cache group suffixes (e.g. ``@bigmem``) are preserved.""" + nid = "tests/t.py::test_fn[p]@bigmem" + assert strip_xdist_suffix(nid) == nid + + def test_preserves_custom_group_suffixes(self) -> None: + """Custom ``xdist_group`` markers are preserved.""" + nid = "tests/t.py::test_fn[p]@custom_group" + assert strip_xdist_suffix(nid) == nid + + def test_without_suffix_is_idempotent(self) -> None: + """A nodeid without ``@`` is returned unchanged.""" + nid = "tests/t.py::test_fn[fork_A-state_test]" + assert strip_xdist_suffix(nid) == nid + + def test_at_in_params_preserved(self) -> None: + """``@`` inside parametrize values is preserved (rsplit).""" + nid = "tests/t.py::test_fn[email@example.com]@t8n-cache-abc" + expected = "tests/t.py::test_fn[email@example.com]" + assert strip_xdist_suffix(nid) == expected + + +class TestNormalizeDurations: + """Tests for :func:`normalize_durations`.""" + + def test_strips_cache_suffixes_only(self) -> None: + """Only ``@t8n-cache-*`` keys are stripped; others kept.""" + raw = { + "a[p]@t8n-cache-xyz": 1.0, + "b[q]@bigmem": 2.0, + "c": 3.0, + } + assert normalize_durations(raw) == { + "a[p]": 1.0, + "b[q]@bigmem": 2.0, + "c": 3.0, + } + + def test_collision_last_wins(self) -> None: + """Collapsed cache keys resolve to the last input's value.""" + raw = {"a@t8n-cache-x": 1.0, "a@t8n-cache-y": 2.0} + assert normalize_durations(raw) == {"a": 2.0} + + def test_empty_input(self) -> None: + """Empty input returns empty output.""" + assert normalize_durations({}) == {} + + +class TestMergeDurations: + """Tests for :func:`merge_durations`.""" + + def test_disjoint_sources(self) -> None: + """Disjoint inputs merge to their union.""" + merged = merge_durations( + [{"a": 1.0}, {"b": 2.0}, {"c": 3.0}], + ) + assert merged == {"a": 1.0, "b": 2.0, "c": 3.0} + + def test_overlap_last_wins(self) -> None: + """Overlapping keys resolve to the last source's value.""" + merged = merge_durations([{"a": 1.0}, {"a": 2.0}]) + assert merged == {"a": 2.0} + + def test_empty(self) -> None: + """Zero sources yield an empty dict.""" + assert merge_durations([]) == {} + + +class TestLoadAndWriteDurations: + """Tests for :func:`load_durations` and :func:`write_durations`.""" + + def test_round_trip(self, tmp_path: Path) -> None: + """Writing then reading returns the original dict.""" + path = tmp_path / ".test_durations" + data = {"a": 1.5, "b": 2.25} + write_durations(path, data) + assert load_durations(path) == data + + def test_load_missing_file_is_empty(self, tmp_path: Path) -> None: + """Missing files load as empty dicts.""" + assert load_durations(tmp_path / "nope") == {} + + def test_write_creates_parent_dirs(self, tmp_path: Path) -> None: + """Non-existent parent directories are created on write.""" + path = tmp_path / "nested" / "dir" / ".test_durations" + write_durations(path, {"x": 1.0}) + assert json.loads(path.read_text()) == {"x": 1.0} diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py new file mode 100644 index 00000000000..c75483a15aa --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_grouping.py @@ -0,0 +1,101 @@ +"""Unit tests for :func:`group_key`.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, cast + +from _pytest.nodes import Item + +from execution_testing.cli.pytest_commands.plugins.split.grouping import ( + group_key, +) + + +@dataclass +class _CallSpec: + """Minimal stub mirroring ``pytest.Function.callspec``.""" + + params: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class _Item: + """Minimal stub mirroring the item fields ``group_key`` reads.""" + + nodeid: str + callspec: _CallSpec | None = None + + +def _as_item(nodeid: str, callspec: _CallSpec | None = None) -> Item: + """Cast the stub to ``Item`` for type-checker friendliness.""" + return cast(Item, _Item(nodeid=nodeid, callspec=callspec)) + + +class TestGroupKeyAuthoritative: + """``parametrized_fork`` from the callspec is the primary source.""" + + def test_callspec_fork_is_used(self) -> None: + """Key uses the callspec's fork, not whatever's in the nodeid.""" + item = _as_item( + "t.py::test_f[fork_Osaka-state_test]", + _CallSpec(params={"parametrized_fork": "Osaka"}), + ) + assert group_key(item) == "t.py::test_f|fork=Osaka" + + def test_callspec_wins_over_ambiguous_nodeid(self) -> None: + """A param value starting with ``fork_`` cannot hijack the key.""" + # Nodeid's first ``fork_*`` token is the bogus param value, but + # the real fork in the callspec is Osaka -- authoritative path + # must ignore the nodeid scan entirely. + item = _as_item( + "t.py::test_f[fork_candidate-fork_Osaka-state_test]", + _CallSpec(params={"parametrized_fork": "Osaka"}), + ) + assert group_key(item) == "t.py::test_f|fork=Osaka" + + def test_fork_object_is_stringified(self) -> None: + """Non-string fork values are rendered via ``str(fork)``.""" + + class _Fork: + def __str__(self) -> str: + return "Prague" + + item = _as_item( + "t.py::test_f[fork_Prague]", + _CallSpec(params={"parametrized_fork": _Fork()}), + ) + assert group_key(item) == "t.py::test_f|fork=Prague" + + +class TestGroupKeyFallback: + """Without callspec or ``parametrized_fork``, fall back to nodeid.""" + + def test_no_callspec_uses_nodeid_scan(self) -> None: + """Items with no callspec scan the nodeid for ``fork_*``.""" + item = _as_item("t.py::test_f[fork_A-state_test]") + assert group_key(item) == "t.py::test_f|fork=A" + + def test_unparametrized_item_is_singleton(self) -> None: + """No ``[`` in nodeid yields the bare nodeid as its own group.""" + item = _as_item("t.py::test_f") + assert group_key(item) == "t.py::test_f" + + def test_parametrized_without_fork_groups_by_function(self) -> None: + """Parametrized items with no fork token share one group.""" + a = _as_item("t.py::test_f[x=1]") + b = _as_item("t.py::test_f[x=2]") + assert group_key(a) == group_key(b) == "t.py::test_f" + + def test_callspec_without_fork_param_falls_back(self) -> None: + """Callspec present but no ``parametrized_fork`` uses the scan.""" + item = _as_item( + "t.py::test_f[fork_A-state_test]", + _CallSpec(params={"other": "value"}), + ) + assert group_key(item) == "t.py::test_f|fork=A" + + def test_xdist_suffix_stripped_before_parsing(self) -> None: + """The ``@t8n-cache-*`` suffix is removed before key building.""" + item = _as_item("t.py::test_f[fork_A]@t8n-cache-abc123") + assert group_key(item) == "t.py::test_f|fork=A" diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py new file mode 100644 index 00000000000..15a807aa782 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_plugin_pytester.py @@ -0,0 +1,306 @@ +"""Pytester-based integration tests for the grouped-split plugin.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +PLUGIN = "execution_testing.cli.pytest_commands.plugins.split.plugin" + + +def _synthetic_tests(pytester: pytest.Pytester) -> None: + """ + Create a set of synthetic tests mirroring fill's nodeid shape. + + Two functions, two forks, two fixture formats, and one + non-format parameter, producing nodeids like + ``test_split_synth.py::test_alpha[fork_A-state_test-x_0]``. All + variants of one ``(function, fork)`` share a grouping key under + :func:`group_key`; different functions or different forks get + different keys. + """ + pytester.makepyfile( + test_split_synth=""" + import pytest + + @pytest.mark.parametrize("fmt", ["state_test", "blockchain_test"]) + @pytest.mark.parametrize("x", ["0", "1"]) + @pytest.mark.parametrize("fork", ["fork_A", "fork_B"]) + def test_alpha(fork, fmt, x): + pass + + @pytest.mark.parametrize("fmt", ["state_test", "blockchain_test"]) + @pytest.mark.parametrize("x", ["0", "1"]) + @pytest.mark.parametrize("fork", ["fork_A", "fork_B"]) + def test_beta(fork, fmt, x): + pass + + def test_singleton(): + pass + """ + ) + + +def _write_durations(path: Path, items: dict[str, float]) -> None: + """Write a synthetic ``.test_durations`` JSON file.""" + path.write_text(json.dumps(items)) + + +def _collected_nodeids(result: pytest.RunResult) -> set[str]: + """Return the set of ``test_split_synth``-prefixed nodeids.""" + return { + line.strip() + for line in result.stdout.lines + if "::" in line and "test_split_synth" in line + } + + +def test_partition_covers_every_item(pytester: pytest.Pytester) -> None: + """Union of every group's selection equals the full collected set.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + seen: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + seen.append(_collected_nodeids(result)) + + union = set().union(*seen) + intersection = seen[0].intersection(*seen[1:]) if seen else set() + assert intersection == set() + # 2 funcs * 2 forks * 2 params * 2 formats + 1 singleton = 17. + assert len(union) == 17 + + +def test_function_fork_items_stay_on_one_runner( + pytester: pytest.Pytester, +) -> None: + """Every item sharing a ``(function, fork)`` lands on one runner.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + runners: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + runners.append(_collected_nodeids(result)) + + for fn in ("test_alpha", "test_beta"): + for fork in ("fork_A", "fork_B"): + runner_idxs = { + idx + for idx, runner in enumerate(runners) + for nid in runner + if fn in nid and f"{fork}-" in nid + } + assert len(runner_idxs) == 1, ( + f"{fn} {fork} split across runners: {runner_idxs}" + ) + + +def test_pytest_split_plugin_unregistered_when_active( + pytester: pytest.Pytester, +) -> None: + """ + Upstream ``pytestsplitplugin`` is unregistered under + ``--grouped-split``. + """ + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + pytester.makeconftest( + """ + def pytest_configure(config): + active = config.pluginmanager.get_plugin("pytestsplitplugin") + print(f"SPLIT_PLUGIN_REGISTERED={active is not None}") + """ + ) + + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-s", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + assert "SPLIT_PLUGIN_REGISTERED=False" in "\n".join(result.stdout.lines) + + +def test_summary_printed(pytester: pytest.Pytester) -> None: + """The grouped-split summary appears in terminal output.""" + _synthetic_tests(pytester) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + stdout = "\n".join(result.stdout.lines) + assert "grouped-split" in stdout + assert "runner 1/2" in stdout + assert "(function, fork) keys" in stdout + + +def test_inactive_without_grouped_split_flag( + pytester: pytest.Pytester, +) -> None: + """Without ``--grouped-split`` the plugin is a no-op.""" + _synthetic_tests(pytester) + result = pytester.runpytest("-p", PLUGIN, "--collect-only", "-q") + assert result.ret == 0 + assert "grouped-split" not in "\n".join(result.stdout.lines) + + +def test_mode_labels_three_regimes(pytester: pytest.Pytester) -> None: + """ + Plugin summary's ``mode:`` line reports the three operator- + visible regimes: no durations, durations loaded but no match, and + duration-aware. + """ + _synthetic_tests(pytester) + + # Discover an actual collected nodeid so the "duration-aware" + # case's key matches exactly — param ordering in multi-decorator + # parametrize is implementation-defined, so hard-coding it is + # brittle. + collect_only = pytester.runpytest("-p", PLUGIN, "--collect-only", "-q") + assert collect_only.ret == 0 + any_nodeid = next( + line.strip() + for line in collect_only.stdout.lines + if "test_split_synth.py::test_alpha[" in line + ) + + missing_path = pytester.path / "missing.json" + assert not missing_path.exists() + bogus_path = pytester.path / "bogus.json" + bogus_path.write_text('{"tests/unrelated.py::test_x[fork_Zzz]": 99.0}') + matching_path = pytester.path / "matching.json" + matching_path.write_text(json.dumps({any_nodeid: 5.0})) + + for path, expected in ( + (missing_path, "average-only (no durations"), + (bogus_path, "average-only (1 durations loaded"), + (matching_path, "duration-aware"), + ): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={path}", + ) + assert result.ret == 0 + stdout = "\n".join(result.stdout.lines) + assert expected in stdout, ( + f"path={path.name}: expected {expected!r} in summary, got:" + f"\n{stdout[-500:]}" + ) + + +def test_warning_annotation_on_key_mismatch( + pytester: pytest.Pytester, +) -> None: + """ + The ``::warning::`` annotation fires when durations are loaded + but none match, so CI surfaces the silent-fallback regression. + """ + _synthetic_tests(pytester) + bogus_path = pytester.path / "bogus.json" + bogus_path.write_text('{"tests/unrelated.py::test_x[fork_Zzz]": 99.0}') + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "--grouped-split", + "--splits=2", + "--group=1", + f"--durations-path={bogus_path}", + ) + assert result.ret == 0 + combined = "\n".join(result.stdout.lines + result.stderr.lines) + assert "::warning title=grouped-split durations mismatch::" in combined + + +def test_items_without_fork_param_become_singletons( + pytester: pytest.Pytester, +) -> None: + """Items whose nodeid carries no ``fork_*`` token are per-nodeid.""" + pytester.makepyfile( + test_unmarked=""" + def test_a(): pass + def test_b(): pass + def test_c(): pass + def test_d(): pass + """ + ) + durations_path = pytester.path / ".test_durations" + _write_durations(durations_path, {}) + + splits = 2 + seen: list[set[str]] = [] + for group in range(1, splits + 1): + result = pytester.runpytest( + "-p", + PLUGIN, + "--collect-only", + "-q", + "--grouped-split", + f"--splits={splits}", + f"--group={group}", + f"--durations-path={durations_path}", + ) + assert result.ret == 0 + # Strip any rootdir-relative path prefix so the assertion is + # robust to pytester sandboxes placed inside the outer pytest's + # rootdir (e.g. under ``just test-tests --basetemp=.just/...``). + seen.append( + { + line.strip().rsplit("/", 1)[-1] + for line in result.stdout.lines + if "test_unmarked.py::" in line + } + ) + + assert all(len(s) > 0 for s in seen) + assert set().union(*seen) == ( + {f"test_unmarked.py::test_{n}" for n in "abcd"} + ) diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py new file mode 100644 index 00000000000..ac42faa1fb6 --- /dev/null +++ b/packages/testing/src/execution_testing/cli/pytest_commands/plugins/split/tests/test_scheduling.py @@ -0,0 +1,300 @@ +"""Unit tests for the scheduling module's public API.""" + +from __future__ import annotations + +from collections import OrderedDict +from typing import NamedTuple + +import pytest + +from execution_testing.cli.pytest_commands.plugins.split.scheduling import ( + assign_runners, + build_group_durations, + lpt_schedule, + sort_items_within_groups, +) + + +class Item(NamedTuple): + """Minimal pytest item stub used in place of ``pytest.Item``.""" + + nodeid: str + + +class TestAssignRunners: + """Tests for :func:`assign_runners` (end-to-end split).""" + + def test_shared_key_stays_on_one_runner(self) -> None: + """Items sharing a key land together.""" + a1 = Item("t.py::f[fork_A-p_X-state_test]") + a2 = Item("t.py::f[fork_A-p_X-blockchain_test]") + b = Item("t.py::f[fork_A-p_Y-state_test]") + keyed = [("group_A", a1), ("group_A", a2), ("group_B", b)] + durations = {a1.nodeid: 1.0, a2.nodeid: 1.0, b.nodeid: 1.0} + + groups = assign_runners(2, keyed, durations) + for g in groups: + if a1 in g.selected: + assert a2 in g.selected + break + + def test_heaviest_group_first(self) -> None: + """Heavy group anchors one runner, light groups fill the other.""" + heavy = [Item(f"t.py::heavy[{i}]") for i in range(3)] + light = [Item(f"t.py::light[{i}]") for i in range(3)] + keyed = [("heavy", h) for h in heavy] + [ + ("light", item) for item in light + ] + durations = {item.nodeid: 100.0 for item in heavy} + durations.update({item.nodeid: 1.0 for item in light}) + + groups = assign_runners(2, keyed, durations) + sorted_totals = sorted(g.duration for g in groups) + assert sorted_totals[0] == pytest.approx(3.0) + assert sorted_totals[1] == pytest.approx(300.0) + + def test_per_item_keys_singleton_groups(self) -> None: + """Items with distinct keys each become their own group.""" + a = Item("t.py::test_a") + b = Item("t.py::test_b") + keyed = [(a.nodeid, a), (b.nodeid, b)] + durations = {a.nodeid: 10.0, b.nodeid: 5.0} + groups = assign_runners(2, keyed, durations) + assert len(groups[0].selected) == 1 + assert len(groups[1].selected) == 1 + + def test_unknown_duration_fallback(self) -> None: + """Unknown items get the average of known durations.""" + known = Item("t.py::known") + unknown = Item("t.py::unknown") + keyed = [("known", known), ("unknown", unknown)] + durations = {known.nodeid: 10.0} + + groups = assign_runners(2, keyed, durations) + for g in groups: + if known in g.selected: + assert g.duration == pytest.approx(10.0) + if unknown in g.selected: + assert g.duration == pytest.approx(10.0) + + def test_intra_group_order_preserved(self) -> None: + """Items within one group keep their original order.""" + items = [ + Item("t.py::f[engine_x]"), + Item("t.py::f[blockchain]"), + Item("t.py::f[state]"), + ] + keyed = [("shared", item) for item in items] + durations = {i.nodeid: 1.0 for i in items} + groups = assign_runners(1, keyed, durations) + assert groups[0].selected == list(items) + + def test_deselected_partitions_items(self) -> None: + """Selected + deselected equals the full input for every runner.""" + items = [Item(f"t.py::f[{i}]") for i in range(3)] + keyed = [(f"key_{i}", item) for i, item in enumerate(items)] + durations = {i.nodeid: 1.0 for i in items} + + groups = assign_runners(2, keyed, durations) + all_ids = {i.nodeid for i in items} + for g in groups: + combined = {i.nodeid for i in g.selected} | { + i.nodeid for i in g.deselected + } + assert combined == all_ids + + def test_empty_runners(self) -> None: + """More splits than groups yields empty runners without error.""" + only = Item("t.py::f") + keyed = [("solo", only)] + durations = {only.nodeid: 1.0} + groups = assign_runners(5, keyed, durations) + + assert len(groups) == 5 + non_empty = [g for g in groups if g.selected] + empty = [g for g in groups if not g.selected] + assert len(non_empty) == 1 + assert all(len(g.deselected) == 1 for g in empty) + + def test_single_runner(self) -> None: + """``splits=1`` returns every item on one runner.""" + items = [Item("t.py::f"), Item("t.py::g")] + keyed = [("k1", items[0]), ("k2", items[1])] + durations = {i.nodeid: 1.0 for i in items} + groups = assign_runners(1, keyed, durations) + assert len(groups) == 1 + assert groups[0].selected == list(items) + assert groups[0].deselected == [] + + def test_durations_with_xdist_suffix_match(self) -> None: + """Durations that still carry ``@xdist_group`` suffixes match.""" + heavy = Item("t.py::heavy") + light = Item("t.py::light") + # Pre-normalized: plugin always normalizes before calling the + # algorithm, so this test mirrors that contract. + durations = {heavy.nodeid: 100.0, light.nodeid: 1.0} + keyed = [("h", heavy), ("l", light)] + groups = assign_runners(2, keyed, durations) + sorted_totals = sorted(g.duration for g in groups) + assert sorted_totals[0] == pytest.approx(1.0) + assert sorted_totals[1] == pytest.approx(100.0) + + def test_max_group_duration_tracked(self) -> None: + """Each runner reports the duration of its heaviest group.""" + heavy_a = Item("t.py::heavy[a]") + heavy_b = Item("t.py::heavy[b]") + light = Item("t.py::light") + keyed = [ + ("heavy", heavy_a), + ("heavy", heavy_b), + ("light", light), + ] + durations = { + heavy_a.nodeid: 100.0, + heavy_b.nodeid: 50.0, + light.nodeid: 1.0, + } + groups = assign_runners(2, keyed, durations) + for g in groups: + if heavy_a in g.selected: + assert g.max_group_duration == pytest.approx(150.0) + else: + assert g.max_group_duration == pytest.approx(1.0) + + +class TestBuildGroupDurations: + """Tests for :func:`build_group_durations`.""" + + def test_groups_preserve_collection_order(self) -> None: + """Items sharing a key keep their original order.""" + a, b, c = Item("t.py::a"), Item("t.py::b"), Item("t.py::a") + keyed = [("k1", a), ("k2", b), ("k1", c)] + groups, _ = build_group_durations(keyed, {}) + assert list(groups.keys()) == ["k1", "k2"] + assert groups["k1"] == [a, c] + assert groups["k2"] == [b] + + def test_unknown_items_use_known_mean(self) -> None: + """Per-group total blends known durations with the known mean.""" + known = Item("t.py::known") + unknown = Item("t.py::unknown") + keyed = [("k", known), ("k", unknown)] + _, group_durations = build_group_durations(keyed, {known.nodeid: 4.0}) + assert group_durations["k"] == pytest.approx(8.0) + + def test_no_known_durations_fallback_one(self) -> None: + """With zero known durations every item weighs ``1.0``.""" + a, b = Item("t.py::a"), Item("t.py::b") + _, group_durations = build_group_durations([("g", a), ("g", b)], {}) + assert group_durations["g"] == pytest.approx(2.0) + + +class TestLPTSchedule: + """Tests for :func:`lpt_schedule`.""" + + def test_heaviest_anchors_least_loaded(self) -> None: + """Heaviest group lands on runner 0; remaining fills runner 1.""" + keys, totals, max_group = lpt_schedule( + {"heavy": 100.0, "mid": 20.0, "light": 5.0}, 2 + ) + assert sorted(totals) == [25.0, 100.0] + heavy_runner = max(range(2), key=lambda i: totals[i]) + assert keys[heavy_runner] == ["heavy"] + assert max_group[heavy_runner] == pytest.approx(100.0) + + def test_empty_runners_when_splits_exceed_groups(self) -> None: + """Extra runners get no keys and zero totals.""" + keys, totals, max_group = lpt_schedule({"only": 7.0}, 3) + assert sum(len(k) for k in keys) == 1 + assert sum(totals) == pytest.approx(7.0) + assert max(max_group) == pytest.approx(7.0) + + +class TestSortItemsWithinGroups: + """Tests for :func:`sort_items_within_groups`.""" + + def test_items_sorted_slowest_first(self) -> None: + """Each group's items come out in duration-DESC order.""" + fast = Item("t.py::f[fast]") + mid = Item("t.py::f[mid]") + slow = Item("t.py::f[slow]") + groups = OrderedDict([("g", [fast, mid, slow])]) + durations = {fast.nodeid: 1.0, mid.nodeid: 5.0, slow.nodeid: 10.0} + out = sort_items_within_groups(groups, durations) + assert out["g"] == [slow, mid, fast] + + def test_group_order_preserved(self) -> None: + """Sort mutates within groups only; group key order is kept.""" + a, b = Item("t.py::a"), Item("t.py::b") + groups = OrderedDict([("k1", [a]), ("k2", [b])]) + out = sort_items_within_groups(groups, {}) + assert list(out.keys()) == ["k1", "k2"] + + def test_stable_on_ties(self) -> None: + """Items with identical durations keep their input order.""" + x = Item("t.py::f[x]") + y = Item("t.py::f[y]") + z = Item("t.py::f[z]") + groups = OrderedDict([("g", [x, y, z])]) + out = sort_items_within_groups( + groups, {x.nodeid: 1.0, y.nodeid: 1.0, z.nodeid: 1.0} + ) + assert out["g"] == [x, y, z] + + def test_unknown_items_use_known_mean(self) -> None: + """Unknown items are weighted at the known-items mean.""" + known_slow = Item("t.py::known_slow") + unknown = Item("t.py::unknown") + known_fast = Item("t.py::known_fast") + groups = OrderedDict([("g", [known_fast, unknown, known_slow])]) + out = sort_items_within_groups( + groups, + {known_slow.nodeid: 100.0, known_fast.nodeid: 1.0}, + ) + # Mean is ~50.5, so unknown > known_fast but < known_slow. + assert out["g"] == [known_slow, unknown, known_fast] + + +class TestAssignRunnersIntraGroupSort: + """End-to-end: :func:`assign_runners` + ``sort_intra_group``.""" + + def test_default_sorts_slowest_first_within_group(self) -> None: + """With ``sort_intra_group`` (default) slow items lead each group.""" + fast = Item("t.py::f[fast]") + slow = Item("t.py::f[slow]") + keyed = [("g", fast), ("g", slow)] + durations = {fast.nodeid: 1.0, slow.nodeid: 100.0} + groups = assign_runners(1, keyed, durations) + assert groups[0].selected == [slow, fast] + + def test_disabled_preserves_collection_order(self) -> None: + """``sort_intra_group=False`` keeps the original input order.""" + fast = Item("t.py::f[fast]") + slow = Item("t.py::f[slow]") + keyed = [("g", fast), ("g", slow)] + durations = {fast.nodeid: 1.0, slow.nodeid: 100.0} + groups = assign_runners(1, keyed, durations, sort_intra_group=False) + assert groups[0].selected == [fast, slow] + + def test_sort_does_not_cross_group_boundaries(self) -> None: + """Groups stay contiguous; sort only applies within each group.""" + a_fast = Item("t.py::a[fast]") + a_slow = Item("t.py::a[slow]") + b_fast = Item("t.py::b[fast]") + b_slow = Item("t.py::b[slow]") + keyed = [ + ("a", a_fast), + ("a", a_slow), + ("b", b_fast), + ("b", b_slow), + ] + durations = { + a_fast.nodeid: 1.0, + a_slow.nodeid: 10.0, + b_fast.nodeid: 2.0, + b_slow.nodeid: 20.0, + } + groups = assign_runners(1, keyed, durations) + # Heaviest group first (b total 22 > a total 11), each + # internally slowest-first, and no interleaving. + assert groups[0].selected == [b_slow, b_fast, a_slow, a_fast] diff --git a/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini b/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini index 179ac2082f0..6696be28755 100644 --- a/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini +++ b/packages/testing/src/execution_testing/cli/pytest_commands/pytest_ini_files/pytest-fill.ini @@ -17,6 +17,7 @@ addopts = -p execution_testing.cli.pytest_commands.plugins.shared.transaction_fixtures -p execution_testing.cli.pytest_commands.plugins.help.help -p execution_testing.cli.pytest_commands.plugins.custom_logging.plugin_logging + -p execution_testing.cli.pytest_commands.plugins.split.plugin --tb short --dist loadgroup --ignore tests/cancun/eip4844_blobs/point_evaluation_vectors/ diff --git a/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py b/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py index e30e9f15ce9..5ea475b2460 100644 --- a/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py +++ b/packages/testing/src/execution_testing/cli/tests/test_generate_all_formats.py @@ -76,26 +76,45 @@ def test_generate_all_formats_removes_clean_from_phase2() -> None: assert "--clean" not in phase2_args -def test_legacy_generate_pre_alloc_groups_still_works() -> None: - """Test that the legacy --generate-pre-alloc-groups flag still works.""" +def test_generate_pre_alloc_groups_alone_is_phase_1_only() -> None: + """ + Test that --generate-pre-alloc-groups without --generate-all-formats + runs phase 1 only, so CI can populate pre-alloc groups on a + dedicated runner without wasting time on phase 2. + """ command = FillCommand() with patch.object(command, "process_arguments", side_effect=lambda x: x): pytest_args = ["--generate-pre-alloc-groups", "tests/somedir/"] executions = command.create_executions(pytest_args) - assert len(executions) == 2 + assert len(executions) == 1 - # Phase 1: Should have --generate-pre-alloc-groups phase1_args = executions[0].args assert "--generate-pre-alloc-groups" in phase1_args + assert "--use-pre-alloc-groups" not in phase1_args + assert "--generate-all-formats" not in phase1_args - # Phase 2: Should have --use-pre-alloc-groups but NOT --generate-all- - # formats - phase2_args = executions[1].args - assert "--use-pre-alloc-groups" in phase2_args - assert "--generate-all-formats" not in phase2_args - assert "--generate-pre-alloc-groups" not in phase2_args + +def test_use_pre_alloc_groups_forces_single_phase() -> None: + """ + Test that --use-pre-alloc-groups always runs a single phase, even + alongside --generate-all-formats (pre-alloc groups already exist on + disk from a previous run). + """ + command = FillCommand() + + with patch.object(command, "process_arguments", side_effect=lambda x: x): + pytest_args = [ + "--use-pre-alloc-groups", + "--generate-all-formats", + "tests/somedir/", + ] + executions = command.create_executions(pytest_args) + + assert len(executions) == 1 + assert "--use-pre-alloc-groups" in executions[0].args + assert "--generate-all-formats" in executions[0].args def test_single_phase_without_flags() -> None: diff --git a/uv.lock b/uv.lock index fc1bf99a549..808f4d064cc 100644 --- a/uv.lock +++ b/uv.lock @@ -1073,6 +1073,7 @@ dependencies = [ { name = "pytest-json-report" }, { name = "pytest-metadata" }, { name = "pytest-regex" }, + { name = "pytest-split" }, { name = "pytest-xdist" }, { name = "pyyaml" }, { name = "questionary" }, @@ -1126,6 +1127,7 @@ requires-dist = [ { name = "pytest-json-report", specifier = ">=1.5.0,<2" }, { name = "pytest-metadata", specifier = ">=3,<4" }, { name = "pytest-regex", specifier = ">=0.2.0,<0.3" }, + { name = "pytest-split", specifier = "==0.11.0" }, { name = "pytest-xdist", specifier = ">=3.3.1,<4" }, { name = "pyyaml", specifier = ">=6.0.2,<7" }, { name = "questionary", specifier = ">=2.1.0,<3" }, @@ -2443,6 +2445,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/72/d4143c66c1806599358c119c0af530bab92ef0c48129f17522dfd5a7ff6a/pytest_regex-0.2.0-py3-none-any.whl", hash = "sha256:c97e9c49e8c7e7482bd1fa701e3a5cccd18eb78d263752e32dba4937d8cee6d9", size = 3824, upload-time = "2023-05-29T22:08:09.551Z" }, ] +[[package]] +name = "pytest-split" +version = "0.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2f/16/8af4c5f2ceb3640bb1f78dfdf5c184556b10dfe9369feaaad7ff1c13f329/pytest_split-0.11.0.tar.gz", hash = "sha256:8ebdb29cc72cc962e8eb1ec07db1eeb98ab25e215ed8e3216f6b9fc7ce0ec2b5", size = 13421, upload-time = "2026-02-03T09:14:31.469Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ae/a1/d4423657caaa8be9b31e491592b49cebdcfd434d3e74512ce71f6ec39905/pytest_split-0.11.0-py3-none-any.whl", hash = "sha256:899d7c0f5730da91e2daf283860eb73b503259cb416851a65599368849c7f382", size = 11911, upload-time = "2026-02-03T09:14:33.708Z" }, +] + [[package]] name = "pytest-xdist" version = "3.8.0"