Skip to content

Commit a03aa97

Browse files
committed
revert changes to get_random_peers, leave them for a future PR
1 parent 973a23b commit a03aa97

2 files changed

Lines changed: 38 additions & 43 deletions

File tree

protocols/gossipsub/src/behaviour.rs

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,15 @@ pub enum Event {
152152
/// A new partial message has been received.
153153
#[cfg(feature = "partial_messages")]
154154
Partial {
155-
/// The topic of the partial message.
155+
/// Topic on which the partiall was published.
156156
topic_hash: TopicHash,
157157
/// The peer that forwarded us this message.
158158
peer_id: PeerId,
159-
/// The group ID that identifies the complete logical message.
159+
/// Identifier grouping the partials that belong to the same full message.
160160
group_id: Vec<u8>,
161-
/// The partial message data.
161+
/// /// Payload of the partial.
162162
message: Option<Vec<u8>>,
163-
/// The partial message metadata, what peer has and wants.
163+
/// Metadata associated with this partial (e.g., available or requested parts).
164164
metadata: Option<Vec<u8>>,
165165
},
166166
/// A remote subscribed to a topic.
@@ -859,6 +859,8 @@ where
859859
}
860860

861861
#[cfg(feature = "partial_messages")]
862+
/// Report an invalid partial message from a peer, originating at the application layer.
863+
/// This triggers penalties for the peer that sent the invalid partial.
862864
pub fn report_invalid_partial(&mut self, peer_id: PeerId, topic_hash: &TopicHash) {
863865
if let PeerScoreState::Active(peer_score) = &mut self.peer_score {
864866
peer_score.reject_invalid_partial(peer_id, topic_hash);
@@ -1164,11 +1166,11 @@ where
11641166
&self.connected_peers,
11651167
topic_hash,
11661168
mesh_n - added_peers.len(),
1167-
|peer_id| {
1168-
!added_peers.contains(peer_id)
1169-
&& !self.explicit_peers.contains(peer_id)
1170-
&& !self.peer_score.below_threshold(peer_id, |_| 0.0).0
1171-
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer_id)
1169+
|peer| {
1170+
!added_peers.contains(peer)
1171+
&& !self.explicit_peers.contains(peer)
1172+
&& !self.peer_score.below_threshold(peer, |_| 0.0).0
1173+
&& !self.backoffs.is_backoff_with_slack(topic_hash, peer)
11721174
},
11731175
);
11741176

@@ -1654,6 +1656,7 @@ where
16541656
tracing::debug!(peer=%peer_id, "Completed GRAFT handling for peer");
16551657
}
16561658

1659+
/// Handle received `Extensions` message.
16571660
fn handle_extensions(&mut self, peer_id: &PeerId, extensions: Extensions) {
16581661
let Some(peer) = self.connected_peers.get_mut(peer_id) else {
16591662
tracing::error!(
@@ -2355,17 +2358,13 @@ where
23552358
);
23562359
// not enough peers - get mesh_n - current_length more
23572360
let desired_peers = mesh_n - peers.len();
2358-
let peer_list = get_random_peers(
2359-
&self.connected_peers,
2360-
topic_hash,
2361-
desired_peers,
2362-
|peer_id| {
2363-
!peers.contains(peer_id)
2364-
&& !explicit_peers.contains(peer_id)
2365-
&& !backoffs.is_backoff_with_slack(topic_hash, peer_id)
2366-
&& scores.get(peer_id).map(|r| r.score).unwrap_or_default() >= 0.0
2367-
},
2368-
);
2361+
let peer_list =
2362+
get_random_peers(&self.connected_peers, topic_hash, desired_peers, |peer| {
2363+
!peers.contains(peer)
2364+
&& !explicit_peers.contains(peer)
2365+
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
2366+
&& scores.get(peer).map(|r| r.score).unwrap_or_default() >= 0.0
2367+
});
23692368
for peer in &peer_list {
23702369
let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new);
23712370
current_topic.push(topic_hash.clone());
@@ -2703,7 +2702,7 @@ where
27032702
}
27042703
self.failed_messages.shrink_to_fit();
27052704

2706-
// Flush stale IDONTWANTs and partial messages.
2705+
// Flush stale IDONTWANTs.
27072706
for peer in self.connected_peers.values_mut() {
27082707
while let Some((_front, instant)) = peer.dont_send.front() {
27092708
if (*instant + IDONTWANT_TIMEOUT) >= Instant::now() {
@@ -2778,27 +2777,23 @@ where
27782777
)
27792778
};
27802779
// get gossip_lazy random peers
2781-
let to_msg_peers = get_random_peers_dynamic(
2782-
self.connected_peers.iter(),
2783-
topic_hash,
2784-
n_map,
2785-
|peer_id| {
2786-
let filter = !peers.contains(peer_id)
2787-
&& !self.explicit_peers.contains(peer_id)
2780+
let to_msg_peers =
2781+
get_random_peers_dynamic(&self.connected_peers, topic_hash, n_map, |peer| {
2782+
let filter = !peers.contains(peer)
2783+
&& !self.explicit_peers.contains(peer)
27882784
&& !self
27892785
.peer_score
2790-
.below_threshold(peer_id, |ts| ts.gossip_threshold)
2786+
.below_threshold(peer, |ts| ts.gossip_threshold)
27912787
.0;
27922788
// Don't send IHAVE to peers that requested partial messages -
27932789
// they receive metadata via partial message gossip instead.
27942790
#[cfg(feature = "partial_messages")]
27952791
let filter = filter
27962792
&& !self
27972793
.partial_messages_extension
2798-
.requests_partial(peer_id, topic_hash);
2794+
.requests_partial(peer, topic_hash);
27992795
filter
2800-
},
2801-
);
2796+
});
28022797

28032798
tracing::debug!("Gossiping IHAVE to {} peers", to_msg_peers.len());
28042799

@@ -3817,17 +3812,17 @@ fn peer_removed_from_mesh(
38173812
/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
38183813
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
38193814
/// that gets as input the number of filtered peers.
3820-
fn get_random_peers_dynamic<'a>(
3821-
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
3815+
fn get_random_peers_dynamic(
3816+
connected_peers: &HashMap<PeerId, PeerDetails>,
38223817
topic_hash: &TopicHash,
38233818
// maps the number of total peers to the number of selected peers
38243819
n_map: impl Fn(usize) -> usize,
3825-
f: impl Fn(&PeerId) -> bool,
3820+
mut f: impl FnMut(&PeerId) -> bool,
38263821
) -> BTreeSet<PeerId> {
3827-
let mut gossip_peers = peers
3828-
.into_iter()
3822+
let mut gossip_peers = connected_peers
3823+
.iter()
38293824
.filter(|(_, p)| p.topics.contains(topic_hash))
3830-
.filter(|(peer_id, _peer)| f(peer_id))
3825+
.filter(|(peer_id, _)| f(peer_id))
38313826
.filter(|(_, p)| p.kind.is_gossipsub())
38323827
.map(|(peer_id, _)| *peer_id)
38333828
.collect::<Vec<PeerId>>();
@@ -3850,13 +3845,13 @@ fn get_random_peers_dynamic<'a>(
38503845

38513846
/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
38523847
/// filtered by the function `f`.
3853-
fn get_random_peers<'a>(
3854-
peers: impl IntoIterator<Item = (&'a PeerId, &'a PeerDetails)>,
3848+
fn get_random_peers(
3849+
connected_peers: &HashMap<PeerId, PeerDetails>,
38553850
topic_hash: &TopicHash,
38563851
n: usize,
3857-
f: impl Fn(&PeerId) -> bool,
3852+
f: impl FnMut(&PeerId) -> bool,
38583853
) -> BTreeSet<PeerId> {
3859-
get_random_peers_dynamic(peers, topic_hash, |_| n, f)
3854+
get_random_peers_dynamic(connected_peers, topic_hash, |_| n, f)
38603855
}
38613856

38623857
/// Validates the combination of signing, privacy and message validation to ensure the

protocols/gossipsub/src/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ pub struct RawMessage {
151151
pub validated: bool,
152152
}
153153

154-
impl std::fmt::Debug for RawMessage {
154+
impl fmt::Debug for RawMessage {
155155
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156156
f.debug_struct("RawMessage")
157157
.field("source", &self.source)

0 commit comments

Comments
 (0)