Skip to content

Commit 82e6bac

Browse files
committed
fix: audio loopback
1 parent d239b8c commit 82e6bac

5 files changed

Lines changed: 71 additions & 32 deletions

File tree

pulsebeam/src/audio_selector.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
//! Room-level Top-N audio selector.
32
//!
43
//! A single async task per room that:
@@ -50,12 +49,14 @@ use std::{
5049
use futures_concurrency::stream::StreamGroup;
5150
use futures_concurrency::stream::stream_group::Key;
5251
use futures_lite::stream::Stream as _;
53-
use pulsebeam_runtime::sync::spmc;
52+
use pulsebeam_runtime::sync::{Arc, spmc};
5453
use tokio::{sync::mpsc, time::Instant};
5554

5655
use crate::{
57-
controller::MAX_SEND_AUDIO_SLOTS, entity::TrackId, rtp::RtpPacket, rtp::monitor::StreamState,
58-
track::TrackReceiver,
56+
controller::MAX_SEND_AUDIO_SLOTS,
57+
entity::TrackId,
58+
rtp::{AudioRtpPacket, RtpPacket, monitor::StreamState},
59+
track::{TrackMeta, TrackReceiver},
5960
};
6061

6162
/// Number of output slots produced by the selector; matches the controller's
@@ -84,15 +85,15 @@ pub struct AudioSelectorSubscription {
8485
/// slots. Slot 0 is the loudest speaker, slot 1 the second-loudest, etc.
8586
/// (assignments shift after each re-rank, but the slot ordering is stable
8687
/// within a re-rank window).
87-
pub receivers: Vec<spmc::Receiver<RtpPacket>>,
88+
pub receivers: Vec<spmc::Receiver<AudioRtpPacket>>,
8889
}
8990

9091
/// Room-side handle: send track commands and create per-participant subscriptions.
9192
pub struct AudioSelectorHandle {
9293
/// Send [`AudioSelectorCmd`]s to the background task.
9394
pub cmd_tx: mpsc::Sender<AudioSelectorCmd>,
9495
/// One prototype receiver per slot; cloned for every subscriber.
95-
receivers: Vec<spmc::Receiver<RtpPacket>>,
96+
receivers: Vec<spmc::Receiver<AudioRtpPacket>>,
9697
}
9798

9899
impl AudioSelectorHandle {
@@ -129,7 +130,7 @@ pub fn create(
129130
for _ in 0..SELECTOR_SLOTS {
130131
// 256-packet ring per slot. At 50 pkt/s that is 5 s of runway;
131132
// enough to absorb any downstream stall without dropping audio.
132-
let (tx, rx) = spmc::channel::<RtpPacket>(256);
133+
let (tx, rx) = spmc::channel::<AudioRtpPacket>(256);
133134
senders.push(tx);
134135
receivers.push(rx);
135136
}
@@ -162,6 +163,7 @@ struct InputTrackMeta {
162163
/// Shared speech-intensity state maintained by the upstream `StreamMonitor`.
163164
/// Atomic loads — zero overhead on the hot path.
164165
state: StreamState,
166+
meta: Arc<TrackMeta>,
165167
}
166168

167169
/// One `spmc` receiver wrapped as a `Stream<Item=(TrackId, RtpPacket)>` so it
@@ -213,7 +215,7 @@ impl futures_lite::stream::Stream for InputStream {
213215
/// One of the N output slots produced by the selector.
214216
struct OutputSlot {
215217
/// Broadcast channel; all participant subscriptions share the same ring.
216-
sender: spmc::Sender<RtpPacket>,
218+
sender: spmc::Sender<AudioRtpPacket>,
217219
/// Which input track is currently assigned to this slot.
218220
track_id: Option<TrackId>,
219221
}
@@ -310,12 +312,13 @@ impl TopNAudioSelector {
310312
}
311313
let sim = track.lowest_quality();
312314
let state = sim.state.clone();
315+
let meta = track.meta.clone();
313316
let receiver = sim.channel.clone();
314317
let key = self.inputs.insert(InputStream {
315318
track_id: id,
316319
receiver,
317320
});
318-
self.tracks.insert(id, InputTrackMeta { key, state });
321+
self.tracks.insert(id, InputTrackMeta { key, state, meta });
319322
}
320323
AudioSelectorCmd::RemoveTrack(id) => {
321324
self.remove_track(id);
@@ -342,9 +345,19 @@ impl TopNAudioSelector {
342345
/// only occurs in the ≤200 ms window between a speaker becoming active and
343346
/// the next re-rank pass assigning them a slot.
344347
fn forward(&mut self, track_id: TrackId, packet: RtpPacket) {
345-
if let Some(slot) = self.slots.iter_mut().find(|s| s.track_id == Some(track_id)) {
346-
slot.sender.send(packet);
347-
}
348+
let Some(slot) = self.slots.iter_mut().find(|s| s.track_id == Some(track_id)) else {
349+
return;
350+
};
351+
352+
let Some(track) = self.tracks.get(&track_id) else {
353+
return;
354+
};
355+
356+
slot.sender.send(AudioRtpPacket {
357+
participant_id: track.meta.origin_participant,
358+
track_id,
359+
packet,
360+
});
348361
}
349362

350363
// ── Re-ranking ────────────────────────────────────────────────────────────

pulsebeam/src/participant/core.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl ParticipantCore {
111111
udp_batcher,
112112
tcp_batcher,
113113
upstream: UpstreamAllocator::new(),
114-
downstream: DownstreamAllocator::new(manual_sub),
114+
downstream: DownstreamAllocator::new(participant_id, manual_sub),
115115
slot_meta: HashMap::new(),
116116
disconnect_reason: None,
117117
events: Vec::with_capacity(32),

pulsebeam/src/participant/downstream/audio.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use std::task::{Context, Poll};
55

66
use crate::audio_selector::AudioSelectorSubscription;
77
use crate::controller::MAX_SEND_AUDIO_SLOTS;
8+
use crate::entity::ParticipantId;
89
use crate::rtp;
9-
use crate::rtp::RtpPacket;
1010
use crate::rtp::timeline::Timeline;
11+
use crate::rtp::{AudioRtpPacket, RtpPacket};
1112
use str0m::media::Mid;
1213
use str0m::rtp::Ssrc;
1314

@@ -18,6 +19,7 @@ use str0m::rtp::Ssrc;
1819
/// every audio `Mid` negotiated with the client. The allocator pairs each
1920
/// slot index with the matching subscription receiver index.
2021
pub struct AudioAllocator {
22+
participant_id: ParticipantId,
2123
/// One stream per negotiated audio Mid. Each stream wraps one selector
2224
/// output receiver + one `AudioSlot` (timeline rewriter).
2325
///
@@ -37,8 +39,9 @@ pub struct AudioAllocator {
3739
}
3840

3941
impl AudioAllocator {
40-
pub fn new() -> Self {
42+
pub fn new(participant_id: ParticipantId) -> Self {
4143
Self {
44+
participant_id,
4245
slots: SlotGroup::with_capacity(MAX_SEND_AUDIO_SLOTS),
4346
pending_sub: None,
4447
}
@@ -67,7 +70,11 @@ impl AudioAllocator {
6770
/// regardless of the Top-N ranking. Returns `false` if `slot_index` has
6871
/// not yet been registered via [`add_slot`].
6972
#[allow(unused)]
70-
pub fn pin_slot(&mut self, slot_index: usize, receiver: spmc::Receiver<RtpPacket>) -> bool {
73+
pub fn pin_slot(
74+
&mut self,
75+
slot_index: usize,
76+
receiver: spmc::Receiver<AudioRtpPacket>,
77+
) -> bool {
7178
let Some(mut stream) = self.slots.get_mut(slot_index) else {
7279
return false;
7380
};
@@ -113,9 +120,20 @@ impl AudioAllocator {
113120
#[inline]
114121
pub(super) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Mid, RtpPacket)>> {
115122
use futures_lite::stream::Stream as _;
116-
match Pin::new(&mut self.slots).poll_next(cx) {
117-
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
118-
Poll::Ready(None) | Poll::Pending => Poll::Pending,
123+
124+
// Drop packets that originate from this participant to avoid sending
125+
// loopback audio back to the sender.
126+
loop {
127+
match Pin::new(&mut self.slots).poll_next(cx) {
128+
Poll::Ready(Some((mid, packet))) => {
129+
if packet.participant_id == self.participant_id {
130+
// Keep draining until we find a packet from another participant.
131+
continue;
132+
}
133+
return Poll::Ready(Some((mid, packet.packet)));
134+
}
135+
Poll::Ready(None) | Poll::Pending => return Poll::Pending,
136+
}
119137
}
120138
}
121139
}
@@ -132,7 +150,7 @@ impl AudioAllocator {
132150
/// the session lifetime; only the underlying receiver is swapped on
133151
/// subscription/pin changes.
134152
struct AudioInputStream {
135-
receiver: Option<spmc::Receiver<RtpPacket>>,
153+
receiver: Option<spmc::Receiver<AudioRtpPacket>>,
136154
slot: AudioSlot,
137155
}
138156

@@ -147,14 +165,14 @@ impl AudioInputStream {
147165
/// Swap in a new receiver and reset the timeline so the SSRC-change
148166
/// detection in `AudioSlot::process` triggers a clean rebase on the
149167
/// first packet from the new source.
150-
fn set_receiver(&mut self, receiver: spmc::Receiver<RtpPacket>) {
168+
fn set_receiver(&mut self, receiver: spmc::Receiver<AudioRtpPacket>) {
151169
self.receiver = Some(receiver);
152170
self.slot.last_ssrc = None;
153171
}
154172
}
155173

156174
impl futures_lite::stream::Stream for AudioInputStream {
157-
type Item = (Mid, RtpPacket);
175+
type Item = (Mid, AudioRtpPacket);
158176

159177
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160178
let this = self.get_mut();
@@ -203,11 +221,12 @@ impl AudioSlot {
203221
}
204222
}
205223

206-
fn process(&mut self, packet: RtpPacket) -> (Mid, RtpPacket) {
207-
if self.last_ssrc != Some(packet.ssrc) {
208-
self.last_ssrc = Some(packet.ssrc);
209-
self.timeline.rebase(&packet);
224+
fn process(&mut self, mut p: AudioRtpPacket) -> (Mid, AudioRtpPacket) {
225+
if self.last_ssrc != Some(p.packet.ssrc) {
226+
self.last_ssrc = Some(p.packet.ssrc);
227+
self.timeline.rebase(&p.packet);
210228
}
211-
(self.mid, self.timeline.rewrite(packet))
229+
p.packet = self.timeline.rewrite(p.packet);
230+
(self.mid, p)
212231
}
213232
}

pulsebeam/src/participant/downstream/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod audio;
22
mod video;
33

44
use crate::audio_selector::AudioSelectorSubscription;
5+
use crate::entity::ParticipantId;
56
use crate::participant::downstream::audio::AudioAllocator;
67
use crate::participant::downstream::video::{SlotConfig, VideoAllocator};
78
use crate::rtp::RtpPacket;
@@ -26,10 +27,10 @@ pub struct DownstreamAllocator {
2627
}
2728

2829
impl DownstreamAllocator {
29-
pub fn new(manual_sub: bool) -> Self {
30+
pub fn new(participant_id: ParticipantId, manual_sub: bool) -> Self {
3031
Self {
3132
available_bandwidth: MIN_BANDWIDTH,
32-
audio: AudioAllocator::new(),
33+
audio: AudioAllocator::new(participant_id),
3334
video: VideoAllocator::new(manual_sub),
3435
dirty_allocation: false,
3536
}

pulsebeam/src/rtp/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use str0m::rtp::rtcp::SenderInfo;
1212
use str0m::rtp::{ExtensionValues, SeqNo, Ssrc};
1313
use tokio::time::Instant;
1414

15+
use crate::entity::{ParticipantId, TrackId};
16+
1517
/// Pool capacity: 16 384 slots × 2 048 bytes ≈ 32 MB resident.
1618
///
1719
/// Trade memory for zero jemalloc on the hot path. RTP payload pool and
@@ -41,16 +43,20 @@ pub enum Codec {
4143
Opus,
4244
}
4345

46+
#[derive(Clone, Debug)]
47+
pub struct AudioRtpPacket {
48+
pub participant_id: ParticipantId,
49+
pub track_id: TrackId,
50+
pub packet: RtpPacket,
51+
}
52+
4453
/// Unified internal RTP packet representation used across the SFU.
4554
/// This struct is designed for mutability and composition in middleware.
4655
/// Only the fields actually consumed by the forwarding pipeline are kept here;
4756
/// redundant header data (sequence_number, timestamp, csrc list, etc.) is dropped
4857
/// at ingress so every ring-slot stays as small as possible.
4958
#[derive(Debug, Clone, PartialEq, Eq)]
5059
pub struct RtpPacket {
51-
// NOTE: `payload` is a `PoolBuf` — a custom Arc-like handle backed by
52-
// `rtp_payload_pool()`. `.clone()` is a single atomic increment with no
53-
// heap allocation, making SPMC fan-out essentially free.
5460
pub ssrc: Ssrc,
5561
pub marker: bool,
5662
pub ext_vals: ExtensionValues,

0 commit comments

Comments
 (0)