Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 168 additions & 36 deletions cardano_node_tests/pytest_plugins/xdist_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,52 @@
import collections
import os
import typing as tp

import pytest
from xdist import scheduler
from xdist import workermanage
from xdist.remote import Producer

LONG_MARKER = "long"
GROUP_MARKER = "xdist_group"
SPLIT_MARKER = "xdist_split"
SPLIT_NODEID_PREFIX = "split="
# Default upper bound on parallel cluster instances when neither CLUSTERS_COUNT
# is set nor a worker count is available. Matches the framework default.
DEFAULT_MAX_CLUSTERS = 9


class OneLongScheduling(scheduler.LoadScopeScheduling):
"""Scheduling plugin that tries to schedule no more than one long-running test per worker.

:scope: A "xdist_group" marker value or full node id.
"""Scheduling plugin with long-test balancing and split-key dispersion.

:scope: An "xdist_group" marker value or full node id.

Tests marked with ``@pytest.mark.long`` are tracked so that no more than one is
scheduled per worker at a time.

Tests marked with ``@pytest.mark.xdist_split("<key>")`` are NOT grouped onto one
worker (unlike ``xdist_group``). Each gets its own per-test scope so the scheduler
can reorder them independently, and the number of in-flight tests sharing a given
split key is capped at the available cluster instance capacity
(see ``self.clusters_count``). A test that locks multiple shared resources can
declare several keys at once with a comma-separated value
(e.g. ``@pytest.mark.xdist_split("governance,plutus")``); the cap is then
enforced independently for each key.

Without this cap, when many tests with the same split key are collected together
(e.g. governance tests in the same module), xdist hands them out at once to many
workers; workers beyond the instance capacity then sit blocked in the cluster
manager waiting for the shared resource (e.g. governance setup) to free up,
instead of running unrelated non-conflicting tests that could share those same
cluster instances in parallel. With the cap, only as many same-key tests are
scheduled simultaneously as there are instances to host them, and the remaining
workers pick non-conflicting work.

Example: 9 cluster instances, 18 governance tests, 20 workers. Without the cap,
18 workers all try to start governance — 9 run, the other 9 wait for instance
capacity while 2 unrelated tests sit unassigned. With the cap, 9 workers run
governance and the remaining 11 run non-governance tests on those same instances
in parallel.

:workqueue: Ordered dictionary that maps all available scopes with their
associated tests (nodeid). Nodeids are in turn associated with their
Expand Down Expand Up @@ -45,6 +80,17 @@ class OneLongScheduling(scheduler.LoadScopeScheduling):
}
"""

def __init__(self, config: tp.Any, log: Producer | None = None) -> None:
super().__init__(config, log)
# Cap concurrent tests per split key by the cluster instance count:
# explicit CLUSTERS_COUNT env wins; otherwise fall back to the xdist
# worker count (controller-side, via self.numnodes), bounded by
# DEFAULT_MAX_CLUSTERS; final fallback to 1.
# PYTEST_XDIST_WORKER_COUNT is NOT used here: xdist sets it only inside
# worker subprocesses, never on the controller where the scheduler runs.
env_count = int(os.environ.get("CLUSTERS_COUNT") or 0)
self.clusters_count = env_count or min(self.numnodes, DEFAULT_MAX_CLUSTERS) or 1

def _split_scope(self, nodeid: str) -> str:
"""Determine the scope (grouping) of a nodeid.

Expand All @@ -59,13 +105,62 @@ def _split_scope(self, nodeid: str) -> str:
return nodeid # nodeid has neither group name nor long-running marker

comps = nodeid[scope_start_idx:].split("@")
has_long = comps[-1] == LONG_MARKER
middle = comps[1:-1] if has_long else comps[1:]

# An xdist_split marker does NOT define a group scope. Tests sharing a split key
# remain in their own per-test scope so they can be spread across workers.
if middle and not middle[0].startswith(SPLIT_NODEID_PREFIX):
return middle[0] # group scope

if len(comps) == 3: # nodeid has a group name and a long-running marker
return comps[1]
if comps[-1] == LONG_MARKER: # nodeid has a long-running marker
return nodeid
return nodeid # per-test scope (long-only, split, split+long, or unmarked)

return comps[1] # nodeid has a group name
@staticmethod
def _get_split_keys(nodeid: str) -> set[str]:
"""Return the set of split keys encoded in a nodeid (empty if none).

A test can declare multiple split keys (locking multiple shared resources)
via ``@pytest.mark.xdist_split("a,b")``; each key is encoded as its own
``@split=<key>`` suffix.
"""
param_end_idx = nodeid.rfind("]")
scope_start_idx = param_end_idx if param_end_idx != -1 else 0
if nodeid.rfind("@") <= scope_start_idx:
return set()
keys: set[str] = set()
for comp in nodeid[scope_start_idx:].split("@")[1:]:
if comp.startswith(SPLIT_NODEID_PREFIX):
keys.add(comp[len(SPLIT_NODEID_PREFIX) :])
return keys

def _scope_split_keys(self, work_unit: dict) -> set[str]:
"""Return the split keys shared by nodeids in a work unit (empty if none)."""
if not work_unit:
return set()
return self._get_split_keys(next(iter(work_unit)))

def _get_saturated_split_keys(self) -> set[str]:
"""Return split keys with >= ``self.clusters_count`` pending tests across all workers.

Such keys have no remaining cluster instance capacity, so additional tests
with the key would just stall in the cluster manager. A test that declares
multiple keys contributes to the count of each independently (each cluster
instance has its own per-resource locks).
"""
counts: collections.Counter[str] = collections.Counter()
for assigned in self.assigned_work.values():
for work_unit in assigned.values():
for nodeid, completed in work_unit.items():
if completed:
continue
for key in self._get_split_keys(nodeid):
counts[key] += 1
return {k for k, n in counts.items() if n >= self.clusters_count}

@staticmethod
def _is_long_unit(work_unit: dict) -> bool:
"""Return True if the work unit contains any long-running test."""
return any(nid.endswith(f"@{LONG_MARKER}") for nid in work_unit)

def _is_long_pending(self, assigned_to_node: dict) -> bool:
"""Return True if there is a long-running test pending."""
Expand All @@ -76,49 +171,70 @@ def _is_long_pending(self, assigned_to_node: dict) -> bool:

return False

def _get_short_scope(self) -> str:
"""Return first non-long work unit."""
for scope, nodeids_dict in self.workqueue.items():
for nodeid in nodeids_dict:
if nodeid.endswith(f"@{LONG_MARKER}"):
break
else:
return str(scope)

return ""

def _get_long_scope(self) -> str:
"""Return first long work unit."""
for scope, nodeids_dict in self.workqueue.items():
for nodeid in nodeids_dict:
if nodeid.endswith(f"@{LONG_MARKER}"):
return str(scope)

def _get_short_scope(self, avoid_split_keys: tp.AbstractSet[str] = frozenset()) -> str:
"""Return first non-long work unit, preferring scopes without a conflicting split key."""
fallback = ""
for scope, work_unit in self.workqueue.items():
if self._is_long_unit(work_unit):
continue
if avoid_split_keys and self._scope_split_keys(work_unit) & avoid_split_keys:
fallback = fallback or str(scope)
continue
return str(scope)
return fallback

def _get_long_scope(self, avoid_split_keys: tp.AbstractSet[str] = frozenset()) -> str:
"""Return first long work unit, preferring scopes without a conflicting split key."""
fallback = ""
for scope, work_unit in self.workqueue.items():
if not self._is_long_unit(work_unit):
continue
if avoid_split_keys and self._scope_split_keys(work_unit) & avoid_split_keys:
fallback = fallback or str(scope)
continue
return str(scope)
return fallback

def _get_non_split_scope(self, avoid_split_keys: tp.AbstractSet[str]) -> str:
"""Return first scope whose split keys do not intersect the avoid set."""
if not avoid_split_keys:
return ""
for scope, work_unit in self.workqueue.items():
if self._scope_split_keys(work_unit) & avoid_split_keys:
continue
return str(scope)
return ""

def _assign_work_unit(self, node: workermanage.WorkerController) -> None:
"""Assign a work unit to a node."""
assert self.workqueue

assigned_to_node = self.assigned_work.setdefault(node, collections.OrderedDict())
scope, work_unit = None, None
scope, work_unit = "", None

# Check if there are any long-running tests already pending

long_pending = self._is_long_pending(assigned_to_node)

# Cap concurrent in-flight tests with the same split key at the cluster
# instance capacity, so workers beyond that limit pick non-conflicting work
# instead of stalling in the cluster manager.
avoid_split_keys = self._get_saturated_split_keys()

if long_pending:
# Try to find a work unit with no long-running test if there is already a long-running
# test pending
scope = self._get_short_scope()
if scope:
work_unit = self.workqueue.pop(scope)
scope = self._get_short_scope(avoid_split_keys=avoid_split_keys)
else:
# Try to find a work unit with long-running test if there is no long-running test
# pending. We want to schedule long-running tests as early as possible
scope = self._get_long_scope()
if scope:
work_unit = self.workqueue.pop(scope)
scope = self._get_long_scope(avoid_split_keys=avoid_split_keys)

# Long-balancing didn't pick anything; at least try to avoid split conflicts
if not scope:
scope = self._get_non_split_scope(avoid_split_keys=avoid_split_keys)

if scope:
work_unit = self.workqueue.pop(scope)

# Grab the first unit of work if none was grabbed above
if work_unit is None:
Expand All @@ -141,10 +257,11 @@ def _assign_work_unit(self, node: workermanage.WorkerController) -> None:
@pytest.hookimpl(tryfirst=True)
def pytest_collection_modifyitems(items: list) -> None:
for item in items:
group_marker = item.get_closest_marker("xdist_group")
group_marker = item.get_closest_marker(GROUP_MARKER)
split_marker = item.get_closest_marker(SPLIT_MARKER)
long_marker = item.get_closest_marker(LONG_MARKER)

if not (group_marker or long_marker):
if not (group_marker or split_marker or long_marker):
continue

comps = [item.nodeid]
Expand All @@ -158,6 +275,21 @@ def pytest_collection_modifyitems(items: list) -> None:
)
comps.append(gname)

# Add the split key(s) to nodeid as suffix. Recognized by the scheduler to spread
# tests sharing a heavy runtime resource across workers, without grouping them.
# Multiple keys can be supplied as a comma-separated string for tests that lock
# several shared resources (e.g. "governance,plutus").
if split_marker:
skey = (
split_marker.args[0]
if len(split_marker.args) > 0
else split_marker.kwargs.get("name", "default")
)
for raw in skey.split(","):
k = raw.strip()
if k:
comps.append(f"{SPLIT_NODEID_PREFIX}{k}")

# Add "long" to nodeid as suffix
if long_marker:
comps.append(LONG_MARKER)
Expand Down
3 changes: 3 additions & 0 deletions cardano_node_tests/tests/tests_conway/test_committee.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ def test_update_committee_action(
[r.success() for r in (reqc.db010, reqc.db011)]

@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance")
@pytest.mark.long
@pytest.mark.dbsync
@pytest.mark.dbsync_config
Expand Down Expand Up @@ -1355,6 +1356,7 @@ def _check_resign_dbsync(res_member: clusterlib.CCMember) -> None:

@allure.link(helpers.get_vcs_link())
@pytest.mark.skipif(not configuration.HAS_CC, reason="Runs only on setup with CC")
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_empty_committee(
self,
Expand Down Expand Up @@ -1717,6 +1719,7 @@ def test_update_committee_threshold_out_of_range(

@allure.link(helpers.get_vcs_link())
@pytest.mark.skipif(not configuration.HAS_CC, reason="Runs only on setup with CC")
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_committee_zero_threshold(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class TestConstitution:
"""Tests for constitution."""

@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance")
@pytest.mark.dbsync
@pytest.mark.long
def test_change_constitution(
Expand Down
1 change: 1 addition & 0 deletions cardano_node_tests/tests/tests_conway/test_drep.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,7 @@ def pool_user_lg(

@allure.link(helpers.get_vcs_link())
@pytest.mark.order(5)
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_drep_inactivity( # noqa: C901
self,
Expand Down
1 change: 1 addition & 0 deletions cardano_node_tests/tests/tests_conway/test_guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,7 @@ def cost_models(cluster_with_constitution: ClusterWithConstitutionRecord):

class TestGovernanceGuardrails:
@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_guardrails(
self,
Expand Down
1 change: 1 addition & 0 deletions cardano_node_tests/tests/tests_conway/test_hardfork.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TestHardfork:
"""Tests for hard-fork."""

@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_hardfork(
self,
Expand Down
2 changes: 2 additions & 0 deletions cardano_node_tests/tests/tests_conway/test_no_confidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TestNoConfidence:

@allure.link(helpers.get_vcs_link())
@pytest.mark.skipif(not configuration.HAS_CC, reason="Runs only on setup with CC")
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_no_confidence_action(
self,
Expand Down Expand Up @@ -320,6 +321,7 @@ def test_no_confidence_action(

@allure.link(helpers.get_vcs_link())
@pytest.mark.skipif(not configuration.HAS_CC, reason="Runs only on setup with CC")
@pytest.mark.xdist_split("governance")
@pytest.mark.long
def test_committee_min_size(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class TestPParamUpdate:
"""Tests for protocol parameters update."""

@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance,plutus")
@pytest.mark.long
@pytest.mark.dbsync
def test_pparam_update( # noqa: C901
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class TestUpdateBuiltIns:

@allure.link(helpers.get_vcs_link())
@pytest.mark.skipif(not configuration.HAS_CC, reason="Runs only on setup with CC")
@pytest.mark.xdist_split("governance,plutus")
@pytest.mark.long
@pytest.mark.upgrade_step1
def test_update_in_pv9(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class TestPlutusV3Builtins:
batch6_overspend_scripts = plutus_common.OVERSPENDING_MINTING_BATCH6_SCRIPTS_V3

@allure.link(helpers.get_vcs_link())
@pytest.mark.xdist_split("governance")
@pytest.mark.long
@pytest.mark.team_plutus
@pytest.mark.upgrade_step1
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ markers = [
"smash: test(s) for node + optionaly cardano-smash",
"testnets: test(s) can run on public testnets, like Preview",
"long: test(s) run for a long time on local testnets",
"xdist_group: group test(s) onto a single xdist worker",
"xdist_split: spread tests sharing a heavy resource across xdist workers",
"smoke: fast test(s) under 1 minute",
"upgrade_step1: test(s) for upgrade testing in step1",
"upgrade_step2: test(s) for upgrade testing in step2",
Expand Down
Loading