Conversation
|
Could you merge |
|
Found a few issues while testing with pluto, so converted to draft |
Code ReviewSummaryThis PR adds the Findings[Critical] Cargo.toml workspace path typo — build errorImpact: The workspace will fail to build because the path points to a non-existent directory. [High] Max message size mismatch: parsigex uses 16 MB, Go uses 128 MBImpact: Any cluster where a [High] Handler only processes one inbound stream at a time — silent drop under concurrencyImpact: If a second inbound stream is fully negotiated while the first is still being read/verified, [Medium]
|
| Component | Go | Rust | Match | Notes |
|---|---|---|---|---|
| Protocol ID | /charon/parsigex/2.0.0 |
/charon/parsigex/2.0.0 |
yes | |
| Wire format | protobuf length-delimited | protobuf length-delimited | yes | |
| Max message size | 128 MB | 16 MB | no | See High finding above |
| DutyType integer mapping | 0–13 | 0–13 | yes | |
| Empty data set rejection | yes | yes | yes | InvalidParSignedDataSetFields |
| Duty validity check on recv | yes | yes | yes | duty_gater |
| Per-entry signature verification | yes | yes | yes | verifier callback |
| Concurrent inbound streams | yes (goroutine per stream) | no (single slot) | no | See High finding above |
| BuilderProposer rejected | yes (deprecated) |
yes (DeprecatedBuilderProposer) |
yes |
Tests
Tests were not run locally. The example (crates/parsigex/examples/parsigex.rs) provides an integration-level smoke test for the broadcast path but requires a live multi-node setup. Unit tests for encode_message/decode_message round-trips and codec edge cases (empty set, unknown duty type, oversized message) appear absent — these would be worth adding.
Open Questions
- Is the 16 MB cap intentional (memory-safety limit) or an oversight vs. Go's 128 MB?
- Was
.with_quic_enabled(true)on the relay node builder intentional? The other threeNode::new_*builders in the same file were not changed. SignedDataError::Customis only added insigneddata.rsbut never constructed in this PR — is it intended for follow-up work, or can it be deferred?
| targeted = targeted.saturating_add(1); | ||
| } | ||
|
|
||
| if targeted == 0 { |
There was a problem hiding this comment.
Do you think we should send a BroadcastFailed here? Otherwise the request will be "pending" forever, notify BroadcastError doesn't enough to mark a request_id is done/fail
There was a problem hiding this comment.
Agree, this should only happen when we cannot connect to any of the cluster peers. We should run the example with a single node running (it will fail to send to the other two) and see what happens with the pending requests.
| Closed, | ||
| /// Broadcast failed for a peer. | ||
| #[error("broadcast to peer {peer} failed: {source}")] | ||
| BroadcastPeer { |
| }, | ||
| /// Peer is not currently connected. | ||
| #[error("peer {0} is not connected")] | ||
| PeerNotConnected(PeerId), |
| anyhow.workspace = true | ||
| alloy.workspace = true | ||
| clap.workspace = true | ||
| rand.workspace = true | ||
| libp2p.workspace = true | ||
| k256.workspace = true | ||
| prost.workspace = true | ||
| prost-types.workspace = true | ||
| hex.workspace = true | ||
| chrono.workspace = true | ||
| test-case.workspace = true | ||
| pluto-eth2util.workspace = true | ||
| pluto-cluster.workspace = true | ||
| pluto-p2p.workspace = true | ||
| pluto-testutil.workspace = true | ||
| pluto-tracing.workspace = true |
There was a problem hiding this comment.
are these newly added dependencies in use?
There was a problem hiding this comment.
None of the added deps are used:
- anyhow
- clap
- k256
- pluto-cluster
- pluto-p2p
- pluto-tracing
|
|
||
| /// Invalid partial signed proto. | ||
| #[error("invalid partial signed proto")] | ||
| InvalidParSignedProto, |
| impl Behaviour { | ||
| /// Creates a behaviour and a clonable broadcast handle. | ||
| pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) { | ||
| debug_assert_eq!(config.peer_id, peer_id); |
There was a problem hiding this comment.
do we need this? or we can remove the peer_id in the param, then use directly from the config
There was a problem hiding this comment.
Agree on this, unnecessary duplication that does not perform any additional checks.
| timeout: Duration, | ||
| verifier: Verifier, | ||
| duty_gater: DutyGater, | ||
| _peer: PeerId, |
| /// Encodes a protobuf message to bytes. | ||
| pub fn encode_protobuf<M: Message>(message: &M) -> Vec<u8> { | ||
| let mut buf = Vec::with_capacity(message.encoded_len()); | ||
| message | ||
| .encode(&mut buf) | ||
| .expect("vec-backed protobuf encoding cannot fail"); | ||
| buf | ||
| } | ||
|
|
||
| /// Decodes a protobuf message from bytes. | ||
| pub fn decode_protobuf<M: Message + Default>( | ||
| bytes: &[u8], | ||
| ) -> std::result::Result<M, prost::DecodeError> { | ||
| M::decode(bytes) | ||
| } |
varex83agent
left a comment
There was a problem hiding this comment.
Review: feat: add parsigex
This PR ports the Charon parsigex libp2p protocol to Rust. The architecture is sound — the Behaviour/Handler split, the per-stream one-shot send/receive design, and the duty codec fallback order (Attestation before VersionedAttestation, SignedAggregateAndProof before versioned) all match Go correctly.
There are 3 bugs that must be fixed before merge:
- The codec is JSON-only — Go writes SSZ-first since v0.17, causing a hard interoperability failure for all major duty types (
crates/core/src/parsigex_codec.rs) DutySentinelencodes as14butTryFrom<i32>has no arm for14, making the round-trip fail withInvalidDuty(crates/core/src/types.rs)- Broadcast with no connected peers silently drops the
request_idwithout emitting a terminal event, leaving callers blocked forever (crates/parsigex/src/behaviour.rs:304)
Additionally there are 8 major issues: the peerinfo max-message-size regression (64 KiB → 128 MiB), pending_broadcasts leaking on connection close, unbounded tokio::spawn in notify_subscribers, FuturesUnordered with no concurrency cap, VerifyError being silently discarded, SignedDataError::Custom not being Send + Sync, a roundabout error conversion through serde_json::Error::io, and a panic!-eligible .expect() in non-test code. Inline comments are filed for each issue individually.
| Serialize(#[from] serde_json::Error), | ||
| } | ||
|
|
||
| pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result<Vec<u8>, ParSigExCodecError> { |
There was a problem hiding this comment.
Bug: JSON-only codec — Go uses SSZ-first serialization since v0.17, causing a hard interoperability failure
serialize_signed_data and deserialize_signed_data use serde_json exclusively. Go's marshal() (charon/core/proto.go:266) tries ssz.Marshaler first, falling back to JSON. Go's unmarshal() tries SSZ then JSON. Since v0.17, sszMarshallingEnabled = true by default.
Affected types that Go writes as SSZ: VersionedSignedProposal, Attestation, VersionedAttestation, SignedAggregateAndProof, VersionedSignedAggregateAndProof, SignedSyncMessage, SignedSyncContributionAndProof.
A Go node will write SSZ; the Rust node will fail to decode it, returning InvalidPayload and silently dropping the partial signature. This is a complete interoperability failure for those duty types.
Fix: implement SSZ-first serialization/deserialization for all SignedData types that implement SSZ in Go, matching the marshal/unmarshal logic in charon/core/proto.go:264-304.
| DutyType::PrepareSyncContribution => 11, | ||
| DutyType::SyncContribution => 12, | ||
| DutyType::InfoSync => 13, | ||
| DutyType::DutySentinel(_) => 14, |
There was a problem hiding this comment.
Bug: DutySentinel serializes as 14 but TryFrom<i32> has no arm for 14 — round-trip is broken
From<&DutyType> for i32 maps DutySentinel(_) => 14 (here), but TryFrom<i32> for DutyType falls through to _ => Err(ParSigExCodecError::InvalidDuty) at line 115 — 14 is never matched.
In Go, dutySentinel = 14 is an unexported constant used only as a loop bound and is never transmitted on the wire. The Rust DutySentinel variant should not be serializable to the wire format at all.
Fix: guard against encoding DutySentinel before it reaches From<&DutyType> for i32 (return an error), and change TryFrom<i32> to return Err(UnsupportedDutyType) (not InvalidDuty) for value 14 to distinguish "recognised but unsupported" from "not a valid duty type".
There was a problem hiding this comment.
Generally agree, but I suggest replacing the From with TryFrom. Also, look into the possibility of dropping the DutySentinel if it's not needed (even if we deviate from Charon, it's most likely an artifact of Go not having reasonable union types)
| targeted = targeted.saturating_add(1); | ||
| } | ||
|
|
||
| if targeted == 0 { |
There was a problem hiding this comment.
Bug: broadcast with no connected peers silently drops the request_id — no completion event is ever emitted
When targeted == 0 (all known peers are disconnected), the function returns early. It has already emitted individual BroadcastError events for each disconnected peer, but never emits BroadcastFailed or BroadcastComplete. The request_id returned by Handle::broadcast() is permanently orphaned — any caller awaiting a terminal event for it will block forever.
Go's Broadcast returns nil synchronously in this scenario, which is an implicit success. In async Rust, the caller receives Ok(id) from Handle::broadcast() before the swarm processes the command, so the silent drop is observable as a hang.
Fix: emit Event::BroadcastFailed { request_id } before returning when targeted == 0.
| Ok(self.connection_handler_for_peer(peer)) | ||
| } | ||
|
|
||
| fn on_swarm_event(&mut self, _event: FromSwarm) {} |
There was a problem hiding this comment.
Major: on_swarm_event ignores ConnectionClosed — pending_broadcasts leaks on mid-flight disconnect
If a connection is torn down while a PendingBroadcast has remaining > 0, the Handler is dropped and its active_futures are abandoned. No FromHandler::OutboundError event is produced for in-flight sends, so finish_broadcast_result is never called and the PendingBroadcast entry leaks in pending_broadcasts forever.
Fix: handle FromSwarm::ConnectionClosed — iterate pending_broadcasts and call finish_broadcast_result(request_id, true) for any broadcast targeting the disconnected peer:
fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::ConnectionClosed(e) = event {
if e.remaining_established == 0 {
// drain any pending broadcasts that included this peer
}
}
}| /// Each subscriber is invoked in a spawned task since `poll()` is | ||
| /// synchronous. This matches Go's intended behaviour (see Go TODO to call | ||
| /// subscribers async). | ||
| fn notify_subscribers(&self, duty: Duty, data_set: ParSignedDataSet) { |
There was a problem hiding this comment.
Major: notify_subscribers spawns an unbounded number of Tokio tasks — DoS amplification and head-of-line blocking
Every verified inbound message calls tokio::spawn with no bound. Under sustained inbound traffic, the number of live tasks grows unboundedly, consuming memory proportional to message rate.
Additionally, all subscriber callbacks are awaited sequentially inside a single task. If any subscriber is slow (e.g., blocked on a back-pressured channel), all subsequent subscribers in the list are stalled for that entire duration.
Fix: feed messages through a bounded tokio::sync::mpsc channel to a dedicated delivery task. Back-pressure is then applied at the receive side rather than accumulating as spawned tasks:
// In Behaviour:
dispatch_tx: mpsc::Sender<(Duty, ParSignedDataSet)>, // bounded capacity
// In a dedicated spawned task:
while let Some((duty, set)) = rx.recv().await {
for sub in subs.read().await.iter() { sub(duty.clone(), set.clone()).await; }
}There was a problem hiding this comment.
Agree on the fact that sequential execution of subscriber callbacks is not good. Still, I believe that we can drop the entire subscribe mechanism given how Rust's libp2p already uses events.
|
|
||
| impl Behaviour { | ||
| /// Creates a behaviour and a clonable broadcast handle. | ||
| pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) { |
There was a problem hiding this comment.
Minor: redundant peer_id parameter — only used in a debug_assert, never stored
pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) accepts peer_id solely for debug_assert_eq!(config.peer_id, peer_id). The value is never stored; in release builds the assertion is compiled out and a mismatching peer_id has no effect.
Config already carries peer_id. Remove the constructor parameter and use config.peer_id directly everywhere the local peer_id was referenced.
| timeout: Duration, | ||
| verifier: Verifier, | ||
| duty_gater: DutyGater, | ||
| _peer: PeerId, |
There was a problem hiding this comment.
Minor: _peer: PeerId is accepted but never stored or used
The parameter is discarded immediately. Remove it from Handler::new and update the call-site in behaviour.rs (Handler::new(self.config.timeout, …, peer) → drop peer).
| pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/charon/parsigex/2.0.0"); | ||
|
|
||
| /// Returns the supported protocols in precedence order. | ||
| pub fn protocols() -> Vec<StreamProtocol> { |
There was a problem hiding this comment.
Minor: protocols() is never called — dead public API
pub fn protocols() -> Vec<StreamProtocol> is exported but no consumer uses it (nothing in p2p.rs, behaviour.rs, or the examples references it). Either wire it into the swarm setup where a protocol list is needed, or remove it to avoid a dead_code lint in non-doc builds.
| }; | ||
| use pluto_p2p::proto; | ||
|
|
||
| use super::{Error, Result as ParasigexResult}; |
There was a problem hiding this comment.
nit: Result as ParasigexResult — "Parasigex" is a misspelling. The crate is named parsigex. Rename the alias to ParsigexResult (or just use Result with a qualified path).
| DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), | ||
| DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), | ||
| DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), | ||
| DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => { |
There was a problem hiding this comment.
nit: DutyType::InfoSync rejection is correct (matches Go's default branch in ParSignedDataFromProto) but has no comment explaining the intent. A future maintainer might assume this is a missing case. Add:
// InfoSync is not used in parsigex in Go (handled via a separate channel);
// Unknown and DutySentinel are sentinel/invalid values that are never transmitted.
DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => {
Err(ParSigExCodecError::UnsupportedDutyType)
}|
/claude-review |
|
Claude finished @varex83's task in 12m 36s —— View job PR #291 — parsigex Review
Verdict: REQUEST_CHANGES — 3 bugs + 7 major findings The architecture is sound: Behaviour/Handler split mirrors Charon, duty-codec fallback order (Attestation before VersionedAttestation, SignedAggregateAndProof before versioned) matches Go, and the protocol ID Bugs (must fix)1. When if targeted == 0 {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::BroadcastFailed { request_id }));
return;
}2.
Fix: 3.
// Fix:
Custom(Box<dyn std::error::Error + Send + Sync>),Major Findings4.
Fix: handle 5. Every inbound stream pushes a Fix: check 6. Every verified inbound message unconditionally calls Fix: use a bounded 7. AGENTS.md prohibits pub fn encode_protobuf<M: Message>(message: &M) -> Vec<u8> {
message.encode_to_vec()
}8.
9. Converting 10. A Minor Findings
Nits
Parity Matrix
|
| anyhow.workspace = true | ||
| alloy.workspace = true | ||
| clap.workspace = true | ||
| rand.workspace = true | ||
| libp2p.workspace = true | ||
| k256.workspace = true | ||
| prost.workspace = true | ||
| prost-types.workspace = true | ||
| hex.workspace = true | ||
| chrono.workspace = true | ||
| test-case.workspace = true | ||
| pluto-eth2util.workspace = true | ||
| pluto-cluster.workspace = true | ||
| pluto-p2p.workspace = true | ||
| pluto-testutil.workspace = true | ||
| pluto-tracing.workspace = true |
There was a problem hiding this comment.
None of the added deps are used:
- anyhow
- clap
- k256
- pluto-cluster
- pluto-p2p
- pluto-tracing
| DutyType::PrepareSyncContribution => 11, | ||
| DutyType::SyncContribution => 12, | ||
| DutyType::InfoSync => 13, | ||
| DutyType::DutySentinel(_) => 14, |
There was a problem hiding this comment.
Generally agree, but I suggest replacing the From with TryFrom. Also, look into the possibility of dropping the DutySentinel if it's not needed (even if we deviate from Charon, it's most likely an artifact of Go not having reasonable union types)
| let share_idx = | ||
| i32::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; | ||
| let signature = data.signed_data.signature().map_err(|err| { | ||
| ParSigExCodecError::Serialize(serde_json::Error::io(std::io::Error::other( |
There was a problem hiding this comment.
Agree on this, the error has nothing to do with IO. Prefer to add another variant to ParSigExCodecError like ParSigExCodecError::InvalidSignature
| use chrono::{DateTime, Utc}; | ||
| use libp2p::{PeerId, swarm::Stream}; | ||
| use pluto_core::version::{self, SemVer, SemVerError}; | ||
| use pluto_p2p::proto::MAX_MESSAGE_SIZE; |
| use super::{Error, Result as ParasigexResult}; | ||
|
|
||
| /// Encodes a protobuf message to bytes. | ||
| pub fn encode_protobuf<M: Message>(message: &M) -> Vec<u8> { |
There was a problem hiding this comment.
Can be fully replaced with message.encode_to_vec()
| targeted = targeted.saturating_add(1); | ||
| } | ||
|
|
||
| if targeted == 0 { |
There was a problem hiding this comment.
Agree, this should only happen when we cannot connect to any of the cluster peers. We should run the example with a single node running (it will fail to send to the other two) and see what happens with the pending requests.
| /// Each subscriber is invoked in a spawned task since `poll()` is | ||
| /// synchronous. This matches Go's intended behaviour (see Go TODO to call | ||
| /// subscribers async). | ||
| fn notify_subscribers(&self, duty: Duty, data_set: ParSignedDataSet) { |
There was a problem hiding this comment.
Agree on the fact that sequential execution of subscriber callbacks is not good. Still, I believe that we can drop the entire subscribe mechanism given how Rust's libp2p already uses events.
| /// Outbound open info that carries the request context through stream | ||
| /// negotiation. |
There was a problem hiding this comment.
The comment does not give any useful context. What is the request id? What is the payload?
| verifier: Verifier, | ||
| duty_gater: DutyGater, | ||
| pending_open: VecDeque<PendingOpen>, | ||
| active_futures: FuturesUnordered<ActiveFuture>, |
There was a problem hiding this comment.
Not really important since we're gating the connections to the cluster known peers, which is always a reasonably bounded number.
| for (pub_key, par_sig) in data_set.inner() { | ||
| verifier(duty.clone(), *pub_key, par_sig.clone()) | ||
| .await | ||
| .map_err(|_| Failure::InvalidPartialSignature)?; |
There was a problem hiding this comment.
It would be nice if we included the error in the InvalidPartialSignature variant so that it can be logged at a higher level. We should not discard the error.
Summary
Implements the partial signature exchange (parsigex) libp2p protocol — the mechanism by which distributed validator nodes share the partial BLS signatures they produce for each duty before threshold-combining them into a full aggregate.
New crate:
pluto-parsigexBehaviourNetworkBehaviourthat manages per-peer connections and routes inbound/outbound eventsHandlerConnectionHandler; opens an outbound substream per broadcast, accepts inbound substreams for receivesprotocol.rs/charon/parsigex/2.0.0)error.rsFailure(handler ↔ behaviour),VerifyError(caller-supplied verifier),Error(public)Handlebroadcast(duty, data_set)fans a signed set out to all connected peers;subscribe(cb)registers a callback for verified inbound setsReceive pipeline (handler side):
(Duty, ParSignedDataSet)DutyGaterVerifierEvent::Receivedto the behaviour on success,Event::Erroron any failureCore domain types (
crates/core)New types in
types.rswith full protobuf round-trip support:DutyType— enum covering all validator duties (attester, proposer, builder, randao, exit, sync, …)SlotNumber/Duty— slot + duty type pair that identifies a signing roundPubKey— 48-byte BLS public key newtypeParSignedData— aBox<dyn SignedData>paired with a share indexParSignedDataSet—HashMap<PubKey, ParSignedData>(one partial sig per validator key)New
parsigex_codec.rs— JSON serialise/deserialise for everySignedDatavariant, with type-erased dispatch viaAny::downcast_ref. Fallback order forAttesterandAggregatorduties matches Go's Charon for wire compatibility.Other changes
crates/p2p— wires parsigexBehaviourinto the combined pluto swarm behaviourcrates/peerinfo— removes the localMAX_MESSAGE_SIZEconstant in favour of the shared one frompluto-p2pTest plan
cargo test --workspace --all-featurescargo clippy --workspace --all-targets --all-features -- -D warningscargo +nightly fmt --all --checkparsigexexample end-to-end against a local cluster