Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct GossipCache {
execution_payload: Option<Duration>,
/// Timeout for payload attestation messages.
payload_attestation_message: Option<Duration>,
/// Timeout for proposer preferences.
proposer_preferences: Option<Duration>,
}

#[derive(Default)]
Expand Down Expand Up @@ -87,6 +89,8 @@ pub struct GossipCacheBuilder {
execution_payload: Option<Duration>,
/// Timeout for payload attestation messages.
payload_attestation_message: Option<Duration>,
/// Timeout for proposer preferences.
proposer_preferences: Option<Duration>,
}

#[allow(dead_code)]
Expand Down Expand Up @@ -181,6 +185,12 @@ impl GossipCacheBuilder {
self
}

/// Timeout for proposer preferences messages.
pub fn proposer_preferences_timeout(mut self, timeout: Duration) -> Self {
self.proposer_preferences = Some(timeout);
self
}

pub fn build(self) -> GossipCache {
let GossipCacheBuilder {
default_timeout,
Expand All @@ -200,6 +210,7 @@ impl GossipCacheBuilder {
execution_payload_bid,
execution_payload,
payload_attestation_message,
proposer_preferences,
} = self;
GossipCache {
expirations: DelayQueue::default(),
Expand All @@ -220,6 +231,7 @@ impl GossipCacheBuilder {
execution_payload_bid: execution_payload_bid.or(default_timeout),
execution_payload: execution_payload.or(default_timeout),
payload_attestation_message: payload_attestation_message.or(default_timeout),
proposer_preferences: proposer_preferences.or(default_timeout),
}
}
}
Expand Down Expand Up @@ -250,6 +262,7 @@ impl GossipCache {
GossipKind::ExecutionPayloadBid => self.execution_payload_bid,
GossipKind::ExecutionPayload => self.execution_payload,
GossipKind::PayloadAttestationMessage => self.payload_attestation_message,
GossipKind::ProposerPreferences => self.proposer_preferences,
};
let Some(expire_timeout) = expire_timeout else {
return;
Expand Down
6 changes: 5 additions & 1 deletion src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,11 @@ impl<P: Preset> Network<P> {

/// Publishes message on the pubsub (gossipsub) behaviour, choosing the encoding.
pub fn publish(&mut self, message: PubsubMessage<P>) {
for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) {
self.publish_with_digest(message, self.enr_fork_id.fork_digest);
}

pub fn publish_with_digest(&mut self, message: PubsubMessage<P>, digest: ForkDigest) {
for topic in message.topics(GossipEncoding::default(), digest) {
let message_data = match message.encode(GossipEncoding::default()) {
Ok(message) => message,
Err(encoding_error) => {
Expand Down
38 changes: 37 additions & 1 deletion src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use types::{
gloas::containers::{
DataColumnSidecar as GloasDataColumnSidecar, PayloadAttestationMessage,
SignedBeaconBlock as GloasSignedBeaconBlock, SignedExecutionPayloadBid,
SignedExecutionPayloadEnvelope,
SignedExecutionPayloadEnvelope, SignedProposerPreferences,
},
nonstandard::Phase,
phase0::{
Expand Down Expand Up @@ -83,6 +83,8 @@ pub enum PubsubMessage<P: Preset> {
ExecutionPayload(Arc<SignedExecutionPayloadEnvelope<P>>),
/// Gossipsub message providing notification of a payload attestation message.
PayloadAttestationMessage(Arc<PayloadAttestationMessage>),
/// Gossipsub message providing notification of a proposer preference.
ProposerPreferences(Arc<SignedProposerPreferences>),
}

// Implements the `DataTransform` trait of gossipsub to employ snappy compression
Expand Down Expand Up @@ -188,6 +190,7 @@ impl<P: Preset> PubsubMessage<P> {
PubsubMessage::ExecutionPayloadBid(_) => GossipKind::ExecutionPayloadBid,
PubsubMessage::ExecutionPayload(_) => GossipKind::ExecutionPayload,
PubsubMessage::PayloadAttestationMessage(_) => GossipKind::PayloadAttestationMessage,
PubsubMessage::ProposerPreferences(_) => GossipKind::ProposerPreferences,
}
}

Expand Down Expand Up @@ -625,6 +628,31 @@ impl<P: Preset> PubsubMessage<P> {
)),
}
}
GossipKind::ProposerPreferences => {
match fork_context.get_fork_from_context_bytes(gossip_topic.fork_digest) {
Some(Phase::Gloas) => {
let signed_proposer_preferences =
SignedProposerPreferences::from_ssz_default(data)
.map_err(|e| format!("{:?}", e))?;
Ok(PubsubMessage::ProposerPreferences(Arc::new(
signed_proposer_preferences,
)))
}
Some(
Phase::Phase0
| Phase::Altair
| Phase::Bellatrix
| Phase::Capella
| Phase::Deneb
| Phase::Electra
| Phase::Fulu,
)
| None => Err(format!(
"proposer_preferences topic invalid for given fork digest {:?}",
gossip_topic.fork_digest
)),
}
}
}
}
}
Expand Down Expand Up @@ -655,6 +683,7 @@ impl<P: Preset> PubsubMessage<P> {
PubsubMessage::ExecutionPayloadBid(data) => data.to_ssz(),
PubsubMessage::ExecutionPayload(data) => data.to_ssz(),
PubsubMessage::PayloadAttestationMessage(data) => data.to_ssz(),
PubsubMessage::ProposerPreferences(data) => data.to_ssz(),
}
}
}
Expand Down Expand Up @@ -745,6 +774,13 @@ impl<P: Preset> std::fmt::Display for PubsubMessage<P> {
data.data.slot, data.data.beacon_block_root, data.validator_index
)
}
PubsubMessage::ProposerPreferences(data) => {
write!(
f,
"Proposer Preference: slot: {}, validator_index: {}",
data.message.proposal_slot, data.message.validator_index,
)
}
}
}
}
12 changes: 11 additions & 1 deletion src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update
pub const EXECUTION_PAYLOAD_BID_TOPIC: &str = "execution_payload_bid";
pub const EXECUTION_PAYLOAD_TOPIC: &str = "execution_payload";
pub const PAYLOAD_ATTESTATION_MESSAGE_TOPIC: &str = "payload_attestation_message";
pub const PROPOSER_PREFERENCES_TOPIC: &str = "proposer_preferences";

#[derive(Debug)]
pub struct TopicConfig {
Expand Down Expand Up @@ -108,6 +109,7 @@ pub fn core_topics_to_subscribe(
topics.push(GossipKind::ExecutionPayloadBid);
topics.push(GossipKind::ExecutionPayload);
topics.push(GossipKind::PayloadAttestationMessage);
topics.push(GossipKind::ProposerPreferences);
}

topics
Expand Down Expand Up @@ -137,7 +139,8 @@ pub fn is_fork_non_core_topic(topic: &GossipTopic, _phase: Phase) -> bool {
| GossipKind::LightClientOptimisticUpdate
| GossipKind::ExecutionPayloadBid
| GossipKind::ExecutionPayload
| GossipKind::PayloadAttestationMessage => false,
| GossipKind::PayloadAttestationMessage
| GossipKind::ProposerPreferences => false,
}
}

Expand Down Expand Up @@ -204,6 +207,8 @@ pub enum GossipKind {
ExecutionPayload,
/// Topic for publishing payload attestation messages.
PayloadAttestationMessage,
/// Topic for publishing proposer preferences.
ProposerPreferences,
}

impl std::fmt::Display for GossipKind {
Expand Down Expand Up @@ -288,6 +293,7 @@ impl GossipTopic {
EXECUTION_PAYLOAD_BID_TOPIC => GossipKind::ExecutionPayloadBid,
EXECUTION_PAYLOAD_TOPIC => GossipKind::ExecutionPayload,
PAYLOAD_ATTESTATION_MESSAGE_TOPIC => GossipKind::PayloadAttestationMessage,
PROPOSER_PREFERENCES_TOPIC => GossipKind::ProposerPreferences,
topic => match subnet_topic_index(topic) {
Some(kind) => kind,
None => return Err(format!("Unknown topic: {}", topic)),
Expand Down Expand Up @@ -358,6 +364,7 @@ impl std::fmt::Display for GossipTopic {
GossipKind::ExecutionPayloadBid => EXECUTION_PAYLOAD_BID_TOPIC.into(),
GossipKind::ExecutionPayload => EXECUTION_PAYLOAD_TOPIC.into(),
GossipKind::PayloadAttestationMessage => PAYLOAD_ATTESTATION_MESSAGE_TOPIC.into(),
GossipKind::ProposerPreferences => PROPOSER_PREFERENCES_TOPIC.into(),
};
write!(
f,
Expand Down Expand Up @@ -433,6 +440,7 @@ mod tests {
ExecutionPayloadBid,
ExecutionPayload,
PayloadAttestationMessage,
ProposerPreferences,
]
.iter()
{
Expand Down Expand Up @@ -535,6 +543,7 @@ mod tests {
"payload_attestation_message",
PayloadAttestationMessage.as_ref()
);
assert_eq!("proposer_preferences", ProposerPreferences.as_ref());
}

fn get_chain_config() -> ChainConfig {
Expand Down Expand Up @@ -620,6 +629,7 @@ mod tests {
GossipKind::ExecutionPayloadBid,
GossipKind::ExecutionPayload,
GossipKind::PayloadAttestationMessage,
GossipKind::ProposerPreferences,
];
for subnet in s {
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
Expand Down