Skip to content

Commit 82d1bf4

Browse files
committed
Better handling of decoder spawn
1 parent c99f8e3 commit 82d1bf4

3 files changed

Lines changed: 153 additions & 123 deletions

File tree

smelter-api/src/input/moq_into.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl TryFrom<MoqInput> for core::RegisterInputOptions {
2525

2626
let input_options = core::MoqServerInputOptions {
2727
broadcast_path,
28-
decoders: core::MoqServerInputDecoders { h264 },
28+
decoders: core::MoqServerInputDecoders { h264, aac: None },
2929
queue_options: core::QueueInputOptions {
3030
required: required.unwrap_or(false),
3131
video_side_channel: side_channel.video.unwrap_or(false),

smelter-core/src/pipeline/moq/connection.rs

Lines changed: 150 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use tracing::{debug, info, trace, warn};
1111
use crate::utils::{H264AvcDecoderConfig, H264AvccToAnnexB};
1212
use crate::{
1313
MediaKind, PipelineCtx, PipelineEvent,
14-
codecs::{AudioCodec, FdkAacDecoderOptions, VideoCodec, VideoDecoderOptions},
14+
codecs::{
15+
AudioCodec, AudioDecoderOptions, FdkAacDecoderOptions, VideoCodec, VideoDecoderOptions,
16+
},
1517
error::DecoderInitError,
1618
pipeline::{
1719
decoder::{
@@ -77,13 +79,6 @@ pub(crate) fn spawn_broadcast_handler(
7779
}
7880
};
7981

80-
if let Some(v) = &discovered.video {
81-
info!(input_id = %input_id_str, track = %v.name, "Discovered MoQ video track");
82-
}
83-
if let Some(a) = &discovered.audio {
84-
info!(input_id = %input_id_str, track = %a.name, "Discovered MoQ audio track");
85-
}
86-
8782
let has_video = discovered.video.is_some();
8883
let has_audio = discovered.audio.is_some();
8984

@@ -93,20 +88,70 @@ pub(crate) fn spawn_broadcast_handler(
9388
offset: QueueTrackOffset::Pts(ctx.queue_ctx.effective_last_pts() + MOQ_BUFFER),
9489
});
9590

91+
if let Some(v) = &discovered.video {
92+
info!(input_id = %input_id_str, track = %v.name, "Discovered MoQ video track");
93+
}
94+
if let Some(a) = &discovered.audio {
95+
info!(input_id = %input_id_str, track = %a.name, "Discovered MoQ audio track");
96+
}
97+
98+
let video_decoder_handle = match &discovered.video {
99+
Some(video) => {
100+
match process_video_config(
101+
&ctx,
102+
&input_ref,
103+
&decoders,
104+
video,
105+
// TODO: (@jbrs) Handle it nicer in the future
106+
video_sender.expect("MUST be initialized only if video is present"),
107+
) {
108+
Ok(handle) => Some(handle),
109+
Err(err) => {
110+
warn!(
111+
"MoQ video config error: {}",
112+
ErrorStack::new(&err).into_string()
113+
);
114+
None
115+
}
116+
}
117+
}
118+
None => None,
119+
};
120+
121+
let audio_decoder_handle = match &discovered.audio {
122+
Some(audio) => {
123+
match process_audio_config(
124+
&ctx,
125+
&input_ref,
126+
audio,
127+
audio_sender.expect("MUST be initialized only if audio is present"),
128+
) {
129+
Ok(handle) => Some(handle),
130+
Err(err) => {
131+
warn!(
132+
"MoQ audio config error: {}",
133+
ErrorStack::new(&err).into_string()
134+
);
135+
None
136+
}
137+
}
138+
}
139+
None => None,
140+
};
141+
96142
let video_fut = async {
97-
if let Some(video) = discovered.video {
98-
let avcc = video.description.clone();
143+
if let Some(video) = discovered.video
144+
&& let Some(decoder_handle) = video_decoder_handle
145+
{
99146
match broadcast.subscribe_track(&Track::new(&video.name)) {
100147
Ok(track) => {
101148
let consumer =
102149
ContainerConsumer::new(track, video.container).with_latency(MOQ_BUFFER);
103150
if let Err(err) = read_video_track(
104151
ctx.clone(),
105152
input_ref.clone(),
106-
decoders.clone(),
107153
consumer,
108-
avcc,
109-
video_sender,
154+
decoder_handle,
110155
)
111156
.await
112157
{
@@ -124,8 +169,9 @@ pub(crate) fn spawn_broadcast_handler(
124169
};
125170

126171
let audio_fut = async {
127-
if let Some(audio) = discovered.audio {
128-
let asc = audio.description.clone();
172+
if let Some(audio) = discovered.audio
173+
&& let Some(decoder_handle) = audio_decoder_handle
174+
{
129175
match broadcast.subscribe_track(&Track::new(&audio.name)) {
130176
Ok(track) => {
131177
let consumer =
@@ -134,8 +180,7 @@ pub(crate) fn spawn_broadcast_handler(
134180
ctx.clone(),
135181
input_ref.clone(),
136182
consumer,
137-
asc,
138-
audio_sender,
183+
decoder_handle,
139184
)
140185
.await
141186
{
@@ -224,6 +269,83 @@ async fn read_catalog(
224269
Ok(DiscoveredTracks { video, audio })
225270
}
226271

272+
fn process_video_config(
273+
ctx: &Arc<PipelineCtx>,
274+
input_ref: &Ref<InputId>,
275+
decoders: &MoqServerInputDecoders,
276+
video: &DiscoveredVideo,
277+
frame_sender: QueueSender<Frame>,
278+
) -> Result<DecoderThreadHandle, MoqConnectionError> {
279+
// TODO: (@jbrs) Proper error
280+
let avcc_bytes = video
281+
.description
282+
.clone()
283+
.ok_or(MoqConnectionError::UnsupportedVideoCodec)?;
284+
let h264_config = H264AvcDecoderConfig::parse(avcc_bytes).unwrap();
285+
let options = VideoDecoderThreadOptions {
286+
ctx: ctx.clone(),
287+
transformer: Some(H264AvccToAnnexB::new(h264_config)),
288+
frame_sender,
289+
input_buffer_size: MOQ_MAX_BUFFER,
290+
};
291+
292+
let h264_decoder =
293+
decoders
294+
.h264
295+
.unwrap_or_else(|| match ctx.graphics_context.has_vulkan_decoder_support() {
296+
true => VideoDecoderOptions::VulkanH264,
297+
false => VideoDecoderOptions::FfmpegH264,
298+
});
299+
300+
match h264_decoder {
301+
VideoDecoderOptions::FfmpegH264 => {
302+
VideoDecoderThread::<ffmpeg_h264::FfmpegH264Decoder, _>::spawn(
303+
input_ref.clone(),
304+
options,
305+
)
306+
.map_err(MoqConnectionError::InitVideoDecoder)
307+
}
308+
VideoDecoderOptions::VulkanH264 => {
309+
VideoDecoderThread::<vulkan_h264::VulkanH264Decoder, _>::spawn(
310+
input_ref.clone(),
311+
options,
312+
)
313+
.map_err(MoqConnectionError::InitVideoDecoder)
314+
}
315+
_ => Err(MoqConnectionError::UnsupportedVideoCodec),
316+
}
317+
}
318+
319+
fn process_audio_config(
320+
ctx: &Arc<PipelineCtx>,
321+
input_ref: &Ref<InputId>,
322+
audio: &DiscoveredAudio,
323+
samples_sender: QueueSender<InputAudioSamples>,
324+
) -> Result<DecoderThreadHandle, MoqConnectionError> {
325+
let aac_decoder_options = {
326+
let asc = audio
327+
.description
328+
.as_ref()
329+
.expect("process_audio_config called with description present")
330+
.clone();
331+
AudioDecoderOptions::FdkAac(FdkAacDecoderOptions { asc: Some(asc) })
332+
};
333+
334+
match aac_decoder_options {
335+
AudioDecoderOptions::FdkAac(decoder_options) => {
336+
let options = AudioDecoderThreadOptions {
337+
ctx: ctx.clone(),
338+
decoder_options,
339+
samples_sender,
340+
input_buffer_size: MOQ_MAX_BUFFER,
341+
};
342+
AudioDecoderThread::<FdkAacDecoder>::spawn(input_ref.clone(), options)
343+
.map_err(MoqConnectionError::InitAudioDecoder)
344+
}
345+
_ => Err(MoqConnectionError::UnsupportedAudioCodec),
346+
}
347+
}
348+
227349
#[derive(thiserror::Error, Debug)]
228350
enum MoqConnectionError {
229351
#[error("MoQ track error")]
@@ -235,20 +357,20 @@ enum MoqConnectionError {
235357
#[error("Catalog track produced no frames")]
236358
CatalogEmpty,
237359

238-
#[error("Failed to read catalog")]
239-
CatalogError(#[source] moq_mux::Error),
240-
241360
#[error("Catalog contains no recognizable video or audio tracks")]
242361
CatalogNoTracks,
243362

244363
#[error("Failed to initialize H264 decoder")]
245-
InitH264Decoder(#[source] DecoderInitError),
364+
InitVideoDecoder(#[source] DecoderInitError),
246365

247-
#[error("Invalid video decoder provided, expected H264 decoder")]
248-
InvalidVideoDecoder,
366+
#[error("Unsupported video codec, H264 expected.")]
367+
UnsupportedVideoCodec,
249368

250369
#[error("Failed to initialize AAC decoder")]
251-
InitAacDecoder(#[source] DecoderInitError),
370+
InitAudioDecoder(#[source] DecoderInitError),
371+
372+
#[error("Unsupported audio codec, AAC expected.")]
373+
UnsupportedAudioCodec,
252374

253375
#[error("Decoder channel closed")]
254376
ChannelClosed,
@@ -257,25 +379,14 @@ enum MoqConnectionError {
257379
ContainerError(#[source] moq_mux::Error),
258380
}
259381

260-
// TODO: This is absolutely terrible and cannot be squashed in one function like that
261382
async fn read_video_track(
262383
ctx: Arc<PipelineCtx>,
263384
input_ref: Ref<InputId>,
264-
decoders: MoqServerInputDecoders,
265385
mut consumer: ContainerConsumer<Hang>,
266-
avcc_from_catalog: Option<Bytes>,
267-
frame_sender: Option<QueueSender<Frame>>,
386+
decoder_handle: DecoderThreadHandle,
268387
) -> Result<(), MoqConnectionError> {
269-
let Some(frame_sender) = frame_sender else {
270-
return Ok(());
271-
};
272-
273-
let mut decoder_handle: Option<DecoderThreadHandle> = None;
274-
let mut pending_frame_sender = Some(frame_sender);
275388
let mut first_pts: Option<Duration> = None;
276389

277-
let h264_config = H264AvcDecoderConfig::parse(avcc_from_catalog.unwrap()).unwrap();
278-
279390
while let Some(frame) = consumer
280391
.read()
281392
.await
@@ -287,48 +398,6 @@ async fn read_video_track(
287398
trace!(?pts, "MoQ video frame");
288399
let payload = frame.payload;
289400

290-
if decoder_handle.is_none() {
291-
let frame_sender = pending_frame_sender
292-
.take()
293-
.expect("frame_sender consumed only once");
294-
let options = VideoDecoderThreadOptions {
295-
ctx: ctx.clone(),
296-
transformer: Some(H264AvccToAnnexB::new(h264_config.clone())),
297-
frame_sender,
298-
input_buffer_size: MOQ_MAX_BUFFER,
299-
};
300-
301-
let h264_decoder = decoders.h264.unwrap_or_else(|| {
302-
match ctx.graphics_context.has_vulkan_decoder_support() {
303-
true => VideoDecoderOptions::VulkanH264,
304-
false => VideoDecoderOptions::FfmpegH264,
305-
}
306-
});
307-
308-
let handle = match h264_decoder {
309-
VideoDecoderOptions::FfmpegH264 => {
310-
VideoDecoderThread::<ffmpeg_h264::FfmpegH264Decoder, _>::spawn(
311-
input_ref.clone(),
312-
options,
313-
)
314-
.map_err(MoqConnectionError::InitH264Decoder)?
315-
}
316-
VideoDecoderOptions::VulkanH264 => {
317-
VideoDecoderThread::<vulkan_h264::VulkanH264Decoder, _>::spawn(
318-
input_ref.clone(),
319-
options,
320-
)
321-
.map_err(MoqConnectionError::InitH264Decoder)?
322-
}
323-
_ => {
324-
return Err(MoqConnectionError::InvalidVideoDecoder);
325-
}
326-
};
327-
328-
decoder_handle = Some(handle);
329-
}
330-
331-
let handle = decoder_handle.as_ref().unwrap();
332401
let chunk = EncodedInputChunk {
333402
data: payload,
334403
pts,
@@ -342,7 +411,7 @@ async fn read_video_track(
342411
.into_event(&input_ref, StatsTrackKind::Video),
343412
);
344413

345-
handle
414+
decoder_handle
346415
.chunk_sender
347416
.send(PipelineEvent::Data(chunk))
348417
.map_err(|_| MoqConnectionError::ChannelClosed)?;
@@ -355,15 +424,8 @@ async fn read_audio_track(
355424
ctx: Arc<PipelineCtx>,
356425
input_ref: Ref<InputId>,
357426
mut consumer: ContainerConsumer<Hang>,
358-
asc_from_catalog: Option<Bytes>,
359-
samples_sender: Option<QueueSender<InputAudioSamples>>,
427+
decoder_handle: DecoderThreadHandle,
360428
) -> Result<(), MoqConnectionError> {
361-
let Some(samples_sender) = samples_sender else {
362-
return Ok(());
363-
};
364-
365-
let mut decoder_handle: Option<DecoderThreadHandle> = None;
366-
let mut pending_samples_sender = Some(samples_sender);
367429
let mut first_pts: Option<Duration> = None;
368430

369431
while let Some(frame) = consumer
@@ -377,39 +439,6 @@ async fn read_audio_track(
377439
trace!(?pts, "MoQ audio frame");
378440
let payload = frame.payload;
379441

380-
if decoder_handle.is_none() {
381-
let samples_sender = pending_samples_sender
382-
.take()
383-
.expect("samples_sender consumed only once");
384-
385-
let asc = if let Some(ref asc) = asc_from_catalog {
386-
asc.clone()
387-
} else {
388-
let asc = payload.clone();
389-
let options = AudioDecoderThreadOptions {
390-
ctx: ctx.clone(),
391-
decoder_options: FdkAacDecoderOptions { asc: Some(asc) },
392-
samples_sender,
393-
input_buffer_size: MOQ_MAX_BUFFER,
394-
};
395-
let handle = AudioDecoderThread::<FdkAacDecoder>::spawn(input_ref.clone(), options)
396-
.map_err(MoqConnectionError::InitAacDecoder)?;
397-
decoder_handle = Some(handle);
398-
continue;
399-
};
400-
401-
let options = AudioDecoderThreadOptions {
402-
ctx: ctx.clone(),
403-
decoder_options: FdkAacDecoderOptions { asc: Some(asc) },
404-
samples_sender,
405-
input_buffer_size: MOQ_MAX_BUFFER,
406-
};
407-
let handle = AudioDecoderThread::<FdkAacDecoder>::spawn(input_ref.clone(), options)
408-
.map_err(MoqConnectionError::InitAacDecoder)?;
409-
decoder_handle = Some(handle);
410-
}
411-
412-
let handle = decoder_handle.as_ref().unwrap();
413442
let chunk = EncodedInputChunk {
414443
data: payload,
415444
pts,
@@ -423,7 +452,7 @@ async fn read_audio_track(
423452
.into_event(&input_ref, StatsTrackKind::Audio),
424453
);
425454

426-
handle
455+
decoder_handle
427456
.chunk_sender
428457
.send(PipelineEvent::Data(chunk))
429458
.map_err(|_| MoqConnectionError::ChannelClosed)?;

0 commit comments

Comments
 (0)