Skip to content

Commit d1e708a

Browse files
Refactor playback prefetching to use tokio mpsc and futures
Co-authored-by: richiemcilroy1 <richiemcilroy1@gmail.com>
1 parent 23f8516 commit d1e708a

File tree

2 files changed

+107
-48
lines changed

2 files changed

+107
-48
lines changed

apps/desktop/core

54.8 MB
Binary file not shown.

crates/editor/src/playback.rs

Lines changed: 107 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ use cpal::{
99
BufferSize, SampleFormat, SupportedBufferSize,
1010
traits::{DeviceTrait, HostTrait, StreamTrait},
1111
};
12-
use std::{collections::VecDeque, sync::Arc, time::Duration};
13-
use tokio::{sync::watch, time::Instant};
12+
use futures::stream::{FuturesUnordered, StreamExt};
13+
use std::{collections::{HashSet, VecDeque}, sync::Arc, time::Duration};
14+
use tokio::{sync::{mpsc as tokio_mpsc, watch}, time::Instant};
1415
use tracing::{error, info, trace, warn};
1516

1617
use crate::{
@@ -20,8 +21,8 @@ use crate::{
2021
segments::get_audio_segments,
2122
};
2223

23-
const PREFETCH_BUFFER_SIZE: usize = 12;
24-
const PREFETCH_AHEAD_FRAMES: u32 = 16;
24+
const PREFETCH_BUFFER_SIZE: usize = 16;
25+
const PARALLEL_DECODE_TASKS: usize = 4;
2526

2627
#[derive(Debug)]
2728
pub enum PlaybackStartError {
@@ -79,6 +80,97 @@ impl Playback {
7980
event_rx,
8081
};
8182

83+
let (prefetch_tx, mut prefetch_rx) = tokio_mpsc::channel::<PrefetchedFrame>(PREFETCH_BUFFER_SIZE * 2);
84+
let (frame_request_tx, mut frame_request_rx) = watch::channel(self.start_frame_number);
85+
86+
let prefetch_stop_rx = stop_rx.clone();
87+
let prefetch_project = self.project.clone();
88+
let prefetch_segment_medias = self.segment_medias.clone();
89+
let prefetch_duration = if let Some(timeline) = &self.project.borrow().timeline {
90+
timeline.duration()
91+
} else {
92+
f64::MAX
93+
};
94+
95+
tokio::spawn(async move {
96+
let mut next_prefetch_frame = *frame_request_rx.borrow();
97+
let mut in_flight: FuturesUnordered<_> = FuturesUnordered::new();
98+
let mut in_flight_frames: HashSet<u32> = HashSet::new();
99+
100+
loop {
101+
if *prefetch_stop_rx.borrow() {
102+
break;
103+
}
104+
105+
if let Ok(true) = frame_request_rx.has_changed() {
106+
let requested = *frame_request_rx.borrow_and_update();
107+
if requested > next_prefetch_frame {
108+
next_prefetch_frame = requested;
109+
in_flight_frames.retain(|&f| f >= requested);
110+
}
111+
}
112+
113+
while in_flight.len() < PARALLEL_DECODE_TASKS {
114+
let frame_num = next_prefetch_frame;
115+
let prefetch_time = frame_num as f64 / fps_f64;
116+
117+
if prefetch_time >= prefetch_duration {
118+
break;
119+
}
120+
121+
if in_flight_frames.contains(&frame_num) {
122+
next_prefetch_frame += 1;
123+
continue;
124+
}
125+
126+
let project = prefetch_project.borrow().clone();
127+
128+
if let Some((segment_time, segment)) = project.get_segment_time(prefetch_time) {
129+
if let Some(segment_media) = prefetch_segment_medias.get(segment.recording_clip as usize) {
130+
let clip_offsets = project
131+
.clips
132+
.iter()
133+
.find(|v| v.index == segment.recording_clip)
134+
.map(|v| v.offsets)
135+
.unwrap_or_default();
136+
137+
let decoders = segment_media.decoders.clone();
138+
let hide_camera = project.camera.hide;
139+
let segment_index = segment.recording_clip;
140+
141+
in_flight_frames.insert(frame_num);
142+
143+
in_flight.push(async move {
144+
let result = decoders
145+
.get_frames(segment_time as f32, !hide_camera, clip_offsets)
146+
.await;
147+
(frame_num, segment_index, result)
148+
});
149+
}
150+
}
151+
152+
next_prefetch_frame += 1;
153+
}
154+
155+
tokio::select! {
156+
biased;
157+
158+
Some((frame_num, segment_index, result)) = in_flight.next() => {
159+
in_flight_frames.remove(&frame_num);
160+
if let Some(segment_frames) = result {
161+
let _ = prefetch_tx.send(PrefetchedFrame {
162+
frame_number: frame_num,
163+
segment_frames,
164+
segment_index,
165+
}).await;
166+
}
167+
}
168+
169+
_ = tokio::time::sleep(Duration::from_millis(1)), if in_flight.is_empty() => {}
170+
}
171+
}
172+
});
173+
82174
tokio::spawn(async move {
83175
let start = Instant::now();
84176

@@ -100,41 +192,16 @@ impl Playback {
100192
let frame_duration = Duration::from_secs_f64(1.0 / fps_f64);
101193
let mut frame_number = self.start_frame_number;
102194
let mut prefetch_buffer: VecDeque<PrefetchedFrame> = VecDeque::with_capacity(PREFETCH_BUFFER_SIZE);
103-
let mut prefetch_frame = self.start_frame_number;
104-
let mut last_rendered_frame: Option<u32> = None;
105-
let max_frame_skip = 2u32;
195+
let max_frame_skip = 3u32;
106196

107197
'playback: loop {
108-
while prefetch_buffer.len() < PREFETCH_BUFFER_SIZE && !*stop_rx.borrow() {
109-
let prefetch_time = prefetch_frame as f64 / fps_f64;
110-
if prefetch_time >= duration {
111-
break;
112-
}
113-
114-
let project = self.project.borrow().clone();
115-
if let Some((segment_time, segment)) = project.get_segment_time(prefetch_time) {
116-
if let Some(segment_media) = self.segment_medias.get(segment.recording_clip as usize) {
117-
let clip_offsets = project
118-
.clips
119-
.iter()
120-
.find(|v| v.index == segment.recording_clip)
121-
.map(|v| v.offsets)
122-
.unwrap_or_default();
123-
124-
if let Some(segment_frames) = segment_media
125-
.decoders
126-
.get_frames(segment_time as f32, !project.camera.hide, clip_offsets)
127-
.await
128-
{
129-
prefetch_buffer.push_back(PrefetchedFrame {
130-
frame_number: prefetch_frame,
131-
segment_frames,
132-
segment_index: segment.recording_clip,
133-
});
134-
}
198+
while let Ok(prefetched) = prefetch_rx.try_recv() {
199+
if prefetched.frame_number >= frame_number {
200+
prefetch_buffer.push_back(prefetched);
201+
if prefetch_buffer.len() > PREFETCH_BUFFER_SIZE {
202+
prefetch_buffer.pop_front();
135203
}
136204
}
137-
prefetch_frame = prefetch_frame.saturating_add(1);
138205
}
139206

140207
let frame_offset = frame_number.saturating_sub(self.start_frame_number) as f64;
@@ -156,10 +223,10 @@ impl Playback {
156223

157224
let project = self.project.borrow().clone();
158225

159-
let prefetched = prefetch_buffer.iter().position(|p| p.frame_number == frame_number);
226+
let prefetched_idx = prefetch_buffer.iter().position(|p| p.frame_number == frame_number);
160227

161-
let segment_frames_opt = if let Some(idx) = prefetched {
162-
let prefetched = prefetch_buffer.remove(idx);
228+
let segment_frames_opt = if let Some(idx) = prefetched_idx {
229+
let prefetched = prefetch_buffer.remove(idx).unwrap();
163230
Some((prefetched.segment_frames, prefetched.segment_index))
164231
} else {
165232
let Some((segment_time, segment)) = project.get_segment_time(playback_time) else {
@@ -207,8 +274,6 @@ impl Playback {
207274
self.renderer
208275
.render_frame(segment_frames, uniforms, segment_media.cursor.clone())
209276
.await;
210-
211-
last_rendered_frame = Some(frame_number);
212277
}
213278

214279
event_tx.send(PlaybackEvent::Frame(frame_number)).ok();
@@ -228,14 +293,8 @@ impl Playback {
228293
trace!("Limiting frame skip to {} (was {} behind)", max_frame_skip, frames_behind);
229294
}
230295

231-
while let Some(front) = prefetch_buffer.front() {
232-
if front.frame_number < frame_number {
233-
prefetch_buffer.pop_front();
234-
} else {
235-
break;
236-
}
237-
}
238-
prefetch_frame = prefetch_frame.max(frame_number);
296+
prefetch_buffer.retain(|p| p.frame_number >= frame_number);
297+
let _ = frame_request_tx.send(frame_number);
239298
}
240299
}
241300

0 commit comments

Comments
 (0)