Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions crates/dkg/examples/bcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ use pluto_p2p::{
config::P2PConfig,
gater, k1,
p2p::{Node, NodeType},
p2p_context::P2PContext,
relay::{MutableRelayReservation, RelayRouter},
};
use pluto_tracing::TracingConfig;
Expand Down Expand Up @@ -579,13 +580,14 @@ async fn main() -> Result<()> {
disable_reuse_port: args.disable_reuse_port,
};

let p2p_context = P2PContext::new(known_peers.clone());
let mut component = None;
let mut node: Node<ExampleBehaviour> = Node::new(
p2p_config,
key.clone(),
NodeType::QUIC,
args.filter_private_addrs,
known_peers,
p2p_context,
|builder, keypair, relay_client| {
let p2p_context = builder.p2p_context();
let local_peer_id = keypair.public().to_peer_id();
Expand All @@ -594,15 +596,12 @@ async fn main() -> Result<()> {
bcast::Behaviour::new(cluster_peers.clone(), p2p_context.clone(), key.clone());
component = Some(c);

builder
.with_p2p_context(p2p_context.clone())
.with_gater(conn_gater)
.with_inner(ExampleBehaviour {
relay: relay_client,
relay_reservation: MutableRelayReservation::new(relays.clone()),
relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id),
bcast: bcast_behaviour,
})
builder.with_gater(conn_gater).with_inner(ExampleBehaviour {
relay: relay_client,
relay_reservation: MutableRelayReservation::new(relays.clone()),
relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id),
bcast: bcast_behaviour,
})
},
)?;

Expand Down
8 changes: 2 additions & 6 deletions crates/dkg/src/bcast/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,8 @@ mod tests {
key,
NodeType::TCP,
false,
peer_ids.clone(),
move |builder, _keypair| {
builder
.with_p2p_context(p2p_context.clone())
.with_inner(behaviour)
},
p2p_context.clone(),
move |builder, _keypair| builder.with_inner(behaviour),
)?;
let addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", ports[index]).parse()?;
nodes.push(LocalNode {
Expand Down
4 changes: 2 additions & 2 deletions crates/dkg/src/nodesigs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ mod tests {
key.clone(),
NodeType::TCP,
false,
peer_ids.clone(),
move |builder, _| builder.with_p2p_context(p2p_context).with_inner(behaviour),
p2p_context,
move |builder, _| builder.with_inner(behaviour),
)?;

let addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/{}", ports[index]).parse()?;
Expand Down
3 changes: 2 additions & 1 deletion crates/p2p/examples/bootnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use pluto_p2p::{
config::P2PConfig,
gater, k1,
p2p::{Node, NodeType},
p2p_context::P2PContext,
relay::{MutableRelayReservation, RelayRouter},
};
use pluto_tracing::TracingConfig;
Expand Down Expand Up @@ -143,7 +144,7 @@ pub async fn main() -> Result<()> {
pk,
NodeType::QUIC,
false,
known_peers.clone(),
P2PContext::new(known_peers.clone()),
|builder, keypair, relay_client| {
let p2p_context = builder.p2p_context();
let local_peer_id = keypair.public().to_peer_id();
Expand Down
5 changes: 3 additions & 2 deletions crates/p2p/examples/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use pluto_p2p::{
behaviours::pluto::PlutoBehaviourEvent,
config::P2PConfig,
p2p::{Node, NodeType},
p2p_context::P2PContext,
};
use tokio::signal;

Expand Down Expand Up @@ -73,13 +74,13 @@ async fn main() -> Result<()> {

// Create node with composed behaviour
// No known cluster peers in this example
let known_peers: Vec<libp2p::PeerId> = vec![];
let p2p_context = P2PContext::default();
let mut p2p = Node::new(
P2PConfig::default(),
key.clone(),
NodeType::QUIC,
false,
known_peers,
p2p_context,
|builder, keypair, relay_client| {
builder
.with_user_agent("pluto-p2p-example/1.0.0")
Expand Down
3 changes: 2 additions & 1 deletion crates/p2p/examples/quic_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use pluto_p2p::{
behaviours::pluto::PlutoBehaviourEvent,
config::P2PConfig,
p2p::{Node, NodeType},
p2p_context::P2PContext,
quic_upgrade::QuicUpgradeEvent,
};
use tokio::signal;
Expand Down Expand Up @@ -112,7 +113,7 @@ async fn main() -> Result<()> {
key,
NodeType::QUIC, // Enable QUIC transport
false, // Don't filter private addresses (for local testing)
known_peers,
P2PContext::new(known_peers),
|builder, _keypair, relay_client| {
builder
.with_user_agent("quic-upgrade-example/1.0.0")
Expand Down
28 changes: 7 additions & 21 deletions crates/p2p/src/behaviours/pluto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct PlutoBehaviour<B: NetworkBehaviour> {

impl<B: NetworkBehaviour> PlutoBehaviour<B> {
/// Returns a new builder for configuring a PlutoBehaviour.
pub fn builder() -> PlutoBehaviourBuilder<B> {
PlutoBehaviourBuilder::default()
pub fn builder(p2p_context: P2PContext) -> PlutoBehaviourBuilder<B> {
PlutoBehaviourBuilder::new(p2p_context)
}
}

Expand Down Expand Up @@ -96,25 +96,20 @@ pub struct PlutoBehaviourBuilder<B> {
inner: Option<B>,
}

impl<B> Default for PlutoBehaviourBuilder<B> {
fn default() -> Self {
impl<B: NetworkBehaviour> PlutoBehaviourBuilder<B> {
/// Creates a new builder with default configuration and the provided shared
/// P2P context.
pub fn new(p2p_context: P2PContext) -> Self {
Self {
gater: None,
identify_protocol: DEFAULT_IDENTIFY_PROTOCOL.clone(),
user_agent: DEFAULT_USER_AGENT.clone(),
autonat_config: autonat::Config::default(),
p2p_context: P2PContext::default(),
p2p_context,
quic_enabled: false,
inner: None,
}
}
}

impl<B: NetworkBehaviour> PlutoBehaviourBuilder<B> {
/// Creates a new builder with default configuration.
pub fn new() -> Self {
Self::default()
}

/// Returns the cloned P2P context.
pub fn p2p_context(&self) -> P2PContext {
Expand Down Expand Up @@ -167,14 +162,6 @@ impl<B: NetworkBehaviour> PlutoBehaviourBuilder<B> {
self
}

/// Sets the global context.
///
/// The global context is used to store the peer store.
pub fn with_p2p_context(mut self, p2p_context: P2PContext) -> Self {
self.p2p_context = p2p_context;
self
}

/// Sets whether QUIC is enabled.
///
/// When enabled, the behaviour will periodically attempt to upgrade
Expand All @@ -191,7 +178,6 @@ impl<B: NetworkBehaviour> PlutoBehaviourBuilder<B> {
/// * `key` - The keypair for this node, used for identify and autonat
pub fn build(self, key: &Keypair) -> PlutoBehaviour<B> {
let local_peer_id = key.public().to_peer_id();
self.p2p_context.set_local_peer_id(local_peer_id);

let identify_config = identify::Config::new(self.identify_protocol, key.public())
.with_agent_version(self.user_agent)
Expand Down
79 changes: 50 additions & 29 deletions crates/p2p/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
//! secret_key,
//! NodeType::QUIC,
//! false, // filter_private_addrs
//! vec![], // known_peers
//! |builder, _p2p_ctx, _keypair, relay_client| {
//! P2PContext::default(),
//! |builder, _keypair, relay_client| {
//! builder
//! .with_user_agent("my-app/1.0.0")
//! .with_inner(relay_client)
Expand All @@ -42,8 +42,8 @@
//! secret_key,
//! NodeType::QUIC,
//! false, // filter_private_addrs
//! vec![], // known_peers
//! |builder, _p2p_ctx, keypair, relay_client| {
//! P2PContext::default(),
//! |builder, keypair, relay_client| {
//! builder
//! .with_user_agent("my-app/1.0.0")
//! .with_inner(MyBehaviour {
Expand All @@ -67,8 +67,8 @@
//! secret_key,
//! NodeType::TCP,
//! false, // filter_private_addrs
//! vec![], // known_peers
//! |builder, _p2p_ctx, keypair| {
//! P2PContext::default(),
//! |builder, keypair| {
//! builder.with_inner(
//! relay::Behaviour::new(keypair.public().to_peer_id(), relay_config)
//! )
Expand Down Expand Up @@ -148,6 +148,15 @@ pub enum P2PError {
/// Failed to parse IP address.
#[error("Failed to parse IP address: {0}")]
FailedToParseIpAddress(#[from] std::net::AddrParseError),

/// The provided P2P context is already bound to a different local peer ID.
#[error("P2P context local peer ID mismatch: expected {expected}, got {actual}")]
LocalPeerIdMismatch {
/// Local peer ID derived from the node keypair.
expected: Box<PeerId>,
/// Local peer ID already bound in the shared P2P context.
actual: Box<PeerId>,
},
}

impl P2PError {
Expand Down Expand Up @@ -183,18 +192,18 @@ pub struct Node<B: NetworkBehaviour> {
impl<B: NetworkBehaviour> Node<B> {
/// Creates a new client node with relay client support.
///
/// The `behaviour_fn` receives a default `PlutoBehaviourBuilder`, the P2P
/// context, keypair, and relay client. It should configure the builder
/// (e.g., set user agent, inner behaviour) and return it. The builder
/// will then be finalized internally.
/// The `behaviour_fn` receives a `PlutoBehaviourBuilder`, keypair, and
/// relay client. It should configure the builder (e.g., set user agent,
/// inner behaviour) and return it. The builder will then be finalized
/// internally.
///
/// # Arguments
///
/// * `cfg` - P2P configuration for addresses and networking
/// * `key` - Secret key for node identity
/// * `node_type` - Transport type (TCP or QUIC)
/// * `filter_private_addrs` - Whether to filter private addresses
/// * `known_peers` - List of known cluster peer IDs for metrics tracking
/// * `p2p_context` - Shared P2P runtime context for this node
/// * `behaviour_fn` - Closure that configures and returns the behaviour
/// builder
///
Expand All @@ -206,8 +215,8 @@ impl<B: NetworkBehaviour> Node<B> {
/// secret_key,
/// NodeType::QUIC,
/// false,
/// vec![peer1, peer2], // known cluster peers
/// |builder, _p2p_ctx, _keypair, relay_client| {
/// P2PContext::new(vec![peer1, peer2]),
/// |builder, _keypair, relay_client| {
/// builder
/// .with_user_agent("my-app/1.0.0")
/// .with_inner(MyBehaviour { relay_client, peerinfo: ... })
Expand All @@ -219,7 +228,7 @@ impl<B: NetworkBehaviour> Node<B> {
key: k256::SecretKey,
node_type: NodeType,
filter_private_addrs: bool,
known_peers: impl IntoIterator<Item = PeerId>,
p2p_context: P2PContext,
behaviour_fn: F,
) -> Result<Self>
where
Expand All @@ -230,7 +239,7 @@ impl<B: NetworkBehaviour> Node<B> {
) -> PlutoBehaviourBuilder<B>,
{
let keypair = utils::keypair_from_secret_key(key)?;
let p2p_context = P2PContext::new(known_peers);
Self::bind_local_peer_id(&p2p_context, keypair.public().to_peer_id())?;

let mut node = match node_type {
NodeType::TCP => Self::build_tcp_client(keypair, p2p_context, behaviour_fn),
Expand All @@ -247,22 +256,22 @@ impl<B: NetworkBehaviour> Node<B> {
/// Server nodes (like relay servers) don't include relay client support
/// since they are expected to be publicly reachable.
///
/// The `behaviour_fn` receives a default `PlutoBehaviourBuilder`, the P2P
/// context, and keypair. It should configure the builder (e.g., set user
/// agent, inner behaviour) and return it.
/// The `behaviour_fn` receives a `PlutoBehaviourBuilder` and keypair. It
/// should configure the builder (e.g., set user agent, inner behaviour)
/// and return it.
pub fn new_server<F>(
cfg: P2PConfig,
key: k256::SecretKey,
node_type: NodeType,
filter_private_addrs: bool,
known_peers: impl IntoIterator<Item = PeerId>,
p2p_context: P2PContext,
behaviour_fn: F,
) -> Result<Self>
where
F: FnOnce(PlutoBehaviourBuilder<B>, &Keypair) -> PlutoBehaviourBuilder<B>,
{
let keypair = utils::keypair_from_secret_key(key)?;
let p2p_context = P2PContext::new(known_peers);
Self::bind_local_peer_id(&p2p_context, keypair.public().to_peer_id())?;

let mut node = match node_type {
NodeType::TCP => Self::build_tcp_server(keypair, p2p_context, behaviour_fn),
Expand Down Expand Up @@ -317,6 +326,22 @@ impl<B: NetworkBehaviour> Node<B> {
Ok(())
}

fn bind_local_peer_id(p2p_context: &P2PContext, local_peer_id: PeerId) -> Result<()> {
match p2p_context.local_peer_id() {
Some(existing_peer_id) if existing_peer_id != local_peer_id => {
Err(P2PError::LocalPeerIdMismatch {
expected: Box::new(local_peer_id),
actual: Box::new(existing_peer_id),
})
}
Some(_) => Ok(()),
None => {
p2p_context.set_local_peer_id(local_peer_id);
Ok(())
}
}
}

fn build_quic_client<F>(
keypair: Keypair,
p2p_context: P2PContext,
Expand All @@ -339,9 +364,8 @@ impl<B: NetworkBehaviour> Node<B> {
.with_relay_client(noise::Config::new, yamux_config)
.map_err(P2PError::failed_to_build_swarm)?
.with_behaviour(|key, relay_client| {
let builder = PlutoBehaviourBuilder::default()
.with_p2p_context(p2p_context.clone())
.with_quic_enabled(true);
let builder =
PlutoBehaviourBuilder::new(p2p_context.clone()).with_quic_enabled(true);
behaviour_fn(builder, key, relay_client).build(key)
})
.map_err(P2PError::failed_to_build_swarm)?
Expand Down Expand Up @@ -376,8 +400,7 @@ impl<B: NetworkBehaviour> Node<B> {
.with_relay_client(noise::Config::new, yamux_config)
.map_err(P2PError::failed_to_build_swarm)?
.with_behaviour(|key, relay_client| {
let builder =
PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone());
let builder = PlutoBehaviourBuilder::new(p2p_context.clone());
behaviour_fn(builder, key, relay_client).build(key)
})
.map_err(P2PError::failed_to_build_swarm)?
Expand Down Expand Up @@ -407,8 +430,7 @@ impl<B: NetworkBehaviour> Node<B> {
.with_dns()
.map_err(P2PError::failed_to_build_swarm)?
.with_behaviour(|key| {
let builder =
PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone());
let builder = PlutoBehaviourBuilder::new(p2p_context.clone());
behaviour_fn(builder, key).build(key)
})
.map_err(P2PError::failed_to_build_swarm)?
Expand Down Expand Up @@ -438,8 +460,7 @@ impl<B: NetworkBehaviour> Node<B> {
.with_dns()
.map_err(P2PError::failed_to_build_swarm)?
.with_behaviour(|key| {
let builder =
PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone());
let builder = PlutoBehaviourBuilder::new(p2p_context.clone());
behaviour_fn(builder, key).build(key)
})
.map_err(P2PError::failed_to_build_swarm)?
Expand Down
Loading
Loading