Skip to content

Commit 7fe05ce

Browse files
committed
Fix shutdown hangs in stop path
1 parent 571e742 commit 7fe05ce

5 files changed

Lines changed: 161 additions & 25 deletions

File tree

core_engine/src/outputs/asr_sink.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ impl AsrSink {
396396
let idle_sleep = Duration::from_micros(200);
397397

398398
loop {
399-
if stop_thread.load(Ordering::Relaxed) {
399+
if stop_thread.load(Ordering::Acquire) {
400400
for state in &mut states {
401401
state.stop_now();
402402
}
@@ -442,7 +442,7 @@ impl AsrSink {
442442
}
443443

444444
pub fn stop(&mut self) -> Result<Vec<AsrSinkInput>, AsrSinkError> {
445-
self.stop.store(true, Ordering::Relaxed);
445+
self.stop.store(true, Ordering::Release);
446446
let Some(handle) = self.handle.take() else {
447447
return Err(AsrSinkError::AlreadyStopped);
448448
};

core_engine/src/outputs/wav_file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl WavFileOutput {
131131
let mut mixed_buffer = Vec::<f32>::new();
132132

133133
loop {
134-
let stopping = stop_thread.load(Ordering::Relaxed);
134+
let stopping = stop_thread.load(Ordering::Acquire);
135135
let mut drained_any = false;
136136
for (consumer, buffer) in consumers.iter_mut().zip(input_buffers.iter_mut()) {
137137
let drain_limit = consumer.occupied_len() / frame_channels * frame_channels;
@@ -302,7 +302,7 @@ impl WavFileOutput {
302302
}
303303

304304
pub fn stop(&mut self) -> Result<Vec<RouteConsumer>, WavOutputError> {
305-
self.stop.store(true, Ordering::Relaxed);
305+
self.stop.store(true, Ordering::Release);
306306
let Some(handle) = self.handle.take() else {
307307
return Err(WavOutputError::AlreadyStopped);
308308
};

macloop/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import queue
66
import uuid
7+
import warnings
78
import weakref
89
from dataclasses import dataclass
910
from pathlib import Path
@@ -68,6 +69,14 @@ def _raise_on_unexpected_kwargs(name: str, kwargs: dict[str, Any]) -> None:
6869
raise TypeError(f"{name} got unexpected keyword arguments: {unexpected}")
6970

7071

72+
def _warn_deferred_native_cleanup(exc: TimeoutError) -> None:
73+
warnings.warn(
74+
f"AudioEngine.close() deferred native cleanup: {exc}",
75+
RuntimeWarning,
76+
stacklevel=3,
77+
)
78+
79+
7180
def _close_backend_with_optional_engine(backend: Any, engine_backend: Any | None) -> None:
7281
if engine_backend is not None:
7382
try:
@@ -375,7 +384,8 @@ def close(self) -> None:
375384
sink_err: Optional[Exception] = None
376385
try:
377386
self._backend.close()
378-
except TimeoutError:
387+
except TimeoutError as exc:
388+
_warn_deferred_native_cleanup(exc)
379389
backend_err = None
380390
except Exception as exc:
381391
backend_err = exc

python_ffi/src/lib.rs

Lines changed: 143 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ use std::collections::HashMap;
2020
use std::fs::File;
2121
use std::os::fd::FromRawFd;
2222
use std::os::raw::c_int;
23-
use std::sync::mpsc::{self, RecvTimeoutError};
23+
use std::sync::atomic::{AtomicU64, Ordering};
24+
use std::sync::mpsc::{self, RecvTimeoutError, TrySendError};
2425
use std::sync::{Arc, Mutex};
25-
use std::thread;
26+
use std::thread::{self, JoinHandle};
2627
use std::time::{Duration, Instant};
2728

2829
const DEFAULT_COMMAND_CAPACITY: usize = 32;
@@ -912,25 +913,134 @@ type DetachedAsrStartResult = Result<AsrSink, (String, Vec<(String, RouteConsume
912913

913914
type DetachedWavStartResult = Result<WavFileOutput, (String, Vec<(String, RouteConsumer)>)>;
914915

916+
const ASR_WORKER_QUEUE_CAPACITY: usize = 32;
917+
const ASR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_millis(500);
918+
const ASR_WORKER_JOIN_POLL: Duration = Duration::from_millis(5);
919+
920+
enum AsrWorkerPayload {
921+
F32 {
922+
input_id: String,
923+
frames: usize,
924+
samples: Vec<f32>,
925+
},
926+
I16 {
927+
input_id: String,
928+
frames: usize,
929+
samples: Vec<i16>,
930+
},
931+
}
932+
915933
struct PythonAsrCallback {
916-
callback: Py<PyAny>,
934+
tx: Option<mpsc::SyncSender<AsrWorkerPayload>>,
935+
worker: Option<JoinHandle<()>>,
936+
dropped_chunks: Arc<AtomicU64>,
937+
}
938+
939+
impl PythonAsrCallback {
940+
fn spawn(callback: Py<PyAny>) -> Self {
941+
let (tx, rx) = mpsc::sync_channel::<AsrWorkerPayload>(ASR_WORKER_QUEUE_CAPACITY);
942+
let worker = thread::spawn(move || {
943+
// `callback` is owned by this thread. Its final drop happens when this closure
944+
// returns; PyO3 queues the decref and the next GIL holder releases it, so no
945+
// extra Python attach is required here.
946+
while let Ok(payload) = rx.recv() {
947+
let _ = Python::try_attach(|py| {
948+
let (input_id, frames, samples_obj) = match payload {
949+
AsrWorkerPayload::F32 {
950+
input_id,
951+
frames,
952+
samples,
953+
} => (
954+
input_id,
955+
frames,
956+
samples.to_pyarray(py).into_any().unbind(),
957+
),
958+
AsrWorkerPayload::I16 {
959+
input_id,
960+
frames,
961+
samples,
962+
} => (
963+
input_id,
964+
frames,
965+
samples.to_pyarray(py).into_any().unbind(),
966+
),
967+
};
968+
969+
if let Err(err) = callback.call1(py, (input_id, frames, samples_obj)) {
970+
err.print(py);
971+
}
972+
});
973+
}
974+
});
975+
976+
Self {
977+
tx: Some(tx),
978+
worker: Some(worker),
979+
dropped_chunks: Arc::new(AtomicU64::new(0)),
980+
}
981+
}
917982
}
918983

919984
impl AsrSinkCallback for PythonAsrCallback {
920985
fn on_chunk(&mut self, chunk: AsrChunkView<'_>) {
921-
let _ = Python::try_attach(|py| {
922-
let samples = match chunk.samples {
923-
AsrSampleSlice::F32(values) => values.to_pyarray(py).into_any().unbind(),
924-
AsrSampleSlice::I16(values) => values.to_pyarray(py).into_any().unbind(),
925-
};
986+
// Sink thread never blocks on user Python code: we hand chunks off to a worker via
987+
// a bounded queue. If the user callback cannot keep up we drop the newest chunk so
988+
// that shutdown (which drops `tx` to signal the worker) cannot be wedged by a slow
989+
// or hung consumer.
990+
let Some(tx) = self.tx.as_ref() else {
991+
return;
992+
};
926993

927-
if let Err(err) = self
928-
.callback
929-
.call1(py, (chunk.input_id, chunk.frames, samples))
930-
{
931-
err.print(py);
994+
let payload = match chunk.samples {
995+
AsrSampleSlice::F32(values) => AsrWorkerPayload::F32 {
996+
input_id: chunk.input_id.to_string(),
997+
frames: chunk.frames,
998+
samples: values.to_vec(),
999+
},
1000+
AsrSampleSlice::I16(values) => AsrWorkerPayload::I16 {
1001+
input_id: chunk.input_id.to_string(),
1002+
frames: chunk.frames,
1003+
samples: values.to_vec(),
1004+
},
1005+
};
1006+
1007+
match tx.try_send(payload) {
1008+
Ok(()) => {}
1009+
Err(TrySendError::Full(_)) => {
1010+
self.dropped_chunks.fetch_add(1, Ordering::Relaxed);
9321011
}
933-
});
1012+
Err(TrySendError::Disconnected(_)) => {}
1013+
}
1014+
}
1015+
}
1016+
1017+
impl Drop for PythonAsrCallback {
1018+
fn drop(&mut self) {
1019+
// Drop the sender first so the worker's `rx.recv()` returns `Err(Disconnected)` and
1020+
// the loop can exit naturally once it finishes the chunk it is currently handling.
1021+
drop(self.tx.take());
1022+
1023+
let Some(handle) = self.worker.take() else {
1024+
return;
1025+
};
1026+
1027+
let deadline = Instant::now() + ASR_WORKER_JOIN_TIMEOUT;
1028+
while !handle.is_finished() && Instant::now() < deadline {
1029+
thread::sleep(ASR_WORKER_JOIN_POLL);
1030+
}
1031+
1032+
if handle.is_finished() {
1033+
let _ = handle.join();
1034+
} else {
1035+
// The user callback is stuck. We can't unblock it, but we refuse to wedge the
1036+
// sink shutdown: detach the worker (dropping the JoinHandle) and move on. The
1037+
// leaked thread will exit whenever the user code finally returns.
1038+
eprintln!(
1039+
"macloop: ASR callback worker thread did not exit within {:?}; detaching to avoid blocking shutdown",
1040+
ASR_WORKER_JOIN_TIMEOUT
1041+
);
1042+
drop(handle);
1043+
}
9341044
}
9351045
}
9361046

@@ -1009,7 +1119,14 @@ impl PyAsrSinkBackend {
10091119

10101120
impl Drop for PyAsrSinkBackend {
10111121
fn drop(&mut self) {
1012-
let _ = Python::try_attach(|py| self.close_no_restore(py));
1122+
if Python::try_attach(|py| self.close_no_restore(py)).is_none() {
1123+
// The Python runtime is being torn down; the interpreter is unreachable.
1124+
// We cannot stop the sink worker or restore route consumers here, so log and leak
1125+
// rather than silently wedging or corrupting native state.
1126+
eprintln!(
1127+
"macloop: AsrSinkBackend dropped with Python runtime unavailable; sink cleanup skipped"
1128+
);
1129+
}
10131130
}
10141131
}
10151132

@@ -1077,7 +1194,11 @@ impl PyWavSinkBackend {
10771194

10781195
impl Drop for PyWavSinkBackend {
10791196
fn drop(&mut self) {
1080-
let _ = Python::try_attach(|py| self.close_no_restore(py));
1197+
if Python::try_attach(|py| self.close_no_restore(py)).is_none() {
1198+
eprintln!(
1199+
"macloop: WavSinkBackend dropped with Python runtime unavailable; sink cleanup skipped"
1200+
);
1201+
}
10811202
}
10821203
}
10831204

@@ -1604,7 +1725,7 @@ impl PyAudioEngineBackend {
16041725
format,
16051726
chunk_frames,
16061727
},
1607-
Box::new(PythonAsrCallback { callback }),
1728+
Box::new(PythonAsrCallback::spawn(callback)),
16081729
) {
16091730
Ok(sink) => Ok(sink),
16101731
Err((err, inputs)) => Err((
@@ -1703,7 +1824,11 @@ impl PyAudioEngineBackend {
17031824

17041825
impl Drop for PyAudioEngineBackend {
17051826
fn drop(&mut self) {
1706-
let _ = Python::try_attach(|py| self.close(py));
1827+
if Python::try_attach(|py| self.close(py)).is_none() {
1828+
eprintln!(
1829+
"macloop: AudioEngineBackend dropped with Python runtime unavailable; native sources may outlive the process"
1830+
);
1831+
}
17071832
}
17081833
}
17091834

tests/test_runtime_helpers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def close(self) -> None:
200200
assert sink.closed is True
201201

202202

203-
def test_audio_engine_close_suppresses_backend_timeout_after_sink_cleanup(macloop_module) -> None:
203+
def test_audio_engine_close_warns_on_backend_timeout_after_sink_cleanup(macloop_module) -> None:
204204
class TrackingSink:
205205
def __init__(self) -> None:
206206
self.closed = False
@@ -213,7 +213,8 @@ def close(self) -> None:
213213
engine._register_sink(sink)
214214
engine._backend.close = lambda: (_ for _ in ()).throw(TimeoutError("native close timed out"))
215215

216-
engine.close()
216+
with pytest.warns(RuntimeWarning, match="deferred native cleanup"):
217+
engine.close()
217218

218219
assert sink.closed is True
219220
with pytest.raises(RuntimeError, match="audio engine is closed"):

0 commit comments

Comments
 (0)