@@ -10,6 +10,17 @@ use std::ops;
1010use std:: sync:: Arc ;
1111use std:: sync:: Weak ;
1212
13+ /// Cache key for upstream relay-to-relay connections.
14+ ///
15+ /// Keyed by both URL and destination address so that connections are
16+ /// reused only when both match. This matters when a [`Coordinator`]
17+ /// returns the same URL for different namespaces (e.g. a shared relay
18+ /// hostname) but distinguishes destinations via [`NamespaceOrigin::addr`].
19+ /// Without the address in the key, all namespaces that share a URL
20+ /// would be routed through a single cached connection to whichever
21+ /// upstream host was contacted first.
22+ type RemoteCacheKey = ( Url , Option < SocketAddr > ) ;
23+
1324use futures:: stream:: FuturesUnordered ;
1425use futures:: FutureExt ;
1526use futures:: StreamExt ;
@@ -44,7 +55,7 @@ impl Remotes {
4455
4556#[ derive( Default ) ]
4657struct RemotesState {
47- lookup : HashMap < Url , RemoteConsumer > ,
58+ lookup : HashMap < RemoteCacheKey , RemoteConsumer > ,
4859 requested : VecDeque < RemoteProducer > ,
4960}
5061
@@ -83,6 +94,7 @@ impl RemotesProducer {
8394 tokio:: select! {
8495 Some ( mut remote) = self . next( ) => {
8596 let url = remote. url. clone( ) ;
97+ let cache_key = ( url. clone( ) , remote. addr) ;
8698
8799 // Spawn a task to serve the remote
88100 tasks. push( async move {
@@ -96,16 +108,16 @@ impl RemotesProducer {
96108 tracing:: warn!( remote_url = %remote_url, error = %err, "failed serving remote: {:?}, error: {}" , info, err) ;
97109 }
98110
99- url
111+ cache_key
100112 } ) ;
101113 }
102114
103115 // Handle finished remote producers
104116 res = tasks. next( ) , if !tasks. is_empty( ) => {
105- let url = res. unwrap( ) ;
117+ let cache_key = res. unwrap( ) ;
106118
107119 if let Some ( mut state) = self . state. lock_mut( ) {
108- state. lookup. remove( & url ) ;
120+ state. lookup. remove( & cache_key ) ;
109121 }
110122 } ,
111123 else => return Ok ( ( ) ) ,
@@ -145,9 +157,11 @@ impl RemotesConsumer {
145157 // Always fetch the origin instead of using the (potentially invalid) cache.
146158 let ( origin, client) = self . coordinator . lookup ( scope, namespace) . await ?;
147159
160+ let cache_key = ( origin. url ( ) , origin. addr ( ) ) ;
161+
148162 // Check if we already have a remote for this origin
149163 let state = self . state . lock ( ) ;
150- if let Some ( remote) = state. lookup . get ( & origin . url ( ) ) . cloned ( ) {
164+ if let Some ( remote) = state. lookup . get ( & cache_key ) . cloned ( ) {
151165 return Ok ( Some ( remote) ) ;
152166 }
153167
@@ -168,8 +182,8 @@ impl RemotesConsumer {
168182 let ( writer, reader) = remote. produce ( ) ;
169183 state. requested . push_back ( writer) ;
170184
171- // Insert the remote into our Map
172- state. lookup . insert ( origin . url ( ) , reader. clone ( ) ) ;
185+ // Insert the remote into our Map, keyed by both URL and destination address
186+ state. lookup . insert ( cache_key , reader. clone ( ) ) ;
173187
174188 Ok ( Some ( reader) )
175189 }
0 commit comments