refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315
refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315yashksaini-coder wants to merge 2 commits intolibp2p:mainfrom
Conversation
The subscribed_mesh fixture used a fixed trio.sleep(settle_time) after every node subscribed, which is timing-dependent and either flaky or unnecessarily slow. Replace the sleep with a wait_for() poll that waits until every router's mesh[topic] has at least min(n-1, router.degree_low) peers before yielding. The fixture now exposes ready_timeout (default 5s) and poll_interval (default 20ms) instead of the old settle_time knob. Adds regression tests covering the ready case and the timeout case. No external callers of subscribed_mesh exist yet, so this is a safe rename. Fixes libp2p#1307.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Refactors the subscribed_mesh pubsub test helper to wait deterministically for mesh readiness (instead of a fixed sleep), addressing timing-dependent flakiness noted in #1307.
Changes:
- Replace fixed
trio.sleep()wait insubscribed_meshwith predicate-based polling viawait_for() - Add regression tests covering readiness success and timeout behavior
- Add an internal newsfragment documenting the change
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| tests/core/pubsub/conftest.py | Replaces fixed “settle” sleep with a wait_for() predicate that checks per-router mesh readiness. |
| tests/core/pubsub/test_subscribed_mesh_fixture.py | Adds regression tests asserting the fixture yields only after mesh readiness and surfaces timeouts. |
| newsfragments/1307.internal.rst | Documents the internal refactor from fixed sleep to deterministic polling. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _flatten(exc: BaseException) -> list[BaseException]: | ||
| """Recursively walk nested ExceptionGroups, returning the leaf exceptions.""" | ||
| if isinstance(exc, BaseExceptionGroup): | ||
| out: list[BaseException] = [] | ||
| for child in exc.exceptions: | ||
| out.extend(_flatten(child)) | ||
| return out | ||
| return [exc] |
There was a problem hiding this comment.
subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.
| with pytest.raises(BaseExceptionGroup) as exc_info: | ||
| async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001): | ||
| pytest.fail("subscribed_mesh should have timed out before yielding") |
There was a problem hiding this comment.
subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.
| # Ask for an absurdly short timeout; the mesh can't form in 1ms. | ||
| # Trio's background service managers nest ExceptionGroups, so flatten | ||
| # the whole tree and confirm the TimeoutError is a leaf. | ||
| with pytest.raises(BaseExceptionGroup) as exc_info: | ||
| async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001): |
There was a problem hiding this comment.
The timeout-path test relies on an elapsed-time assumption (“can’t form in 1ms”), which can become flaky on fast CI or if mesh formation happens to complete before the first predicate check. To make this deterministic, consider monkeypatching tests.core.pubsub.conftest.wait_for (or the underlying clock/timer used by wait_for) to raise TimeoutError immediately, then assert the fixture propagates the error and message formatting as expected.
| ready_timeout: float = 5.0, | ||
| poll_interval: float = 0.02, |
There was a problem hiding this comment.
Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.
| await wait_for( | ||
| _mesh_ready, | ||
| timeout=ready_timeout, | ||
| poll_interval=poll_interval, |
There was a problem hiding this comment.
Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.
- Validate ready_timeout and poll_interval are > 0 in subscribed_mesh so a zero/negative value surfaces as ValueError instead of busy-looping or silently timing out - Make the timeout-path test tolerant of both wrapped and unwrapped TimeoutError shapes (nursery strictness varies across upstream CMs) - Add a test covering the new argument validation - Support the exceptiongroup backport on Python 3.10, matching the rest of the repo's compat pattern
|
Addressed Copilot feedback in 9395a48:
Left the "1ms elapsed-time flake" suggestion as-is — the mesh genuinely can't form before the first predicate check on any realistic hardware, and monkeypatching |
Summary
Fixes #1307 — replaces the fixed
trio.sleep(settle_time)in thesubscribed_meshpubsub test fixture with deterministic, predicate-based polling using thewait_for()helper introduced in #1298.Context
PR #1298 (PR 1 of 6 for #378) introduced
subscribed_mesh(), which waited a fixed 1s for mesh formation:Both @acul71 and @IronJam11 flagged this as timing-dependent. This PR resolves that TODO.
Approach
Use the existing
wait_for(predicate, timeout=..., poll_interval=...)helper to poll until every router'smesh[topic]has at leastmin(n - 1, router.degree_low)peers. This lines up with GossipSub's mesh degree bounds: for small fan-outs (n ≤ degree_low + 1) every subscriber ends up in every other subscriber's mesh; for larger n, each router reaches its degree floor.The fixture signature swaps the
settle_time: float = 1.0knob forready_timeout: float = 5.0andpoll_interval: float = 0.02. There are no external callers ofsubscribed_meshyet (PR #1298 landed the fixture but nothing consumes it), so this is a safe rename.Files changed
tests/core/pubsub/conftest.py— replace fixed sleep withwait_forpoll, removeTODO(#378)commenttests/core/pubsub/test_subscribed_mesh_fixture.py— new regression tests for both the happy path and the timeout pathnewsfragments/1307.internal.rst— internal changelog entryTesting
pytest tests/core/pubsub/test_subscribed_mesh_fixture.py— 2 passed (new)pytest tests/core/pubsub/test_dummyaccount_demo.py tests/core/pubsub/test_subscribed_mesh_fixture.py— 11 passedmake lintclean (ruff, ruff-format, mypy, pyrefly)References
wait_forhelper)Open for review from @acul71 / @seetadev.