Skip to content

Commit 37ae246

Browse files
fix/gossipsub-batch-subscription
1 parent 91e8931 commit 37ae246

4 files changed

Lines changed: 115 additions & 35 deletions

File tree

protocols/gossipsub/src/behaviour.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3203,28 +3203,34 @@ where
32033203
}
32043204

32053205
tracing::debug!(peer=%peer_id, "New peer connected");
3206-
// We need to send our subscriptions to the newly-connected node.
3207-
for topic_hash in self.mesh.clone().into_keys() {
3208-
#[cfg(not(feature = "partial_messages"))]
3209-
let (requests_partial, supports_partial) = (false, false);
3210-
#[cfg(feature = "partial_messages")]
3211-
let Some(SubscriptionOpts {
3212-
requests_partial,
3213-
supports_partial,
3214-
}) = self.partial_messages_extension.opts(&topic_hash)
3215-
else {
3216-
tracing::error!("Partial subscription options should exist for subscribed topic");
3217-
return;
3218-
};
3206+
// Send all our topic subscriptions to the newly-connected peer in a single hello RPC.
3207+
let topics: Vec<_> = self
3208+
.mesh
3209+
.keys()
3210+
.cloned()
3211+
.filter_map(|topic_hash| {
3212+
#[cfg(not(feature = "partial_messages"))]
3213+
let (requests_partial, supports_partial) = (false, false);
3214+
#[cfg(feature = "partial_messages")]
3215+
let (requests_partial, supports_partial) = {
3216+
let Some(SubscriptionOpts {
3217+
requests_partial,
3218+
supports_partial,
3219+
}) = self.partial_messages_extension.opts(&topic_hash)
3220+
else {
3221+
tracing::error!(
3222+
"Partial subscription options should exist for subscribed topic"
3223+
);
3224+
return None;
3225+
};
3226+
(requests_partial, supports_partial)
3227+
};
3228+
Some((topic_hash, requests_partial, supports_partial))
3229+
})
3230+
.collect();
32193231

3220-
self.send_message(
3221-
peer_id,
3222-
RpcOut::Subscribe {
3223-
topic: topic_hash.clone(),
3224-
requests_partial,
3225-
supports_partial,
3226-
},
3227-
);
3232+
if !topics.is_empty() {
3233+
self.send_message(peer_id, RpcOut::SubscribeMany(topics));
32283234
}
32293235
}
32303236

protocols/gossipsub/src/behaviour/tests/subscription.rs

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,17 @@ fn test_subscribe() {
5454
"Subscribe should add a new entry to the mesh[topic] hashmap"
5555
);
5656

57-
// collect all the subscriptions
57+
// collect all the subscriptions (hello RPC on connection is now a single SubscribeMany)
5858
let subscriptions = queues
5959
.into_values()
6060
.fold(0, |mut collected_subscriptions, mut queue| {
6161
while !queue.is_empty() {
62-
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
63-
collected_subscriptions += 1
62+
match queue.try_pop() {
63+
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
64+
Some(RpcOut::SubscribeMany(topics)) => {
65+
collected_subscriptions += topics.len()
66+
}
67+
_ => {}
6468
}
6569
}
6670
collected_subscriptions
@@ -114,19 +118,23 @@ fn test_unsubscribe() {
114118
"should be able to unsubscribe successfully from each topic",
115119
);
116120

117-
// collect all the subscriptions
121+
// collect all the subscriptions (hello RPC on connection is now a single SubscribeMany)
118122
let subscriptions = queues
119123
.into_values()
120124
.fold(0, |mut collected_subscriptions, mut queue| {
121125
while !queue.is_empty() {
122-
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
123-
collected_subscriptions += 1
126+
match queue.try_pop() {
127+
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
128+
Some(RpcOut::SubscribeMany(topics)) => {
129+
collected_subscriptions += topics.len()
130+
}
131+
_ => {}
124132
}
125133
}
126134
collected_subscriptions
127135
});
128136

129-
// we sent a unsubscribe to all known peers, for two topics
137+
// we sent subscriptions to all known peers for two topics (20 peers × 2 topics)
130138
assert_eq!(subscriptions, 40);
131139

132140
// check we clean up internal structures
@@ -303,23 +311,22 @@ fn test_peer_added_on_connection() {
303311
.to_subscribe(true)
304312
.create_network();
305313

306-
// check that our subscriptions are sent to each of the peers
307-
// collect all the SendEvents
314+
// check that our subscriptions are sent to each of the peers as a single hello RPC
308315
let subscriptions = queues.into_iter().fold(
309316
HashMap::<libp2p_identity::PeerId, Vec<String>>::new(),
310317
|mut collected_subscriptions, (peer, mut queue)| {
311318
while !queue.is_empty() {
312-
if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() {
313-
let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default();
314-
peer_subs.push(topic.into_string());
319+
if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() {
320+
let peer_subs: Vec<String> =
321+
topics.into_iter().map(|(t, _, _)| t.into_string()).collect();
315322
collected_subscriptions.insert(peer, peer_subs);
316323
}
317324
}
318325
collected_subscriptions
319326
},
320327
);
321328

322-
// check that there are two subscriptions sent to each peer
329+
// check that there are two subscriptions sent to each peer in a single RPC
323330
for peer_subs in subscriptions.values() {
324331
assert!(peer_subs.contains(&String::from("topic1")));
325332
assert!(peer_subs.contains(&String::from("topic2")));
@@ -339,6 +346,53 @@ fn test_peer_added_on_connection() {
339346
}
340347
}
341348

349+
/// Test that on new connection the hello RPC is a single batched message, not one per topic.
350+
#[test]
351+
fn test_hello_rpc_is_single_batched_message() {
352+
let topic_names = vec![
353+
String::from("alpha"),
354+
String::from("beta"),
355+
String::from("gamma"),
356+
];
357+
let (_, _, queues, topic_hashes) = DefaultBehaviourTestBuilder::default()
358+
.peer_no(5)
359+
.topics(topic_names)
360+
.to_subscribe(true)
361+
.create_network();
362+
363+
for (_, mut queue) in queues {
364+
let mut subscribe_many_count = 0;
365+
let mut individual_subscribe_count = 0;
366+
367+
while !queue.is_empty() {
368+
match queue.try_pop() {
369+
Some(RpcOut::SubscribeMany(topics)) => {
370+
subscribe_many_count += 1;
371+
// All topics must be present in the single hello packet.
372+
let sent: Vec<_> = topics.into_iter().map(|(t, _, _)| t).collect();
373+
for topic_hash in &topic_hashes {
374+
assert!(
375+
sent.contains(topic_hash),
376+
"hello RPC must include all subscribed topics"
377+
);
378+
}
379+
}
380+
Some(RpcOut::Subscribe { .. }) => individual_subscribe_count += 1,
381+
_ => {}
382+
}
383+
}
384+
385+
assert_eq!(
386+
subscribe_many_count, 1,
387+
"exactly one batched hello RPC should be sent per peer"
388+
);
389+
assert_eq!(
390+
individual_subscribe_count, 0,
391+
"no individual Subscribe RPCs should be sent on connection"
392+
);
393+
}
394+
}
395+
342396
/// Test subscription handling
343397
#[test]
344398
fn test_handle_received_subscriptions() {

protocols/gossipsub/src/queue.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ impl Queue {
6262
/// which will only happen for control and non priority messages.
6363
pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box<RpcOut>> {
6464
match message {
65-
RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => {
65+
RpcOut::Extensions(_)
66+
| RpcOut::Subscribe { .. }
67+
| RpcOut::SubscribeMany(_)
68+
| RpcOut::Unsubscribe(_) => {
6669
self.priority
6770
.try_push(message)
6871
.expect("Shared is unbounded");

protocols/gossipsub/src/types.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ pub enum RpcOut {
358358
requests_partial: bool,
359359
supports_partial: bool,
360360
},
361+
/// Subscribe to multiple topics in a single RPC (hello packet on new connection).
362+
SubscribeMany(Vec<(TopicHash, bool, bool)>),
361363
/// Unsubscribe a topic.
362364
Unsubscribe(TopicHash),
363365
/// Send a GRAFT control message.
@@ -392,6 +394,7 @@ impl RpcOut {
392394
matches!(
393395
self,
394396
RpcOut::Subscribe { .. }
397+
| RpcOut::SubscribeMany(_)
395398
| RpcOut::Unsubscribe(_)
396399
| RpcOut::Graft(_)
397400
| RpcOut::Prune(_)
@@ -431,6 +434,20 @@ impl From<RpcOut> for proto::RPC {
431434
control: None,
432435
partial: None,
433436
},
437+
RpcOut::SubscribeMany(topics) => proto::RPC {
438+
publish: Vec::new(),
439+
subscriptions: topics
440+
.into_iter()
441+
.map(|(topic, requests_partial, supports_partial)| proto::SubOpts {
442+
subscribe: Some(true),
443+
topic_id: Some(topic.into_string()),
444+
requestsPartial: Some(requests_partial),
445+
supportsPartial: Some(supports_partial),
446+
})
447+
.collect(),
448+
control: None,
449+
partial: None,
450+
},
434451
RpcOut::Unsubscribe(topic) => proto::RPC {
435452
publish: Vec::new(),
436453
subscriptions: vec![proto::SubOpts {

0 commit comments

Comments
 (0)