Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable
protocol advertisement.
See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154).
- Expire external address when a relay listener is closed without a replacement reservation.
See [PR 6285](https://github.com/libp2p/rust-libp2p/pull/6285).

## 0.21.1
- reduce allocations by replacing `get_or_insert` with `get_or_insert_with`
Expand Down
49 changes: 47 additions & 2 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ use futures::{
ready,
stream::StreamExt,
};
use libp2p_core::{Endpoint, Multiaddr, multiaddr::Protocol, transport::PortUse};
use libp2p_core::{
Endpoint, Multiaddr,
multiaddr::Protocol,
transport::{ListenerId, PortUse},
};
use libp2p_identity::PeerId;
use libp2p_swarm::{
ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm, ListenerClosed},
dial_opts::DialOpts,
dummy,
};
Expand Down Expand Up @@ -100,6 +104,12 @@ pub struct Behaviour {
/// `/p2p-circuit` address we reserved on it.
reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,

/// Stores the [`ListenerId`] to the [`ConnectionId`] of the relay connection
/// that the listener's reservation is on. This allows us to expire external addresses
/// when a listener closes, but only if no other listener still holds a
/// reservation for the same address.
listener_id_to_connection_id: HashMap<ListenerId, ConnectionId>,

/// Queue of actions to return when polled.
queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,

Expand All @@ -114,6 +124,7 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
from_transport,
directly_connected_peers: Default::default(),
reservation_addresses: Default::default(),
listener_id_to_connection_id: Default::default(),
queued_actions: Default::default(),
pending_handler_commands: Default::default(),
};
Expand Down Expand Up @@ -148,6 +159,8 @@ impl Behaviour {
unreachable!("`on_connection_closed` for unconnected peer.")
}
};
self.listener_id_to_connection_id
.retain(|_, cid| *cid != connection_id);
if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
Expand All @@ -156,6 +169,32 @@ impl Behaviour {
}
}
}

fn on_listener_closed(&mut self, ListenerClosed { listener_id, .. }: ListenerClosed) {
let Some(connection_id) = self.listener_id_to_connection_id.remove(&listener_id) else {
return;
};

// Only expire the external address if no other listener still holds
// a reservation on the same connection. During reservation replacement the new listener is
// registered before the old one closes, so there will still be another entry, so we should
// not emit the event to expire the address address.
let listenr_and_connection = self
.listener_id_to_connection_id
.values()
.any(|cid| *cid == connection_id);

if listenr_and_connection {
return;
}

if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr));
}
}
}

impl NetworkBehaviour for Behaviour {
Expand Down Expand Up @@ -222,6 +261,7 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::ListenerClosed(listener_closed) => self.on_listener_closed(listener_closed),
FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
self.reservation_addresses.remove(&connection_id);
self.pending_handler_commands.remove(&connection_id);
Expand Down Expand Up @@ -285,6 +325,7 @@ impl NetworkBehaviour for Behaviour {

let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
Some(transport::TransportToBehaviourMsg::ListenReq {
listener_id,
relay_peer_id,
relay_addr,
to_listener,
Expand All @@ -295,6 +336,8 @@ impl NetworkBehaviour for Behaviour {
.and_then(|cs| cs.first())
{
Some(connection_id) => {
self.listener_id_to_connection_id
.insert(listener_id, *connection_id);
self.reservation_addresses.insert(
*connection_id,
(
Expand All @@ -319,6 +362,8 @@ impl NetworkBehaviour for Behaviour {
.build();
let relayed_connection_id = opts.connection_id();

self.listener_id_to_connection_id
.insert(listener_id, relayed_connection_id);
self.reservation_addresses.insert(
relayed_connection_id,
(
Expand Down
2 changes: 2 additions & 0 deletions protocols/relay/src/priv_client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport {
let (to_listener, from_behaviour) = mpsc::channel(0);
self.pending_to_behaviour
.push_back(TransportToBehaviourMsg::ListenReq {
listener_id,
relay_peer_id,
relay_addr,
to_listener,
Expand Down Expand Up @@ -459,6 +460,7 @@ pub(crate) enum TransportToBehaviourMsg {
},
/// Listen for incoming relayed connections via relay node.
ListenReq {
listener_id: ListenerId,
relay_peer_id: PeerId,
relay_addr: Multiaddr,
to_listener: mpsc::Sender<ToListenerMsg>,
Expand Down
74 changes: 74 additions & 0 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,80 @@ async fn new_reservation_to_same_relay_replaces_old() {
}
}

#[tokio::test]
async fn closing_relay_listener_expires_external_address() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
let mut relay = build_relay();
let relay_peer_id = *relay.local_peer_id();

relay.listen_on(relay_addr.clone()).unwrap();
relay.add_external_address(relay_addr.clone());
tokio::spawn(async move {
relay.collect::<Vec<_>>().await;
});

let mut client = build_client();
let client_peer_id = *client.local_peer_id();
let client_addr = relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit);
let client_addr_with_peer_id = client_addr.clone().with(Protocol::P2p(client_peer_id));

let listener_id = client.listen_on(client_addr.clone()).unwrap();

// Wait for connection to relay.
assert!(wait_for_dial(&mut client, relay_peer_id).await);

// Wait for reservation to be accepted.
wait_for_reservation(
&mut client,
client_addr_with_peer_id.clone(),
relay_peer_id,
false, // No renewal.
)
.await;

// Now remove the relay listener.
assert!(client.remove_listener(listener_id));

// Expect the listener to close and the external address to expire.
let mut listener_closed = false;
let mut external_addr_expired = false;
loop {
match client.select_next_some().await {
SwarmEvent::ListenerClosed {
listener_id: closed_id,
addresses,
..
} => {
assert_eq!(closed_id, listener_id);
assert_eq!(addresses, vec![client_addr_with_peer_id.clone()]);
listener_closed = true;
if external_addr_expired {
break;
}
}
SwarmEvent::ExternalAddrExpired { address } => {
assert_eq!(address, client_addr_with_peer_id);
external_addr_expired = true;
if listener_closed {
break;
}
}
SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {}
SwarmEvent::ExpiredListenAddr { .. } => {}
e => panic!("{e:?}"),
}
}

assert!(listener_closed);
assert!(external_addr_expired);
}

#[tokio::test]
async fn connect() {
let _ = tracing_subscriber::fmt()
Expand Down
Loading