diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 8c4bb4ae4bf..191e4132f20 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,6 @@ ## 0.50.0 - +- Send all topic subscriptions in a single hello RPC when connecting to a new peer, aligning with the GossipSub spec and other implementations (Go, Nim, JS). + See [PR 6385](https://github.com/libp2p/rust-libp2p/pull/6385). - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 34a92a51752..ec7a8090b2e 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3211,28 +3211,34 @@ where } tracing::debug!(peer=%peer_id, "New peer connected"); - // We need to send our subscriptions to the newly-connected node. - for topic_hash in self.mesh.clone().into_keys() { - #[cfg(not(feature = "partial_messages"))] - let (requests_partial, supports_partial) = (false, false); - #[cfg(feature = "partial_messages")] - let Some(SubscriptionOpts { - requests_partial, - supports_partial, - }) = self.partial_messages_extension.opts(&topic_hash) - else { - tracing::error!("Partial subscription options should exist for subscribed topic"); - return; - }; + // Send all our topic subscriptions to the newly-connected peer in a single hello RPC. + let topics: Vec<_> = self + .mesh + .keys() + .cloned() + .filter_map(|topic_hash| { + #[cfg(not(feature = "partial_messages"))] + let (requests_partial, supports_partial) = (false, false); + #[cfg(feature = "partial_messages")] + let (requests_partial, supports_partial) = { + let Some(SubscriptionOpts { + requests_partial, + supports_partial, + }) = self.partial_messages_extension.opts(&topic_hash) + else { + tracing::error!( + "Partial subscription options should exist for subscribed topic" + ); + return None; + }; + (requests_partial, supports_partial) + }; + Some((topic_hash, requests_partial, supports_partial)) + }) + .collect(); - self.send_message( - peer_id, - RpcOut::Subscribe { - topic: topic_hash.clone(), - requests_partial, - supports_partial, - }, - ); + if !topics.is_empty() { + self.send_message(peer_id, RpcOut::SubscribeMany(topics)); } } diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index 9722083e90f..605f2691555 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -59,8 +59,10 @@ fn test_subscribe() { .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { - collected_subscriptions += 1 + match queue.try_pop() { + Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, + Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(), + _ => {} } } collected_subscriptions @@ -119,14 +121,16 @@ fn test_unsubscribe() { .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { - collected_subscriptions += 1 + match queue.try_pop() { + Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, + Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(), + _ => {} } } collected_subscriptions }); - // we sent a unsubscribe to all known peers, for two topics + // we sent subscriptions to all known peers for two topics (20 peers × 2 topics) assert_eq!(subscriptions, 40); // check we clean up internal structures @@ -303,15 +307,16 @@ fn test_peer_added_on_connection() { .to_subscribe(true) .create_network(); - // check that our subscriptions are sent to each of the peers - // collect all the SendEvents + // check that our subscriptions are sent to each of the peers as a single hello RPC let subscriptions = queues.into_iter().fold( HashMap::>::new(), |mut collected_subscriptions, (peer, mut queue)| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() { - let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); - peer_subs.push(topic.into_string()); + if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() { + let peer_subs: Vec = topics + .into_iter() + .map(|(t, _, _)| t.into_string()) + .collect(); collected_subscriptions.insert(peer, peer_subs); } } @@ -319,7 +324,7 @@ fn test_peer_added_on_connection() { }, ); - // check that there are two subscriptions sent to each peer + // check that there are two subscriptions sent to each peer in a single RPC for peer_subs in subscriptions.values() { assert!(peer_subs.contains(&String::from("topic1"))); assert!(peer_subs.contains(&String::from("topic2"))); @@ -339,6 +344,53 @@ fn test_peer_added_on_connection() { } } +/// Test that on new connection the hello RPC is a single batched message, not one per topic. +#[test] +fn test_hello_rpc_is_single_batched_message() { + let topic_names = vec![ + String::from("alpha"), + String::from("beta"), + String::from("gamma"), + ]; + let (_, _, queues, topic_hashes) = DefaultBehaviourTestBuilder::default() + .peer_no(5) + .topics(topic_names) + .to_subscribe(true) + .create_network(); + + for (_, mut queue) in queues { + let mut subscribe_many_count = 0; + let mut individual_subscribe_count = 0; + + while !queue.is_empty() { + match queue.try_pop() { + Some(RpcOut::SubscribeMany(topics)) => { + subscribe_many_count += 1; + // All topics must be present in the single hello packet. + let sent: Vec<_> = topics.into_iter().map(|(t, _, _)| t).collect(); + for topic_hash in &topic_hashes { + assert!( + sent.contains(topic_hash), + "hello RPC must include all subscribed topics" + ); + } + } + Some(RpcOut::Subscribe { .. }) => individual_subscribe_count += 1, + _ => {} + } + } + + assert_eq!( + subscribe_many_count, 1, + "exactly one batched hello RPC should be sent per peer" + ); + assert_eq!( + individual_subscribe_count, 0, + "no individual Subscribe RPCs should be sent on connection" + ); + } +} + /// Test subscription handling #[test] fn test_handle_received_subscriptions() { diff --git a/protocols/gossipsub/src/queue.rs b/protocols/gossipsub/src/queue.rs index a127a98491f..465c10a5a05 100644 --- a/protocols/gossipsub/src/queue.rs +++ b/protocols/gossipsub/src/queue.rs @@ -62,7 +62,10 @@ impl Queue { /// which will only happen for control and non priority messages. pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box> { match message { - RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => { + RpcOut::Extensions(_) + | RpcOut::Subscribe { .. } + | RpcOut::SubscribeMany(_) + | RpcOut::Unsubscribe(_) => { self.priority .try_push(message) .expect("Shared is unbounded"); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index aa0ae92d8e0..4c470f3e3e3 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -358,6 +358,8 @@ pub enum RpcOut { requests_partial: bool, supports_partial: bool, }, + /// Subscribe to multiple topics in a single RPC (hello packet on new connection). + SubscribeMany(Vec<(TopicHash, bool, bool)>), /// Unsubscribe a topic. Unsubscribe(TopicHash), /// Send a GRAFT control message. @@ -392,6 +394,7 @@ impl RpcOut { matches!( self, RpcOut::Subscribe { .. } + | RpcOut::SubscribeMany(_) | RpcOut::Unsubscribe(_) | RpcOut::Graft(_) | RpcOut::Prune(_) @@ -431,6 +434,22 @@ impl From for proto::RPC { control: None, partial: None, }, + RpcOut::SubscribeMany(topics) => proto::RPC { + publish: Vec::new(), + subscriptions: topics + .into_iter() + .map( + |(topic, requests_partial, supports_partial)| proto::SubOpts { + subscribe: Some(true), + topic_id: Some(topic.into_string()), + requestsPartial: Some(requests_partial), + supportsPartial: Some(supports_partial), + }, + ) + .collect(), + control: None, + partial: None, + }, RpcOut::Unsubscribe(topic) => proto::RPC { publish: Vec::new(), subscriptions: vec![proto::SubOpts {