Skip to content

Commit 571e742

Browse files
authored
Stop issues (#21)
* Fix engine shutdown ordering and sink stop behavior (#18) (#19) * Fix engine shutdown ordering and sink stop behavior * Use maturin develop in CI test install * Avoid uv editable install in CI sync * Exclude macloop from uv sync in CI * Bypass uv project install in CI commands * Use actual macOS Swift stdlib path in CI * Expose Swift stdlib path to CI build tools * Run maturin directly in wheel build job * Detect Swift stdlib path dynamically in CI * Build separate macOS wheels instead of universal2 * Build x86_64 wheel on Intel macOS runner * Use supported Intel macOS runner label * Publish arm64 macOS wheel and sdist only * Add x86_64 Swift runtime debug job * Restore x86_64 wheel to build matrix * Resolve Swift stdlib from libswiftCore in CI * Resolve Swift runtime from swift target info * Fix workflow YAML for Swift runtime resolver * Make Swift resolver work on macOS bash 3.2 * Move Swift runtime resolution into Python script * Add Swift-aware wheel repair step * Remove temporary Swift debug jobs * Harden macOS wheel CI validation * Fix wheel smoke test import path * Prefer system Swift runtime when available * Harden Swift repair path validation * Remove unused Swift debug script * Enforce real timeout in medium close tests * Test single-engine hot restart shutdown * Fix single-engine sink restart shutdown * Keep ffi sink close backward compatible * Fix stop_twice unit test expectations * Bound wav sink polling on hot reruns * Bound native source shutdown on engine close * Add medium real-source rerun regression * Make Codecov status checks informational
1 parent 791babd commit 571e742

6 files changed

Lines changed: 230 additions & 33 deletions

File tree

codecov.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
coverage:
2+
status:
3+
project:
4+
default:
5+
informational: true
6+
patch:
7+
default:
8+
informational: true

core_engine/src/outputs/wav_file.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ impl WavFileOutput {
134134
let stopping = stop_thread.load(Ordering::Relaxed);
135135
let mut drained_any = false;
136136
for (consumer, buffer) in consumers.iter_mut().zip(input_buffers.iter_mut()) {
137-
let drain_limit = if stopping {
138-
consumer.occupied_len() / frame_channels * frame_channels
139-
} else {
140-
usize::MAX
141-
};
137+
let drain_limit = consumer.occupied_len() / frame_channels * frame_channels;
142138
let mut drained = 0_usize;
143139
while drained < drain_limit {
144140
let Some(sample) = consumer.try_pop() else {

macloop/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,8 @@ def close(self) -> None:
375375
sink_err: Optional[Exception] = None
376376
try:
377377
self._backend.close()
378+
except TimeoutError:
379+
backend_err = None
378380
except Exception as exc:
379381
backend_err = exc
380382
finally:

python_ffi/src/lib.rs

Lines changed: 96 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use core_engine::{
99
WavSinkMetricsSnapshot,
1010
};
1111
use numpy::ToPyArray;
12-
use pyo3::exceptions::{PyOSError, PyRuntimeError, PyValueError};
12+
use pyo3::exceptions::{PyOSError, PyRuntimeError, PyTimeoutError, PyValueError};
1313
use pyo3::prelude::*;
1414
use pyo3::types::{PyAny, PyDict, PyList, PyModule};
1515
use stats::{
@@ -30,7 +30,8 @@ const DEFAULT_GARBAGE_CAPACITY: usize = 32;
3030
const DEFAULT_ROUTE_CAPACITY: usize = 4096;
3131
const DEFAULT_MAX_PROCESSORS: usize = 32;
3232
const DEFAULT_MAX_OUTPUTS: usize = 16;
33-
const SCREEN_CAPTURE_START_TIMEOUT: Duration = Duration::from_secs(10);
33+
const NATIVE_SOURCE_START_TIMEOUT: Duration = Duration::from_secs(10);
34+
const NATIVE_SOURCE_STOP_TIMEOUT: Duration = Duration::from_secs(2);
3435

3536
#[derive(Default)]
3637
struct GainProcessorNode {
@@ -95,6 +96,7 @@ impl<T> PendingCleanup<T> {
9596

9697
enum PendingRuntimeCleanup {
9798
AppAudio(PendingCleanup<AppAudioSource>),
99+
Microphone(PendingCleanup<MicrophoneSource>),
98100
SystemAudio(PendingCleanup<SystemAudioSource>),
99101
}
100102

@@ -194,16 +196,24 @@ enum StopStreamRuntimeError {
194196
},
195197
}
196198

199+
fn runtime_stop_priority(runtime: &StreamRuntime) -> u8 {
200+
match runtime {
201+
StreamRuntime::Microphone(_) => 0,
202+
StreamRuntime::AppAudio(_) | StreamRuntime::SystemAudio(_) => 1,
203+
StreamRuntime::Synthetic(_) => 2,
204+
}
205+
}
206+
197207
fn remaining_deadline(deadline: Instant) -> Option<Duration> {
198208
deadline
199209
.checked_duration_since(Instant::now())
200210
.filter(|remaining| !remaining.is_zero())
201211
}
202212

203-
fn startup_timeout_message(action: &str, label: &str) -> String {
213+
fn lifecycle_timeout_message(action: &str, label: &str, timeout: Duration) -> String {
204214
format!(
205-
"timed out {action} {label} before the {}s startup deadline elapsed",
206-
SCREEN_CAPTURE_START_TIMEOUT.as_secs()
215+
"timed out {action} {label} before the {}s lifecycle deadline elapsed",
216+
timeout.as_secs()
207217
)
208218
}
209219

@@ -247,7 +257,7 @@ where
247257
return Err(TimedSourceStartError::Fatal {
248258
source: Some(source),
249259
cleanup: None,
250-
message: startup_timeout_message("starting", label),
260+
message: lifecycle_timeout_message("starting", label, NATIVE_SOURCE_START_TIMEOUT),
251261
});
252262
};
253263

@@ -281,7 +291,7 @@ where
281291
completion,
282292
ready,
283293
}),
284-
message: startup_timeout_message("starting", label),
294+
message: lifecycle_timeout_message("starting", label, NATIVE_SOURCE_START_TIMEOUT),
285295
}),
286296
Err(RecvTimeoutError::Disconnected) => match take_deferred_source_result(&completion) {
287297
Some(Ok(source)) => Ok(source),
@@ -361,6 +371,13 @@ impl PendingRuntimeCleanup {
361371
AppAudioSource::stop,
362372
PendingRuntimeCleanup::AppAudio,
363373
),
374+
Self::Microphone(handle) => progress_pending_cleanup(
375+
handle,
376+
"microphone source",
377+
deadline,
378+
MicrophoneSource::stop,
379+
PendingRuntimeCleanup::Microphone,
380+
),
364381
Self::SystemAudio(handle) => progress_pending_cleanup(
365382
handle,
366383
"system audio source",
@@ -377,10 +394,13 @@ fn pending_cleanup_from_ready_runtime(runtime: StreamRuntime) -> Option<PendingR
377394
StreamRuntime::AppAudio(source) => Some(PendingRuntimeCleanup::AppAudio(
378395
PendingCleanup::ready_to_stop(Ok(source)),
379396
)),
397+
StreamRuntime::Microphone(source) => Some(PendingRuntimeCleanup::Microphone(
398+
PendingCleanup::ready_to_stop(Ok(source)),
399+
)),
380400
StreamRuntime::SystemAudio(source) => Some(PendingRuntimeCleanup::SystemAudio(
381401
PendingCleanup::ready_to_stop(Ok(source)),
382402
)),
383-
StreamRuntime::Microphone(_) | StreamRuntime::Synthetic(_) => None,
403+
StreamRuntime::Synthetic(_) => None,
384404
}
385405
}
386406

@@ -404,8 +424,8 @@ fn cleanup_pending_cleanups_with_deadline(
404424
PendingRuntimeCleanupProgress::Pending(cleanup) => {
405425
if first_error.is_none() {
406426
first_error = Some(format!(
407-
"timed out waiting for cleanup of stream '{stream_id}' before the {}s startup deadline elapsed",
408-
SCREEN_CAPTURE_START_TIMEOUT.as_secs()
427+
"timed out waiting for cleanup of stream '{stream_id}' before the {}s lifecycle deadline elapsed",
428+
NATIVE_SOURCE_STOP_TIMEOUT.as_secs()
409429
));
410430
}
411431
remaining.insert(stream_id, cleanup);
@@ -489,17 +509,36 @@ fn start_stream_state_with_deadline(
489509
message,
490510
}),
491511
},
492-
StreamRuntime::Microphone(mut source) => match source.start() {
493-
Ok(()) => Ok(StreamRuntimeState {
512+
StreamRuntime::Microphone(source) => match start_native_source_with_deadline(
513+
source,
514+
"microphone source",
515+
deadline,
516+
MicrophoneSource::start,
517+
) {
518+
Ok(source) => Ok(StreamRuntimeState {
494519
runtime: StreamRuntime::Microphone(source),
495520
started: true,
496521
}),
497-
Err(message) => Err(StartStreamStateError::Recoverable {
498-
state: StreamRuntimeState {
522+
Err(TimedSourceStartError::Recoverable { source, message }) => {
523+
Err(StartStreamStateError::Recoverable {
524+
state: StreamRuntimeState {
525+
runtime: StreamRuntime::Microphone(source),
526+
started: false,
527+
},
528+
message,
529+
})
530+
}
531+
Err(TimedSourceStartError::Fatal {
532+
source,
533+
cleanup,
534+
message,
535+
}) => Err(StartStreamStateError::Fatal {
536+
state: source.map(|source| StreamRuntimeState {
499537
runtime: StreamRuntime::Microphone(source),
500538
started: false,
501-
},
502-
message: message.to_string(),
539+
}),
540+
cleanup: cleanup.map(PendingRuntimeCleanup::Microphone),
541+
message,
503542
}),
504543
},
505544
StreamRuntime::Synthetic(mut source) => match source.start() {
@@ -533,7 +572,7 @@ where
533572
return Err(TimedSourceStopError::Fatal {
534573
source: Some(source),
535574
cleanup: None,
536-
message: startup_timeout_message("stopping", label),
575+
message: lifecycle_timeout_message("stopping", label, NATIVE_SOURCE_STOP_TIMEOUT),
537576
});
538577
};
539578

@@ -568,7 +607,7 @@ where
568607
completion,
569608
ready,
570609
}),
571-
message: startup_timeout_message("stopping", label),
610+
message: lifecycle_timeout_message("stopping", label, NATIVE_SOURCE_STOP_TIMEOUT),
572611
}),
573612
Err(RecvTimeoutError::Disconnected) => match take_deferred_source_result(&completion) {
574613
Some(Ok(source)) => Ok(source),
@@ -635,11 +674,27 @@ fn stop_stream_runtime_with_deadline(
635674
message,
636675
}),
637676
},
638-
StreamRuntime::Microphone(mut source) => match source.stop() {
639-
Ok(()) => Ok(StreamRuntime::Microphone(source)),
640-
Err(message) => Err(StopStreamRuntimeError::Recoverable {
641-
runtime: StreamRuntime::Microphone(source),
642-
message: message.to_string(),
677+
StreamRuntime::Microphone(source) => match stop_native_source_with_deadline(
678+
source,
679+
"microphone source",
680+
deadline,
681+
MicrophoneSource::stop,
682+
) {
683+
Ok(source) => Ok(StreamRuntime::Microphone(source)),
684+
Err(TimedSourceStopError::Recoverable { source, message }) => {
685+
Err(StopStreamRuntimeError::Recoverable {
686+
runtime: StreamRuntime::Microphone(source),
687+
message,
688+
})
689+
}
690+
Err(TimedSourceStopError::Fatal {
691+
source,
692+
cleanup,
693+
message,
694+
}) => Err(StopStreamRuntimeError::Fatal {
695+
runtime: source.map(StreamRuntime::Microphone),
696+
cleanup: cleanup.map(PendingRuntimeCleanup::Microphone),
697+
message,
643698
}),
644699
},
645700
StreamRuntime::Synthetic(mut source) => match source.stop() {
@@ -1298,15 +1353,18 @@ impl PyAudioEngineBackend {
12981353
let was_poisoned = self.poisoned.is_some();
12991354
let (stop_result, remaining_cleanups) = py.detach(move || {
13001355
let mut first_error: Option<String> = None;
1301-
let deadline = Instant::now() + SCREEN_CAPTURE_START_TIMEOUT;
1356+
let deadline = Instant::now() + NATIVE_SOURCE_STOP_TIMEOUT;
13021357

13031358
let (mut remaining_cleanups, cleanup_error) =
13041359
cleanup_pending_cleanups_with_deadline(pending_cleanups, deadline);
13051360
if let Some(err) = cleanup_error {
13061361
first_error = Some(err);
13071362
}
13081363

1309-
for (stream_id, source) in sources {
1364+
let mut source_entries = sources.into_iter().collect::<Vec<_>>();
1365+
source_entries.sort_by_key(|(_, source)| runtime_stop_priority(&source.runtime));
1366+
1367+
for (stream_id, source) in source_entries {
13101368
if !source.started {
13111369
continue;
13121370
}
@@ -1359,12 +1417,22 @@ impl PyAudioEngineBackend {
13591417
if was_poisoned {
13601418
Ok(())
13611419
} else {
1362-
stop_result.map_err(PyRuntimeError::new_err)
1420+
stop_result.map_err(|message| {
1421+
if Self::lifecycle_timeout_error(&message) {
1422+
PyTimeoutError::new_err(message)
1423+
} else {
1424+
PyRuntimeError::new_err(message)
1425+
}
1426+
})
13631427
}
13641428
}
13651429
}
13661430

13671431
impl PyAudioEngineBackend {
1432+
fn lifecycle_timeout_error(message: &str) -> bool {
1433+
message.contains("timed out") && message.contains("lifecycle deadline")
1434+
}
1435+
13681436
fn rollback_failed_stream_creation(&mut self, stream_id: &str, err: PyErr) -> PyErr {
13691437
match self.controller.remove_stream(&stream_id.to_string()) {
13701438
Ok(()) => err,
@@ -1503,7 +1571,7 @@ impl PyAudioEngineBackend {
15031571
let stream_ids = self.stream_ids_for_routes(&route_ids)?;
15041572
let stream_states = self.take_stream_states(&stream_ids)?;
15051573
let started_states = match py.detach(move || {
1506-
start_stream_states_with_timeout(stream_states, SCREEN_CAPTURE_START_TIMEOUT)
1574+
start_stream_states_with_timeout(stream_states, NATIVE_SOURCE_START_TIMEOUT)
15071575
}) {
15081576
Ok(states) => states,
15091577
Err(StartStreamsError {
@@ -1580,7 +1648,7 @@ impl PyAudioEngineBackend {
15801648
let stream_ids = self.stream_ids_for_routes(&route_ids)?;
15811649
let stream_states = self.take_stream_states(&stream_ids)?;
15821650
let started_states = match py.detach(move || {
1583-
start_stream_states_with_timeout(stream_states, SCREEN_CAPTURE_START_TIMEOUT)
1651+
start_stream_states_with_timeout(stream_states, NATIVE_SOURCE_START_TIMEOUT)
15841652
}) {
15851653
Ok(states) => states,
15861654
Err(StartStreamsError {

0 commit comments

Comments
 (0)