Skip to content

Commit 4b1efe2

Browse files
fix: cancel periodic status check task when process raises ConstraintError (#228)
After a process fails (e.g. via `ConstraintError`), the `_periodic_status_check` background task on waiting components was not being cancelled — causing it to log `"State backend not connected, skipping status check"` every 20s indefinitely after process teardown. # Summary Python's `asyncio.wait()` does **not** propagate cancellation to its child tasks when the outer task is cancelled. When `LocalProcess.run()`'s `TaskGroup` cancelled component B's task due to component A raising `ConstraintError`, the `CancelledError` raised in `_io_read_with_status_check` left the `_periodic_status_check` task orphaned in the event loop. # Changes - **`plugboard/component/component.py`**: In `_io_read_with_status_check`, wrap `asyncio.wait()` in `try/except BaseException` and explicitly cancel `status_task` before re-raising. `io_task` is intentionally left uncancelled — cancelling it leaves stale entries in `IOController._read_tasks` that break subsequent reads. ```python # Before done, pending = await asyncio.wait( ( asyncio.create_task(self._periodic_status_check()), asyncio.create_task(self.io.read(timeout=read_timeout)), ), return_when=asyncio.FIRST_COMPLETED, ) # After status_task = asyncio.create_task(self._periodic_status_check()) io_task = asyncio.create_task(self.io.read(timeout=read_timeout)) try: done, pending = await asyncio.wait( (status_task, io_task), return_when=asyncio.FIRST_COMPLETED, ) except BaseException: status_task.cancel() raise ``` - **`tests/integration/test_process_with_components_run.py`**: Added `test_constraint_error_stops_background_status_check` — patches `IO_READ_TIMEOUT_SECONDS` to 0.1s, runs a process where the producer raises `ConstraintError`, and asserts the consumer's background status check task count does not increase after the process fails. > [!WARNING] > > <details> > <summary>Firewall rules blocked me from connecting to one or more addresses (expand for details)</summary> > > #### I tried to connect to the following addresses, but was blocked by firewall rules: > > - `astral.sh` > - Triggering command: `/usr/bin/curl curl -LsSf REDACTED` (dns block) > - `metadata.google.internal` > - Triggering command: `/usr/bin/python3 /usr/bin/python3 /home/REDACTED/.local/lib/python3.12/site-packages/ray/dashboard/dashboard.py --host=127.0.0.1 --port=8265 --port-retries=50 --temp-dir=/tmp/ray --log-dir=/tmp/ray/session_2026-03-10_12-24-30_696966_4740/logs --session-dir=/tmp/ray/session_2026-03-10_12-24-30_696966_4740 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 --gcs-address=127.0.0.1:41467 --cluster-id-hex=315478065acf40787bae5a5a2d085e45ba453b1cdf3b6bb9491c8a13 --node-ip-address=127.0.0.1 --stdout-filepath=/tmp/ray/session_2026-03-10_12-24-30_696966_4740/logs/dashboard.out --stderr-filepath=/tmp/ray/session_2026-03-10_12-24-30_696966_4740/logs/dashboard.err de/node/bin/bash` (dns block) > - Triggering command: `/usr/bin/python3 /usr/bin/python3 /home/REDACTED/.local/lib/python3.12/site-packages/ray/dashboard/dashboard.py --host=127.0.0.1 --port=8265 --port-retries=50 --temp-dir=/tmp/ray --log-dir=/tmp/ray/session_2026-03-10_12-25-36_456083_5403/logs --session-dir=/tmp/ray/session_2026-03-10_12-25-36_456083_5403 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 --gcs-address=127.0.0.1:36975 --cluster-id-hex=d4d2c57ba441af7a721e338a987f2c67a082d7e7dabcda1628e8b01c --node-ip-address=127.0.0.1 --stdout-filepath=/tmp/ray/session_2026-03-10_12-25-36_456083_5403/logs/dashboard.out --stderr-filepath=/tmp/ray/session_2026-03-10_12-25-36_456083_5403/logs/dashboard.err /home/REDACTED/.config/composer/vendor/bin/git` (dns block) > - Triggering command: `/usr/bin/python3 /usr/bin/python3 /home/REDACTED/.local/lib/python3.12/site-packages/ray/dashboard/dashboard.py --host=127.0.0.1 --port=8265 --port-retries=50 --temp-dir=/tmp/ray --log-dir=/tmp/ray/session_2026-03-10_12-28-25_364935_6030/logs --session-dir=/tmp/ray/session_2026-03-10_12-28-25_364935_6030 --logging-rotate-bytes=536870912 --logging-rotate-backup-count=5 --gcs-address=127.0.0.1:36649 --cluster-id-hex=0a7c71f1c8306fa045621fa679f13bfe448ffad9bf7a61dfae288243 --node-ip-address=127.0.0.1 --stdout-filepath=/tmp/ray/session_2026-03-10_12-28-25_364935_6030/logs/dashboard.out --stderr-filepath=/tmp/ray/session_2026-03-10_12-28-25_364935_6030/logs/dashboard.err sh credential.usernbash` (dns block) > > If you need me to access, download, or install something from one of these locations, you can either: > > - Configure [Actions setup steps](https://gh.io/copilot/actions-setup-steps) to set up my environment, which run before the firewall is enabled > - Add the appropriate URLs or hosts to the custom allowlist in this repository's [Copilot coding agent settings](https://github.com/plugboard-dev/plugboard/settings/copilot/coding_agent) (admins only) > > </details> <!-- START COPILOT ORIGINAL PROMPT --> <details> <summary>Original prompt</summary> > > ---- > > *This section details on the original issue you should resolve* > > <issue_title>bug: Status check keeps running after process has raised ConstraintError</issue_title> > <issue_description>### Summary > > Consider the following MRE: > ```python > from plugboard.component import Component, IOController as IO > from plugboard.process import LocalProcess > from plugboard.connector import AsyncioConnector > from plugboard.schemas import ConnectorSpec > from plugboard.exceptions import ConstraintError > > class A(Component): > io = IO(outputs=["out_1"]) > > async def step(self) -> None: > raise ConstraintError("This is a constraint error from component A.") > > class B(Component): > io = IO(inputs=["in_1"]) > > async def step(self) -> None: > pass > > process = LocalProcess( > components=[A(name="component-a"), B(name="component-b")], > connectors=[ > AsyncioConnector( > spec=ConnectorSpec(source="component-a.out_1", target="component-b.in_1"), > ), > ], > ) > async with process: > await process.run() > ``` > > This will raise the `ConstraintError` as expected. However, it will continue to emit log messages afterwards every 20s, for example: > ``` > {"cls":"B","name":"component-b","job_id":"Job_I6UAzJs5zhQGCfCi","event":"State backend not connected, skipping status check","level":"warning","timestamp":"2026-03-10T12:02:38.077249Z","module":"component","process":17506} > ``` > > The process should instead be cleaned up after the exception was raised. > > ### Version Information > > ```text > Plugboard version: 0.6.0 > Platform: Linux-6.6.84.1-microsoft-standard-WSL2-x86_64-with-glibc2.39 > Python version: 3.12.11 > ```</issue_description> > > ## Comments on the Issue (you are @copilot in this section) > > <comments> > </comments> > </details> <!-- START COPILOT CODING AGENT SUFFIX --> - Fixes #227 <!-- START COPILOT CODING AGENT TIPS --> --- 💬 We'd love your input! Share your thoughts on Copilot coding agent in our [2 minute survey](https://gh.io/copilot-coding-agent-survey). --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com>
1 parent a55de20 commit 4b1efe2

File tree

2 files changed

+91
-8
lines changed

2 files changed

+91
-8
lines changed

plugboard/component/component.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -395,13 +395,16 @@ async def _io_read_with_status_check(self) -> None:
395395
otherwise another read attempt is made.
396396
"""
397397
read_timeout = 1e-3 if self._has_outputs and not self._has_inputs else None
398-
done, pending = await asyncio.wait(
399-
(
400-
asyncio.create_task(self._periodic_status_check()),
401-
asyncio.create_task(self.io.read(timeout=read_timeout)),
402-
),
403-
return_when=asyncio.FIRST_COMPLETED,
404-
)
398+
status_task = asyncio.create_task(self._periodic_status_check())
399+
io_task = asyncio.create_task(self.io.read(timeout=read_timeout))
400+
try:
401+
done, pending = await asyncio.wait(
402+
(status_task, io_task),
403+
return_when=asyncio.FIRST_COMPLETED,
404+
)
405+
except BaseException:
406+
status_task.cancel()
407+
raise
405408
for task in pending:
406409
task.cancel()
407410
for task in done:

tests/integration/test_process_with_components_run.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pathlib import Path
66
from tempfile import NamedTemporaryFile
77
import typing as _t
8+
from unittest.mock import patch
89

910
from aiofile import async_open
1011
from pydantic import BaseModel
@@ -21,7 +22,7 @@
2122
RayConnector,
2223
)
2324
from plugboard.events import Event
24-
from plugboard.exceptions import NotInitialisedError, ProcessStatusError
25+
from plugboard.exceptions import ConstraintError, NotInitialisedError, ProcessStatusError
2526
from plugboard.process import LocalProcess, Process, RayProcess
2627
from plugboard.schemas import ConnectorSpec, Status
2728
from tests.conftest import ComponentTestHelper, zmq_connector_cls
@@ -456,3 +457,82 @@ async def test_event_driven_process_shutdown(
456457
assert actuator.actions == [f"do_{i}" for i in range(ticks)]
457458

458459
await process.destroy()
460+
461+
462+
_SHORT_TIMEOUT = 0.1
463+
464+
465+
class ConstraintErrorComponent(ComponentTestHelper):
466+
"""Component that raises a ConstraintError on the first step."""
467+
468+
io = IO(outputs=["out_1"])
469+
470+
async def step(self) -> None:
471+
raise ConstraintError("Constraint violated")
472+
473+
474+
class BackgroundTaskTracker(ComponentTestHelper):
475+
"""Component that counts how many times _periodic_status_check loops after process ends.
476+
477+
Overrides _periodic_status_check without calling super() to avoid early termination
478+
via ProcessStatusError, so we can detect if the task leaks after process failure.
479+
"""
480+
481+
io = IO(inputs=["in_1"])
482+
exports = ["background_run_count"]
483+
484+
def __init__(self, *args: _t.Any, **kwargs: _t.Any) -> None:
485+
super().__init__(*args, **kwargs)
486+
self.background_run_count: int = 0
487+
488+
async def step(self) -> None:
489+
await super().step()
490+
491+
async def _periodic_status_check(self) -> None:
492+
while True:
493+
await asyncio.sleep(_SHORT_TIMEOUT)
494+
self.background_run_count += 1
495+
496+
497+
@pytest.mark.asyncio
498+
async def test_constraint_error_stops_background_status_check() -> None:
499+
"""Test that background status check tasks are cancelled when ConstraintError is raised.
500+
501+
Regression test for: a bug where the periodic status check task was not cancelled
502+
when the process was cancelled due to a ConstraintError raised by another component.
503+
"""
504+
with patch("plugboard.component.component.IO_READ_TIMEOUT_SECONDS", _SHORT_TIMEOUT):
505+
producer = ConstraintErrorComponent(name="producer")
506+
consumer = BackgroundTaskTracker(name="consumer")
507+
508+
connector = AsyncioConnector(
509+
spec=ConnectorSpec(source="producer.out_1", target="consumer.in_1")
510+
)
511+
process = LocalProcess(
512+
components=[producer, consumer],
513+
connectors=[connector],
514+
)
515+
516+
await process.init()
517+
518+
with pytest.raises(ExceptionGroup) as exc_info:
519+
await process.run()
520+
521+
# Verify the ConstraintError propagated
522+
exceptions = exc_info.value.exceptions
523+
assert any(isinstance(e, ConstraintError) for e in exceptions)
524+
525+
# Record background task count immediately after the process ends
526+
count_after_failure = consumer.background_run_count
527+
528+
# Wait longer than the patched IO_READ_TIMEOUT_SECONDS to ensure any leaked
529+
# background tasks would have had time to run
530+
await asyncio.sleep(_SHORT_TIMEOUT * 5)
531+
532+
# Background tasks should NOT have run again after the process ended
533+
assert consumer.background_run_count == count_after_failure, (
534+
f"Background status check ran {consumer.background_run_count - count_after_failure} "
535+
f"extra time(s) after process ended, indicating a task leak"
536+
)
537+
538+
await process.destroy()

0 commit comments

Comments
 (0)