Skip to content

Commit 2116de5

Browse files
committed
Simplify shutdown close contracts
1 parent 2916d48 commit 2116de5

5 files changed

Lines changed: 175 additions & 90 deletions

File tree

macloop/__init__.py

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import queue
66
import uuid
7+
import warnings
78
import weakref
89
from dataclasses import dataclass
910
from pathlib import Path
@@ -68,23 +69,12 @@ def _raise_on_unexpected_kwargs(name: str, kwargs: dict[str, Any]) -> None:
6869
raise TypeError(f"{name} got unexpected keyword arguments: {unexpected}")
6970

7071

71-
def _close_backend_with_optional_engine(backend: Any, engine_backend: Any | None) -> None:
72-
if engine_backend is not None:
73-
try:
74-
backend.close(engine_backend)
75-
return
76-
except TypeError as exc:
77-
try:
78-
backend.close()
79-
return
80-
except TypeError:
81-
raise exc
82-
83-
close_no_restore = getattr(backend, "close_no_restore", None)
84-
if close_no_restore is not None:
85-
close_no_restore()
86-
else:
87-
backend.close()
72+
def _warn_deferred_native_cleanup(exc: TimeoutError) -> None:
73+
warnings.warn(
74+
f"AudioEngine.close() deferred native cleanup: {exc}",
75+
RuntimeWarning,
76+
stacklevel=2,
77+
)
8878

8979

9080
@dataclass(frozen=True, slots=True)
@@ -370,12 +360,15 @@ def close(self) -> None:
370360
if self._closed:
371361
return
372362

363+
# High-level close is bounded best-effort cleanup. Native runtime timeouts should not
364+
# wedge user code; they are surfaced as warnings while the engine still becomes closed.
373365
self._closed = True
374366
backend_err: Optional[Exception] = None
375367
sink_err: Optional[Exception] = None
376368
try:
377369
self._backend.close()
378-
except TimeoutError:
370+
except TimeoutError as exc:
371+
_warn_deferred_native_cleanup(exc)
379372
backend_err = None
380373
except Exception as exc:
381374
backend_err = exc
@@ -526,13 +519,14 @@ def close(self) -> None:
526519
if self._closed:
527520
return
528521

522+
# Sink close only detaches the sink and returns its route lease when an engine is alive.
529523
err: Optional[Exception] = None
530524
engine = self._engine_ref()
531525
try:
532-
_close_backend_with_optional_engine(
533-
self._backend,
534-
engine._backend if engine is not None else None,
535-
)
526+
if engine is not None:
527+
self._backend.close_for_engine(engine._backend)
528+
else:
529+
self._backend.close()
536530
except Exception as exc:
537531
err = exc
538532
finally:
@@ -657,13 +651,14 @@ def close(self) -> None:
657651
if self._closed:
658652
return
659653

654+
# Sink close only detaches the sink and returns its route lease when an engine is alive.
660655
err: Optional[Exception] = None
661656
engine = self._engine_ref()
662657
try:
663-
_close_backend_with_optional_engine(
664-
self._backend,
665-
engine._backend if engine is not None else None,
666-
)
658+
if engine is not None:
659+
self._backend.close_for_engine(engine._backend)
660+
else:
661+
self._backend.close()
667662
except Exception as exc:
668663
err = exc
669664
finally:

python_ffi/src/lib.rs

Lines changed: 60 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -940,30 +940,11 @@ struct PyAsrSinkBackend {
940940
final_stats: Option<AsrSinkMetricsSnapshot>,
941941
}
942942

943-
#[pymethods]
944943
impl PyAsrSinkBackend {
945-
fn stats<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
946-
let snapshot = match (&self.sink, &self.final_stats) {
947-
(Some(sink), _) => sink.stats(),
948-
(None, Some(snapshot)) => snapshot.clone(),
949-
(None, None) => AsrSinkMetricsSnapshot::default(),
950-
};
951-
952-
let out = PyDict::new(py);
953-
for (input_id, stats) in snapshot {
954-
out.set_item(
955-
input_id,
956-
Py::new(py, PyAsrInputStats::from_snapshot(py, stats)?)?,
957-
)?;
958-
}
959-
Ok(out)
960-
}
961-
962-
#[pyo3(signature = (engine=None))]
963-
fn close(
944+
fn finish_close(
964945
&mut self,
965946
py: Python<'_>,
966-
mut engine: Option<PyRefMut<'_, PyAudioEngineBackend>>,
947+
mut engine: Option<&mut PyAudioEngineBackend>,
967948
) -> PyResult<()> {
968949
let Some(mut sink) = self.sink.take() else {
969950
return Ok(());
@@ -986,30 +967,49 @@ impl PyAsrSinkBackend {
986967
.map(|input| (input.input_id, input.consumer))
987968
.collect(),
988969
)
989-
.map_err(|e| PyRuntimeError::new_err(format!("failed to restore asr sink routes: {e}")))?;
970+
.map_err(|e| {
971+
PyRuntimeError::new_err(format!("failed to restore asr sink routes: {e}"))
972+
})?;
990973
}
991974
Ok(())
992975
}
976+
}
993977

994-
fn close_no_restore(&mut self, py: Python<'_>) -> PyResult<()> {
995-
let Some(mut sink) = self.sink.take() else {
996-
return Ok(());
978+
#[pymethods]
979+
impl PyAsrSinkBackend {
980+
fn stats<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
981+
let snapshot = match (&self.sink, &self.final_stats) {
982+
(Some(sink), _) => sink.stats(),
983+
(None, Some(snapshot)) => snapshot.clone(),
984+
(None, None) => AsrSinkMetricsSnapshot::default(),
997985
};
998986

999-
let (stop_result, final_stats) = py.detach(move || {
1000-
let stop_result = sink.stop().map_err(|e| e.to_string());
1001-
let final_stats = sink.stats();
1002-
(stop_result, final_stats)
1003-
});
1004-
self.final_stats = Some(final_stats);
1005-
let _ = stop_result;
1006-
Ok(())
987+
let out = PyDict::new(py);
988+
for (input_id, stats) in snapshot {
989+
out.set_item(
990+
input_id,
991+
Py::new(py, PyAsrInputStats::from_snapshot(py, stats)?)?,
992+
)?;
993+
}
994+
Ok(out)
995+
}
996+
997+
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
998+
self.finish_close(py, None)
999+
}
1000+
1001+
fn close_for_engine(
1002+
&mut self,
1003+
py: Python<'_>,
1004+
mut engine: PyRefMut<'_, PyAudioEngineBackend>,
1005+
) -> PyResult<()> {
1006+
self.finish_close(py, Some(&mut *engine))
10071007
}
10081008
}
10091009

10101010
impl Drop for PyAsrSinkBackend {
10111011
fn drop(&mut self) {
1012-
let _ = Python::try_attach(|py| self.close_no_restore(py));
1012+
let _ = Python::try_attach(|py| self.finish_close(py, None));
10131013
}
10141014
}
10151015

@@ -1020,22 +1020,11 @@ struct PyWavSinkBackend {
10201020
route_ids: Vec<String>,
10211021
}
10221022

1023-
#[pymethods]
10241023
impl PyWavSinkBackend {
1025-
fn stats(&self, py: Python<'_>) -> PyResult<Py<PyWavSinkStats>> {
1026-
let snapshot = match (&self.sink, &self.final_stats) {
1027-
(Some(sink), _) => sink.stats(),
1028-
(None, Some(snapshot)) => snapshot.clone(),
1029-
(None, None) => WavSinkMetricsSnapshot::default(),
1030-
};
1031-
Py::new(py, PyWavSinkStats::from_snapshot(py, snapshot)?)
1032-
}
1033-
1034-
#[pyo3(signature = (engine=None))]
1035-
fn close(
1024+
fn finish_close(
10361025
&mut self,
10371026
py: Python<'_>,
1038-
mut engine: Option<PyRefMut<'_, PyAudioEngineBackend>>,
1027+
mut engine: Option<&mut PyAudioEngineBackend>,
10391028
) -> PyResult<()> {
10401029
let Some(mut sink) = self.sink.take() else {
10411030
return Ok(());
@@ -1054,30 +1043,41 @@ impl PyWavSinkBackend {
10541043
if let Some(engine) = engine.as_mut() {
10551044
engine
10561045
.restore_route_consumers(route_ids.into_iter().zip(consumers).collect())
1057-
.map_err(|e| PyRuntimeError::new_err(format!("failed to restore wav sink routes: {e}")))?;
1046+
.map_err(|e| {
1047+
PyRuntimeError::new_err(format!("failed to restore wav sink routes: {e}"))
1048+
})?;
10581049
}
10591050
Ok(())
10601051
}
1052+
}
10611053

1062-
fn close_no_restore(&mut self, py: Python<'_>) -> PyResult<()> {
1063-
let Some(mut sink) = self.sink.take() else {
1064-
return Ok(());
1054+
#[pymethods]
1055+
impl PyWavSinkBackend {
1056+
fn stats(&self, py: Python<'_>) -> PyResult<Py<PyWavSinkStats>> {
1057+
let snapshot = match (&self.sink, &self.final_stats) {
1058+
(Some(sink), _) => sink.stats(),
1059+
(None, Some(snapshot)) => snapshot.clone(),
1060+
(None, None) => WavSinkMetricsSnapshot::default(),
10651061
};
1062+
Py::new(py, PyWavSinkStats::from_snapshot(py, snapshot)?)
1063+
}
10661064

1067-
let (stop_result, final_stats) = py.detach(move || {
1068-
let stop_result = sink.stop().map_err(|e| e.to_string());
1069-
let final_stats = sink.stats();
1070-
(stop_result, final_stats)
1071-
});
1072-
self.final_stats = Some(final_stats);
1073-
let _ = stop_result;
1074-
Ok(())
1065+
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
1066+
self.finish_close(py, None)
1067+
}
1068+
1069+
fn close_for_engine(
1070+
&mut self,
1071+
py: Python<'_>,
1072+
mut engine: PyRefMut<'_, PyAudioEngineBackend>,
1073+
) -> PyResult<()> {
1074+
self.finish_close(py, Some(&mut *engine))
10751075
}
10761076
}
10771077

10781078
impl Drop for PyWavSinkBackend {
10791079
fn drop(&mut self) {
1080-
let _ = Python::try_attach(|py| self.close_no_restore(py));
1080+
let _ = Python::try_attach(|py| self.finish_close(py, None));
10811081
}
10821082
}
10831083

ref.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Shutdown Refactor Plan
2+
3+
## Context
4+
- The recent fixes made shutdown reliable again: synthetic hangs are gone, native source shutdown is bounded, and the real-source rerun repro now completes.
5+
- The remaining concern is architectural: stop/close semantics currently work, but they are spread across Python high-level API, Python FFI, and Rust sink/source implementations in a way that feels patchy.
6+
- In particular, the desired product semantics are now clearer than the code structure: **shutdown should be bounded best-effort cleanup, not guaranteed full pipeline delivery or full drain**.
7+
- The current code still contains compatibility-oriented and cross-layer shutdown logic that obscures that model.
8+
- User preference: preserve only the high-level Python API contract; low-level/internal `_macloop` shutdown compatibility may be simplified or broken if that leads to a cleaner design.
9+
- User preference for high-level close behavior: if a native source does not stop within the bounded timeout, `AudioEngine.close()` should still complete without hanging, but it should surface a warning/log signal rather than silently succeeding or raising by default.
10+
11+
## Approach
12+
Refactor shutdown around a simpler lifecycle contract:
13+
1. **Engine owns source lifecycle** — native sources start/stop on the engine lifecycle, not on sink lifecycle.
14+
2. **Sink close only detaches the sink** — bounded worker stop + route lease/consumer return, but no source orchestration.
15+
3. **Bounded best-effort close is explicit** — native stop timeouts are part of the normal close model, not an exceptional hidden path.
16+
4. **High-level close warns on deferred native cleanup**`AudioEngine.close()` remains non-hanging and non-fatal by default, but emits a warning/log when native cleanup is deferred.
17+
5. **Cross-layer close API becomes explicit** — remove signature probing / fallback behavior and replace it with one clear sink-close contract.
18+
6. **Allow small semantic cleanup** — internal/FFI shutdown contracts may be cleaned up rather than preserved verbatim if that materially simplifies the design, as long as the high-level Python API remains stable.
19+
20+
This should be done incrementally, preserving user-visible reliability while shrinking the shutdown state machine.
21+
4. **Cross-layer close API becomes explicit** — remove signature probing / fallback behavior and replace it with one clear sink-close contract.
22+
5. **Allow small semantic cleanup** — internal/FFI shutdown contracts may be cleaned up rather than preserved verbatim if that materially simplifies the design, as long as the high-level Python API remains stable.
23+
24+
This should be done incrementally, preserving user-visible reliability while shrinking the shutdown state machine.
25+
26+
## Files to modify
27+
- `macloop/__init__.py`
28+
- `python_ffi/src/lib.rs`
29+
- `core_engine/src/outputs/asr_sink.rs`
30+
- `core_engine/src/outputs/wav_file.rs`
31+
- `core_engine/src/engine.rs`
32+
- `tests/test_runtime_helpers.py`
33+
- `tests/test_e2e_synthetic.py`
34+
- `tests/test_medium_e2e_real_capture.py`
35+
- possibly `tests/test_ffi_backend.py`
36+
37+
## Reuse
38+
- Bounded native lifecycle helpers already exist in `python_ffi/src/lib.rs`:
39+
- `start_native_source_with_deadline(...)`
40+
- `stop_native_source_with_deadline(...)`
41+
- `cleanup_pending_cleanups_with_deadline(...)`
42+
- Route consumer handoff/restore already exists and should be reused rather than re-invented:
43+
- `AudioEngineController::take_output_consumer(...)` in `core_engine/src/engine.rs`
44+
- `AudioEngineController::restore_output_consumer(...)` in `core_engine/src/engine.rs`
45+
- `restore_route_consumers(...)` in `python_ffi/src/lib.rs`
46+
- Bounded worker shutdown patterns already exist in Rust sinks and should stay as the basis:
47+
- bounded polling in `core_engine/src/outputs/asr_sink.rs`
48+
- bounded polling in `core_engine/src/outputs/wav_file.rs`
49+
- Existing regression coverage to preserve during refactor:
50+
- `tests/test_e2e_synthetic.py::test_hot_synthetic_engine_close_completes_with_wav_and_asr_sinks`
51+
- `tests/test_e2e_synthetic.py::test_hot_synthetic_restart_on_single_engine_does_not_hang`
52+
- `tests/test_medium_e2e_real_capture.py::test_medium_real_source_asr_rerun_engine_close_remains_stable`
53+
54+
## Steps
55+
- [ ] Step 1: Document and codify the target lifecycle contract in code comments/tests:
56+
- engine close = bounded best-effort source shutdown
57+
- sink close = bounded detach + route return
58+
- close does not guarantee full drain/delivery
59+
- [ ] Step 2: Simplify Python high-level close flow in `macloop/__init__.py` so it no longer performs feature detection via exception shape/signature fallback, and define one clear warning/log path for deferred native cleanup.
60+
- [ ] Step 3: Simplify FFI sink close API in `python_ffi/src/lib.rs` into one explicit contract for route restoration instead of optional engine-aware probing, even if that changes low-level/internal `_macloop` shutdown semantics.
61+
- [ ] Step 4: Keep source lifecycle orchestration centralized in `PyAudioEngineBackend`, and remove any remaining sink/source lifecycle coupling that is only there for historical shutdown bugs.
62+
- [ ] Step 5: Separate explicit sink shutdown semantics from fallback/drop cleanup semantics in Rust sink implementations, so `Drop` remains best-effort cleanup and explicit close remains the ownership-return path.
63+
- [ ] Step 6: Revisit route ownership bookkeeping and reduce duplicate state where possible (Python `_claimed_routes` vs backend consumer availability), without regressing same-engine restart behavior.
64+
- [ ] Step 7: Tighten and align tests around the simplified model, especially:
65+
- backend timeout is non-hanging and best-effort
66+
- high-level `engine.close()` emits the intended warning/log when native cleanup is deferred
67+
- same-engine restart still works
68+
- real-source rerun stays stable
69+
- low-level tests are updated to validate the new internal contract only where still intentionally supported
70+
71+
## Verification
72+
- Rust tests:
73+
- `cargo test -p python_ffi --quiet`
74+
- `cargo test -p core_engine --quiet`
75+
- Python tests:
76+
- `.venv/bin/python -m pytest -q tests`
77+
- Focused regressions:
78+
- `.venv/bin/python -m pytest -q tests/test_e2e_synthetic.py::test_hot_synthetic_restart_on_single_engine_does_not_hang`
79+
- `.venv/bin/python -m pytest -q tests/test_medium_e2e_real_capture.py::test_medium_real_source_asr_rerun_engine_close_remains_stable --run-medium`
80+
- Manual reasoning check after refactor:
81+
- sink close no longer starts/stops sources
82+
- engine close remains bounded even if native source stop times out
83+
- route reuse still works on the same engine after explicit sink close

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ def stats(self):
126126
def close(self) -> None:
127127
self.closed = True
128128

129+
def close_for_engine(self, _engine) -> None:
130+
self.close()
131+
129132

130133
class _FakeWavSinkBackend:
131134
def __init__(self) -> None:
@@ -166,6 +169,9 @@ def stats(self):
166169
def close(self) -> None:
167170
self.closed = True
168171

172+
def close_for_engine(self, _engine) -> None:
173+
self.close()
174+
169175

170176
class _FakeAudioEngineBackend:
171177
def __init__(self) -> None:

0 commit comments

Comments
 (0)