Skip to content

Commit 7bc24d8

Browse files
fix: gossipsub-batch-subscription
Previously, when a new peer connected, the GossipSub behaviour sent one RpcOut::Subscribe message per subscribed topic. This meant N separate protobuf frames were written to the wire for N topics. This PR introduces ```RpcOut::SubscribeMany```, a batched variant that carries all subscriptions as a ```Vec<(TopicHash, bool, bool)>```, and updates ```on_connection_established``` to use it. The result is a single ```proto::RPC```with multiple SubOpts entries on the wire, which is both spec-compliant and more efficient. The receiving side was already correct as ```handle_received_subscriptions``` already iterates a ```Vec<proto::SubOpts>```, so no changes are needed there and backward compatibility is fully preserved. Fixes #6271 Pull-Request: #6385.
1 parent fa4cf62 commit 7bc24d8

5 files changed

Lines changed: 115 additions & 34 deletions

File tree

protocols/gossipsub/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## 0.50.0
2-
2+
- Send all topic subscriptions in a single hello RPC when connecting to a new peer, aligning with the GossipSub spec and other implementations (Go, Nim, JS).
3+
See [PR 6385](https://github.com/libp2p/rust-libp2p/pull/6385).
34
- Raise MSRV to 1.88.0.
45
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).
56

protocols/gossipsub/src/behaviour.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3211,28 +3211,34 @@ where
32113211
}
32123212

32133213
tracing::debug!(peer=%peer_id, "New peer connected");
3214-
// We need to send our subscriptions to the newly-connected node.
3215-
for topic_hash in self.mesh.clone().into_keys() {
3216-
#[cfg(not(feature = "partial_messages"))]
3217-
let (requests_partial, supports_partial) = (false, false);
3218-
#[cfg(feature = "partial_messages")]
3219-
let Some(SubscriptionOpts {
3220-
requests_partial,
3221-
supports_partial,
3222-
}) = self.partial_messages_extension.opts(&topic_hash)
3223-
else {
3224-
tracing::error!("Partial subscription options should exist for subscribed topic");
3225-
return;
3226-
};
3214+
// Send all our topic subscriptions to the newly-connected peer in a single hello RPC.
3215+
let topics: Vec<_> = self
3216+
.mesh
3217+
.keys()
3218+
.cloned()
3219+
.filter_map(|topic_hash| {
3220+
#[cfg(not(feature = "partial_messages"))]
3221+
let (requests_partial, supports_partial) = (false, false);
3222+
#[cfg(feature = "partial_messages")]
3223+
let (requests_partial, supports_partial) = {
3224+
let Some(SubscriptionOpts {
3225+
requests_partial,
3226+
supports_partial,
3227+
}) = self.partial_messages_extension.opts(&topic_hash)
3228+
else {
3229+
tracing::error!(
3230+
"Partial subscription options should exist for subscribed topic"
3231+
);
3232+
return None;
3233+
};
3234+
(requests_partial, supports_partial)
3235+
};
3236+
Some((topic_hash, requests_partial, supports_partial))
3237+
})
3238+
.collect();
32273239

3228-
self.send_message(
3229-
peer_id,
3230-
RpcOut::Subscribe {
3231-
topic: topic_hash.clone(),
3232-
requests_partial,
3233-
supports_partial,
3234-
},
3235-
);
3240+
if !topics.is_empty() {
3241+
self.send_message(peer_id, RpcOut::SubscribeMany(topics));
32363242
}
32373243
}
32383244

protocols/gossipsub/src/behaviour/tests/subscription.rs

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,10 @@ fn test_subscribe() {
5959
.into_values()
6060
.fold(0, |mut collected_subscriptions, mut queue| {
6161
while !queue.is_empty() {
62-
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
63-
collected_subscriptions += 1
62+
match queue.try_pop() {
63+
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
64+
Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(),
65+
_ => {}
6466
}
6567
}
6668
collected_subscriptions
@@ -119,14 +121,16 @@ fn test_unsubscribe() {
119121
.into_values()
120122
.fold(0, |mut collected_subscriptions, mut queue| {
121123
while !queue.is_empty() {
122-
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
123-
collected_subscriptions += 1
124+
match queue.try_pop() {
125+
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
126+
Some(RpcOut::SubscribeMany(topics)) => collected_subscriptions += topics.len(),
127+
_ => {}
124128
}
125129
}
126130
collected_subscriptions
127131
});
128132

129-
// we sent a unsubscribe to all known peers, for two topics
133+
// we sent subscriptions to all known peers for two topics (20 peers × 2 topics)
130134
assert_eq!(subscriptions, 40);
131135

132136
// check we clean up internal structures
@@ -303,23 +307,24 @@ fn test_peer_added_on_connection() {
303307
.to_subscribe(true)
304308
.create_network();
305309

306-
// check that our subscriptions are sent to each of the peers
307-
// collect all the SendEvents
310+
// check that our subscriptions are sent to each of the peers as a single hello RPC
308311
let subscriptions = queues.into_iter().fold(
309312
HashMap::<libp2p_identity::PeerId, Vec<String>>::new(),
310313
|mut collected_subscriptions, (peer, mut queue)| {
311314
while !queue.is_empty() {
312-
if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() {
313-
let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default();
314-
peer_subs.push(topic.into_string());
315+
if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() {
316+
let peer_subs: Vec<String> = topics
317+
.into_iter()
318+
.map(|(t, _, _)| t.into_string())
319+
.collect();
315320
collected_subscriptions.insert(peer, peer_subs);
316321
}
317322
}
318323
collected_subscriptions
319324
},
320325
);
321326

322-
// check that there are two subscriptions sent to each peer
327+
// check that there are two subscriptions sent to each peer in a single RPC
323328
for peer_subs in subscriptions.values() {
324329
assert!(peer_subs.contains(&String::from("topic1")));
325330
assert!(peer_subs.contains(&String::from("topic2")));
@@ -339,6 +344,53 @@ fn test_peer_added_on_connection() {
339344
}
340345
}
341346

347+
/// Test that on new connection the hello RPC is a single batched message, not one per topic.
348+
#[test]
349+
fn test_hello_rpc_is_single_batched_message() {
350+
let topic_names = vec![
351+
String::from("alpha"),
352+
String::from("beta"),
353+
String::from("gamma"),
354+
];
355+
let (_, _, queues, topic_hashes) = DefaultBehaviourTestBuilder::default()
356+
.peer_no(5)
357+
.topics(topic_names)
358+
.to_subscribe(true)
359+
.create_network();
360+
361+
for (_, mut queue) in queues {
362+
let mut subscribe_many_count = 0;
363+
let mut individual_subscribe_count = 0;
364+
365+
while !queue.is_empty() {
366+
match queue.try_pop() {
367+
Some(RpcOut::SubscribeMany(topics)) => {
368+
subscribe_many_count += 1;
369+
// All topics must be present in the single hello packet.
370+
let sent: Vec<_> = topics.into_iter().map(|(t, _, _)| t).collect();
371+
for topic_hash in &topic_hashes {
372+
assert!(
373+
sent.contains(topic_hash),
374+
"hello RPC must include all subscribed topics"
375+
);
376+
}
377+
}
378+
Some(RpcOut::Subscribe { .. }) => individual_subscribe_count += 1,
379+
_ => {}
380+
}
381+
}
382+
383+
assert_eq!(
384+
subscribe_many_count, 1,
385+
"exactly one batched hello RPC should be sent per peer"
386+
);
387+
assert_eq!(
388+
individual_subscribe_count, 0,
389+
"no individual Subscribe RPCs should be sent on connection"
390+
);
391+
}
392+
}
393+
342394
/// Test subscription handling
343395
#[test]
344396
fn test_handle_received_subscriptions() {

protocols/gossipsub/src/queue.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ impl Queue {
6262
/// which will only happen for control and non priority messages.
6363
pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box<RpcOut>> {
6464
match message {
65-
RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => {
65+
RpcOut::Extensions(_)
66+
| RpcOut::Subscribe { .. }
67+
| RpcOut::SubscribeMany(_)
68+
| RpcOut::Unsubscribe(_) => {
6669
self.priority
6770
.try_push(message)
6871
.expect("Shared is unbounded");

protocols/gossipsub/src/types.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ pub enum RpcOut {
358358
requests_partial: bool,
359359
supports_partial: bool,
360360
},
361+
/// Subscribe to multiple topics in a single RPC (hello packet on new connection).
362+
SubscribeMany(Vec<(TopicHash, bool, bool)>),
361363
/// Unsubscribe a topic.
362364
Unsubscribe(TopicHash),
363365
/// Send a GRAFT control message.
@@ -392,6 +394,7 @@ impl RpcOut {
392394
matches!(
393395
self,
394396
RpcOut::Subscribe { .. }
397+
| RpcOut::SubscribeMany(_)
395398
| RpcOut::Unsubscribe(_)
396399
| RpcOut::Graft(_)
397400
| RpcOut::Prune(_)
@@ -431,6 +434,22 @@ impl From<RpcOut> for proto::RPC {
431434
control: None,
432435
partial: None,
433436
},
437+
RpcOut::SubscribeMany(topics) => proto::RPC {
438+
publish: Vec::new(),
439+
subscriptions: topics
440+
.into_iter()
441+
.map(
442+
|(topic, requests_partial, supports_partial)| proto::SubOpts {
443+
subscribe: Some(true),
444+
topic_id: Some(topic.into_string()),
445+
requestsPartial: Some(requests_partial),
446+
supportsPartial: Some(supports_partial),
447+
},
448+
)
449+
.collect(),
450+
control: None,
451+
partial: None,
452+
},
434453
RpcOut::Unsubscribe(topic) => proto::RPC {
435454
publish: Vec::new(),
436455
subscriptions: vec![proto::SubOpts {

0 commit comments

Comments
 (0)