Skip to content

Commit a38b3a5

Browse files
authored
Merge pull request #121 from itzmanish/feat/remote-manager-rewrite
refactor: simplified remote manager
2 parents f9f51dc + 9074c26 commit a38b3a5

10 files changed

Lines changed: 762 additions & 647 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dev/sub

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ ADDR="${ADDR:-$HOST:$PORT}"
2020
NAME="${NAME:-bbb}"
2121

2222
# Combine the host and name into a URL.
23-
URL="${URL:-"https://$ADDR/$NAME"}"
23+
URL="${URL:-"https://$ADDR"}"
2424

2525
cargo run --bin moq-sub -- --name "$NAME" "$URL" "$@" | ffplay -

moq-relay-ietf/Cargo.toml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ url = "2"
3333

3434
# Async stuff
3535
tokio = { version = "1", features = ["full"] }
36-
# tokio-util = "0.7"
36+
tokio-util = "0.7"
3737
futures = "0.3"
3838
async-trait = "0.1"
3939

@@ -70,9 +70,6 @@ thiserror = "2.0.17"
7070
metrics = "0.24"
7171
metrics-exporter-prometheus = { version = "0.16", optional = true }
7272

73-
# misc
74-
#once_cell = "1.21.3"
75-
7673
[features]
7774
default = []
7875
metrics-prometheus = ["dep:metrics-exporter-prometheus"]

moq-relay-ietf/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ pub use coordinator::*;
4848
pub use local::*;
4949
pub use producer::*;
5050
pub use relay::*;
51-
pub use remote::*;
51+
pub use remote::RemoteManager;
5252
pub use session::*;
5353
pub use web::*;

moq-relay-ietf/src/producer.rs

Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ use moq_transport::{
99

1010
use crate::{
1111
metrics::{GaugeGuard, TimingGuard},
12-
Locals, RemotesConsumer,
12+
Locals, RemoteManager,
1313
};
1414

1515
/// Producer of tracks to a remote Subscriber
1616
#[derive(Clone)]
1717
pub struct Producer {
1818
publisher: Publisher,
1919
locals: Locals,
20-
remotes: Option<RemotesConsumer>,
20+
remotes: RemoteManager,
2121
/// The resolved scope identity for this session, if any.
2222
/// Produced by `Coordinator::resolve_scope()` from the connection path.
2323
/// Passed to locals/remotes to isolate namespace lookups.
@@ -28,7 +28,7 @@ impl Producer {
2828
pub fn new(
2929
publisher: Publisher,
3030
locals: Locals,
31-
remotes: Option<RemotesConsumer>,
31+
remotes: RemoteManager,
3232
scope: Option<String>,
3333
) -> Self {
3434
Self {
@@ -46,7 +46,6 @@ impl Producer {
4646

4747
/// Run the producer to serve subscribe requests.
4848
pub async fn run(self) -> Result<(), SessionError> {
49-
//let mut tasks = FuturesUnordered::new();
5049
let mut tasks: FuturesUnordered<futures::future::BoxFuture<'static, ()>> =
5150
FuturesUnordered::new();
5251

@@ -122,40 +121,40 @@ impl Producer {
122121
}
123122
}
124123

125-
if let Some(remotes) = self.remotes {
126-
// Check remote tracks second, and serve from remote if possible
127-
match remotes.route(self.scope.as_deref(), &namespace).await {
128-
Ok(remote) => {
129-
if let Some(remote) = remote {
130-
if let Some(track) = remote.subscribe(&namespace, &track_name)? {
131-
let ns = namespace.to_utf8_path();
132-
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
133-
// Update label to indicate remote source, timing recorded on drop
134-
timing_guard.set_label("source", "remote");
135-
// Track active tracks - decrements when serve completes
136-
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
137-
return Ok(subscribed.serve(track.reader).await?);
138-
}
139-
}
140-
}
141-
Err(e) => {
142-
// Route error = infrastructure failure (couldn't reach coordinator/upstream)
143-
// This is different from "not found" - we don't know if the track exists
124+
// Check remote tracks second, and serve from remote if possible
125+
match self
126+
.remotes
127+
.subscribe(self.scope.as_deref(), &namespace, &track_name)
128+
.await
129+
{
130+
Ok(track) => {
131+
if let Some(track) = track {
144132
let ns = namespace.to_utf8_path();
145-
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
146-
timing_guard.set_label("source", "route_error");
147-
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);
148-
149-
// Return an internal error rather than "not found" since we couldn't check
150-
// TODO: Consider returning a more specific error to the subscriber
151-
let err = ServeError::internal_ctx(format!(
152-
"route error for namespace '{}': {}",
153-
namespace, e
154-
));
155-
subscribed.close(err.clone())?;
156-
return Err(err.into());
133+
tracing::info!(namespace = %ns, track = %track_name, source = "remote", "serving subscribe from remote: {:?}", track.info);
134+
// Update label to indicate remote source, timing recorded on drop
135+
timing_guard.set_label("source", "remote");
136+
// Track active tracks - decrements when serve completes
137+
let _track_guard = GaugeGuard::new("moq_relay_active_tracks");
138+
return Ok(subscribed.serve(track).await?);
157139
}
158140
}
141+
Err(e) => {
142+
// Route error = infrastructure failure (couldn't reach coordinator/upstream)
143+
// This is different from "not found" - we don't know if the track exists
144+
let ns = namespace.to_utf8_path();
145+
tracing::error!(namespace = %ns, track = %track_name, error = %e, "failed to route to remote: {}", e);
146+
timing_guard.set_label("source", "route_error");
147+
metrics::counter!("moq_relay_subscribe_route_errors_total").increment(1);
148+
149+
// Return an internal error rather than "not found" since we couldn't check
150+
// TODO: Consider returning a more specific error to the subscriber
151+
let err = ServeError::internal_ctx(format!(
152+
"route error for namespace '{}': {}",
153+
namespace, e
154+
));
155+
subscribed.close(err.clone())?;
156+
return Err(err.into());
157+
}
159158
}
160159

161160
// Track not found - we checked all sources and the track doesn't exist

0 commit comments

Comments
 (0)