Skip to content

Commit 4886f65

Browse files
synchronize audio pipeline stop with video completion (#1316)
* fix: synchronize audio pipeline stop with video completion * fmt
1 parent c1bc7ed commit 4886f65

4 files changed

Lines changed: 44 additions & 42 deletions

File tree

apps/desktop/src-tauri/src/recording.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -594,9 +594,9 @@ pub async fn start_recording(
594594
}
595595
.await;
596596

597-
let actor_done_fut = match spawn_actor_res.flatten() {
598-
Ok(rx) => rx,
599-
Err(err) => {
597+
let actor_done_fut = match spawn_actor_res {
598+
Ok(Ok(rx)) => rx,
599+
Ok(Err(err)) | Err(err) => {
600600
let _ = RecordingEvent::Failed { error: err.clone() }.emit(&app);
601601

602602
let mut dialog = MessageDialogBuilder::new(

crates/recording/src/output_pipeline/core.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,11 @@ impl<TVideo: VideoSource> OutputPipelineBuilder<HasVideo<TVideo>> {
219219
Ok(OutputPipeline {
220220
path,
221221
first_timestamp_rx: first_rx,
222-
stop_token: Some(stop_token.drop_guard()),
222+
stop_token: Some(stop_token.clone().drop_guard()),
223223
video_info: Some(video_info),
224224
done_fut: done_rx,
225225
pause_flag,
226+
cancel_token: stop_token,
226227
})
227228
}
228229
}
@@ -272,10 +273,11 @@ impl OutputPipelineBuilder<NoVideo> {
272273
Ok(OutputPipeline {
273274
path,
274275
first_timestamp_rx: first_rx,
275-
stop_token: Some(stop_token.drop_guard()),
276+
stop_token: Some(stop_token.clone().drop_guard()),
276277
video_info: None,
277278
done_fut: done_rx,
278279
pause_flag,
280+
cancel_token: stop_token,
279281
})
280282
}
281283
}
@@ -559,6 +561,7 @@ pub struct OutputPipeline {
559561
video_info: Option<VideoInfo>,
560562
done_fut: DoneFut,
561563
pause_flag: Arc<AtomicBool>,
564+
cancel_token: CancellationToken,
562565
}
563566

564567
pub struct FinishedOutputPipeline {
@@ -614,6 +617,14 @@ impl OutputPipeline {
614617
pub fn done_fut(&self) -> DoneFut {
615618
self.done_fut.clone()
616619
}
620+
621+
pub fn cancel_token(&self) -> CancellationToken {
622+
self.cancel_token.clone()
623+
}
624+
625+
pub fn cancel(&self) {
626+
self.cancel_token.cancel();
627+
}
617628
}
618629

619630
pub struct ChannelVideoSourceConfig<TVideoFrame> {

crates/recording/src/sources/audio_mixer.rs

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use cap_media_info::AudioInfo;
22
use cap_timestamp::{Timestamp, Timestamps};
33
use futures::channel::{mpsc, oneshot};
4+
#[cfg(not(any(target_os = "macos", windows)))]
5+
use std::time::Instant;
46
use std::{
57
collections::VecDeque,
68
sync::{
79
Arc,
810
atomic::{AtomicBool, Ordering},
911
},
10-
time::{Duration, Instant},
12+
time::Duration,
1113
};
1214
use tracing::{debug, info};
1315

@@ -238,43 +240,10 @@ impl AudioMixer {
238240
fn buffer_sources(&mut self, now: Timestamp) {
239241
for source in &mut self.sources {
240242
let rate = source.info.rate();
241-
let buffer_timeout = source.buffer_timeout;
243+
let _buffer_timeout = source.buffer_timeout;
242244

243-
if let Some(last) = source.buffer_last {
244-
let last_end = last.0 + last.1;
245-
if let Some(elapsed_since_last) = now
246-
.duration_since(self.timestamps)
247-
.checked_sub(last_end.duration_since(self.timestamps))
248-
{
249-
let mut remaining = elapsed_since_last;
250-
251-
while remaining > buffer_timeout {
252-
let chunk_samples = samples_for_timeout(rate, buffer_timeout);
253-
let frame_duration = duration_from_samples(chunk_samples, rate);
254-
255-
let mut frame = ffmpeg::frame::Audio::new(
256-
source.info.sample_format,
257-
chunk_samples,
258-
source.info.channel_layout(),
259-
);
260-
frame.set_rate(source.info.rate() as u32);
261-
262-
for i in 0..frame.planes() {
263-
frame.data_mut(i).fill(0);
264-
}
265-
266-
let timestamp = last_end + (elapsed_since_last - remaining);
267-
source.buffer_last = Some((timestamp, frame_duration));
268-
source.buffer.push_back(AudioFrame::new(frame, timestamp));
269-
270-
if frame_duration.is_zero() {
271-
break;
272-
}
273-
274-
remaining = remaining.saturating_sub(frame_duration);
275-
}
276-
}
277-
}
245+
// Do not inject silence based on wall-clock pacing. We only bridge actual gaps
246+
// when a new frame arrives (below), to keep emission data-driven.
278247

279248
while let Ok(Some(AudioFrame {
280249
inner: frame,

crates/recording/src/studio_recording.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,28 @@ impl Pipeline {
289289
futures.push(system_audio.done_fut());
290290
}
291291

292+
// Ensure non-video pipelines stop promptly when the video pipeline completes
293+
{
294+
let mic_cancel = self.microphone.as_ref().map(|p| p.cancel_token());
295+
let cam_cancel = self.camera.as_ref().map(|p| p.cancel_token());
296+
let sys_cancel = self.system_audio.as_ref().map(|p| p.cancel_token());
297+
298+
let screen_done = self.screen.done_fut();
299+
tokio::spawn(async move {
300+
// When screen (video) finishes, cancel the other pipelines
301+
let _ = screen_done.await;
302+
if let Some(token) = mic_cancel.as_ref() {
303+
token.cancel();
304+
}
305+
if let Some(token) = cam_cancel.as_ref() {
306+
token.cancel();
307+
}
308+
if let Some(token) = sys_cancel.as_ref() {
309+
token.cancel();
310+
}
311+
});
312+
}
313+
292314
tokio::spawn(async move {
293315
while let Some(res) = futures.next().await {
294316
if let Err(err) = res

0 commit comments

Comments
 (0)