Skip to content

feat(dkg): implement sync#311

Open
iamquang95 wants to merge 29 commits intomainfrom
iamquang95/dkg/sync
Open

feat(dkg): implement sync#311
iamquang95 wants to merge 29 commits intomainfrom
iamquang95/dkg/sync

Conversation

@iamquang95
Copy link
Copy Markdown
Collaborator

Should address #133

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @iamquang95's task in 6m 16s —— View job


Code Review

  • Gather context and understand PR scope
  • Read changed files
  • Compare against Go reference implementation
  • Review sync module files
  • Review p2p proto changes
  • Compile final review

Summary

This PR implements the DKG step-synchronization protocol (/charon/dkg/sync/1.0.0/) in Rust, porting the Go Charon dkg/sync package. The overall structure is sound — Server/Client/Handler/Behaviour decomposition mirrors the Go architecture, and the proto wire format matches. There are a few correctness issues worth addressing before merge, plus some lower-priority observations.


Findings

[High] Step is updated for peers that fail validation

In handle_inbound_stream, after validate_request_with_public_key fails the code sets the server error and response error string, but then falls through to server.update_step(peer_id, message.step) and continues the loop to read the next message.

Evidence: crates/dkg/src/sync/handler.rs:365-416

if let Err(error) = protocol::validate_request_with_public_key(...) {
    // sets server error, response.error
} else {
    server.mark_connected(peer_id).await;
}

// ← this runs even after validation failure
if server.update_step(peer_id, message.step).await? {
    send_inbound_event(...);
}
// loop continues - next read will also come from the invalid peer

A peer that sends an invalid hash signature (wrong cluster) can still advance the step tracker and the loop keeps reading from that peer. The Go reference returns from the handler after an invalid message. The function should break (or return Err) immediately after setting the error and writing the error response. Fix this →


[Medium] Protocol ID has trailing slash — verify against Go

protocol.rs:11:

pub const PROTOCOL_NAME: libp2p::StreamProtocol = StreamProtocol::new("/charon/dkg/sync/1.0.0/");

The trailing / needs to exactly match what the Charon Go server advertises. libp2p protocol negotiation is byte-exact. If the Go side uses /charon/dkg/sync/1.0.0 (no trailing slash), all protocol negotiations will fail silently — the handler is never invoked, connections appear established but no sync traffic flows. Please confirm against the Go source. Fix this →


[Medium] await_all_at_step does not check started — can deadlock before Server::start()

await_all_connected guards against this with an is_started() check (server.rs:74), but await_all_at_step (server.rs:120) has no such guard. If called before start(), it spins on the notify but the started-notify never fires, so cancellation is the only exit. This likely doesn't matter in practice (DKG won't await steps before starting), but it's an asymmetry worth documenting or guarding. Fix this →


[Low] Redundant explicit arms before wildcard in on_connection_event

handler.rs:279-285:

ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
ConnectionEvent::ListenUpgradeError(_) => {}
_ => {}    // ← makes the lines above unreachable

The named arms are unreachable because _ => {} catches everything. Since ConnectionEvent is #[non_exhaustive] you need the wildcard — the named arms can be removed. Fix this →


[Low] inbound_events_tx error logged at error! level, but handler owns both ends

handler.rs:426-428: If inbound_events_tx.send() fails it logs error!. Since the handler holds both the sender and receiver for the channel's lifetime, this can only happen after the handler is dropped — at which point the error is silently swallowed anyway. A debug! level or silent ignore would be less alarming. Fix this →


[Low] run_outbound_stream does not flush on shutdown

handler.rs:336-338: When shutdown=true the final MsgSync is sent and the function returns GracefulShutdown immediately after reading the response. The stream is then dropped. On the write path write_fixed_size_protobuf does flush, so the actual bytes make it to the peer — this is fine. Just confirming this is intentional and the inbound side reads the shutdown flag and terminates gracefully.


Parity Matrix

Component Go (charon/dkg/sync) Rust (pluto/crates/dkg/src/sync) Match Notes
Protocol ID /charon/dkg/sync/1.0.0 /charon/dkg/sync/1.0.0/ Unverified Trailing slash needs confirmation
Wire framing i64 LE length prefix i64 LE length prefix Yes write_fixed_size_protobuf
Proto schema MsgSync, MsgSyncResponse Generated from same .proto Yes Fields match
Step validation (initial) 0 or 1 0 or 1 Yes server.rs:227
Step validation (jump) current+1 or current+2 current+1 or current+2 Yes server.rs:218
Hash sig verification secp256k1 / libp2p sign secp256k1 / libp2p sign Yes protocol.rs:44
Version check major.minor exact full semver == Yes SemVer is major.minor only
Shutdown flow send shutdown flag, wait ACK send shutdown=true, read response Yes handler.rs:296-338
Error propagation (invalid msg) break loop continues loop No High severity, see above

Tests

Tests were not run (no cargo test permission). Unit tests cover:

  • update_step_rules_match_go — validates step acceptance rules
  • sync_round_trip_matches_go_shape — 3-node integration test across real TCP
  • active_client_requests_dial — behaviour poll emits Dial
  • connection_closed_keeps_client_state_until_last_connection — multi-connection tracking

Missing coverage:

  • Validation failure path (hash sig mismatch) — the high-severity bug above is untested
  • relay_reset reconnect path via on_dial_upgrade_error
  • Protocol negotiation failure (Unsupported variant)

Open Questions

  1. Trailing slash in protocol ID — Can you confirm against charon/dkg/sync/? The Go source either has or doesn't have a trailing slash and it must match byte-for-byte.
  2. Wire format — Was the i64 LE framing verified against the Charon Go dkg/sync package? (Other Charon p2p protocols use the standard varint framing; DKG sync is an exception worth calling out explicitly.)
  3. step=0 initial value — The client initializes step: AtomicI64::new(0) (client.rs:79) and begins sending step=0 immediately. The Go sync sends step=0 until the caller advances it. Is step 0 meant to mean "not yet started"? The test in mod.rs starts at step 0 but the example starts advancing from step 1 — this asymmetry should be documented.

@iamquang95
Copy link
Copy Markdown
Collaborator Author

[High] Step is updated for peers that fail validation

Same as charon implementation

[Medium] Protocol ID has trailing slash — verify against Go

Matched with golang: const protocolID = "/charon/dkg/sync/1.0.0/"

[Medium] await_all_at_step does not check started — can deadlock before Server::start()

Matched with charon, also it's not a bug.

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude now you will have the charon source code, review it again

@claude
Copy link
Copy Markdown

claude bot commented Apr 6, 2026

Claude finished @iamquang95's task in 3m 23s —— View job


Code Review (with Charon source)

  • Read Charon Go source for dkg/sync
  • Read PR changed files
  • Compare implementations
  • Post updated review

Summary

With access to charon/dkg/sync/, I can confirm several findings from the previous review were incorrect. The three previously flagged high/medium issues all match the Go reference implementation. Two low-severity cosmetic issues remain. One new finding identified.


Corrections to Previous Review

[Retracted — High] Step updated after validation failure

Confirmed correct. Go's handleStream (server.go:283) calls s.updateStep(pID, ...) unconditionally after validReq fails:

if err := s.validReq(pubkey, msg); err != nil {
    s.setErr(...)
    resp.Error = err.Error()
} else if !s.isConnected(pID) {
    s.setConnected(pID)
}

if err := s.updateStep(pID, int(msg.GetStep())); err != nil {  // ← unconditional
    return err
}

Rust handler.rs:403 behaves identically. This was a correct port.


[Retracted — Medium] Protocol ID trailing slash

Confirmed correct. Go uses const protocolID = "/charon/dkg/sync/1.0.0/" (server.go:30) — trailing slash is intentional and matches Rust protocol.rs:11.


[Retracted — Medium] await_all_at_step started guard

Confirmed correct. Go's AwaitAllAtStep (server.go:123) has no started guard either. The Rust implementation matches.


Remaining Valid Findings

[Low] Redundant match arms before wildcard in on_connection_event

handler.rs:279-283:

ConnectionEvent::AddressChange(_)
| ConnectionEvent::LocalProtocolsChange(_)
| ConnectionEvent::RemoteProtocolsChange(_) => {}
ConnectionEvent::ListenUpgradeError(_) => {}
_ => {}    // makes the four named arms above dead code

The named arms are unreachable. Remove them. Fix this →


[Low] inbound_events_tx failure logged at error!

handler.rs:435: The handler owns both ends of the channel; send failure can only happen after the handler is dropped, at which point the error is unactionable. Use debug! or a silent ignore. Fix this →


New Finding

[Low] Relay error detection may miss ErrResourceScopeClosed

Go's isRelayError (client.go:245) checks two error types:

func isRelayError(err error) bool {
    return errors.Is(err, network.ErrReset) ||
        errors.Is(err, network.ErrResourceScopeClosed)   // ← second case
}

Rust (handler.rs:326-328) only checks ConnectionReset:

relay_reset: error.kind() == std::io::ErrorKind::ConnectionReset,

ErrResourceScopeClosed fires when a relay circuit's resource scope is exhausted — a normal occurrence during relay recycling. The equivalent in Rust's libp2p may surface as a different ErrorKind (e.g. BrokenPipe, Other). If missed, the client will not reconnect when it should (instead treating it as a fatal error).

Recommend mapping BrokenPipe and/or wrapping the condition to match what Rust libp2p actually produces for resource scope closure. Fix this →


Informational Note

Rust read_exact vs Go's reader.Read

Go's readSizedProto (server.go:368) uses reader.Read(buf) which is not guaranteed to fill the buffer on a partial TCP read — this is technically a latent Go bug. Rust's read_fixed_size_protobuf (p2p/src/proto.rs:147) calls read_exact which correctly handles partial reads. Rust is more correct here; no change needed.


Parity Matrix (updated)

Component Go Rust Match Notes
Protocol ID /charon/dkg/sync/1.0.0/ /charon/dkg/sync/1.0.0/ Trailing slash is intentional
Wire framing i64 LE prefix i64 LE prefix Rust uses read_exact (more correct)
Step update on invalid msg Unconditional Unconditional Both update step regardless of validation
await_all_at_step started guard None None Matches Go by design
Hash sig verification secp256k1 libp2p secp256k1 libp2p
Relay reconnect (ErrReset)
Relay reconnect (ErrResourceScopeClosed) Unverified Low severity
Shutdown flow Send flag, wait ACK Send flag, wait ACK
clearConnected on exit defer After result Semantically equivalent

@iamquang95
Copy link
Copy Markdown
Collaborator Author

[Low] Relay error detection may miss ErrResourceScopeClosed

Don't have the equivalent error in rust.

Copy link
Copy Markdown
Collaborator

@varex83agent varex83agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: feat(dkg): implement sync

This PR ports the Charon DKG sync protocol to Rust. The architecture is solid — the Arc<Inner> / RwLock<ServerState> / Notify design is correct, the wire format matches Go's i64 LE framing, and the step validation rules are faithful. Test coverage is good with both unit tests mirroring Go's test shapes and an integration round-trip test.

However there are four bugs that need to be fixed before merge:

  1. stop_rx.changed() spurious cancelactivate() sends false on stop_tx, which fires changed() in the running outbound stream and kills it prematurely (handler.rs:299,319 + client.rs:178)
  2. Inbound future replaced without cleanup — dropping the old BoxFuture skips server.clear_connected, leaving the peer permanently in the connected set (handler.rs:257)
  3. No backoff on Reconnectable exit — transitioning to Idle directly can spin-loop under relay churn; the on_dial_upgrade_error path correctly calls schedule_retry() but the stream-exit path does not (handler.rs:226)
  4. NegotiationFailed retries forever — when a peer doesn't support the protocol, retrying with backoff is pointless; the handler should call client.finish(Err(Unsupported)) (handler.rs:125)

Additionally, await_all_shutdown is missing the error-state check that await_all_connected and await_all_at_step both have, and the MAX_MESSAGE_SIZE constant (128 MiB) diverges from Go's 32 MiB cap.

Inline comments are filed for each issue individually.

Comment thread crates/dkg/src/sync/handler.rs Outdated
Comment thread crates/dkg/src/sync/handler.rs
Comment thread crates/dkg/src/sync/handler.rs Outdated
Comment thread crates/dkg/src/sync/handler.rs Outdated
Comment thread crates/dkg/src/sync/server.rs
Comment thread crates/dkg/src/sync/handler.rs Outdated
Comment thread crates/dkg/src/sync/server.rs
Comment thread crates/dkg/src/sync/server.rs
Comment thread crates/dkg/src/sync/mod.rs
Comment thread crates/dkg/examples/sync.rs Outdated
) {
match event {
Either::Left(event) => self.pending_events.push_back(ToSwarm::GenerateEvent(event)),
Either::Right(unreachable) => match unreachable {},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Either::Right(unreachable) => match unreachable {},
Either::Right(_) => {},

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep the code with unreachable. It's cleaner to read and expect this will never happen

Comment thread crates/dkg/src/sync/handler.rs
Comment thread crates/dkg/src/sync/handler.rs Outdated
@iamquang95
Copy link
Copy Markdown
Collaborator Author

/claude-review

@claude
Copy link
Copy Markdown

claude bot commented Apr 16, 2026

Claude finished @iamquang95's task in 20m 15s —— View job


Code Review — PR #311 feat(dkg): implement sync

  • Gather PR context and diff
  • Read changed files (handler.rs, server.rs, client.rs, behaviour.rs, error.rs, protocol.rs, mod.rs, proto.rs)
  • Read Go reference (charon/dkg/sync/server.go, client.go)
  • Run parallel review agents (functional parity, security, Rust style, code quality)
  • Compile and post review findings

Note: Inline GitHub review comments could not be posted (requires gh api POST permission). All findings are listed below.


Summary

All five bugs identified in prior review rounds are confirmed fixed:

  • ✅ Spurious stop_rx cancel on activate() — fixed (value checked before returning Fatal)
  • ✅ Duplicate inbound stream drops old future without cleanup — fixed (rejects duplicate, keeps existing)
  • ✅ Missing backoff on Reconnectable exit — fixed (schedule_retry() with INITIAL_BACKOFF reset)
  • NegotiationFailed retried forever — fixed (client.finish(Err(Error::Unsupported)))
  • await_all_shutdown missing error check — fixed (checks state.err)

Functional parity with the Go reference is excellent. The findings below are all minor or nit — no blockers.


Findings

[Minor] update_step errors exit via ? without calling server.set_err()handler.rs:425

When update_step returns Err (PeerStepBehind, PeerStepAhead, AbnormalInitialStep, StepOverflow), the ? exits the inner async block without ever calling server.set_err(). await_all_connected / await_all_at_step / await_all_shutdown all check state.err first — but since it is never set for step errors, they do not fast-fail. clear_connected fires notify_waiters(), waiters re-check, find no error and connected.len() < all_count, and go back to waiting indefinitely.

This matches Go behaviour (setErr is not called for updateStep errors in Go either), so it is not a porting divergence. But Rust's notify-based design makes this gap more dangerous than Go's polling design — a stuck peer (valid signature, invalid step) blocks all barrier waiters silently rather than returning a diagnostic error. Consider calling server.set_err(...) before ? propagation.


[Minor] update_step ? can bypass set_shutdown on the final shutdown messagehandler.rs:425-444

If a shutdown message arrives with a step that fails update_step (e.g. PeerStepAhead), the ? exits the loop before set_shutdown(peer_id) at line 444. The outbound side has already returned GracefulShutdown and called client.finish(Ok(())), but the inbound server never records the peer as shut down. await_all_shutdown blocks indefinitely. Matches Go behaviour; in practice legitimate peers will not jump >2 steps in a shutdown message, but worth a comment.


[Minor] No per-message read timeout — slow-loris riskhandler.rs:385

read_fixed_size_protobuf calls read_exact on an 8-byte header and then on the payload with no timeout. A peer that sends 7 of the 8 header bytes and goes silent parks this Tokio task indefinitely. libp2p's connection-level idle timeout does not fire during partial reads. Consider wrapping with tokio::time::timeout. Go also lacks this, so it is an improvement opportunity rather than a regression.


[Minor] send_inbound_event logs at error! for an impossible failurehandler.rs:456

Handler owns both inbound_events_tx and inbound_events_rx. An unbounded send fails only when the receiver is dropped — impossible while the handler is alive. Replace with debug! or debug_assert!:

fn send_inbound_event(tx: &mpsc::UnboundedSender<OutEvent>, event: OutEvent) {
    let result = tx.send(event);
    debug_assert!(result.is_ok(), "inbound event receiver dropped while handler alive");
}

[Minor] SeqCst on all atomics is unnecessarily strongclient.rs:100 (and throughout)

SeqCst establishes a global total order across ALL atomic operations on ALL threads. On ARM this requires a full memory barrier per operation. For the independent boolean flags (active, connected, reconnect, shutdown_requested, finished, outbound_claimed), Relaxed is sufficient. For step (producer/consumer), use Release/Acquire. For try_claim_outbound and finished.swap, use AcqRel. Same applies to started in server.rs.


[Minor] release_outbound() in ConnectionClosed can race with handler's re-claimbehaviour.rs:154

If connection closure follows a Reconnectable exit, the handler at handler.rs:226 already called release_outbound() then schedule_retry(). The WaitingRetry timer fires and calls try_claim_outbound() (sets outbound_claimed = true). If ConnectionClosed delivers after the handler re-claimed, this call clears outbound_claimed back to false — potentially allowing a second concurrent handler to claim outbound and open two parallel streams. Both operations are idempotent stores so no corruption occurs in practice, but removing release_outbound() from this path (leaving the handler authoritative over outbound lifecycle) would eliminate the race.


[Minor] TOCTOU race in available_tcp_port() test helpermod.rs:93

The TcpListener is dropped after reading the port, releasing it back to the OS. Between the drop and listen_on, another process or parallel test can claim that port. Use listen_on("/ip4/127.0.0.1/tcp/0") and wait for SwarmEvent::NewListenAddr to get the actually-bound address.


[Minor] Unbounded inbound_events_tx channelhandler.rs:73

Events are produced by handle_inbound_stream at up to 1/period rate and consumed one-per-poll by the swarm. If the swarm is temporarily slow, the queue grows without bound. A bounded channel or try_send with drop-on-full would cap memory use.


[nit] clear_connected called unconditionally even when mark_connected was never called (handler.rs:451). mutate_state always fires notify_waiters() even on a no-op HashSet::remove, causing spurious wakeups on barrier waiters.

[nit] stop_tx.send(false) in activate() sends the same value already in the channel (client.rs:178). watch::send notifies receivers even on no-op — use send_if_modified to avoid the spurious wake.

[nit] done_rx.borrow() at the top of wait_finished loop; prefer borrow_and_update() inside the changed() arm (client.rs:206).

[nit] Type annotation uses libp2p::StreamProtocol full path while constructor uses imported StreamProtocol::new(...). Use consistently (protocol.rs:14).


Verdict: COMMENT — 0 bugs, 7 minor, 4 nits. No blockers; all prior bugs fixed.

Comment thread crates/p2p/src/proto.rs
pub async fn read_fixed_size_delimited<S: AsyncRead + Unpin>(
stream: &mut S,
) -> io::Result<Vec<u8>> {
read_fixed_size_delimited_with_max(stream, MAX_MESSAGE_SIZE).await
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that the earlier version did not have any checks at all, but since we have added this already in newer releases (post-audit), then it makes sense to align the values from the beginning. It will probably be hard to find such the differences in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants