Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions .github/scripts/diagnose_durations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Diagnose a pytest-split ``.test_durations`` file.

Report entry count, whether keys still carry ``@xdist_group`` suffixes
(indicating incomplete normalization), and print sample keys for
quick visual comparison against collected test nodeids.

Usage::

uv run python .github/scripts/diagnose_durations.py [path]

*path* defaults to ``.test_durations`` in the current directory.
"""

import sys
from pathlib import Path

from execution_testing.cli.pytest_commands.plugins.split.durations import (
load_durations,
)


def main() -> None:
"""Entry point."""
path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations")
if not path.exists():
print(f"::warning::No durations file at {path}")
return

data = load_durations(path)
has_at = sum(1 for k in data if "@" in k)
keys = sorted(data)
abs_path = path.resolve()

print(f"Durations file: {path}")
print(f" Entries: {len(data)}")
print(f" Keys with @ suffix: {has_at}/{len(data)}")
if has_at:
print(
f" WARNING: {has_at} keys still have @ suffixes"
" - normalization may have failed"
)

for label, sample in (
("First 3 keys:", keys[:3]),
("Last 3 keys:", keys[-3:]),
):
print(f" {label}")
for k in sample:
print(f" {k}: {data[k]:.2f}s")

print(f" Absolute path: {abs_path}")
print(f" File size: {abs_path.stat().st_size} bytes")


if __name__ == "__main__":
main()
58 changes: 58 additions & 0 deletions .github/scripts/merge_durations_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Merge multiple pytest-split ``.test_durations`` files.

Accept an output path and one or more input ``.test_durations`` JSON
files and flat-merge them into one file. Splits produce disjoint test
sets by construction, so collisions are not expected; when they do
occur the last input wins.

Usage::

uv run python .github/scripts/merge_durations_files.py \
<output.json> <durations_file> [<durations_file> ...]
"""

import sys
from pathlib import Path

from execution_testing.cli.pytest_commands.plugins.split.durations import (
load_durations,
merge_durations,
write_durations,
)


def main() -> None:
"""Entry point."""
if len(sys.argv) < 3:
print(
"Usage: merge_durations_files.py <output.json>"
" <durations_file> [<durations_file> ...]",
file=sys.stderr,
)
sys.exit(1)

output_path = Path(sys.argv[1])
inputs = [Path(p) for p in sys.argv[2:]]

sources: list[dict[str, float]] = []
count = 0
for path in inputs:
if not path.exists():
print(f"Skipping {path} (not found)")
continue
sources.append(load_durations(path))
count += 1

if not sources:
print("No durations found, nothing to merge.")
sys.exit(0)

merged = merge_durations(sources)
write_durations(output_path, merged)
print(f"Merged {count} durations files ({len(merged)} tests)")


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions .github/scripts/normalize_durations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
Normalize a pytest-split ``.test_durations`` file in place.

Strip ``@xdist_group`` suffixes so the keys match the bare nodeids
pytest sees during collection. ``--store-durations`` records ids with
the suffix (e.g. ``@t8n-cache-<hash>``) added by xdist during
execution, so a normalization pass is required before a subsequent run
can look up durations.

Usage::

uv run python .github/scripts/normalize_durations.py [path]

*path* defaults to ``.test_durations`` in the current directory.
"""

import sys
from pathlib import Path

from execution_testing.cli.pytest_commands.plugins.split.durations import (
load_durations,
normalize_durations,
write_durations,
)


def main() -> None:
"""Entry point."""
path = Path(sys.argv[1]) if len(sys.argv) > 1 else Path(".test_durations")
if not path.exists():
print(f"::warning::No durations file at {path}")
return

raw = load_durations(path)
normalized = normalize_durations(raw)
write_durations(path, normalized)

collisions = len(raw) - len(normalized)
print(
f"Normalized {len(raw)} -> {len(normalized)} entries"
f" ({collisions} collisions)"
)


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions packages/testing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"pytest-custom-report>=1.0.1,<2",
"pytest-html>=4.1.0,<5",
"pytest-metadata>=3,<4",
"pytest-split==0.11.0",
"pytest-xdist>=3.3.1,<4",
"coincurve>=20.0.0,<21",
"trie>=3.1.0,<4",
Expand Down
48 changes: 30 additions & 18 deletions packages/testing/src/execution_testing/cli/pytest_commands/fill.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,45 @@ def create_executions(
self, pytest_args: List[str]
) -> List[PytestExecution]:
"""
Create execution plan that supports two-phase pre-allocation group
Create execution plan supporting two-phase pre-allocation group
generation.

Returns single execution for normal filling, or two-phase execution
when --generate-pre-alloc-groups or --generate-all-formats is
specified.
Returns:
- Single-phase execution when `--use-pre-alloc-groups` is set,
regardless of `--generate-all-formats` (pre-alloc groups
already exist on disk from a previous run).
- Phase-1-only execution when `--generate-pre-alloc-groups` is
set without `--generate-all-formats` (CI generates pre-alloc
on a dedicated runner without wasting time on phase 2).
- Two-phase execution when `--generate-all-formats` is set.
- Normal single-phase execution otherwise.

"""
processed_args = self.process_arguments(pytest_args)
processed_args = self._add_default_ignores(processed_args)

# Check if we need two-phase execution
if self._should_use_two_phase_execution(processed_args):
return self._create_two_phase_executions(processed_args)
elif "--use-pre-alloc-groups" in processed_args:
# Only phase 2: using existing pre-allocation groups
if "--use-pre-alloc-groups" in processed_args:
# Pre-alloc groups already exist: single-phase fill only.
return self._create_single_phase_with_pre_alloc_groups(
processed_args
)
else:
# Normal single-phase execution
return [
PytestExecution(
config_file=self.config_path,
args=processed_args,
allowed_exit_codes=self.allowed_exit_codes,
)
]
if self._should_use_two_phase_execution(processed_args):
two_phase = self._create_two_phase_executions(processed_args)
if (
"--generate-pre-alloc-groups" in processed_args
and "--generate-all-formats" not in processed_args
):
# Phase 1 only: generate pre-alloc groups without filling.
return [two_phase[0]]
return two_phase
# Normal single-phase execution
return [
PytestExecution(
config_file=self.config_path,
args=processed_args,
allowed_exit_codes=self.allowed_exit_codes,
)
]

def _create_two_phase_executions(
self, args: List[str]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Grouped test splitting for pytest-split."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Utilities for pytest-split ``.test_durations`` files.

``--store-durations`` records nodeids with a ``@t8n-cache-<hash>``
suffix appended during execution, but pytest collection sees bare
nodeids. These helpers bridge the two so the plugin and the CI
scripts share one implementation of suffix stripping, normalization,
and per-group merging.

Only the ``@t8n-cache-*`` suffix is stripped. Other ``xdist_group``
markers (e.g. ``@bigmem``) and ``@`` characters inside parametrize
values (e.g. ``test[email@example.com]``) are preserved, matching
``filler._strip_xdist_group_suffix``.
"""

from __future__ import annotations

import json
from collections.abc import Iterable
from pathlib import Path


def strip_xdist_suffix(nodeid: str) -> str:
"""Strip a ``@t8n-cache-*`` suffix from *nodeid*, if present."""
if "@" in nodeid:
base, suffix = nodeid.rsplit("@", 1)
if suffix.startswith("t8n-cache-"):
return base
return nodeid


def normalize_durations(raw: dict[str, float]) -> dict[str, float]:
"""
Return *raw* with ``@t8n-cache-*`` suffixes removed from every key.

When two keys collapse to the same stripped form (e.g. runs with
different t8n-cache ids), the last one wins.
"""
return {strip_xdist_suffix(k): v for k, v in raw.items()}


def merge_durations(
sources: Iterable[dict[str, float]],
) -> dict[str, float]:
"""
Flat-merge *sources* into a single durations dict.

Fork-range and pytest-split groups produce disjoint nodeid sets by
construction, so collisions are expected to be empty; if any occur,
the last source wins.
"""
merged: dict[str, float] = {}
for src in sources:
merged.update(src)
return merged


def load_durations(path: Path) -> dict[str, float]:
"""Read a ``.test_durations`` JSON file; empty dict if absent."""
try:
return json.loads(path.read_text())
except FileNotFoundError:
return {}


def write_durations(path: Path, data: dict[str, float]) -> None:
"""Serialize *data* as JSON to *path*, creating parents as needed."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Split-group key extraction for ``--grouped-split``.

The grouping key maps every parametrization of one test function
under one fork to a single runner. The key format mirrors fill's
output-file layout (one file per ``(fork, function)`` pair), so plain
file copies can fan in the per-runner outputs without content
collisions.

This module encodes only the correctness invariant -- which items
must stay together. The performance question of how to distribute
groups across runners is handled by :mod:`.scheduling`.
"""

from __future__ import annotations

from _pytest.nodes import Item

from execution_testing.cli.pytest_commands.plugins.split.durations import (
strip_xdist_suffix,
)

_FORK_PARAM = "parametrized_fork"


def group_key(item: Item) -> str:
"""
Return the ``(function_path, fork)`` split-group key for *item*.

Every parametrization of one test function under one fork maps
to the same key and therefore lands on the same runner, keeping
each per-test-function fixture file under its fork subdir
runner-owned.

The fork is read from the authoritative source when available --
``item.callspec.params["parametrized_fork"]`` set by the forks
plugin -- so a parametrize value that happens to start with
``fork_`` cannot be mistaken for the real fork. Items without a
callspec (unparametrized functions, doctests, or unit-test stubs)
fall back to a nodeid-based ``fork_*`` token scan. Items with no
fork anywhere form singleton groups keyed by the bare nodeid.
"""
nodeid = strip_xdist_suffix(item.nodeid)
path = nodeid.partition("[")[0]

callspec = getattr(item, "callspec", None)
if callspec is not None:
params = getattr(callspec, "params", None) or {}
fork = params.get(_FORK_PARAM)
if fork is not None:
return f"{path}|fork={fork}"

if "[" not in nodeid:
return nodeid
_, _, bracketed = nodeid.partition("[")
for token in bracketed.rstrip("]").split("-"):
if token.startswith("fork_"):
return f"{path}|fork={token[len('fork_') :]}"
return path
Loading
Loading