Skip to content

refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315

Open
yashksaini-coder wants to merge 2 commits intolibp2p:mainfrom
yashksaini-coder:refactor/1307-subscribed-mesh-predicate
Open

refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture#1315
yashksaini-coder wants to merge 2 commits intolibp2p:mainfrom
yashksaini-coder:refactor/1307-subscribed-mesh-predicate

Conversation

@yashksaini-coder
Copy link
Copy Markdown
Contributor

Summary

Fixes #1307 — replaces the fixed trio.sleep(settle_time) in the subscribed_mesh pubsub test fixture with deterministic, predicate-based polling using the wait_for() helper introduced in #1298.

Context

PR #1298 (PR 1 of 6 for #378) introduced subscribed_mesh(), which waited a fixed 1s for mesh formation:

# tests/core/pubsub/conftest.py — before
for ps in harness.pubsubs:
    await ps.subscribe(topic)
# TODO(#378): replace fixed sleep with predicate-based mesh-ready polling
await trio.sleep(settle_time)
yield harness

Both @acul71 and @IronJam11 flagged this as timing-dependent. This PR resolves that TODO.

Approach

Use the existing wait_for(predicate, timeout=..., poll_interval=...) helper to poll until every router's mesh[topic] has at least min(n - 1, router.degree_low) peers. This lines up with GossipSub's mesh degree bounds: for small fan-outs (n ≤ degree_low + 1) every subscriber ends up in every other subscriber's mesh; for larger n, each router reaches its degree floor.

def _mesh_ready() -> bool:
    for router in routers:
        expected = min(n - 1, router.degree_low)
        if len(router.mesh.get(topic, set())) < expected:
            return False
    return True

await wait_for(_mesh_ready, timeout=ready_timeout, poll_interval=poll_interval, ...)

The fixture signature swaps the settle_time: float = 1.0 knob for ready_timeout: float = 5.0 and poll_interval: float = 0.02. There are no external callers of subscribed_mesh yet (PR #1298 landed the fixture but nothing consumes it), so this is a safe rename.

Files changed

  • tests/core/pubsub/conftest.py — replace fixed sleep with wait_for poll, remove TODO(#378) comment
  • tests/core/pubsub/test_subscribed_mesh_fixture.py — new regression tests for both the happy path and the timeout path
  • newsfragments/1307.internal.rst — internal changelog entry

Testing

  • pytest tests/core/pubsub/test_subscribed_mesh_fixture.py — 2 passed (new)
  • pytest tests/core/pubsub/test_dummyaccount_demo.py tests/core/pubsub/test_subscribed_mesh_fixture.py — 11 passed
  • make lint clean (ruff, ruff-format, mypy, pyrefly)

References

Open for review from @acul71 / @seetadev.

The subscribed_mesh fixture used a fixed trio.sleep(settle_time) after
every node subscribed, which is timing-dependent and either flaky or
unnecessarily slow. Replace the sleep with a wait_for() poll that waits
until every router's mesh[topic] has at least min(n-1, router.degree_low)
peers before yielding.

The fixture now exposes ready_timeout (default 5s) and poll_interval
(default 20ms) instead of the old settle_time knob. Adds regression tests
covering the ready case and the timeout case. No external callers of
subscribed_mesh exist yet, so this is a safe rename.

Fixes libp2p#1307.
Copilot AI review requested due to automatic review settings April 22, 2026 17:27
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Refactors the subscribed_mesh pubsub test helper to wait deterministically for mesh readiness (instead of a fixed sleep), addressing timing-dependent flakiness noted in #1307.

Changes:

  • Replace fixed trio.sleep() wait in subscribed_mesh with predicate-based polling via wait_for()
  • Add regression tests covering readiness success and timeout behavior
  • Add an internal newsfragment documenting the change

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
tests/core/pubsub/conftest.py Replaces fixed “settle” sleep with a wait_for() predicate that checks per-router mesh readiness.
tests/core/pubsub/test_subscribed_mesh_fixture.py Adds regression tests asserting the fixture yields only after mesh readiness and surfaces timeouts.
newsfragments/1307.internal.rst Documents the internal refactor from fixed sleep to deterministic polling.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +32 to +39
def _flatten(exc: BaseException) -> list[BaseException]:
"""Recursively walk nested ExceptionGroups, returning the leaf exceptions."""
if isinstance(exc, BaseExceptionGroup):
out: list[BaseException] = []
for child in exc.exceptions:
out.extend(_flatten(child))
return out
return [exc]
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.

Copilot uses AI. Check for mistakes.
Comment on lines +48 to +50
with pytest.raises(BaseExceptionGroup) as exc_info:
async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001):
pytest.fail("subscribed_mesh should have timed out before yielding")
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribed_mesh(..., ready_timeout=...) timing out may raise TimeoutError directly (not necessarily wrapped in a BaseExceptionGroup), depending on how wait_for() and the surrounding Trio context are implemented. This test is currently too strict and can fail even when behavior is correct. Recommend accepting either: (a) a direct TimeoutError, or (b) an exception group containing a TimeoutError leaf. Also note that referencing BaseExceptionGroup hard-requires Python 3.11+ at import time; if the project test matrix includes <3.11, this module will fail collection—consider guarding with pytest.importorskip, a compatibility shim, or structuring the test to not reference BaseExceptionGroup unconditionally.

Copilot uses AI. Check for mistakes.
Comment on lines +45 to +49
# Ask for an absurdly short timeout; the mesh can't form in 1ms.
# Trio's background service managers nest ExceptionGroups, so flatten
# the whole tree and confirm the TimeoutError is a leaf.
with pytest.raises(BaseExceptionGroup) as exc_info:
async with subscribed_mesh(TOPIC, 3, ready_timeout=0.001):
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout-path test relies on an elapsed-time assumption (“can’t form in 1ms”), which can become flaky on fast CI or if mesh formation happens to complete before the first predicate check. To make this deterministic, consider monkeypatching tests.core.pubsub.conftest.wait_for (or the underlying clock/timer used by wait_for) to raise TimeoutError immediately, then assert the fixture propagates the error and message formatting as expected.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +96
ready_timeout: float = 5.0,
poll_interval: float = 0.02,
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +123
await wait_for(
_mesh_ready,
timeout=ready_timeout,
poll_interval=poll_interval,
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll_interval is now a public knob on this helper, it should be validated (e.g., must be > 0) to avoid accidental busy-looping or surprising behavior if a caller passes 0 or a negative value. A small explicit check with a clear error message would make failures easier to diagnose.

Copilot uses AI. Check for mistakes.
- Validate ready_timeout and poll_interval are > 0 in subscribed_mesh
  so a zero/negative value surfaces as ValueError instead of busy-looping
  or silently timing out
- Make the timeout-path test tolerant of both wrapped and unwrapped
  TimeoutError shapes (nursery strictness varies across upstream CMs)
- Add a test covering the new argument validation
- Support the exceptiongroup backport on Python 3.10, matching the rest
  of the repo's compat pattern
@yashksaini-coder
Copy link
Copy Markdown
Contributor Author

Addressed Copilot feedback in 9395a48:

  • subscribed_mesh now validates ready_timeout > 0 and poll_interval > 0, raising ValueError on bad input instead of silently busy-looping or timing out immediately. Added a test covering this.
  • Relaxed the timeout-path test to accept both raw TimeoutError and BaseExceptionGroup-wrapped forms (since nursery strictness varies across the upstream context-manager stack). The _flatten helper walks either shape.
  • Imported BaseExceptionGroup via the exceptiongroup backport for Python 3.10, matching the existing pattern in libp2p/tools/anyio_service/*.

Left the "1ms elapsed-time flake" suggestion as-is — the mesh genuinely can't form before the first predicate check on any realistic hardware, and monkeypatching wait_for would test the mock more than the code. Happy to revisit if it flakes in CI.

@yashksaini-coder yashksaini-coder changed the title refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture (Fixes #1307) refactor(pubsub-test): predicate-based readiness in subscribed_mesh fixture Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

refactor: replace fixed sleeps in pubsub test fixtures with predicate-based polling

2 participants