Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dev/sub
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ ADDR="${ADDR:-$HOST:$PORT}"
NAME="${NAME:-bbb}"

# Combine the host and name into a URL.
URL="${URL:-"https://$ADDR/$NAME"}"
URL="${URL:-"https://$ADDR"}"

cargo run --bin moq-sub -- --name "$NAME" "$URL" "$@" | ffplay -
5 changes: 1 addition & 4 deletions moq-relay-ietf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ url = "2"

# Async stuff
tokio = { version = "1", features = ["full"] }
# tokio-util = "0.7"
tokio-util = "0.7"
futures = "0.3"
async-trait = "0.1"

Expand Down Expand Up @@ -70,9 +70,6 @@ thiserror = "2.0.17"
metrics = "0.24"
metrics-exporter-prometheus = { version = "0.16", optional = true }

# misc
#once_cell = "1.21.3"

[features]
default = []
metrics-prometheus = ["dep:metrics-exporter-prometheus"]
2 changes: 1 addition & 1 deletion moq-relay-ietf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ pub use coordinator::*;
pub use local::*;
pub use producer::*;
pub use relay::*;
pub use remote::*;
pub use remote::RemoteManager;
pub use session::*;
pub use web::*;
69 changes: 34 additions & 35 deletions moq-relay-ietf/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use moq_transport::{

use crate::{
metrics::{GaugeGuard, TimingGuard},
Locals, RemotesConsumer,
Locals, RemoteManager,
};

/// Producer of tracks to a remote Subscriber
#[derive(Clone)]
pub struct Producer {
publisher: Publisher,
locals: Locals,
remotes: Option<RemotesConsumer>,
remotes: RemoteManager,
/// The resolved scope identity for this session, if any.
/// Produced by `Coordinator::resolve_scope()` from the connection path.
/// Passed to locals/remotes to isolate namespace lookups.
Expand All @@ -28,7 +28,7 @@ impl Producer {
pub fn new(
publisher: Publisher,
locals: Locals,
remotes: Option<RemotesConsumer>,
remotes: RemoteManager,
scope: Option<String>,
) -> Self {
Self {
Expand All @@ -46,7 +46,6 @@ impl Producer {

/// Run the producer to serve subscribe requests.
pub async fn run(self) -> Result<(), SessionError> {
//let mut tasks = FuturesUnordered::new();
let mut tasks: FuturesUnordered<futures::future::BoxFuture<'static, ()>> =
FuturesUnordered::new();

Expand Down Expand Up @@ -122,40 +121,40 @@ impl Producer {
}
}

if let Some(remotes) = self.remotes {
// Check remote tracks second, and serve from remote if possible
match remotes.route(self.scope.as_deref(), &namespace).await {
Ok(remote) => {
if let Some(remote) = remote {
if let Some(track) = remote.subscribe(&namespace, &track_name)? {
let ns = namespace.to_utf8_path();
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
// Update label to indicate remote source, timing recorded on drop
timing_guard.set_label("source", "remote");
// Track active tracks - decrements when serve completes
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
return Ok(subscribed.serve(track.reader).await?);
}
}
}
Err(e) => {
// Route error = infrastructure failure (couldn't reach coordinator/upstream)
// This is different from "not found" - we don't know if the track exists
// Check remote tracks second, and serve from remote if possible
match self
.remotes
.subscribe(self.scope.as_deref(), &namespace, &track_name)
.await
{
Ok(track) => {
if let Some(track) = track {
let ns = namespace.to_utf8_path();
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
timing_guard.set_label("source", "route_error");
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);

// Return an internal error rather than "not found" since we couldn't check
// TODO: Consider returning a more specific error to the subscriber
let err = ServeError::internal_ctx(format!(
"route error for namespace '{}': {}",
namespace, e
));
subscribed.close(err.clone())?;
return Err(err.into());
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
// Update label to indicate remote source, timing recorded on drop
timing_guard.set_label("source", "remote");
// Track active tracks - decrements when serve completes
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
return Ok(subscribed.serve(track).await?);
}
}
Err(e) => {
// Route error = infrastructure failure (couldn't reach coordinator/upstream)
// This is different from "not found" - we don't know if the track exists
let ns = namespace.to_utf8_path();
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
timing_guard.set_label("source", "route_error");
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);

// Return an internal error rather than "not found" since we couldn't check
// TODO: Consider returning a more specific error to the subscriber
let err = ServeError::internal_ctx(format!(
"route error for namespace '{}': {}",
namespace, e
));
subscribed.close(err.clone())?;
return Err(err.into());
}
}

// Track not found - we checked all sources and the track doesn't exist
Expand Down
Loading
Loading