Skip to content

Commit ca2ca66

Browse files
authored
Merge pull request #825 from RustAudio/perf/optimize-pipeline
perf: improve mixer and sample rate conversion
2 parents 600962d + a9b19a8 commit ca2ca66

4 files changed

Lines changed: 50 additions & 57 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4848
- `Blue` noise generator uses uniform instead of Gaussian noise for better performance.
4949
- `Gaussian` noise generator has standard deviation of 0.6 for perceptual equivalence.
5050
- `Velvet` noise generator takes density in Hz as `usize` instead of `f32`.
51-
- Upgrade `cpal` to v0.17.
51+
- Upgraded `cpal` to v0.17.
5252
- Clarified `Source::current_span_len()` contract documentation.
53+
- Improved queue, mixer and sample rate conversion performance.
5354

5455
## Version [0.21.1] (2025-07-14)
5556

src/conversions/sample_rate.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::common::{ChannelCount, SampleRate};
22
use crate::{math, Sample};
33
use num_rational::Ratio;
4+
use std::collections::VecDeque;
45
use std::mem;
56

67
/// Iterator that converts from a certain sample rate to another.
@@ -27,7 +28,7 @@ where
2728
/// This counter is incremented (modulo `to`) every time the iterator is called.
2829
next_output_span_pos_in_chunk: u32,
2930
/// The buffer containing the samples waiting to be output.
30-
output_buffer: Vec<I::Item>,
31+
output_buffer: VecDeque<I::Item>,
3132
}
3233

3334
impl<I> SampleRateConverter<I>
@@ -81,7 +82,10 @@ where
8182
next_output_span_pos_in_chunk: 0,
8283
current_span: first_samples,
8384
next_frame: next_samples,
84-
output_buffer: Vec::with_capacity(num_channels.get() as usize - 1),
85+
// Capacity: worst case is upsampling where we buffer multiple frames worth of samples.
86+
output_buffer: VecDeque::with_capacity(
87+
(to as f32 / from as f32).ceil() as usize * num_channels.get() as usize,
88+
),
8589
}
8690
}
8791

@@ -126,8 +130,8 @@ where
126130
}
127131

128132
// Short circuit if there are some samples waiting.
129-
if !self.output_buffer.is_empty() {
130-
return Some(self.output_buffer.remove(0));
133+
if let Some(sample) = self.output_buffer.pop_front() {
134+
return Some(sample);
131135
}
132136

133137
// The span we are going to return from this function will be a linear interpolation
@@ -172,7 +176,7 @@ where
172176
if off == 0 {
173177
result = Some(sample);
174178
} else {
175-
self.output_buffer.push(sample);
179+
self.output_buffer.push_back(sample);
176180
}
177181
}
178182

@@ -183,14 +187,10 @@ where
183187
result
184188
} else {
185189
// draining `self.current_span`
186-
if !self.current_span.is_empty() {
187-
let r = Some(self.current_span.remove(0));
188-
mem::swap(&mut self.output_buffer, &mut self.current_span);
189-
self.current_span.clear();
190-
r
191-
} else {
192-
None
193-
}
190+
let mut current_span = self.current_span.drain(..);
191+
let r = current_span.next()?;
192+
self.output_buffer.extend(current_span);
193+
Some(r)
194194
}
195195
}
196196

src/mixer.rs

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
use crate::common::{ChannelCount, SampleRate};
44
use crate::source::{SeekError, Source, UniformSourceIterator};
55
use crate::Sample;
6-
use std::sync::atomic::{AtomicBool, Ordering};
7-
use std::sync::{Arc, Mutex};
6+
use std::sync::Arc;
87
use std::time::Duration;
98

9+
#[cfg(feature = "crossbeam-channel")]
10+
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
11+
#[cfg(not(feature = "crossbeam-channel"))]
12+
use std::sync::mpsc::{channel, Receiver, Sender};
13+
1014
/// Builds a new mixer.
1115
///
1216
/// You can choose the characteristics of the output thanks to this constructor. All the sounds
@@ -19,9 +23,10 @@ use std::time::Duration;
1923
/// As a result, input sources added to the mixer later might not be forwarded to the sink.
2024
/// Add `Zero` source to prevent detaching the mixer from sink.
2125
pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) {
26+
let (tx, rx) = channel();
27+
2228
let input = Mixer(Arc::new(Inner {
23-
has_pending: AtomicBool::new(false),
24-
pending_sources: Mutex::new(Vec::new()),
29+
pending_tx: tx,
2530
channels,
2631
sample_rate,
2732
}));
@@ -31,7 +36,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo
3136
input: input.clone(),
3237
sample_count: 0,
3338
still_pending: vec![],
34-
still_current: vec![],
39+
pending_rx: rx,
3540
};
3641

3742
(input, output)
@@ -42,8 +47,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo
4247
pub struct Mixer(Arc<Inner>);
4348

4449
struct Inner {
45-
has_pending: AtomicBool,
46-
pending_sources: Mutex<Vec<Box<dyn Source + Send>>>,
50+
pending_tx: Sender<Box<dyn Source + Send>>,
4751
channels: ChannelCount,
4852
sample_rate: SampleRate,
4953
}
@@ -57,12 +61,8 @@ impl Mixer {
5761
{
5862
let uniform_source =
5963
UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate);
60-
self.0
61-
.pending_sources
62-
.lock()
63-
.unwrap()
64-
.push(Box::new(uniform_source) as Box<_>);
65-
self.0.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering?
64+
// Ignore send errors (channel dropped means MixerSource was dropped)
65+
let _ = self.0.pending_tx.send(Box::new(uniform_source));
6666
}
6767
}
6868

@@ -80,8 +80,8 @@ pub struct MixerSource {
8080
// A temporary vec used in start_pending_sources.
8181
still_pending: Vec<Box<dyn Source + Send>>,
8282

83-
// A temporary vec used in sum_current_sources.
84-
still_current: Vec<Box<dyn Source + Send>>,
83+
// Receiver for pending sources from the channel.
84+
pending_rx: Receiver<Box<dyn Source + Send>>,
8585
}
8686

8787
impl Source for MixerSource {
@@ -118,9 +118,7 @@ impl Iterator for MixerSource {
118118

119119
#[inline]
120120
fn next(&mut self) -> Option<Self::Item> {
121-
if self.input.0.has_pending.load(Ordering::SeqCst) {
122-
self.start_pending_sources();
123-
}
121+
self.start_pending_sources();
124122

125123
self.sample_count += 1;
126124

@@ -145,9 +143,7 @@ impl MixerSource {
145143
// in-step with the modulo of the samples produced so far. Otherwise, the
146144
// sound will play on the wrong channels, e.g. left / right will be reversed.
147145
fn start_pending_sources(&mut self) {
148-
let mut pending = self.input.0.pending_sources.lock().unwrap(); // TODO: relax ordering?
149-
150-
for source in pending.drain(..) {
146+
while let Ok(source) = self.pending_rx.try_recv() {
151147
let in_step = self
152148
.sample_count
153149
.is_multiple_of(source.channels().get() as usize);
@@ -158,24 +154,19 @@ impl MixerSource {
158154
self.still_pending.push(source);
159155
}
160156
}
161-
std::mem::swap(&mut self.still_pending, &mut pending);
162-
163-
let has_pending = !pending.is_empty();
164-
self.input
165-
.0
166-
.has_pending
167-
.store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
168157
}
169158

170159
fn sum_current_sources(&mut self) -> Sample {
171160
let mut sum = 0.0;
172-
for mut source in self.current_sources.drain(..) {
173-
if let Some(value) = source.next() {
174-
sum += value;
175-
self.still_current.push(source);
161+
self.current_sources.retain_mut(|source| {
162+
match source.next() {
163+
Some(value) => {
164+
sum += value;
165+
true // Keep this source
166+
}
167+
None => false, // Remove exhausted source
176168
}
177-
}
178-
std::mem::swap(&mut self.still_current, &mut self.current_sources);
169+
});
179170

180171
sum
181172
}

src/queue.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Queue that plays sounds one after the other.
22
3+
use std::collections::VecDeque;
34
use std::sync::atomic::{AtomicBool, Ordering};
45
use std::sync::{Arc, Mutex};
56
use std::time::Duration;
@@ -26,7 +27,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
2627
///
2728
pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
2829
let input = Arc::new(SourcesQueueInput {
29-
next_sounds: Mutex::new(Vec::new()),
30+
next_sounds: Mutex::new(VecDeque::new()),
3031
keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
3132
});
3233

@@ -48,7 +49,7 @@ type SignalDone = Option<Sender<()>>;
4849

4950
/// The input of the queue.
5051
pub struct SourcesQueueInput {
51-
next_sounds: Mutex<Vec<(Sound, SignalDone)>>,
52+
next_sounds: Mutex<VecDeque<(Sound, SignalDone)>>,
5253

5354
// See constructor.
5455
keep_alive_if_empty: AtomicBool,
@@ -64,7 +65,7 @@ impl SourcesQueueInput {
6465
self.next_sounds
6566
.lock()
6667
.unwrap()
67-
.push((Box::new(source) as Box<_>, None));
68+
.push_back((Box::new(source) as Box<_>, None));
6869
}
6970

7071
/// Adds a new source to the end of the queue.
@@ -81,7 +82,7 @@ impl SourcesQueueInput {
8182
self.next_sounds
8283
.lock()
8384
.unwrap()
84-
.push((Box::new(source) as Box<_>, Some(tx)));
85+
.push_back((Box::new(source) as Box<_>, Some(tx)));
8586
rx
8687
}
8788

@@ -175,7 +176,7 @@ impl Source for SourcesQueueOutput {
175176
// - After append: the appended source while playing
176177
// - With keep_alive: Zero (silence) while playing
177178
self.current.channels()
178-
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() {
179+
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
179180
// Current source exhausted, peek at next queued source
180181
// This is critical: UniformSourceIterator queries metadata during append,
181182
// before any samples are pulled. We must report the next source's metadata.
@@ -194,7 +195,7 @@ impl Source for SourcesQueueOutput {
194195
if !self.current.is_exhausted() {
195196
// Current source is active (producing samples)
196197
self.current.sample_rate()
197-
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() {
198+
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
198199
// Current source exhausted, peek at next queued source
199200
// This prevents wrong resampling setup in UniformSourceIterator
200201
next.sample_rate()
@@ -282,7 +283,9 @@ impl SourcesQueueOutput {
282283
let (next, signal_after_end) = {
283284
let mut next = self.input.next_sounds.lock().unwrap();
284285

285-
if next.is_empty() {
286+
if let Some(next) = next.pop_front() {
287+
next
288+
} else {
286289
let channels = self.current.channels();
287290
let silence = Box::new(Zero::new_samples(
288291
channels,
@@ -295,8 +298,6 @@ impl SourcesQueueOutput {
295298
} else {
296299
return Err(());
297300
}
298-
} else {
299-
next.remove(0)
300301
}
301302
};
302303

0 commit comments

Comments
 (0)