Skip to content

Commit 791babd

Browse files
authored
Stop issues (#20)
* Fix engine shutdown ordering and sink stop behavior (#18) (#19) * Fix engine shutdown ordering and sink stop behavior * Use maturin develop in CI test install * Avoid uv editable install in CI sync * Exclude macloop from uv sync in CI * Bypass uv project install in CI commands * Use actual macOS Swift stdlib path in CI * Expose Swift stdlib path to CI build tools * Run maturin directly in wheel build job * Detect Swift stdlib path dynamically in CI * Build separate macOS wheels instead of universal2 * Build x86_64 wheel on Intel macOS runner * Use supported Intel macOS runner label * Publish arm64 macOS wheel and sdist only * Add x86_64 Swift runtime debug job * Restore x86_64 wheel to build matrix * Resolve Swift stdlib from libswiftCore in CI * Resolve Swift runtime from swift target info * Fix workflow YAML for Swift runtime resolver * Make Swift resolver work on macOS bash 3.2 * Move Swift runtime resolution into Python script * Add Swift-aware wheel repair step * Remove temporary Swift debug jobs * Harden macOS wheel CI validation * Fix wheel smoke test import path * Prefer system Swift runtime when available * Harden Swift repair path validation * Remove unused Swift debug script * Enforce real timeout in medium close tests * Test single-engine hot restart shutdown * Fix single-engine sink restart shutdown * Keep ffi sink close backward compatible * Fix stop_twice unit test expectations
1 parent 607aa95 commit 791babd

6 files changed

Lines changed: 238 additions & 30 deletions

File tree

core_engine/src/outputs/asr_sink.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,20 @@ impl InputState {
178178
}
179179

180180
fn poll(&mut self, callback: &mut dyn AsrSinkCallback) -> Result<bool, AsrSinkError> {
181-
self.poll_with_limit(callback, None)
182-
}
183-
184-
fn poll_stop(&mut self, callback: &mut dyn AsrSinkCallback) -> Result<bool, AsrSinkError> {
185181
let channels = MASTER_FORMAT.channels.max(1) as usize;
186182
let bounded_samples = self.consumer.occupied_len() / channels * channels;
187183
self.poll_with_limit(callback, Some(bounded_samples))
188184
}
189185

186+
fn stop_now(&mut self) {
187+
self.drained_master.clear();
188+
self.converted_output.clear();
189+
self.pending_output.clear();
190+
self.pending_offset = 0;
191+
self.quantized_output.clear();
192+
self.metrics.pending_frames.store(0, Ordering::Relaxed);
193+
}
194+
190195
fn poll_with_limit(
191196
&mut self,
192197
callback: &mut dyn AsrSinkCallback,
@@ -323,7 +328,7 @@ impl AsrSinkMetrics {
323328

324329
pub struct AsrSink {
325330
stop: Arc<AtomicBool>,
326-
handle: Option<JoinHandle<Result<(), AsrSinkError>>>,
331+
handle: Option<JoinHandle<Result<Vec<AsrSinkInput>, AsrSinkError>>>,
327332
metrics: Arc<AsrSinkMetrics>,
328333
}
329334

@@ -387,13 +392,13 @@ impl AsrSink {
387392
let stop = Arc::new(AtomicBool::new(false));
388393
let stop_thread = stop.clone();
389394

390-
let handle = thread::spawn(move || -> Result<(), AsrSinkError> {
395+
let handle = thread::spawn(move || -> Result<Vec<AsrSinkInput>, AsrSinkError> {
391396
let idle_sleep = Duration::from_micros(200);
392397

393398
loop {
394399
if stop_thread.load(Ordering::Relaxed) {
395400
for state in &mut states {
396-
let _ = state.poll_stop(&mut *callback)?;
401+
state.stop_now();
397402
}
398403
break;
399404
}
@@ -408,7 +413,13 @@ impl AsrSink {
408413
}
409414
}
410415

411-
Ok(())
416+
Ok(states
417+
.into_iter()
418+
.map(|state| AsrSinkInput {
419+
input_id: state.input_id,
420+
consumer: state.consumer,
421+
})
422+
.collect())
412423
});
413424

414425
Ok(Self {
@@ -430,7 +441,7 @@ impl AsrSink {
430441
self.metrics.snapshot()
431442
}
432443

433-
pub fn stop(&mut self) -> Result<(), AsrSinkError> {
444+
pub fn stop(&mut self) -> Result<Vec<AsrSinkInput>, AsrSinkError> {
434445
self.stop.store(true, Ordering::Relaxed);
435446
let Some(handle) = self.handle.take() else {
436447
return Err(AsrSinkError::AlreadyStopped);
@@ -672,8 +683,7 @@ mod tests {
672683
.expect("spawn sink");
673684

674685
sink.stop().expect("first stop");
675-
let err = sink.stop().expect_err("second stop should fail");
676-
assert!(matches!(err, AsrSinkError::AlreadyStopped));
686+
assert!(matches!(sink.stop(), Err(AsrSinkError::AlreadyStopped)));
677687
}
678688

679689
#[test]

core_engine/src/outputs/wav_file.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub struct WavSinkMetricsSnapshot {
9191

9292
pub struct WavFileOutput {
9393
stop: Arc<AtomicBool>,
94-
handle: Option<JoinHandle<Result<(), WavOutputError>>>,
94+
handle: Option<JoinHandle<Result<Vec<RouteConsumer>, WavOutputError>>>,
9595
metrics: Arc<WavSinkMetrics>,
9696
}
9797

@@ -123,7 +123,7 @@ impl WavFileOutput {
123123
let channels = format.channels.max(1) as u64;
124124
let frame_channels = format.channels.max(1) as usize;
125125

126-
let handle = thread::spawn(move || -> Result<(), WavOutputError> {
126+
let handle = thread::spawn(move || -> Result<Vec<RouteConsumer>, WavOutputError> {
127127
let mut writer = hound::WavWriter::new(writer, spec)?;
128128
let idle_sleep = Duration::from_micros(200);
129129
let mut consumers = consumers;
@@ -207,7 +207,7 @@ impl WavFileOutput {
207207
metrics_thread
208208
.finalize
209209
.record(duration_to_u32_us(finalize_start.elapsed()));
210-
Ok(())
210+
Ok(consumers)
211211
});
212212

213213
Ok(Self {
@@ -305,7 +305,7 @@ impl WavFileOutput {
305305
self.metrics.snapshot()
306306
}
307307

308-
pub fn stop(&mut self) -> Result<(), WavOutputError> {
308+
pub fn stop(&mut self) -> Result<Vec<RouteConsumer>, WavOutputError> {
309309
self.stop.store(true, Ordering::Relaxed);
310310
let Some(handle) = self.handle.take() else {
311311
return Err(WavOutputError::AlreadyStopped);
@@ -539,8 +539,7 @@ mod tests {
539539
.expect("spawn wav");
540540

541541
wav.stop().expect("first stop");
542-
let err = wav.stop().expect_err("second stop should fail");
543-
assert!(matches!(err, WavOutputError::AlreadyStopped));
542+
assert!(matches!(wav.stop(), Err(WavOutputError::AlreadyStopped)));
544543
}
545544

546545
#[test]

macloop/__init__.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ def _raise_on_unexpected_kwargs(name: str, kwargs: dict[str, Any]) -> None:
6868
raise TypeError(f"{name} got unexpected keyword arguments: {unexpected}")
6969

7070

71+
def _close_backend_with_optional_engine(backend: Any, engine_backend: Any | None) -> None:
72+
if engine_backend is not None:
73+
try:
74+
backend.close(engine_backend)
75+
return
76+
except TypeError as exc:
77+
try:
78+
backend.close()
79+
return
80+
except TypeError:
81+
raise exc
82+
83+
close_no_restore = getattr(backend, "close_no_restore", None)
84+
if close_no_restore is not None:
85+
close_no_restore()
86+
else:
87+
backend.close()
88+
89+
7190
@dataclass(frozen=True, slots=True)
7291
class AudioChunk:
7392
route_id: str
@@ -506,13 +525,16 @@ def close(self) -> None:
506525
return
507526

508527
err: Optional[Exception] = None
528+
engine = self._engine_ref()
509529
try:
510-
self._backend.close()
530+
_close_backend_with_optional_engine(
531+
self._backend,
532+
engine._backend if engine is not None else None,
533+
)
511534
except Exception as exc:
512535
err = exc
513536
finally:
514537
self._closed = True
515-
engine = self._engine_ref()
516538
if engine is not None:
517539
engine._release_routes(self._route_ids)
518540
_drop_oldest_put(self._queue, _STOP)
@@ -634,13 +656,16 @@ def close(self) -> None:
634656
return
635657

636658
err: Optional[Exception] = None
659+
engine = self._engine_ref()
637660
try:
638-
self._backend.close()
661+
_close_backend_with_optional_engine(
662+
self._backend,
663+
engine._backend if engine is not None else None,
664+
)
639665
except Exception as exc:
640666
err = exc
641667
finally:
642668
self._closed = True
643-
engine = self._engine_ref()
644669
if engine is not None:
645670
engine._release_routes(self._route_ids)
646671

python_ffi/src/lib.rs

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,12 @@ impl PyAsrSinkBackend {
904904
Ok(out)
905905
}
906906

907-
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
907+
#[pyo3(signature = (engine=None))]
908+
fn close(
909+
&mut self,
910+
py: Python<'_>,
911+
mut engine: Option<PyRefMut<'_, PyAudioEngineBackend>>,
912+
) -> PyResult<()> {
908913
let Some(mut sink) = self.sink.take() else {
909914
return Ok(());
910915
};
@@ -915,22 +920,49 @@ impl PyAsrSinkBackend {
915920
(stop_result, final_stats)
916921
});
917922
self.final_stats = Some(final_stats);
918-
stop_result
923+
let route_consumers = stop_result
919924
.map_err(|e| PyRuntimeError::new_err(format!("failed to stop asr sink: {e}")))?;
925+
926+
if let Some(engine) = engine.as_mut() {
927+
engine
928+
.restore_route_consumers(
929+
route_consumers
930+
.into_iter()
931+
.map(|input| (input.input_id, input.consumer))
932+
.collect(),
933+
)
934+
.map_err(|e| PyRuntimeError::new_err(format!("failed to restore asr sink routes: {e}")))?;
935+
}
936+
Ok(())
937+
}
938+
939+
fn close_no_restore(&mut self, py: Python<'_>) -> PyResult<()> {
940+
let Some(mut sink) = self.sink.take() else {
941+
return Ok(());
942+
};
943+
944+
let (stop_result, final_stats) = py.detach(move || {
945+
let stop_result = sink.stop().map_err(|e| e.to_string());
946+
let final_stats = sink.stats();
947+
(stop_result, final_stats)
948+
});
949+
self.final_stats = Some(final_stats);
950+
let _ = stop_result;
920951
Ok(())
921952
}
922953
}
923954

924955
impl Drop for PyAsrSinkBackend {
925956
fn drop(&mut self) {
926-
let _ = Python::try_attach(|py| self.close(py));
957+
let _ = Python::try_attach(|py| self.close_no_restore(py));
927958
}
928959
}
929960

930961
#[pyclass(name = "_WavSinkBackend", module = "macloop._macloop", unsendable)]
931962
struct PyWavSinkBackend {
932963
sink: Option<WavFileOutput>,
933964
final_stats: Option<WavSinkMetricsSnapshot>,
965+
route_ids: Vec<String>,
934966
}
935967

936968
#[pymethods]
@@ -944,26 +976,53 @@ impl PyWavSinkBackend {
944976
Py::new(py, PyWavSinkStats::from_snapshot(py, snapshot)?)
945977
}
946978

947-
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
979+
#[pyo3(signature = (engine=None))]
980+
fn close(
981+
&mut self,
982+
py: Python<'_>,
983+
mut engine: Option<PyRefMut<'_, PyAudioEngineBackend>>,
984+
) -> PyResult<()> {
948985
let Some(mut sink) = self.sink.take() else {
949986
return Ok(());
950987
};
988+
let route_ids = self.route_ids.clone();
951989

952990
let (stop_result, final_stats) = py.detach(move || {
953991
let stop_result = sink.stop().map_err(|e| e.to_string());
954992
let final_stats = sink.stats();
955993
(stop_result, final_stats)
956994
});
957995
self.final_stats = Some(final_stats);
958-
stop_result
996+
let consumers = stop_result
959997
.map_err(|e| PyRuntimeError::new_err(format!("failed to stop wav sink: {e}")))?;
998+
999+
if let Some(engine) = engine.as_mut() {
1000+
engine
1001+
.restore_route_consumers(route_ids.into_iter().zip(consumers).collect())
1002+
.map_err(|e| PyRuntimeError::new_err(format!("failed to restore wav sink routes: {e}")))?;
1003+
}
1004+
Ok(())
1005+
}
1006+
1007+
fn close_no_restore(&mut self, py: Python<'_>) -> PyResult<()> {
1008+
let Some(mut sink) = self.sink.take() else {
1009+
return Ok(());
1010+
};
1011+
1012+
let (stop_result, final_stats) = py.detach(move || {
1013+
let stop_result = sink.stop().map_err(|e| e.to_string());
1014+
let final_stats = sink.stats();
1015+
(stop_result, final_stats)
1016+
});
1017+
self.final_stats = Some(final_stats);
1018+
let _ = stop_result;
9601019
Ok(())
9611020
}
9621021
}
9631022

9641023
impl Drop for PyWavSinkBackend {
9651024
fn drop(&mut self) {
966-
let _ = Python::try_attach(|py| self.close(py));
1025+
let _ = Python::try_attach(|py| self.close_no_restore(py));
9671026
}
9681027
}
9691028

@@ -1539,6 +1598,7 @@ impl PyAudioEngineBackend {
15391598
self.restore_stream_states(started_states);
15401599

15411600
let route_consumers = self.take_route_consumers(&route_ids)?;
1601+
let route_ids_for_sink = route_ids.clone();
15421602
let master_format = self.controller.master_format();
15431603
let detached_result: DetachedWavStartResult = py.detach(move || {
15441604
let consumers = route_consumers
@@ -1559,6 +1619,7 @@ impl PyAudioEngineBackend {
15591619
Ok(sink) => Ok(PyWavSinkBackend {
15601620
sink: Some(sink),
15611621
final_stats: None,
1622+
route_ids: route_ids_for_sink,
15621623
}),
15631624
Err((err, route_consumers)) => {
15641625
if let Err(restore_err) = self.restore_route_consumers(route_consumers) {

0 commit comments

Comments
 (0)