|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
5 | 5 | from collections.abc import Callable |
| 6 | +from functools import partial |
6 | 7 | import inspect |
7 | 8 | import logging |
8 | 9 | from typing import TYPE_CHECKING |
9 | 10 |
|
10 | 11 | import trio |
11 | 12 |
|
| 13 | +from tests.utils.pubsub.dummy_account_node import CRYPTO_TOPIC |
| 14 | + |
12 | 15 | if TYPE_CHECKING: |
13 | 16 | from tests.utils.pubsub.dummy_account_node import DummyAccountNode |
14 | 17 |
|
@@ -54,6 +57,61 @@ async def wait_for( |
54 | 57 | await trio.sleep(poll_interval) |
55 | 58 |
|
56 | 59 |
|
| 60 | +async def _wait_for_adjacency_edge_ready( |
| 61 | + nodes: tuple[DummyAccountNode, ...], |
| 62 | + src: int, |
| 63 | + tgt: int, |
| 64 | + topic: str, |
| 65 | + timeout: float, |
| 66 | +) -> None: |
| 67 | + src_node = nodes[src] |
| 68 | + tgt_node = nodes[tgt] |
| 69 | + src_id = src_node.host.get_id() |
| 70 | + tgt_id = tgt_node.host.get_id() |
| 71 | + |
| 72 | + await src_node.pubsub.wait_for_peer(tgt_id, timeout=timeout) |
| 73 | + await tgt_node.pubsub.wait_for_peer(src_id, timeout=timeout) |
| 74 | + await src_node.pubsub.wait_for_subscription(tgt_id, topic, timeout=timeout) |
| 75 | + await tgt_node.pubsub.wait_for_subscription(src_id, topic, timeout=timeout) |
| 76 | + |
| 77 | + |
| 78 | +async def wait_for_adjacency_ready( |
| 79 | + nodes: tuple[DummyAccountNode, ...], |
| 80 | + adjacency_map: dict[int, list[int]], |
| 81 | + *, |
| 82 | + topic: str = CRYPTO_TOPIC, |
| 83 | + timeout: float = 10.0, |
| 84 | +) -> None: |
| 85 | + """ |
| 86 | + Wait until pubsub peers and topic subscriptions are ready on every edge. |
| 87 | +
|
| 88 | + For each directed edge in *adjacency_map*, blocks until both endpoints |
| 89 | + have pubsub streams and see each other's subscription on *topic*. |
| 90 | + Uses event-based ``wait_for_peer`` / ``wait_for_subscription`` instead of |
| 91 | + fixed sleeps. |
| 92 | + """ |
| 93 | + try: |
| 94 | + with trio.fail_after(timeout): |
| 95 | + async with trio.open_nursery() as nursery: |
| 96 | + for src, targets in adjacency_map.items(): |
| 97 | + for tgt in targets: |
| 98 | + nursery.start_soon( |
| 99 | + partial( |
| 100 | + _wait_for_adjacency_edge_ready, |
| 101 | + nodes, |
| 102 | + src, |
| 103 | + tgt, |
| 104 | + topic, |
| 105 | + timeout, |
| 106 | + ) |
| 107 | + ) |
| 108 | + except trio.TooSlowError as exc: |
| 109 | + raise TimeoutError( |
| 110 | + f"Adjacency readiness timed out after {timeout:.2f}s " |
| 111 | + f"for topic {topic!r} with map {adjacency_map}" |
| 112 | + ) from exc |
| 113 | + |
| 114 | + |
57 | 115 | async def wait_for_convergence( |
58 | 116 | nodes: tuple[DummyAccountNode, ...], |
59 | 117 | check: Callable[[DummyAccountNode], bool], |
|
0 commit comments