Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces an experimental MoQ (Media over QUIC) input path (based on moq-lite) by adding a WebTransport server to smelter-core, wiring it into pipeline initialization/config, exposing a new moq_server input type in the public API, and surfacing MoQ in status/stats and integration-test demos.
Changes:
- Add MoQ server + pipeline integration (WebTransport accept loop, origin announce loop, input registration, and broadcast-to-queue decode path for H264/AAC).
- Extend configuration + pipeline options to enable/disable MoQ server and configure port/TLS.
- Add API models/conversions + status/stats reporting for MoQ inputs, plus integration-test/demo support.
Reviewed changes
Copilot reviewed 28 out of 29 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/state.rs |
Plumbs MoQ server options into PipelineOptions derived from config. |
src/routes/status.rs |
Adds "moq" protocol label in status output. |
src/routes/register_request.rs |
Adds moq_server input registration route handling via MoqInput. |
src/config.rs |
Adds MoQ env-driven config (port/enable/TLS cert+key). |
smelter-core/src/stats/input/moq.rs |
Implements MoQ input stats state/events (bitrate tracking). |
smelter-core/src/stats/input/mod.rs |
Registers MoQ stats state/event wiring and report mapping. |
smelter-core/src/stats/input_reports.rs |
Adds serializable MoQ stats report structs + enum variant. |
smelter-core/src/protocols/moq.rs |
Defines MoQ input options and MoQ-specific error types. |
smelter-core/src/protocols.rs |
Exposes the new protocols::moq module. |
smelter-core/src/pipeline/moq/state.rs |
Tracks registered MoQ inputs and active broadcast connection handles. |
smelter-core/src/pipeline/moq/server.rs |
Implements MoQ WebTransport server startup, accept loop, and announce loop. |
smelter-core/src/pipeline/moq/moq_input.rs |
Implements MoQ server input registration and teardown hooks. |
smelter-core/src/pipeline/moq/mod.rs |
Declares MoQ pipeline submodules and re-exports. |
smelter-core/src/pipeline/moq/connection.rs |
Implements catalog parsing, track subscription, decoding, queue feeding, and stats events. |
smelter-core/src/pipeline/instance.rs |
Instantiates MoQ pipeline state and starts/stores MoQ server handle. |
smelter-core/src/pipeline/input.rs |
Adds MoQ as a pipeline input variant and external input constructor branch. |
smelter-core/src/pipeline.rs |
Adds PipelineMoqServerOptions + threads MoQ state through PipelineCtx. |
smelter-core/src/input.rs |
Adds MoQ to RegisterInputOptions and InputProtocolKind. |
smelter-core/src/error.rs |
Adds MoQ server init error and MoQ input init error variants. |
smelter-core/Cargo.toml |
Adds MoQ/WebTransport/CMAF-related dependencies to core crate. |
smelter-api/src/input/moq.rs |
Adds public API schema for MoqInput. |
smelter-api/src/input/moq_into.rs |
Adds conversion from API MoqInput into core RegisterInputOptions. |
smelter-api/src/input.rs |
Re-exports the new MoQ input API types. |
integration-tests/src/bin/benchmark/utils.rs |
Updates benchmark pipeline options to include MoQ server option. |
integration-tests/examples/demo/smelter_state.rs |
Adds MoQ input flow to the interactive demo app. |
integration-tests/examples/demo/inputs/moq.rs |
Adds a demo MoQ input builder + registration JSON + publish instructions. |
integration-tests/examples/demo/inputs.rs |
Registers MoQ in demo input selection and handle enum. |
Cargo.toml |
Adds workspace dependency versions for MoQ/WebTransport/TLS helpers. |
Cargo.lock |
Locks newly introduced transitive dependencies. |
Comments suppressed due to low confidence (1)
smelter-core/src/pipeline/moq/connection.rs:332
process_audio_configwill panic if the catalog's AAC rendition hasdescription: None(it calls.expect(...)). Sincedescriptionis optional in the catalog, this should be handled as an error (e.g., return a dedicatedMoqConnectionErroror treat the track as unsupported/missing config) instead of crashing the broadcast task.
let aac_decoder_options = {
let asc = audio
.description
.as_ref()
.expect("process_audio_config called with description present")
.clone();
AudioDecoderOptions::FdkAac(FdkAacDecoderOptions { asc: Some(asc) })
};
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .description | ||
| .clone() | ||
| .ok_or(MoqConnectionError::UnsupportedVideoCodec)?; | ||
| let h264_config = H264AvcDecoderConfig::parse(avcc_bytes).unwrap(); |
| let video = match catalog.video.renditions.first_key_value() { | ||
| Some((name, config)) if let HangVideoCodec::H264(_) = config.codec => { | ||
| match Hang::try_from(&config.container) { | ||
| Ok(container) => Some(DiscoveredVideo { | ||
| name: name.clone(), | ||
| container, | ||
| description: config.description.clone(), | ||
| }), | ||
| Err(error) => { | ||
| warn!(track=%name, "Unsupported video container, skipping: {error}"); | ||
| None | ||
| } | ||
| } | ||
| } | ||
| Some((name, config)) => { | ||
| warn!(track=%name, codec=%config.codec, "Unsupported video codec, skipping track"); | ||
| None | ||
| } | ||
| None => None, | ||
| }; | ||
|
|
||
| let audio = match catalog.audio.renditions.first_key_value() { | ||
| Some((name, config)) if let HangAudioCodec::AAC(_) = config.codec => { | ||
| match Hang::try_from(&config.container) { | ||
| Ok(container) => Some(DiscoveredAudio { | ||
| name: name.clone(), | ||
| container, | ||
| description: config.description.clone(), | ||
| }), | ||
| Err(error) => { |
There was a problem hiding this comment.
Aware of that, this code is not meant to be production ready for now
| pub fn on_after_registration(&self) -> Result<()> { | ||
| println!("Publish to this input using moq-cli:"); | ||
| println!( | ||
| " moq-cli publish https://localhost:4443/{}", |
| let aac_decoder_options = { | ||
| let asc = audio | ||
| .description | ||
| .as_ref() | ||
| .expect("process_audio_config called with description present") | ||
| .clone(); | ||
| AudioDecoderOptions::FdkAac(FdkAacDecoderOptions { asc: Some(asc) }) | ||
| }; | ||
|
|
||
| match aac_decoder_options { | ||
| AudioDecoderOptions::FdkAac(decoder_options) => { | ||
| let options = AudioDecoderThreadOptions { | ||
| ctx: ctx.clone(), | ||
| decoder_options, | ||
| samples_sender, | ||
| input_buffer_size: MOQ_MAX_BUFFER, | ||
| }; | ||
| AudioDecoderThread::<FdkAacDecoder>::spawn(input_ref.clone(), options) | ||
| .map_err(MoqConnectionError::InitAudioDecoder) | ||
| } | ||
| _ => Err(MoqConnectionError::UnsupportedAudioCodec), | ||
| } |
| // XXX: I am not sure if this is the best solution from semantic POV, using `rtmp::TlsConfig` in | ||
| // MoQ | ||
| tls_config: Option<TlsConfig>, |
There was a problem hiding this comment.
That is a good question
There was a problem hiding this comment.
All stats related code should be put to another PR, remove it from here.
| // XXX: I am not sure if this is the best solution from semantic POV, using `rtmp::TlsConfig` in | ||
| // MoQ | ||
| tls_config: Option<TlsConfig>, |
| Err(_) => true, | ||
| }; | ||
|
|
||
| let moq_tls_cert_file = match env::var("SMELTER_MOQ_TLS_CERT_FILE") { |
There was a problem hiding this comment.
does the moq server even make sense without certs? If no then you can skip enable option, by default do not enable it unless certs are provided
| // HACK: This is needed because even though `.with_certificate()` is sync it runs async code | ||
| // underneath and requires a runtime |
There was a problem hiding this comment.
It's worth preserving the comment that with_certificate requires runtime, but instead of hack, I would just make this function async
| } | ||
| } | ||
|
|
||
| fn start_wt_server( |
| tokio::spawn(async move { | ||
| let session = match request.ok().await { | ||
| Ok(session) => { | ||
| info!("WebTransport session created."); |
There was a problem hiding this comment.
Is it correct to refer to web transport? My understanding is that WebTransport is a browser API for QUICK, but if you run outside of browser it does not have anything to do with it, you just use quick then.
f30de37 to
33d1397
Compare
3730c61 to
f641055
Compare
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
It was a leftover from before branch split
Based on the
moq-litecrate, however adds a few more dependencies for QUIC server and CMAF handling.Currently supports only
H264video andAACaudio.Yet to be done:
/resetmoq-muxversionWhat will NOT be done in the experimental: