Skip to content

Commit 6c01748

Browse files
committed
fix: include destination address in upstream connection cache key
The relay-to-relay connection cache (RemotesState.lookup) was keyed by URL only. Coordinator implementations that return the same URL for different namespaces (e.g. scope-based URLs with per-host socket_addr overrides) may see subscribes misrouted to a previously cached upstream host instead of the host the coordinator identified. Include the SocketAddr in the cache key so that different upstream hosts get separate connections even when the coordinator returns the same URL. Connection reuse is preserved for same-URL + same-host combinations, which is correct since namespace identity is carried in MoQT SUBSCRIBE messages, not the connection URL.
1 parent 1f03066 commit 6c01748

1 file changed

Lines changed: 21 additions & 7 deletions

File tree

moq-relay-ietf/src/remote.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@ use std::ops;
1010
use std::sync::Arc;
1111
use std::sync::Weak;
1212

13+
/// Cache key for upstream relay-to-relay connections.
14+
///
15+
/// Keyed by both URL and destination address so that connections are
16+
/// reused only when both match. This matters when a [`Coordinator`]
17+
/// returns the same URL for different namespaces (e.g. a shared relay
18+
/// hostname) but distinguishes destinations via [`NamespaceOrigin::addr`].
19+
/// Without the address in the key, all namespaces that share a URL
20+
/// would be routed through a single cached connection to whichever
21+
/// upstream host was contacted first.
22+
type RemoteCacheKey = (Url, Option<SocketAddr>);
23+
1324
use futures::stream::FuturesUnordered;
1425
use futures::FutureExt;
1526
use futures::StreamExt;
@@ -44,7 +55,7 @@ impl Remotes {
4455

4556
#[derive(Default)]
4657
struct RemotesState {
47-
lookup: HashMap<Url, RemoteConsumer>,
58+
lookup: HashMap<RemoteCacheKey, RemoteConsumer>,
4859
requested: VecDeque<RemoteProducer>,
4960
}
5061

@@ -83,6 +94,7 @@ impl RemotesProducer {
8394
tokio::select! {
8495
Some(mut remote) = self.next() => {
8596
let url = remote.url.clone();
97+
let cache_key = (url.clone(), remote.addr);
8698

8799
// Spawn a task to serve the remote
88100
tasks.push(async move {
@@ -96,16 +108,16 @@ impl RemotesProducer {
96108
tracing::warn!(remote_url = %remote_url, error = %err, "failed serving remote: {:?}, error: {}", info, err);
97109
}
98110

99-
url
111+
cache_key
100112
});
101113
}
102114

103115
// Handle finished remote producers
104116
res = tasks.next(), if !tasks.is_empty() => {
105-
let url = res.unwrap();
117+
let cache_key = res.unwrap();
106118

107119
if let Some(mut state) = self.state.lock_mut() {
108-
state.lookup.remove(&url);
120+
state.lookup.remove(&cache_key);
109121
}
110122
},
111123
else => return Ok(()),
@@ -145,9 +157,11 @@ impl RemotesConsumer {
145157
// Always fetch the origin instead of using the (potentially invalid) cache.
146158
let (origin, client) = self.coordinator.lookup(scope, namespace).await?;
147159

160+
let cache_key = (origin.url(), origin.addr());
161+
148162
// Check if we already have a remote for this origin
149163
let state = self.state.lock();
150-
if let Some(remote) = state.lookup.get(&origin.url()).cloned() {
164+
if let Some(remote) = state.lookup.get(&cache_key).cloned() {
151165
return Ok(Some(remote));
152166
}
153167

@@ -168,8 +182,8 @@ impl RemotesConsumer {
168182
let (writer, reader) = remote.produce();
169183
state.requested.push_back(writer);
170184

171-
// Insert the remote into our Map
172-
state.lookup.insert(origin.url(), reader.clone());
185+
// Insert the remote into our Map, keyed by both URL and destination address
186+
state.lookup.insert(cache_key, reader.clone());
173187

174188
Ok(Some(reader))
175189
}

0 commit comments

Comments
 (0)