Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions newsfragments/1353.internal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hardened pubsub dummyaccount topology tests against intermittent CI failures by waiting for event-driven network readiness instead of fixed delays.
7 changes: 5 additions & 2 deletions tests/core/pubsub/test_dummyaccount_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DummyAccountNode,
)
from tests.utils.pubsub.wait import (
wait_for_adjacency_ready,
wait_for_convergence,
)

Expand Down Expand Up @@ -35,8 +36,7 @@ async def perform_test(num_nodes, adjacency_map, action_func, assertion_func):
dummy_nodes[target_num].host,
)

# Allow time for network creation to take place
await trio.sleep(0.25)
await wait_for_adjacency_ready(dummy_nodes, adjacency_map, timeout=10.0)

# Perform action function
await action_func(dummy_nodes)
Expand Down Expand Up @@ -90,6 +90,7 @@ def assertion_func(dummy_node):
await perform_test(num_nodes, adj_map, action_func, assertion_func)


@pytest.mark.flaky(reruns=3, reruns_delay=2)
async def test_simple_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
Expand All @@ -103,6 +104,7 @@ def assertion_func(dummy_node):
await perform_test(num_nodes, adj_map, action_func, assertion_func)


@pytest.mark.flaky(reruns=3, reruns_delay=2)
async def test_set_then_send_from_root_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
Expand All @@ -127,6 +129,7 @@ def assertion_func(dummy_node):
await perform_test(num_nodes, adj_map, action_func, assertion_func)


@pytest.mark.flaky(reruns=3, reruns_delay=2)
async def test_set_then_send_from_different_leafs_seven_nodes_tree_topography():
num_nodes = 7
adj_map = {0: [1, 2], 1: [3, 4], 2: [5, 6]}
Expand Down
58 changes: 58 additions & 0 deletions tests/utils/pubsub/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
from __future__ import annotations

from collections.abc import Callable
from functools import partial
import inspect
import logging
from typing import TYPE_CHECKING

import trio

from tests.utils.pubsub.dummy_account_node import CRYPTO_TOPIC

if TYPE_CHECKING:
from tests.utils.pubsub.dummy_account_node import DummyAccountNode

Expand Down Expand Up @@ -54,6 +57,61 @@ async def wait_for(
await trio.sleep(poll_interval)


async def _wait_for_adjacency_edge_ready(
nodes: tuple[DummyAccountNode, ...],
src: int,
tgt: int,
topic: str,
timeout: float,
) -> None:
src_node = nodes[src]
tgt_node = nodes[tgt]
src_id = src_node.host.get_id()
tgt_id = tgt_node.host.get_id()

await src_node.pubsub.wait_for_peer(tgt_id, timeout=timeout)
await tgt_node.pubsub.wait_for_peer(src_id, timeout=timeout)
await src_node.pubsub.wait_for_subscription(tgt_id, topic, timeout=timeout)
await tgt_node.pubsub.wait_for_subscription(src_id, topic, timeout=timeout)


async def wait_for_adjacency_ready(
nodes: tuple[DummyAccountNode, ...],
adjacency_map: dict[int, list[int]],
*,
topic: str = CRYPTO_TOPIC,
timeout: float = 10.0,
) -> None:
"""
Wait until pubsub peers and topic subscriptions are ready on every edge.

For each directed edge in *adjacency_map*, blocks until both endpoints
have pubsub streams and see each other's subscription on *topic*.
Uses event-based ``wait_for_peer`` / ``wait_for_subscription`` instead of
fixed sleeps.
"""
try:
with trio.fail_after(timeout):
async with trio.open_nursery() as nursery:
for src, targets in adjacency_map.items():
for tgt in targets:
nursery.start_soon(
partial(
_wait_for_adjacency_edge_ready,
nodes,
src,
tgt,
topic,
timeout,
)
)
except trio.TooSlowError as exc:
raise TimeoutError(
f"Adjacency readiness timed out after {timeout:.2f}s "
f"for topic {topic!r} with map {adjacency_map}"
) from exc


async def wait_for_convergence(
nodes: tuple[DummyAccountNode, ...],
check: Callable[[DummyAccountNode], bool],
Expand Down
Loading