From 40e6f3bf6ccdcf36174660218b0d3eb70a221673 Mon Sep 17 00:00:00 2001 From: Shashwat-Nautiyal Date: Sat, 18 Apr 2026 08:45:55 +0530 Subject: [PATCH 1/4] fix/gossipsub-batch-subscription --- protocols/gossipsub/CHANGELOG.md | 3 +- protocols/gossipsub/src/behaviour.rs | 48 ++++++----- .../src/behaviour/tests/subscription.rs | 80 ++++++++++++++++--- protocols/gossipsub/src/queue.rs | 5 +- protocols/gossipsub/src/types.rs | 17 ++++ 5 files changed, 117 insertions(+), 36 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index e3f24272bcd..4d22d59a00b 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,6 @@ ## 0.50.0 - +- Send all topic subscriptions in a single hello RPC when connecting to a new peer, aligning with the GossipSub spec and other implementations (Go, Nim, JS). + See [PR 6385](https://github.com/libp2p/rust-libp2p/pull/6385). - Raise MSRV to 1.88.0. See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273). diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4451cfb9bbf..210dc0a4c51 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -3203,28 +3203,34 @@ where } tracing::debug!(peer=%peer_id, "New peer connected"); - // We need to send our subscriptions to the newly-connected node. - for topic_hash in self.mesh.clone().into_keys() { - #[cfg(not(feature = "partial_messages"))] - let (requests_partial, supports_partial) = (false, false); - #[cfg(feature = "partial_messages")] - let Some(SubscriptionOpts { - requests_partial, - supports_partial, - }) = self.partial_messages_extension.opts(&topic_hash) - else { - tracing::error!("Partial subscription options should exist for subscribed topic"); - return; - }; + // Send all our topic subscriptions to the newly-connected peer in a single hello RPC. + let topics: Vec<_> = self + .mesh + .keys() + .cloned() + .filter_map(|topic_hash| { + #[cfg(not(feature = "partial_messages"))] + let (requests_partial, supports_partial) = (false, false); + #[cfg(feature = "partial_messages")] + let (requests_partial, supports_partial) = { + let Some(SubscriptionOpts { + requests_partial, + supports_partial, + }) = self.partial_messages_extension.opts(&topic_hash) + else { + tracing::error!( + "Partial subscription options should exist for subscribed topic" + ); + return None; + }; + (requests_partial, supports_partial) + }; + Some((topic_hash, requests_partial, supports_partial)) + }) + .collect(); - self.send_message( - peer_id, - RpcOut::Subscribe { - topic: topic_hash.clone(), - requests_partial, - supports_partial, - }, - ); + if !topics.is_empty() { + self.send_message(peer_id, RpcOut::SubscribeMany(topics)); } } diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index 9722083e90f..a5f01543e54 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -54,13 +54,17 @@ fn test_subscribe() { "Subscribe should add a new entry to the mesh[topic] hashmap" ); - // collect all the subscriptions + // collect all the subscriptions (hello RPC on connection is now a single SubscribeMany) let subscriptions = queues .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { - collected_subscriptions += 1 + match queue.try_pop() { + Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, + Some(RpcOut::SubscribeMany(topics)) => { + collected_subscriptions += topics.len() + } + _ => {} } } collected_subscriptions @@ -114,19 +118,23 @@ fn test_unsubscribe() { "should be able to unsubscribe successfully from each topic", ); - // collect all the subscriptions + // collect all the subscriptions (hello RPC on connection is now a single SubscribeMany) let subscriptions = queues .into_values() .fold(0, |mut collected_subscriptions, mut queue| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() { - collected_subscriptions += 1 + match queue.try_pop() { + Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, + Some(RpcOut::SubscribeMany(topics)) => { + collected_subscriptions += topics.len() + } + _ => {} } } collected_subscriptions }); - // we sent a unsubscribe to all known peers, for two topics + // we sent subscriptions to all known peers for two topics (20 peers × 2 topics) assert_eq!(subscriptions, 40); // check we clean up internal structures @@ -303,15 +311,14 @@ fn test_peer_added_on_connection() { .to_subscribe(true) .create_network(); - // check that our subscriptions are sent to each of the peers - // collect all the SendEvents + // check that our subscriptions are sent to each of the peers as a single hello RPC let subscriptions = queues.into_iter().fold( HashMap::>::new(), |mut collected_subscriptions, (peer, mut queue)| { while !queue.is_empty() { - if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() { - let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default(); - peer_subs.push(topic.into_string()); + if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() { + let peer_subs: Vec = + topics.into_iter().map(|(t, _, _)| t.into_string()).collect(); collected_subscriptions.insert(peer, peer_subs); } } @@ -319,7 +326,7 @@ fn test_peer_added_on_connection() { }, ); - // check that there are two subscriptions sent to each peer + // check that there are two subscriptions sent to each peer in a single RPC for peer_subs in subscriptions.values() { assert!(peer_subs.contains(&String::from("topic1"))); assert!(peer_subs.contains(&String::from("topic2"))); @@ -339,6 +346,53 @@ fn test_peer_added_on_connection() { } } +/// Test that on new connection the hello RPC is a single batched message, not one per topic. +#[test] +fn test_hello_rpc_is_single_batched_message() { + let topic_names = vec![ + String::from("alpha"), + String::from("beta"), + String::from("gamma"), + ]; + let (_, _, queues, topic_hashes) = DefaultBehaviourTestBuilder::default() + .peer_no(5) + .topics(topic_names) + .to_subscribe(true) + .create_network(); + + for (_, mut queue) in queues { + let mut subscribe_many_count = 0; + let mut individual_subscribe_count = 0; + + while !queue.is_empty() { + match queue.try_pop() { + Some(RpcOut::SubscribeMany(topics)) => { + subscribe_many_count += 1; + // All topics must be present in the single hello packet. + let sent: Vec<_> = topics.into_iter().map(|(t, _, _)| t).collect(); + for topic_hash in &topic_hashes { + assert!( + sent.contains(topic_hash), + "hello RPC must include all subscribed topics" + ); + } + } + Some(RpcOut::Subscribe { .. }) => individual_subscribe_count += 1, + _ => {} + } + } + + assert_eq!( + subscribe_many_count, 1, + "exactly one batched hello RPC should be sent per peer" + ); + assert_eq!( + individual_subscribe_count, 0, + "no individual Subscribe RPCs should be sent on connection" + ); + } +} + /// Test subscription handling #[test] fn test_handle_received_subscriptions() { diff --git a/protocols/gossipsub/src/queue.rs b/protocols/gossipsub/src/queue.rs index a127a98491f..465c10a5a05 100644 --- a/protocols/gossipsub/src/queue.rs +++ b/protocols/gossipsub/src/queue.rs @@ -62,7 +62,10 @@ impl Queue { /// which will only happen for control and non priority messages. pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box> { match message { - RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => { + RpcOut::Extensions(_) + | RpcOut::Subscribe { .. } + | RpcOut::SubscribeMany(_) + | RpcOut::Unsubscribe(_) => { self.priority .try_push(message) .expect("Shared is unbounded"); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index aa0ae92d8e0..f390967c237 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -358,6 +358,8 @@ pub enum RpcOut { requests_partial: bool, supports_partial: bool, }, + /// Subscribe to multiple topics in a single RPC (hello packet on new connection). + SubscribeMany(Vec<(TopicHash, bool, bool)>), /// Unsubscribe a topic. Unsubscribe(TopicHash), /// Send a GRAFT control message. @@ -392,6 +394,7 @@ impl RpcOut { matches!( self, RpcOut::Subscribe { .. } + | RpcOut::SubscribeMany(_) | RpcOut::Unsubscribe(_) | RpcOut::Graft(_) | RpcOut::Prune(_) @@ -431,6 +434,20 @@ impl From for proto::RPC { control: None, partial: None, }, + RpcOut::SubscribeMany(topics) => proto::RPC { + publish: Vec::new(), + subscriptions: topics + .into_iter() + .map(|(topic, requests_partial, supports_partial)| proto::SubOpts { + subscribe: Some(true), + topic_id: Some(topic.into_string()), + requestsPartial: Some(requests_partial), + supportsPartial: Some(supports_partial), + }) + .collect(), + control: None, + partial: None, + }, RpcOut::Unsubscribe(topic) => proto::RPC { publish: Vec::new(), subscriptions: vec![proto::SubOpts { From 9fbc5d6d67bbeeea778ccd4c524dff93dcfa30b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 21 Apr 2026 14:04:41 +0100 Subject: [PATCH 2/4] Update protocols/gossipsub/src/behaviour/tests/subscription.rs --- protocols/gossipsub/src/behaviour/tests/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index a5f01543e54..aa474948f8b 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -118,7 +118,7 @@ fn test_unsubscribe() { "should be able to unsubscribe successfully from each topic", ); - // collect all the subscriptions (hello RPC on connection is now a single SubscribeMany) + // collect all the subscriptions let subscriptions = queues .into_values() .fold(0, |mut collected_subscriptions, mut queue| { From 859639b2ecd8dd1868251ecc2a3ea9013e6f0db8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 21 Apr 2026 14:04:48 +0100 Subject: [PATCH 3/4] Update protocols/gossipsub/src/behaviour/tests/subscription.rs --- protocols/gossipsub/src/behaviour/tests/subscription.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index aa474948f8b..dfa412e4b32 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -54,7 +54,7 @@ fn test_subscribe() { "Subscribe should add a new entry to the mesh[topic] hashmap" ); - // collect all the subscriptions (hello RPC on connection is now a single SubscribeMany) + // collect all the subscriptions let subscriptions = queues .into_values() .fold(0, |mut collected_subscriptions, mut queue| { From cdc88d35df65aa8bc3c373e54cf9df3dad9262b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Tue, 21 Apr 2026 14:16:35 +0100 Subject: [PATCH 4/4] cargo fmt --- .../gossipsub/src/behaviour/tests/subscription.rs | 14 ++++++-------- protocols/gossipsub/src/types.rs | 14 ++++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests/subscription.rs b/protocols/gossipsub/src/behaviour/tests/subscription.rs index dfa412e4b32..605f2691555 100644 --- a/protocols/gossipsub/src/behaviour/tests/subscription.rs +++ b/protocols/gossipsub/src/behaviour/tests/subscription.rs @@ -61,9 +61,7 @@ fn test_subscribe() { while !queue.is_empty() { match queue.try_pop() { Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, - Some(RpcOut::SubscribeMany(topics)) => { - collected_subscriptions += topics.len() - } + Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(), _ => {} } } @@ -125,9 +123,7 @@ fn test_unsubscribe() { while !queue.is_empty() { match queue.try_pop() { Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1, - Some(RpcOut::SubscribeMany(topics)) => { - collected_subscriptions += topics.len() - } + Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(), _ => {} } } @@ -317,8 +313,10 @@ fn test_peer_added_on_connection() { |mut collected_subscriptions, (peer, mut queue)| { while !queue.is_empty() { if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() { - let peer_subs: Vec = - topics.into_iter().map(|(t, _, _)| t.into_string()).collect(); + let peer_subs: Vec = topics + .into_iter() + .map(|(t, _, _)| t.into_string()) + .collect(); collected_subscriptions.insert(peer, peer_subs); } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f390967c237..4c470f3e3e3 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -438,12 +438,14 @@ impl From for proto::RPC { publish: Vec::new(), subscriptions: topics .into_iter() - .map(|(topic, requests_partial, supports_partial)| proto::SubOpts { - subscribe: Some(true), - topic_id: Some(topic.into_string()), - requestsPartial: Some(requests_partial), - supportsPartial: Some(supports_partial), - }) + .map( + |(topic, requests_partial, supports_partial)| proto::SubOpts { + subscribe: Some(true), + topic_id: Some(topic.into_string()), + requestsPartial: Some(requests_partial), + supportsPartial: Some(supports_partial), + }, + ) .collect(), control: None, partial: None,