Skip to content

Commit 9fb392f

Browse files
Merge pull request #33 from rticommunity/dev
Harden exception/cleanup paths and fix QoS wildcard shadowing
2 parents d4d72ab + 3b370ae commit 9fb392f

10 files changed

Lines changed: 293 additions & 10 deletions

.github/copilot-instructions.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@
55
- **Always** use the pre-existing venv: `source python/.venv/bin/activate`
66
- The venv has `rti.connext 7.6.0` on Python 3.8. Never use the system Python (has 7.3.1).
77
- Run tests from inside the venv: `cd python && source .venv/bin/activate && python -m pytest tests/ -x -q`
8+
- Before running tests or any DDS code, ensure RTI runtime variables are exported in the same shell session:
9+
- `export NDDSHOME=/path/to/rti_connext_dds-7.6.0`
10+
- `export PATH="$NDDSHOME/bin:$PATH"`
11+
- `export LD_LIBRARY_PATH="$NDDSHOME/lib/${RTI_ARCH:-x64Linux4gcc7.3.0}:$LD_LIBRARY_PATH"`
12+
- `export RTI_LICENSE_FILE="${RTI_LICENSE_FILE:-$NDDSHOME/rti_license.dat}"`
13+
- If `NDDSHOME`/license is missing, `dds.DomainParticipant` creation can fail and tests will error before execution.
14+
- Before running any test suite, verify the wildcard USTM QoS assignment is commented out in `qos/umaa_qos_lib.xml`:
15+
- `<!-- <datawriter_qos topic_filter="*" base_name="USTMQoS" /> -->`
16+
- `<!-- <datareader_qos topic_filter="*" base_name="USTMQoS" /> -->`
17+
- Rationale: topic filters are first-match-wins; uncommenting the wildcard USTM lines shadows all specific QoS mappings and breaks QoS profile tests.
818

919
## Project Layout
1020

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Exception and Cleanup Findings (2026-05-18)
2+
3+
This document captures a focused review of exception handling and shutdown/cleanup behavior in the Python SDK runtime lifecycle.
4+
5+
## Scope
6+
7+
- Command provider session lifecycle
8+
- Command consumer session lifecycle
9+
- Context startup/shutdown orchestration
10+
- Hook exception isolation
11+
12+
## Findings Already Handled
13+
14+
### 1. Consumer hook exception isolation is robust
15+
16+
- `on_status`, `on_ack`, `on_exec_status`, and `on_terminal` hook exceptions are caught and logged.
17+
- Consumer session cleanup still completes even if `on_terminal` raises.
18+
- Existing tests cover this behavior (`test_consumer_hook_isolation.py`).
19+
20+
### 2. Shutdown is idempotent
21+
22+
- `DDSContext.shutdown()` uses `_is_shutdown` guard and returns on repeated calls.
23+
24+
### 3. Shutdown order is correct for dispatcher safety
25+
26+
- Dispatcher is stopped before task cancellation and DDS entity teardown.
27+
- Service-level `close()` is used for logical cleanup only.
28+
29+
### 4. Context-level service close failures are contained
30+
31+
- `DDSContext.shutdown()` catches/logs service `close()` exceptions and continues teardown.
32+
33+
## Open Findings (Not Yet Fully Handled)
34+
35+
### 1. Provider `on_terminal` exceptions can skip provider session cleanup
36+
37+
In `CommandProviderSession.run()` finalization, `await self._provider.on_terminal(self)` executes before:
38+
39+
- provider instance disposal
40+
- active-session map removal
41+
42+
If `on_terminal()` raises, disposal and map cleanup may be skipped for that session.
43+
44+
Risk:
45+
46+
- lingering `_active_sessions` entries
47+
- missed instance disposal
48+
- shutdown path inconsistencies under hook failure
49+
50+
### 2. `run_until_shutdown()` can double-start already-started services
51+
52+
`DDSContext.run_until_shutdown()` currently creates a new task for every service exposing `_run()` without checking whether a prior `start()` already created a live task.
53+
54+
Risk:
55+
56+
- duplicate reader loops for services that were manually started
57+
- hard-to-debug duplicated processing
58+
59+
### 3. Provider `close()` can abort early on non-cancel exception from `_run` task await
60+
61+
`CommandProvider.close()` cancels `_task` and only handles `asyncio.CancelledError` when awaiting it.
62+
If awaiting `_task` raises a different exception, active-session fail/cleanup logic below may not execute in that `close()` call.
63+
64+
Risk:
65+
66+
- incomplete fail-on-shutdown behavior for active sessions
67+
- reduced cleanup resilience after reader-loop failure
68+
69+
## Suggested Fixes
70+
71+
### A. Harden provider session finalization
72+
73+
Wrap provider `on_terminal` in `try/except` in the `finally` block, and always run disposal and `_active_sessions.pop(...)` afterward.
74+
75+
Suggested shape:
76+
77+
1. `try: await on_terminal(...)`
78+
2. `except Exception: log`
79+
3. always dispose instances
80+
4. always remove session from active map
81+
82+
### B. Prevent double-start in `run_until_shutdown()`
83+
84+
Before creating a task for `_run()`, check whether `_task` exists and is still running.
85+
86+
Suggested guard:
87+
88+
- start only when `_task is None` or `_task.done()`
89+
90+
### C. Make provider `close()` resilient to non-cancel task failures
91+
92+
When awaiting canceled `_task`, catch generic exceptions (log and continue) so active sessions are still failed and awaited.
93+
94+
## Test Gaps to Add
95+
96+
1. Provider hook isolation test:
97+
98+
- Provider subclass whose `on_terminal()` raises.
99+
- Assert session still disposes and is removed from `_active_sessions`.
100+
101+
2. Lifecycle double-start guard test:
102+
103+
- Call `service.start()` and then `ctx.run_until_shutdown()`.
104+
- Assert only one `_run()` task loop executes.
105+
106+
3. Provider close resilience test:
107+
108+
- Force `_run()` task to fail with non-cancel exception.
109+
- Assert `close()` still proceeds to fail/await active sessions.
110+
111+
## Priority
112+
113+
- High: provider `on_terminal` finalization hardening
114+
- High: double-start guard in `run_until_shutdown()`
115+
- Medium: provider `close()` robustness for non-cancel task exceptions
116+
117+
## Notes
118+
119+
- Findings are based on code-path verification in runtime implementation, not architecture docs alone.
120+
- Consumer exception isolation is in better shape than provider finalization paths.

python/docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ Key facts:
5353
:caption: Project
5454

5555
changelog
56+
exception-cleanup-findings-2026-05-18
5657

5758

5859
Indices

python/rtiumaapy/command_provider.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ async def close(self) -> None:
254254
await self._task
255255
except asyncio.CancelledError:
256256
pass
257+
except Exception:
258+
_logger.exception(
259+
"Provider %s: _run task failed during close",
260+
self.service_name,
261+
)
257262

258263
# Fail all active sessions
259264
tasks = []

python/rtiumaapy/command_provider_session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,12 @@ async def run(self) -> None:
329329
await self._provider.on_failed(self, e)
330330

331331
finally:
332-
await self._provider.on_terminal(self)
332+
try:
333+
await self._provider.on_terminal(self)
334+
except Exception:
335+
_logger.exception(
336+
"Session %s: provider on_terminal hook error",
337+
self._session_id,
338+
)
333339
self._dispose_provider_instances()
334340
self._provider._active_sessions.pop(self._session_id, None)

python/rtiumaapy/dds_context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ async def run_until_shutdown(self) -> None:
302302
# Start _run() for every registered service
303303
for service in self._registry.values():
304304
if hasattr(service, "_run"):
305-
service._task = asyncio.create_task(service._run())
305+
task: Optional[asyncio.Task] = getattr(service, "_task", None)
306+
if task is None or task.done():
307+
service._task = asyncio.create_task(service._run())
306308

307309
# Wait for shutdown signal
308310
stop = asyncio.Event()

python/tests/test_base_service.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,32 @@ async def close(self) -> None:
4343
self.closed = True
4444

4545

46+
class CountingRunService(BaseService):
47+
"""Service that records how many concurrent _run entries occurred."""
48+
49+
def __init__(self, ctx: DDSContext, service_name: str) -> None:
50+
super().__init__(ctx, service_name)
51+
self.closed = False
52+
self.run_entries = 0
53+
self.duplicate_started = asyncio.Event()
54+
55+
async def _run(self) -> None:
56+
self.run_entries += 1
57+
if self.run_entries > 1:
58+
self.duplicate_started.set()
59+
try:
60+
await asyncio.Event().wait()
61+
except asyncio.CancelledError:
62+
pass
63+
64+
def start(self) -> None:
65+
if not hasattr(self, "_task") or self._task is None or self._task.done():
66+
self._task = asyncio.ensure_future(self._run())
67+
68+
async def close(self) -> None:
69+
self.closed = True
70+
71+
4672
# ═══════════════════════════════════════════════════════════════════════════
4773
# Auto-registration
4874
# ═══════════════════════════════════════════════════════════════════════════
@@ -145,3 +171,28 @@ async def send_signal():
145171
await ctx.run_until_shutdown()
146172
assert svc.ran
147173
assert svc.closed
174+
175+
@pytest.mark.asyncio
176+
async def test_run_until_shutdown_does_not_double_start(
177+
self, qos_file: str,
178+
):
179+
"""If start() already ran, run_until_shutdown() must not spawn another _run task."""
180+
import os
181+
import signal
182+
183+
ctx = DDSContext(domain_id=DEFAULT_DOMAIN_ID, qos_file=qos_file)
184+
svc = CountingRunService(ctx, "RunnerOnce")
185+
svc.start()
186+
187+
loop = asyncio.get_running_loop()
188+
189+
async def send_signal():
190+
await asyncio.sleep(0.1)
191+
loop.call_soon(lambda: os.kill(os.getpid(), signal.SIGINT))
192+
193+
asyncio.create_task(send_signal())
194+
await ctx.run_until_shutdown()
195+
196+
assert svc.run_entries == 1
197+
assert not svc.duplicate_started.is_set()
198+
assert svc.closed

python/tests/test_command_shutdown.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,28 @@ async def on_executing(self, session):
6565
await asyncio.sleep(30)
6666

6767

68+
class TerminalFailProvider(CommandProvider):
69+
def __init__(self, ctx, source_id):
70+
super().__init__(
71+
ctx, "TerminalFailProvider",
72+
command_type=AnchorCommandType,
73+
ack_type=AnchorCommandAckReportType,
74+
status_type=AnchorCommandStatusType,
75+
command_topic=AnchorCommandTypeTopic,
76+
ack_topic=AnchorCommandAckReportTypeTopic,
77+
status_topic=AnchorCommandStatusTypeTopic,
78+
source_id=source_id,
79+
)
80+
self.terminal_calls = 0
81+
82+
async def on_executing(self, session):
83+
return
84+
85+
async def on_terminal(self, session):
86+
self.terminal_calls += 1
87+
raise RuntimeError("terminal hook boom")
88+
89+
6890
class ShutdownConsumer(CommandConsumer):
6991
def __init__(self, ctx, source_id, destination_id):
7092
super().__init__(
@@ -148,6 +170,56 @@ async def test_close_no_active_sessions(self, dds_context: DDSContext):
148170
# Should not raise
149171
await provider.close()
150172

173+
@pytest.mark.asyncio
174+
async def test_provider_on_terminal_error_still_cleans_session(
175+
self, dds_context: DDSContext,
176+
):
177+
"""Provider on_terminal raise should not skip session disposal/pop."""
178+
provider_id = _make_id()
179+
consumer_id = _make_id()
180+
181+
provider = TerminalFailProvider(dds_context, source_id=provider_id)
182+
consumer = ShutdownConsumer(
183+
dds_context, source_id=consumer_id,
184+
destination_id=provider_id)
185+
186+
provider.start()
187+
consumer.start()
188+
189+
await wait_for_match(consumer._command_writer)
190+
await wait_for_match(provider._ack_writer)
191+
await wait_for_match(provider._status_writer)
192+
193+
await consumer.send(AnchorCommandType())
194+
195+
await asyncio.wait_for(
196+
consumer.terminal_event.wait(), timeout=10.0)
197+
198+
# Give provider session finally block a moment to complete.
199+
await asyncio.sleep(0.2)
200+
201+
assert provider.terminal_calls == 1
202+
assert provider._active_sessions == {}
203+
204+
@pytest.mark.asyncio
205+
async def test_close_continues_if_run_task_raises_non_cancel(
206+
self, dds_context: DDSContext,
207+
):
208+
"""Provider close should continue cleanup if canceled _run raises."""
209+
provider_id = _make_id()
210+
provider = SlowProvider(dds_context, source_id=provider_id)
211+
212+
async def bad_run():
213+
try:
214+
await asyncio.Event().wait()
215+
except asyncio.CancelledError as exc:
216+
raise RuntimeError("forced _run failure") from exc
217+
218+
provider._task = asyncio.create_task(bad_run())
219+
220+
# Should not raise; close should tolerate non-cancel task failure.
221+
await provider.close()
222+
151223

152224
class TestConsumerShutdown:
153225
"""Consumer close() during active session (C39)."""

python/tests/test_dds_context.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from rtiumaapy.dds_context import DDSContext
99
from tests.conftest import DEFAULT_DOMAIN_ID, SimpleReport
1010
from rtiumaapy.datamodel.AccelerationReportType import (
11+
UMAA_SA_AccelerationStatus_AccelerationReportType as AccelerationReportType,
1112
UMAA_SA_AccelerationStatus_AccelerationReportTypeTopic as AccelerationReportTypeTopic,
1213
)
1314
from rtiumaapy.datamodel.BatteryReportType import (
@@ -17,10 +18,16 @@
1718
UMAA_SEM_GPSStatus_GPSReportTypeTopic as GPSReportTypeTopic,
1819
)
1920
from rtiumaapy.datamodel.AnchorCommandType import (
21+
UMAA_EO_AnchorControl_AnchorCommandType as AnchorCommandType,
2022
UMAA_EO_AnchorControl_AnchorCommandTypeTopic as AnchorCommandTypeTopic,
2123
)
2224

2325

26+
def _reliability_kind_name(kind) -> str:
27+
"""Normalize RTI reliability kind wrappers/enums to a stable string."""
28+
return str(kind).split(".")[-1]
29+
30+
2431
# ═══════════════════════════════════════════════════════════════════════════
2532
# Singleton behaviour
2633
# ═══════════════════════════════════════════════════════════════════════════
@@ -123,16 +130,22 @@ async def test_creates_writer(self, dds_context: DDSContext):
123130
@pytest.mark.asyncio
124131
async def test_report_qos_best_effort(self, dds_context: DDSContext):
125132
"""*ReportType topics should get TelemetryQoS → BEST_EFFORT."""
126-
writer = dds_context.create_writer(SimpleReport, AccelerationReportTypeTopic)
133+
writer = dds_context.create_writer(
134+
AccelerationReportType,
135+
AccelerationReportTypeTopic,
136+
)
127137
qos = writer.qos
128-
assert qos.reliability.kind == dds.ReliabilityKind.BEST_EFFORT
138+
assert _reliability_kind_name(qos.reliability.kind) == "BEST_EFFORT"
129139

130140
@pytest.mark.asyncio
131141
async def test_command_qos_reliable(self, dds_context: DDSContext):
132142
"""*CommandType topics should get CommandQoS → RELIABLE."""
133-
writer = dds_context.create_writer(SimpleReport, AnchorCommandTypeTopic)
143+
writer = dds_context.create_writer(
144+
AnchorCommandType,
145+
AnchorCommandTypeTopic,
146+
)
134147
qos = writer.qos
135-
assert qos.reliability.kind == dds.ReliabilityKind.RELIABLE
148+
assert _reliability_kind_name(qos.reliability.kind) == "RELIABLE"
136149

137150

138151
class TestCreateReader:
@@ -143,9 +156,12 @@ async def test_creates_reader(self, dds_context: DDSContext):
143156

144157
@pytest.mark.asyncio
145158
async def test_report_qos_best_effort(self, dds_context: DDSContext):
146-
reader = dds_context.create_reader(SimpleReport, AccelerationReportTypeTopic)
159+
reader = dds_context.create_reader(
160+
AccelerationReportType,
161+
AccelerationReportTypeTopic,
162+
)
147163
qos = reader.qos
148-
assert qos.reliability.kind == dds.ReliabilityKind.BEST_EFFORT
164+
assert _reliability_kind_name(qos.reliability.kind) == "BEST_EFFORT"
149165

150166

151167
class TestCreateFilteredReader:

0 commit comments

Comments
 (0)