diff --git a/Cargo.lock b/Cargo.lock index f853ade5..ced553bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,9 +302,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" +checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "bimap" @@ -384,6 +384,12 @@ dependencies = [ "serde", ] +[[package]] +name = "cancellation" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7a879c84c21f354f13535f87ad119ac3be22ebb9097b552a0af6a78f86628c4" + [[package]] name = "cast" version = "0.3.0" @@ -401,9 +407,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.48" +version = "1.2.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c481bdbf0ed3b892f6f806287d72acd515b352a4ec27a208489b8c1bc839633a" +checksum = "90583009037521a116abf44494efecd645ba48b6622457080f080b85544e2215" dependencies = [ "find-msvc-tools", "shlex", @@ -496,14 +502,18 @@ dependencies = [ name = "charon-core" version = "0.1.0" dependencies = [ + "cancellation", "charon-build-proto", "chrono", + "crossbeam", "hex", "libp2p", "prost 0.14.1", "prost-types 0.14.1", + "rand 0.8.5", "serde", "serde_json", + "thiserror 2.0.17", ] [[package]] @@ -764,6 +774,19 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -792,6 +815,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1766,9 +1798,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -1780,9 +1812,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -2244,9 +2276,9 @@ dependencies = [ [[package]] name = "libp2p-identity" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3104e13b51e4711ff5738caa1fb54467c8604c2e94d607e27745bcf709068774" +checksum = "f0c7892c221730ba55f7196e98b0b8ba5e04b4155651736036628e9f73ed6fc3" dependencies = [ "asn1_der", "bs58", @@ -2828,9 +2860,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "mio" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" +checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", "wasi 0.11.1+wasi-snapshot-preview1", @@ -3808,9 +3840,9 @@ checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "reqwest" -version = "0.12.24" +version = "0.12.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "b6eff9328d40131d43bd911d42d79eb6a47312002a4daefc9e37f17e74a7701a" dependencies = [ "base64", "bytes", @@ -4667,9 +4699,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf146f99d442e8e68e585f5d798ccd3cad9a7835b917e09728880a862706456" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags 2.10.0", "bytes", diff --git a/Cargo.toml b/Cargo.toml index eafea64e..8eedcf24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,20 +24,22 @@ publish = false [workspace.dependencies] axum = "0.8.6" +blst = "0.3.13" +cancellation = "0.1.0" chrono = { version = "0.4", features = ["serde"] } +crossbeam = "0.8.4" hex = { version = "^0.4.3" } -serde = { version = "1.0", features = ["derive"] } -serde_json = { version = "^1.0" } -tokio = { version = "1", features = ["full"] } -libp2p = { version = "0.56", features = ["full", "secp256k1"] } prost = "0.14" -prost-types = "0.14" prost-build = "0.14" -blst = "0.3.13" +prost-types = "0.14" +rand = { version = "0.8", features = ["std_rng"] } rand_core = "0.6" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "^1.0" } thiserror = "2.0.12" -rand = {version = "0.8", features = ["std_rng"]} -uuid = {version = "1.16", features = ["serde", "v4"] } +tokio = { version = "1", features = ["full"] } +libp2p = { version = "0.56", features = ["full", "secp256k1"] } +uuid = { version = "1.16", features = ["serde", "v4"] } serde_with = { version = "3", features = ["hex", "base64"] } base64 = { version = "0.22.1" } sha3 = { version = "0.10.8" } diff --git a/crates/charon-core/Cargo.toml b/crates/charon-core/Cargo.toml index 1a0b0ffd..ad68e518 100644 --- a/crates/charon-core/Cargo.toml +++ b/crates/charon-core/Cargo.toml @@ -7,8 +7,19 @@ license.workspace = true publish.workspace = true [dependencies] +cancellation.workspace = true +chrono.workspace = true +crossbeam.workspace = true +hex.workspace = true serde.workspace = true serde_json.workspace = true +thiserror.workspace = true +libp2p.workspace = true +prost.workspace = true +prost-types.workspace = true + +[dev-dependencies] +rand.workspace = true libp2p.workspace = true prost.workspace = true prost-types.workspace = true @@ -19,4 +30,4 @@ chrono.workspace = true charon-build-proto.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/crates/charon-core/src/lib.rs b/crates/charon-core/src/lib.rs index 2d964756..4729d561 100644 --- a/crates/charon-core/src/lib.rs +++ b/crates/charon-core/src/lib.rs @@ -4,6 +4,7 @@ //! This crate provides the fundamental building blocks, data structures, and //! core algorithms used throughout the Charon system. +pub mod qbft; /// Types for the Charon core. pub mod types; diff --git a/crates/charon-core/src/qbft/fake_clock.rs b/crates/charon-core/src/qbft/fake_clock.rs new file mode 100644 index 00000000..04e143f5 --- /dev/null +++ b/crates/charon-core/src/qbft/fake_clock.rs @@ -0,0 +1,153 @@ +use crossbeam::channel as mpmc; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + thread, + time::{Duration, Instant}, +}; + +#[derive(Clone)] +pub struct FakeClock { + inner: Arc>, +} + +struct FakeClockInner { + start: Instant, + now: Instant, + last_id: usize, + clients: HashMap, Instant)>, +} + +impl FakeClock { + pub fn new(now: Instant) -> Self { + Self { + inner: Arc::new(Mutex::new(FakeClockInner { + start: now, + now, + last_id: 1, + clients: Default::default(), + })), + } + } + + pub fn new_timer( + &self, + duration: Duration, + ) -> ( + mpmc::Receiver, + Box, + ) { + let (tx, rx) = mpmc::bounded::(1); + + let client_id = { + let mut inner = self.inner.lock().unwrap(); + let id = inner.last_id; + let deadline = inner.now + duration; + + inner.last_id += 1; + inner.clients.insert(id, (tx, deadline)); + + id + }; + + let inner = Arc::clone(&self.inner); + let cancel = Box::new(move || { + let mut inner = inner.lock().unwrap(); + inner.clients.remove(&client_id); + }); + + (rx, cancel) + } + + pub fn advance(&self, duration: Duration) { + // Advance time and collect expired senders under lock, but perform sends + // without holding lock. + let mut expired = vec![]; + + let now = { + let mut inner = self.inner.lock().unwrap(); + inner.now += duration; + let now = inner.now; + + for (&id, (ch, deadline)) in inner.clients.iter() { + if *deadline <= now { + expired.push((id, ch.clone())); + } + } + + for (id, _) in expired.iter() { + inner.clients.remove(id); + } + + now + }; + + for (_, ch) in expired { + let _ = ch.send(now); + } + } + + pub fn elapsed(&self) -> Duration { + let inner = self.inner.lock().unwrap(); + inner.now - inner.start + } + + pub fn cancel(&self) { + let mut inner = self.inner.lock().unwrap(); + inner.clients.clear(); + } +} + +impl Drop for FakeClock { + fn drop(&mut self) { + self.cancel(); + } +} + +#[test] +fn multiple_threads_timers() { + let clock = FakeClock::new(Instant::now()); + + let start = Instant::now(); + thread::scope(|s| { + let c1 = clock.clone(); + let (ch_1, _) = c1.new_timer(Duration::from_secs(5)); + s.spawn(move || { + let _ = ch_1.recv(); + }); + + let c2 = clock.clone(); + let (ch_2, _) = c2.new_timer(Duration::from_secs(5)); + s.spawn(move || { + let _ = ch_2.recv(); + }); + + clock.advance(Duration::from_secs(6)); + }); + + println!("start={:?}, clock={:?}", start.elapsed(), clock.elapsed()); +} + +#[test] +fn multiple_threads_cancellation() { + let clock = FakeClock::new(Instant::now()); + + let start = Instant::now(); + thread::scope(|s| { + let c1 = clock.clone(); + let (ch_1, _) = c1.new_timer(Duration::from_secs(5)); + s.spawn(move || { + let _ = ch_1.recv(); + }); + + let c2 = clock.clone(); + let (ch_2, _) = c2.new_timer(Duration::from_secs(5)); + s.spawn(move || { + let _ = ch_2.recv(); + }); + + clock.cancel(); + }); + + println!("start={:?}, clock={:?}", start.elapsed(), clock.elapsed()); +} diff --git a/crates/charon-core/src/qbft/internal_test.rs b/crates/charon-core/src/qbft/internal_test.rs new file mode 100644 index 00000000..b7770251 --- /dev/null +++ b/crates/charon-core/src/qbft/internal_test.rs @@ -0,0 +1,790 @@ +use crate::qbft::{self, fake_clock::FakeClock, *}; +use cancellation::CancellationTokenSource; +use crossbeam::channel as mpmc; +use std::{collections::HashMap, sync::Arc, thread, time::Duration}; + +const WRITE_CHAN_ERR: &str = "Failed to write to channel"; +const READ_CHAN_ERR: &str = "Failed to read from channel"; + +#[derive(Default, Debug)] +struct Test { + /// Consensus instance, only affects leader election. + pub instance: i64, + /// Results in 1s round timeout, otherwise exponential (1s,2s,4s...) + pub const_period: bool, + /// Delays start of certain processes + pub start_delay: HashMap, + /// Delays input value availability of certain processes + pub value_delay: HashMap, + /// [0..1] - probability of dropped messages per processes + pub drop_prob: HashMap, + /// Add random delays to broadcast of messages. + pub bcast_jitter_ms: i32, + /// Only broadcast commits after this round. + pub commits_after: i32, + /// Deterministic consensus at specific round + pub decide_round: i32, + /// If prepared value decided, as opposed to leader's value. + pub prepared_val: i32, + /// Non-deterministic consensus at random round. + pub random_round: bool, +} + +fn test_qbft(test: Test) { + const N: usize = 4; + const MAX_ROUND: usize = 50; + const FIFO_LIMIT: usize = 100; + + let start_time = time::Instant::now(); + let clock = FakeClock::new(start_time); + + let cts = CancellationTokenSource::new(); + let mut receives = HashMap::< + i64, + ( + mpmc::Sender>, + mpmc::Receiver>, + ), + >::new(); + let (broadcast_tx, broadcast_rx) = mpmc::unbounded::>(); + let (result_chan_tx, result_chan_rx) = mpmc::bounded::>>(N); + let (run_chan_tx, run_chan_rx) = mpmc::bounded::>(N); + + let is_leader = Box::new(make_is_leader(N as i64)); + + let defs = Arc::new(Definition { + is_leader: is_leader.clone(), + new_timer: { + let clock = clock.clone(); + + Box::new(move |round| { + let d: Duration = if test.const_period { + Duration::from_secs(1) + } else { + // If not constant periods, then exponential. + Duration::from_secs(u64::pow(2, (round as u32) - 1)) + }; + + clock.new_timer(d) + }) + }, + decide: { + let result_chan_tx = result_chan_tx.clone(); + Box::new(move |_, _, _, q_commit| { + result_chan_tx.send(q_commit.clone()).expect(WRITE_CHAN_ERR); + }) + }, + compare: Box::new(|_, _, _, _, return_err, _| { + return_err.send(Ok(())).expect(WRITE_CHAN_ERR); + }), + nodes: N as i64, + fifo_limit: FIFO_LIMIT as i64, + log_round_change: { + let clock = clock.clone(); + + Box::new(move |_, process, round, new_round, upon_rule, _| { + println!( + "{:?} - {}@{} change to {} ~= {}", + clock.elapsed(), + process, + round, + new_round, + upon_rule, + ); + }) + }, + log_unjust: Box::new(|_, _, msg| { + println!("Unjust: {:?}", msg); + }), + log_upon_rule: { + let clock = clock.clone(); + Box::new(move |_, process, round, msg, upon_rule| { + println!( + "{:?} {} => {}@{} -> {}@{} ~= {}", + clock.elapsed(), + msg.source(), + msg.type_(), + msg.round(), + process, + round, + upon_rule, + ); + }) + }, + }); + + thread::scope(|s| { + for i in 1..=N as i64 { + let (sender, receiver) = mpmc::bounded::>(1000); + let broadcast_tx = broadcast_tx.clone(); + receives.insert(i, (sender.clone(), receiver.clone())); + + let trans = Transport { + broadcast: { + let clock = clock.clone(); + + Box::new( + move |_, type_, instance, source, round, value, pr, pv, justification| { + if round > MAX_ROUND as i64 { + return Err(QbftError::MaxRoundReached); + } + + if type_ == MSG_COMMIT && round <= test.commits_after.into() { + println!( + "{:?} {} dropping commit for round {}", + clock.elapsed(), + source, + round + ); + return Ok(()); + } + + println!("{:?} {} => {}@{}", clock.elapsed(), source, type_, round); + + let msg = new_msg( + type_, + *instance, + source, + round, + *value, + *value, + pr, + *pv, + justification, + ); + sender.send(msg.clone()).expect(WRITE_CHAN_ERR); + + bcast( + broadcast_tx.clone(), + msg.clone(), + test.bcast_jitter_ms, + clock.clone(), + ); // TODO: Add clock + + Ok(()) + }, + ) + }, + receive: receiver.clone(), + }; + + let token = cts.token(); + let clock = clock.clone(); + let receiver = receiver.clone(); + let start_delay = test.start_delay.get(&i).copied(); + let value_delay = test.value_delay.get(&i).copied(); + let decide_round = test.decide_round; + let run_chan_tx = run_chan_tx.clone(); + let defs = defs.clone(); + let is_leader = is_leader.clone(); + + s.spawn(move || { + if let Some(delay) = start_delay { + println!("{:?} Node {} start delay {:?}", clock.elapsed(), i, delay); + let (delay_ch, _) = clock.new_timer(delay); + _ = delay_ch.recv(); + println!("{:?} Node {} starting", clock.elapsed(), i); + } + + // Drain any buffered messages + while !receiver.is_empty() { + _ = receiver.recv().expect(READ_CHAN_ERR); + } + + let (v_chan_tx, v_chan_rx) = mpmc::bounded::(1); + let (_, vs_chan_rx) = mpmc::bounded::(1); + + if let Some(delay) = value_delay { + s.spawn(move || { + let (delay_ch, cancel) = clock.new_timer(delay); + _ = delay_ch.recv(); + _ = v_chan_tx.send(i); + + cancel(); + }); + } else if decide_round != 1 { + s.spawn(move || { + _ = v_chan_tx.send(i); + }); + } else if is_leader(&test.instance, 1, i) { + s.spawn(move || { + _ = v_chan_tx.send(i); + }); + } + + run_chan_tx + .send(qbft::run( + token, + &defs, + &trans, + &test.instance, + i, + v_chan_rx, + vs_chan_rx, + )) + .expect(WRITE_CHAN_ERR); + }); + } + + let mut results = HashMap::>::new(); + let mut count = 0; + let mut decided = false; + let mut done = 0; + + loop { + mpmc::select! { + recv(broadcast_rx) -> msg => { + let msg = msg.expect(READ_CHAN_ERR); + for (target, (out_tx, _)) in receives.iter() { + if *target == msg.source() { + continue; // Do not broadcast to self, we sent to self already. + } + + if let Some(p) = test.drop_prob.get(&msg.source()) { + if rand::random::() < *p { + println!("{:?} {} => {}@{} => {} (dropped)", clock.elapsed(), msg.source(), msg.type_(), msg.round(), target); + continue; // Drop + } + } + + out_tx.send(msg.clone()).expect(WRITE_CHAN_ERR); + + if rand::random::() < 0.1 { // Send 10% messages twice + out_tx.send(msg.clone()).expect(WRITE_CHAN_ERR); + } + } + } + + recv(result_chan_rx) -> res => { + let q_commit = res.expect(READ_CHAN_ERR); + + for commit in q_commit.clone() { + for (_, previous) in results.iter() { + assert_eq!(previous.value(), commit.value(), "commit values"); + } + + if !test.random_round { + assert_eq!(i64::from(test.decide_round), commit.round(), "wrong decide round"); + + if test.prepared_val != 0 { // Check prepared value if set + assert_eq!(i64::from(test.prepared_val), commit.value(), "wrong prepared value"); + } else { // Otherwise check that leader value was used. + assert!(is_leader(&test.instance, commit.round(), commit.value()), "not leader"); + } + } + + results.insert(commit.source(), commit); + } + + count += 1; + if count != N { + continue; + } + + let round = q_commit[0].round(); + println!("Got all results in round {} after {:?}: {:?}", round, clock.elapsed(), results); + + // Trigger shutdown + decided = true; + + clock.cancel(); + cts.cancel(); + } + + recv(run_chan_rx) -> res => { + let err = res.expect(READ_CHAN_ERR); + + if err.is_err() { + if !decided { + panic!("unexpected run error"); + } + } + + done += 1; + if done == N { + return; + } + } + + default => { + thread::sleep(time::Duration::from_micros(1)); + clock.advance(Duration::from_millis(1)); + } + } + } + }); +} + +/// Construct a leader election function. +fn make_is_leader(n: i64) -> impl Fn(&i64, i64, i64) -> bool + Clone { + move |instance: &i64, round: i64, process: i64| -> bool { (instance + round) % n == process } +} + +/// Returns a new message to be broadcast. +#[allow(clippy::too_many_arguments)] +fn new_msg( + type_: MessageType, + instance: i64, + source: i64, + round: i64, + value: i64, + value_source: i64, + pr: i64, + pv: i64, + justify: Option<&Vec>>, +) -> Msg { + let msgs = match justify { + None => vec![], + Some(justify) => justify + .iter() + .map(|j| { + let mut j = j + .as_any() + .downcast_ref::() + .expect("Expected `TestMsg` instance") + .clone(); + j.justify = None; + j + }) + .collect(), + }; + + Arc::new(TestMsg { + msg_type: type_, + instance, + peer_idx: source, + round, + value, + value_source, + pr, + pv, + justify: Some(msgs), + }) +} + +// Delays the message broadcast by between 1x and 2x jitter_ms and drops +// messages. +fn bcast( + broadcast: mpmc::Sender>, + msg: Msg, + jitter_ms: i32, + clock: FakeClock, +) { + if jitter_ms == 0 { + broadcast.send(msg.clone()).expect(WRITE_CHAN_ERR); + return; + } + + thread::spawn(move || { + let delta_ms = (f64::from(jitter_ms) * rand::random::()) as i32; + let delay = Duration::from_millis((jitter_ms + delta_ms) as u64); + println!( + "{:?} {} => {}@{} (bcast delay {:?})", + clock.elapsed(), + msg.source(), + msg.type_(), + msg.round(), + delay + ); + let (delay_ch, _) = clock.new_timer(delay); + _ = delay_ch.recv(); + + _ = broadcast.send(msg); + }); +} + +#[derive(Clone, Debug)] +struct TestMsg { + msg_type: MessageType, + instance: i64, + peer_idx: i64, + round: i64, + value: i64, + value_source: i64, + pr: i64, + pv: i64, + justify: Option>, +} + +impl SomeMsg for TestMsg { + fn type_(&self) -> MessageType { + self.msg_type + } + + fn instance(&self) -> i64 { + self.instance + } + + fn source(&self) -> i64 { + self.peer_idx + } + + fn round(&self) -> i64 { + self.round + } + + fn value(&self) -> i64 { + self.value + } + + fn value_source(&self) -> Result { + Ok(self.value_source) + } + + fn prepared_round(&self) -> i64 { + self.pr + } + + fn prepared_value(&self) -> i64 { + self.pv + } + + fn justification(&self) -> Vec> { + match self.justify { + None => vec![], + Some(ref j) => j + .iter() + .map(|j| Arc::new(j.clone()) as Msg) + .collect(), + } + } + + fn as_any(&self) -> &dyn any::Any { + self + } +} + +#[test] +#[ignore = "flaky"] +fn happy_0() { + test_qbft(Test { + instance: 0, + decide_round: 1, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn happy_1() { + test_qbft(Test { + instance: 1, + decide_round: 1, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn prepare_round_1_decide_round_2() { + test_qbft(Test { + instance: 0, + commits_after: 1, + decide_round: 2, + prepared_val: 1, + ..Default::default() + }); +} + +#[test] +#[ignore = "wrong prepared value"] +fn prepare_round_2_decide_round_3() { + test_qbft(Test { + instance: 0, + commits_after: 2, + value_delay: HashMap::from([(1, Duration::from_millis(200))]), + decide_round: 3, + prepared_val: 2, + const_period: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "wrong decide round"] +fn leader_late_xp() { + test_qbft(Test { + instance: 0, + start_delay: HashMap::from([(1, Duration::from_millis(200))]), + decide_round: 2, + ..Default::default() + }); +} + +#[test] +#[ignore = "wrong decide round"] +fn leader_down_const() { + test_qbft(Test { + instance: 3, + start_delay: HashMap::from([ + (1, Duration::from_millis(50)), + (2, Duration::from_millis(100)), + ]), + decide_round: 4, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn very_late_exp() { + test_qbft(Test { + instance: 3, + start_delay: HashMap::from([(1, Duration::from_secs(5)), (2, Duration::from_secs(10))]), + decide_round: 4, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn very_late_const() { + test_qbft(Test { + instance: 1, + start_delay: HashMap::from([(1, Duration::from_secs(5)), (2, Duration::from_secs(10))]), + const_period: true, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn stagger_start_exp() { + test_qbft(Test { + instance: 0, + start_delay: HashMap::from([ + (1, Duration::from_secs(0)), + (2, Duration::from_secs(1)), + (3, Duration::from_secs(2)), + (4, Duration::from_secs(3)), + ]), + random_round: true, // Takes 1 or 2 rounds. + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn stagger_start_const() { + test_qbft(Test { + instance: 0, + start_delay: HashMap::from([ + (1, Duration::from_secs(0)), + (2, Duration::from_secs(1)), + (3, Duration::from_secs(2)), + (4, Duration::from_secs(3)), + ]), + const_period: true, + random_round: true, // Takes 1 or 2 rounds. + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn very_delayed_value_exp() { + test_qbft(Test { + instance: 3, + value_delay: HashMap::from([(1, Duration::from_secs(5)), (2, Duration::from_secs(10))]), + decide_round: 4, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn very_delayed_value_const() { + test_qbft(Test { + instance: 1, + value_delay: HashMap::from([(1, Duration::from_secs(5)), (2, Duration::from_secs(10))]), + const_period: true, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn stagger_delayed_value_exp() { + test_qbft(Test { + instance: 0, + value_delay: HashMap::from([ + (1, Duration::from_secs(0)), + (2, Duration::from_secs(1)), + (3, Duration::from_secs(2)), + (4, Duration::from_secs(3)), + ]), + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn stagger_delayed_value_const() { + test_qbft(Test { + instance: 0, + value_delay: HashMap::from([ + (1, Duration::from_secs(0)), + (2, Duration::from_secs(1)), + (3, Duration::from_secs(2)), + (4, Duration::from_secs(3)), + ]), + const_period: true, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn round1_leader_no_value_round2_leader_offline() { + test_qbft(Test { + instance: 0, + value_delay: HashMap::from([(1, Duration::from_secs(1))]), + start_delay: HashMap::from([(2, Duration::from_secs(2))]), + const_period: true, + decide_round: 3, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn jitter_500ms_exp() { + test_qbft(Test { + instance: 3, + bcast_jitter_ms: 500, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn jitter_200ms_const() { + test_qbft(Test { + instance: 3, + bcast_jitter_ms: 200, // 0.2-0.4s network delay * 3msgs/round == 0.6-1.2s delay per 1s round + const_period: true, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn drop_10_percent_const() { + test_qbft(Test { + instance: 1, + drop_prob: HashMap::from([(1, 0.1), (2, 0.1), (3, 0.1), (4, 0.1)]), + const_period: true, + random_round: true, + ..Default::default() + }); +} + +#[test] +#[ignore = "flaky"] +fn drop_30_percent_const() { + test_qbft(Test { + instance: 1, + drop_prob: HashMap::from([(1, 0.3), (2, 0.3), (3, 0.3), (4, 0.3)]), + const_period: true, + random_round: true, + ..Default::default() + }); +} + +fn noop_definition() -> Definition { + Definition { + is_leader: Box::new(|_, _, _| false), + new_timer: Box::new(|_| (mpmc::never(), Box::new(|| {}))), + decide: Box::new(|_, _, _, _| {}), + compare: Box::new(|_, _, _, _, _, _| {}), + nodes: 0, + fifo_limit: 0, + log_round_change: Box::new(|_, _, _, _, _, _| {}), + log_unjust: Box::new(|_, _, _| {}), + log_upon_rule: Box::new(|_, _, _, _, _| {}), + } +} + +fn noop_transport() -> Transport { + Transport { + broadcast: Box::new(|_, _, _, _, _, _, _, _, _| Ok(())), + receive: mpmc::never(), + } +} + +#[test] +#[ignore = "flaky"] +fn duplicate_pre_prepare_rules() { + let cts = CancellationTokenSource::new(); + let ct = &cts.token().clone(); + + const NO_LEADER: i64 = 1; + const LEADER: i64 = 2; + + let new_preprepare = |round: i64| -> Msg { + new_msg( + MSG_PRE_PREPARE, + 0, + LEADER, + round, + 0, + 0, + 0, + 0, + // Justification not required since nodes and quorum both 0. + None, + ) + }; + + let mut def = noop_definition(); + def.is_leader = Box::new(|_, _, process| process == LEADER); + def.log_upon_rule = Box::new(move |_, _, round, msg, upon_rule| { + println!("UponRule: rule={} round={} ", upon_rule, msg.round()); + + assert!(upon_rule == UPON_JUSTIFIED_PRE_PREPARE); + + if msg.round() == 1 { + return; + } + + if msg.round() == 2 { + cts.cancel(); + return; + } + + panic!("unexpected round {}", round); + }); + def.compare = Box::new(|_, _, _, _, return_err, _| { + _ = return_err.send(Ok(())); + }); + + let (r_chan_tx, r_chan_rx) = mpmc::bounded::>(2); + r_chan_tx.send(new_preprepare(1)).expect(WRITE_CHAN_ERR); + r_chan_tx.send(new_preprepare(2)).expect(WRITE_CHAN_ERR); + + let mut transport = noop_transport(); + transport.receive = r_chan_rx; + + let (ch, input_value_ch) = mpmc::bounded::(1); + ch.send(1).expect(WRITE_CHAN_ERR); + let (ch, input_value_source_ch) = mpmc::bounded::(1); + ch.send(2).expect(WRITE_CHAN_ERR); + + let res = qbft::run( + ct, + &def, + &transport, + &0, + NO_LEADER, + input_value_ch, + input_value_source_ch, + ); + + assert!(res.is_ok()); +} diff --git a/crates/charon-core/src/qbft/mod.rs b/crates/charon-core/src/qbft/mod.rs new file mode 100644 index 00000000..a686f1f6 --- /dev/null +++ b/crates/charon-core/src/qbft/mod.rs @@ -0,0 +1,1305 @@ +//! Package `qbft` is an implementation of ["The Istanbul BFT Consensus Algorithm"](https://arxiv.org/pdf/2002.03613.pdf) by Henrique Moniz +//! as referenced by the [QBFT spec](https://github.com/ConsenSys/qbft-formal-spec-and-verification). +//! +//! ## Features +//! +//! - Simple API, just a single function: `qbft::run`. +//! - Consensus on arbitrary data. +//! - Transport abstracted and not provided. +//! - Decoupled from process authentication and message signing (not provided). +//! - No domain-specific dependencies. +//! - Explicit justifications. + +// TODO: Remove these checks +#![allow(missing_docs)] +#![allow(clippy::type_complexity)] +#![allow(clippy::collapsible_if)] +#![allow(clippy::cast_sign_loss)] +#![allow(clippy::cast_precision_loss)] +#![allow(clippy::cast_possible_wrap)] +#![allow(clippy::cast_possible_truncation)] +#![allow(clippy::arithmetic_side_effects)] + +use cancellation::CancellationToken; +use crossbeam::channel as mpmc; +use std::{ + any, + cell::{Cell, RefCell}, + collections::{HashMap, HashSet}, + fmt::{self, Display}, + hash::Hash, + sync, thread, time, +}; + +type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum QbftError { + #[error("Timeout")] + TimeoutError, + + #[error("Compare leader value with local value failed")] + CompareError, + + #[error("Maximum round reached")] + MaxRoundReached, + + #[error("Zero input value not supported")] + ZeroInputValue, + + #[error("Failed to read from channel: {0}")] + ChannelError(#[from] mpmc::RecvError), +} + +/// Abstracts the transport layer between processes in the consensus system. +pub struct Transport +where + V: PartialEq, +{ + /// Broadcast sends a message with the provided fields to all other + /// processes in the system (including this process). + /// + /// Note that an error exits the algorithm. + pub broadcast: Box< + dyn Fn( + /* ct */ &CancellationToken, + /* type_ */ MessageType, + /* instance */ &I, + /* source */ i64, + /* round */ i64, + /* value */ &V, + /* pr */ i64, + /* pv */ &V, + /* justification */ Option<&Vec>>, + ) -> Result<()> + + Send + + Sync, + >, + + /// Receive returns a stream of messages received + /// from other processes in the system (including this process). + pub receive: mpmc::Receiver>, +} + +/// Defines the consensus system parameters that are external to the qbft +/// algorithm. This remains constant across multiple instances of consensus +/// (calls to `run`). +pub struct Definition +where + V: PartialEq, +{ + /// A deterministic leader election function. + pub is_leader: + Box bool + Send + Sync>, + + /// Returns a new timer channel and stop function for the round + pub new_timer: Box< + dyn Fn(/* round */ i64) -> (mpmc::Receiver, Box) + + Send + + Sync, + >, + + /// Called when leader proposes value and we compare it with our local + /// value. It's an opt-in feature that should instantly return `None` on + /// `return_err` channel if it is not turned on. + pub compare: Box< + dyn Fn( + /* ct */ &CancellationToken, + /* qcommit */ &Msg, + /* input_value_source_ch */ &mpmc::Receiver, + /* input_value_source */ &C, + /* return_err */ &mpmc::Sender>, + /* return_value */ &mpmc::Sender, + ) + Send + + Sync, + >, + + /// Called when consensus has been reached on a value. + pub decide: Box< + dyn Fn( + /* ct */ &CancellationToken, + /* instance */ &I, + /* value */ &V, + /* qcommit */ &Vec>, + ) + Send + + Sync, + >, + + /// Allows debug logging of triggered upon rules on message receipt. + /// It includes the rule that triggered it and all received round messages. + pub log_upon_rule: Box< + dyn Fn( + /* instance */ &I, + /* process */ i64, + /* round */ i64, + /* msg */ &Msg, + /* upon_rule */ UponRule, + ) + Send + + Sync, + >, + /// Allows debug logging of round changes. + pub log_round_change: Box< + dyn Fn( + /* instance */ &I, + /* process */ i64, + /* round */ i64, + /* new_round */ i64, + /* upon_rule */ UponRule, + /* msgs */ &Vec>, + ) + Send + + Sync, + >, + + /// Allows debug logging of unjust messages. + pub log_unjust: + Box) + Send + Sync>, + + /// Total number of nodes/processes participating in consensus. + pub nodes: i64, + + /// Limits the amount of message buffered for each peer. + pub fifo_limit: i64, +} + +impl Definition +where + V: PartialEq, +{ + /// Quorum count for the system. + /// See IBFT 2.0 paper for correct formula: + pub fn quorum(&self) -> i64 { + (self.nodes as u64 * 2).div_ceil(3) as i64 + } + + /// Maximum number of faulty/byzantine nodes supported in the system. + /// See IBFT 2.0 paper for correct formula: + pub fn faulty(&self) -> i64 { + (self.nodes - 1) / 3 + } +} + +/// Defines the QBFT message types +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct MessageType(i64); + +// NOTE: message type ordering MUST not change, since it breaks backwards +// compatibility. +pub const MSG_UNKNOWN: MessageType = MessageType(0); +pub const MSG_PRE_PREPARE: MessageType = MessageType(1); +pub const MSG_PREPARE: MessageType = MessageType(2); +pub const MSG_COMMIT: MessageType = MessageType(3); +pub const MSG_ROUND_CHANGE: MessageType = MessageType(4); +pub const MSG_DECIDED: MessageType = MessageType(5); + +const MSG_SENTINEL: MessageType = MessageType(6); // intentionally not public + +impl MessageType { + pub fn valid(&self) -> bool { + self.0 > MSG_UNKNOWN.0 && self.0 < MSG_SENTINEL.0 + } +} + +impl Display for MessageType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self.0 { + 0 => "unknown", + 1 => "pre_prepare", + 2 => "prepare", + 3 => "commit", + 4 => "round_change", + 5 => "decided", + _ => panic!("bug: invalid message type"), + }; + write!(f, "{}", s) + } +} + +/// Defines the inter process messages. +pub trait SomeMsg: Send + Sync + fmt::Debug +where + V: PartialEq, +{ + /// Type of the message. + fn type_(&self) -> MessageType; + /// Consensus instance. + fn instance(&self) -> I; + /// Process that sent the message. + fn source(&self) -> i64; + /// The round the message pertains to. + fn round(&self) -> i64; + /// The value being proposed, usually a hash. + fn value(&self) -> V; + /// Usually the value that was hashed and is returned in `value`. + fn value_source(&self) -> Result; + /// The justified prepared round. + fn prepared_round(&self) -> i64; + /// The justified prepared value. + fn prepared_value(&self) -> V; + /// Set of messages that explicitly justifies this message. + fn justification(&self) -> Vec>; + + /// Cast as `Any` to allow downcasting. + fn as_any(&self) -> &dyn any::Any; +} + +/// Alias for any `Msg` implementation tracked by reference counting. +pub type Msg = sync::Arc>; + +/// Defines the event based rules that are triggered when messages are received. +#[derive(PartialEq, Eq, Hash, Clone, Copy)] +pub struct UponRule(i64); + +pub const UPON_NOTHING: UponRule = UponRule(0); +pub const UPON_JUSTIFIED_PRE_PREPARE: UponRule = UponRule(1); +pub const UPON_QUORUM_PREPARES: UponRule = UponRule(2); +pub const UPON_QUORUM_COMMITS: UponRule = UponRule(3); +pub const UPON_UNJUST_QUORUM_ROUND_CHANGES: UponRule = UponRule(4); +pub const UPON_F_PLUS1_ROUND_CHANGES: UponRule = UponRule(5); +pub const UPON_QUORUM_ROUND_CHANGES: UponRule = UponRule(6); +pub const UPON_JUSTIFIED_DECIDED: UponRule = UponRule(7); +pub const UPON_ROUND_TIMEOUT: UponRule = UponRule(8); // This is not triggered by a message, but by a timer. + +impl Display for UponRule { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self.0 { + 0 => "nothing", + 1 => "justified_pre_prepare", + 2 => "quorum_prepares", + 3 => "quorum_commits", + 4 => "unjust_quorum_round_changes", + 5 => "f_plus_1_round_changes", + 6 => "quorum_round_changes", + 7 => "justified_decided", + 8 => "round_timeout", + _ => panic!("bug: invalid upon rule"), + }; + write!(f, "{}", s) + } +} + +/// Defines the key used to deduplicate upon rules. +#[derive(Eq, Hash, PartialEq)] +struct DedupKey { + upon_rule: UponRule, + round: i64, +} + +/// Executes the consensus algorithm until the context is closed. +/// The generic type `I` is the instance of consensus and can be anything. +/// The generic type `V` is the arbitrary data value being proposed; it only +/// requires an Equal method. The generic type `C` is the compare value, used to +/// compare leader's proposed value with local value and can be anything. +pub fn run( + ct: &CancellationToken, + d: &Definition, + t: &Transport, + instance: &I, + process: i64, + mut input_value_ch: mpmc::Receiver, + input_value_source_ch: mpmc::Receiver, +) -> Result<()> +where + V: PartialEq + Eq + Hash + Default, + C: Clone + Send + Sync + Default, +{ + // === State === + let round: Cell = Cell::new(1); + let input_value: RefCell = RefCell::new(Default::default()); + let mut input_value_source: C = Default::default(); + let ppj_cache: RefCell>>> = RefCell::new(None); // Cached pre-prepare justification for the current round (`None` value is unset). + let prepared_round: Cell = Cell::new(0); + let prepared_value: RefCell = RefCell::new(Default::default()); + let mut compare_failure_round: i64 = 0; + let prepared_justification: RefCell>>> = RefCell::new(None); + let mut q_commit: Option>> = None; + let buffer: RefCell>>> = RefCell::new(HashMap::new()); + let dedup_rules: RefCell> = RefCell::new(HashMap::new()); + let mut timer_chan: mpmc::Receiver; + let mut stop_timer: Box; + + // === Helpers == + + // Broadcasts a non-ROUND-CHANGE message for current round. + let broadcast_msg = + |type_: MessageType, value: &V, justification: Option<&Vec>>| { + (t.broadcast)( + ct, + type_, + instance, + process, + round.get(), + value, + 0, + &Default::default(), + justification, + ) + }; + // Broadcasts a ROUND-CHANGE message with current state. + let broadcast_round_change = || { + (t.broadcast)( + ct, + MSG_ROUND_CHANGE, + instance, + process, + round.get(), + &Default::default(), + prepared_round.get(), + &prepared_value.borrow(), + prepared_justification.borrow().as_ref(), + ) + }; + + // Broadcasts a PRE-PREPARE message with current state + // and our own input value if present, otherwise it caches the justification + // to be used when the input value becomes available. + let broadcast_own_pre_prepare = |justification: Vec>| { + if ppj_cache.borrow().is_some() { + panic!("bug: justification cache must be none") + } + + if *input_value.borrow() == Default::default() { + // Can't broadcast a pre-prepare yet, need to wait for an input value. + ppj_cache.replace(Some(justification)); + return Ok(()); + } + + broadcast_msg(MSG_PRE_PREPARE, &input_value.borrow(), Some(&justification)) + }; + + // Adds a message to each process' FIFO queue + let buffer_msg = |msg: &Msg| { + let mut b = buffer.borrow_mut(); + let fifo = b.entry(msg.source()).or_default(); + + fifo.push(msg.clone()); + if fifo.len() as i64 > d.fifo_limit { + fifo.drain(0..(fifo.len() - d.fifo_limit as usize)); + } + }; + + // Returns true if the rule has been already executed since last round + // change. + let is_duplicated_rule = |upon_rule: UponRule, round: i64| { + let k = DedupKey { upon_rule, round }; + dedup_rules.borrow_mut().insert(k, true).is_some() + }; + + // Updates round and clears the rule dedup state. + let change_round = |new_round: i64, rule: UponRule| { + if round.get() == new_round { + return; + } + + (d.log_round_change)( + instance, + process, + round.get(), + new_round, + rule, + &extract_round_messages(&buffer.borrow(), round.get()), + ); + + round.set(new_round); + dedup_rules.replace(HashMap::new()); + ppj_cache.replace(None); + }; + + // Algorithm 1:11 + { + if (d.is_leader)(instance, round.get(), process) { + // Note round==1 at this point. + broadcast_own_pre_prepare(vec![])?; // Empty justification since round==1 + } + + (timer_chan, stop_timer) = (d.new_timer)(round.get()); + } + + while !ct.is_canceled() { + mpmc::select! { + recv(input_value_ch) -> result => { + let iv = result?; + input_value.replace(iv); + + if *input_value.borrow() == Default::default() { + return Err(QbftError::ZeroInputValue); + } + + if let Some(ppj) = ppj_cache.borrow().as_ref() { + // Broadcast the pre-prepare now that we have a input value using the cached + // justification. + broadcast_msg(MSG_PRE_PREPARE, &input_value.borrow(), Some(ppj))?; + } + + // Don't read from this channel again. + input_value_ch = mpmc::never(); + }, + + recv(t.receive) -> result => { + let msg = result?; + if let Some(v) = q_commit.as_ref() { + if !v.is_empty() { + if msg.source() != process && msg.type_() == MSG_ROUND_CHANGE { + // Algorithm 3:17 + broadcast_msg(MSG_DECIDED, &v[0].value(), Some(v))?; + } + + continue; + } + } + + // Drop unjust messages + if !is_justified(d, instance, &msg, compare_failure_round) { + (d.log_unjust)(instance, process, msg); + continue; + } + + buffer_msg(&msg); + + let (rule, justification) = + classify(d, instance, round.get(), process, &buffer.borrow(), &msg); + if rule == UPON_NOTHING || is_duplicated_rule(rule, msg.round()) { + // Do nothing more if no rule or duplicate rule was triggered + continue; + } + + (d.log_upon_rule)(instance, process, round.get(), &msg, rule); + + match rule { + // Algorithm 2:1 + UPON_JUSTIFIED_PRE_PREPARE => { + change_round(msg.round(), rule); + + stop_timer(); + (timer_chan, stop_timer) = (d.new_timer)(round.get()); + + let compare_result = compare( + ct, + d, + &msg, + &input_value_source_ch, + input_value_source.clone(), + &timer_chan, + ); + + match compare_result { + Ok(v) => { + input_value_source = v; + broadcast_msg(MSG_PREPARE, &msg.value(), None)?; + } + Err(qbft_err) => { + match qbft_err { + QbftError::CompareError => { + compare_failure_round = msg.round(); + } + QbftError::TimeoutError => { + // As compare function is blocking on waiting local data, round + // might timeout in the meantime. If + // this happens, we trigger round change. + // Algorithm 3:1 + change_round(round.get() + 1, UPON_ROUND_TIMEOUT); + stop_timer(); + + (timer_chan, stop_timer) = (d.new_timer)(round.get()); + + broadcast_round_change()?; + } + _ => panic!("bug: expected only {} or {} error", QbftError::CompareError, QbftError::TimeoutError) + } + } + } + } + UPON_QUORUM_PREPARES => { + // Algorithm 2:4 + // Only applicable to current round + prepared_round.set(round.get()); /* == msg.round() */ + prepared_value.replace(msg.value()); + prepared_justification.replace(justification); + + broadcast_msg(MSG_COMMIT, &prepared_value.borrow(), None)?; + } + UPON_QUORUM_COMMITS | UPON_JUSTIFIED_DECIDED => { + // Algorithm 2:8 + change_round(msg.round(), rule); + q_commit = justification; + stop_timer(); + + timer_chan = mpmc::never(); + + let justification = q_commit.as_ref() + .expect("Rules `UPON_QUORUM_COMMITS` and `UPON_JUSTIFIED_DECIDED` always include a justification"); + (d.decide)(ct, instance, &msg.value(), justification); + } + UPON_F_PLUS1_ROUND_CHANGES => { + // Algorithm 3:5 + + let justification = justification.expect( + "Rule `UPON_F_PLUS1_ROUND_CHANGES` always includes a justification", + ); + + // Only applicable to future rounds + change_round( + next_min_round(d, &justification, round.get() /* < msg.round() */), + rule, + ); + + stop_timer(); + (timer_chan, stop_timer) = (d.new_timer)(round.get()); + + broadcast_round_change()?; + } + UPON_QUORUM_ROUND_CHANGES => { + // Algorithm 3:11 + + let justification = justification + .expect("Rule `UPON_QUORUM_ROUND_CHANGES` always includes a justification"); + + // Only applicable to current round (round > 1) + match get_single_justified_pr_pv(d, &justification) { + Some((pr, pv)) if compare_failure_round != pr => { + broadcast_msg(MSG_PRE_PREPARE, &pv, Some(&justification))? + } + _ => broadcast_own_pre_prepare(justification)?, + } + } + UPON_UNJUST_QUORUM_ROUND_CHANGES => { + // Ignore bug or byzantine + } + _ => panic!("bug: invalid rule"), + } + }, + + recv(timer_chan) -> result => { + result?; + + change_round(round.get() + 1, UPON_ROUND_TIMEOUT); + stop_timer(); + + (timer_chan, stop_timer) = (d.new_timer)(round.get()); + + broadcast_round_change()?; + } + + default => { + if ct.is_canceled() { + break; + } + } + } + } + + Ok(()) +} + +fn compare( + ct: &CancellationToken, + d: &Definition, + msg: &Msg, + input_value_source_ch: &mpmc::Receiver, + input_value_source: C, + timer_chan: &mpmc::Receiver, +) -> Result +where + V: PartialEq, + C: Clone + Send + Sync, +{ + let (compare_err_tx, compare_err_rx) = mpmc::bounded::>(1); + let (compare_value_tx, compare_value_rx) = mpmc::bounded::(1); + + // d.Compare has 2 roles: + // 1. Read from the `input_value_source_ch` (if `input_value_source` is empty). + // If it read from the channel, it returns the value on `compare_value` + // channel. + // 2. Compare the value read from `input_value_source_ch` (or + // `input_value_source` if it is not empty) to the value proposed by the + // leader. + // If comparison or any other unexpected error occurs, the error is returned on + // `compare_err` channel. + + thread::scope(|s| { + let mut result = input_value_source.clone(); + let compare = &d.compare; + + s.spawn(move || { + (compare)( + ct, + msg, + input_value_source_ch, + &input_value_source, + &compare_err_tx, + &compare_value_tx, + ); + }); + + loop { + mpmc::select! { + recv(compare_err_rx) -> msg => { + let err = msg?; + + return match err { + Ok(_) => Ok(result), + Err(_) => Err(QbftError::CompareError), + }; + }, + + recv(compare_value_rx) -> msg => { + let value = msg?; + + result = value; + }, + + recv(timer_chan) -> msg => { + msg?; + + return Err(QbftError::TimeoutError); + } + } + } + }) +} + +/// Returns all messages from the provided round. +fn extract_round_messages( + buffer: &HashMap>>, + round: i64, +) -> Vec> +where + V: PartialEq, +{ + let mut resp = vec![]; + + for msgs in buffer.values() { + for msg in msgs { + if msg.round() == round { + resp.push(msg.clone()); + } + } + } + + resp +} + +/// Returns the rule triggered upon receipt of the last message and its +/// justifications. +fn classify( + d: &Definition, + instance: &I, + round: i64, + process: i64, + buffer: &HashMap>>, + msg: &Msg, +) -> (UponRule, Option>>) +where + V: Eq + Hash + Default, +{ + match msg.type_() { + MSG_DECIDED => (UPON_JUSTIFIED_DECIDED, Some(msg.justification())), + MSG_PRE_PREPARE => { + if msg.round() < round { + (UPON_NOTHING, None) + } else { + (UPON_JUSTIFIED_PRE_PREPARE, None) + } + } + MSG_PREPARE => { + // Ignore other rounds, since PREPARE isn't justified. + if msg.round() != round { + return (UPON_NOTHING, None); + } + + let prepares = + filter_by_round_and_value(&flatten(buffer), MSG_PREPARE, msg.round(), msg.value()); + + if prepares.len() as i64 >= d.quorum() { + (UPON_QUORUM_PREPARES, Some(prepares)) + } else { + (UPON_NOTHING, None) + } + } + MSG_COMMIT => { + // Ignore other rounds, since COMMIT isn't justified. + if msg.round() != round { + return (UPON_NOTHING, None); + } + + let commits = + filter_by_round_and_value(&flatten(buffer), MSG_COMMIT, msg.round(), msg.value()); + if commits.len() as i64 >= d.quorum() { + (UPON_QUORUM_COMMITS, Some(commits)) + } else { + (UPON_NOTHING, None) + } + } + MSG_ROUND_CHANGE => { + // Only ignore old rounds. + if msg.round() < round { + return (UPON_NOTHING, None); + } + + let all = flatten(buffer); + + if msg.round() > round { + // Jump ahead if we received F+1 higher ROUND-CHANGEs. + if let Some(frc) = get_fplus1_round_changes(d, &all, round) { + return (UPON_F_PLUS1_ROUND_CHANGES, Some(frc)); + } + + return (UPON_NOTHING, None); + } + + /* else msg.round() == round */ + + let qrc = filter_round_change(&all, msg.round()); + if (qrc.len() as i64) < d.quorum() { + return (UPON_NOTHING, None); + } + + let Some(qrc) = get_justified_qrc(d, &all, msg.round()) else { + return (UPON_UNJUST_QUORUM_ROUND_CHANGES, None); + }; + + if !(d.is_leader)(instance, msg.round(), process) { + return (UPON_NOTHING, None); + } + + (UPON_QUORUM_ROUND_CHANGES, Some(qrc)) + } + _ => { + panic!("bug: invalid type"); + } + } +} + +/// Implements algorithm 3:6 and returns the next minimum round from received +/// round change messages. +fn next_min_round(d: &Definition, frc: &Vec>, round: i64) -> i64 +where + V: PartialEq, +{ + // Get all RoundChange messages with round (rj) higher than current round (ri) + if (frc.len() as i64) < d.faulty() + 1 { + panic!("bug: Frc too short"); + } + + // Get the smallest round in the set. + let mut rmin = i64::MAX; + + for msg in frc { + if msg.type_() != MSG_ROUND_CHANGE { + panic!("bug: Frc contain non-round change"); + } else if msg.round() <= round { + panic!("bug: Frc round not in future"); + } + + if rmin > msg.round() { + rmin = msg.round(); + } + } + + rmin +} + +/// Returns true if message is justified or if it does not need justification. +fn is_justified( + d: &Definition, + instance: &I, + msg: &Msg, + compare_failure_round: i64, +) -> bool +where + V: Eq + Hash + Default, +{ + match msg.type_() { + MSG_PRE_PREPARE => is_justified_pre_prepare(d, instance, msg, compare_failure_round), + MSG_PREPARE => true, + MSG_COMMIT => true, + MSG_ROUND_CHANGE => is_justified_round_change(d, msg), + MSG_DECIDED => is_justified_decided(d, msg), + _ => panic!("bug: invalid message type"), + } +} + +/// Returns true if the ROUND_CHANGE message's prepared round and value is +/// justified. +fn is_justified_round_change(d: &Definition, msg: &Msg) -> bool +where + V: PartialEq + Default, +{ + if msg.type_() != MSG_ROUND_CHANGE { + panic!("bug: not a round change message"); + } + + // ROUND-CHANGE justification contains quorum PREPARE messages that justifies Pr + // and Pv. + let prepares = msg.justification(); + let pr = msg.prepared_round(); + let pv = msg.prepared_value(); + + if prepares.is_empty() { + return pr == 0 && pv == Default::default(); + } + + // No need to check for all possible combinations, since justified should only + // contain a one. + + if (prepares.len() as i64) < d.quorum() { + return false; + } + + let mut uniq = uniq_source::(vec![]); + for prepare in prepares { + if !uniq(&prepare) { + return false; + } + + if prepare.type_() != MSG_PREPARE { + return false; + } + + if prepare.round() != pr { + return false; + } + + if prepare.value() != pv { + return false; + } + } + + true +} + +/// Returns true if the decided message is justified by quorum COMMIT messages +/// of identical round and value. +fn is_justified_decided(d: &Definition, msg: &Msg) -> bool +where + V: PartialEq, +{ + if msg.type_() != MSG_DECIDED { + panic!("bug: not a decided message"); + } + + let v = msg.value(); + let commits = filter_msgs( + &msg.justification(), + MSG_COMMIT, + msg.round(), + Some(&v), + None, + None, + ); + + (commits.len() as i64) >= d.quorum() +} + +/// Returns true if the PRE-PREPARE message is justified. +fn is_justified_pre_prepare( + d: &Definition, + instance: &I, + msg: &Msg, + compare_failure_round: i64, +) -> bool +where + V: Eq + Hash + Default, +{ + if msg.type_() != MSG_PRE_PREPARE { + panic!("bug: not a preprepare message"); + } + + if !(d.is_leader)(instance, msg.round(), msg.source()) { + return false; + } + + // Justified if PrePrepare is the first round OR if comparison failed previous + // round. + if msg.round() == 1 || (msg.round() == compare_failure_round + 1) { + return true; + } + + let Some(pv) = contains_justified_qrc(d, &msg.justification(), msg.round()) else { + return false; + }; + + if pv == Default::default() { + return true; // New value being proposed + } + + msg.value() == pv // Ensure Pv is being proposed +} + +/// Implements algorithm 4:1 and returns true and pv if the messages contains a +/// justified quorum ROUND_CHANGEs (Qrc). +fn contains_justified_qrc( + d: &Definition, + justification: &Vec>, + round: i64, +) -> Option +where + V: Eq + Hash + Default, +{ + let qrc = filter_round_change(justification, round); + if (qrc.len() as i64) < d.quorum() { + return None; + } + // No need to calculate J1 or J2 for all possible combinations, + // since justification should only contain one. + + // J1: If qrc contains quorum ROUND-CHANGEs with null pv and null pr. + let mut all_null = true; + + for rc in qrc.iter() { + if rc.prepared_round() != 0 || rc.prepared_value() != Default::default() { + all_null = false; + break; + } + } + + if all_null { + return Some(Default::default()); + } + + // J2: if the justification has a quorum of valid PREPARE messages + // with pr and pv equaled to highest pr and pv in Qrc (other than null). + + // Get pr and pv from quorum PREPARES + let (pr, pv) = get_single_justified_pr_pv(d, justification)?; + + let mut found = false; + + for rc in qrc { + // Ensure no ROUND-CHANGE with higher pr + if rc.prepared_round() > pr { + return None; + } + // Ensure at least one ROUND-CHANGE with pr and pv + if rc.prepared_round() == pr && rc.prepared_value() == pv { + found = true; + } + } + + if found { Some(pv) } else { None } +} + +/// Extracts the single justified Pr and Pv from quorum PREPARES in list of +/// messages. It expects only one possible combination. +fn get_single_justified_pr_pv( + d: &Definition, + msgs: &Vec>, +) -> Option<(i64, V)> +where + V: Eq + Hash + Default, +{ + let mut pr: i64 = 0; + let mut pv: V = Default::default(); + let mut count: i64 = 0; + let mut uniq = uniq_source::(vec![]); + + for msg in msgs { + if msg.type_() != MSG_PREPARE { + continue; + } + + if !uniq(msg) { + return None; + } + + if count == 0 { + pr = msg.round(); + pv = msg.value(); + } else if pr != msg.round() || pv != msg.value() { + return None; + } + + count += 1; + } + + if count >= d.quorum() { + Some((pr, pv)) + } else { + None + } +} + +/// Implements algorithm 4:1 and returns a justified quorum ROUND_CHANGEs (Qrc) +fn get_justified_qrc( + d: &Definition, + all: &Vec>, + round: i64, +) -> Option>> +where + V: Eq + Hash + Default, +{ + if let (qrc, true) = quorum_null_prepared(d, all, round) { + // Return any quorum null pv ROUND_CHANGE messages as Qrc. + return Some(qrc); + } + + let round_changes = filter_round_change(all, round); + + for prepares in get_prepare_quorums(d, all) { + // See if we have quorum ROUND-CHANGE with HIGHEST_PREPARED(qrc) == + // prepares.Round. + let mut qrc: Vec> = vec![]; + let mut has_highest_prepared = false; + let pr = prepares[0].round(); + let pv = prepares[0].value(); + let mut uniq = uniq_source::(vec![]); + + for rc in round_changes.iter() { + if rc.prepared_round() > pr { + continue; + } + + if !uniq(rc) { + continue; + } + + if rc.prepared_round() == pr && rc.prepared_value() == pv { + has_highest_prepared = true; + } + + qrc.push(rc.clone()); + } + + if (qrc.len() as i64) >= d.quorum() && has_highest_prepared { + qrc.extend(prepares.into_iter()); + return Some(qrc); + } + } + + None +} + +/// Returns true and Faulty+1 ROUND-CHANGE messages (Frc) with the rounds higher +/// than the provided round. It returns the highest round per process in order +/// to jump furthest. +fn get_fplus1_round_changes( + d: &Definition, + all: &Vec>, + round: i64, +) -> Option>> +where + V: PartialEq, +{ + let mut highest_by_source = HashMap::>::new(); + + for msg in all { + if msg.type_() != MSG_ROUND_CHANGE { + continue; + } + + if msg.round() <= round { + continue; + } + + if let Some(highest) = highest_by_source.get(&msg.source()) { + if highest.round() > msg.round() { + continue; + } + } + + highest_by_source.insert(msg.source(), msg.clone()); + + if (highest_by_source.len() as i64) == d.faulty() + 1 { + break; + } + } + + if (highest_by_source.len() as i64) < d.faulty() + 1 { + return None; + } + + let resp = highest_by_source.into_values().collect::>(); + + Some(resp) +} + +/// Defines the round and value of set of identical PREPARE messages. +#[derive(Eq, Hash, PartialEq)] +struct PreparedKey +where + V: Eq + Hash, +{ + round: i64, + value: V, +} + +fn get_prepare_quorums( + d: &Definition, + all: &Vec>, +) -> Vec>> +where + V: Eq + Hash, +{ + let mut sets = HashMap::, HashMap>>::new(); + + for msg in all { + if msg.type_() != MSG_PREPARE { + continue; + } + + let key = PreparedKey { + round: msg.round(), + value: msg.value(), + }; + + sets.entry(key) + .or_default() + .insert(msg.source(), msg.clone()); + } + + let mut quorums = vec![]; + + for (_, msgs) in sets { + if (msgs.len() as i64) < d.quorum() { + continue; + } + + let mut quorum = vec![]; + for (_, msg) in msgs { + quorum.push(msg); + } + + quorums.push(quorum); + } + + quorums +} + +/// Implements condition J1 and returns Qrc and true if a quorum +/// of round changes messages (Qrc) for the round have null prepared round and +/// value. +fn quorum_null_prepared( + d: &Definition, + all: &Vec>, + round: i64, +) -> (Vec>, bool) +where + V: PartialEq + Default, +{ + let null_pr = Default::default(); + let null_pv = Some(&Default::default()); + + let justification = filter_msgs(all, MSG_ROUND_CHANGE, round, None, Some(null_pr), null_pv); + + ( + justification.clone(), + justification.len() as i64 >= d.quorum(), + ) +} + +/// Returns the messages matching the type and value. +fn filter_by_round_and_value( + msgs: &Vec>, + message_type: MessageType, + round: i64, + value: V, +) -> Vec> +where + V: PartialEq, +{ + filter_msgs(msgs, message_type, round, Some(&value), None, None) +} + +/// Returns all round change messages for the provided round. +fn filter_round_change(msgs: &Vec>, round: i64) -> Vec> +where + V: PartialEq, +{ + filter_msgs::(msgs, MSG_ROUND_CHANGE, round, None, None, None) +} + +/// Returns one message per process matching the provided type and round and +/// optional value, pr, pv. +fn filter_msgs( + msgs: &Vec>, + message_type: MessageType, + round: i64, + value: Option<&V>, + pr: Option, + pv: Option<&V>, +) -> Vec> +where + V: PartialEq, +{ + let mut resp = Vec::new(); + let mut uniq = uniq_source::(vec![]); + + for msg in msgs { + if message_type != msg.type_() { + continue; + } + + if round != msg.round() { + continue; + } + + if let Some(value) = value + && msg.value() != *value + { + continue; + } + + if let Some(pv) = pv + && msg.prepared_value() != *pv + { + continue; + } + + if let Some(pr) = pr + && pr != msg.prepared_round() + { + continue; + } + + if uniq(msg) { + resp.push(msg.clone()); + } + } + + resp +} + +/// Produce a vector containing all the buffered messages as well as all their +/// justifications. +fn flatten(buffer: &HashMap>>) -> Vec> +where + V: PartialEq, +{ + let mut resp: Vec> = Vec::new(); + + for msgs in buffer.values() { + for msg in msgs { + resp.push(msg.clone()); + for j in msg.justification() { + resp.push(j.clone()); + if !j.justification().is_empty() { + panic!("bug: nested justifications"); + } + } + } + } + + resp +} + +/// Construct a function that returns true if the message is from a unique +/// source. +fn uniq_source(vec: Vec>) -> Box) -> bool> +where + V: PartialEq, +{ + let mut s = vec.iter().map(|msg| msg.source()).collect::>(); + Box::new(move |msg: &Msg| { + let source = msg.source(); + if s.contains(&source) { + false + } else { + s.insert(source); + true + } + }) +} + +#[cfg(test)] +mod fake_clock; +#[cfg(test)] +mod internal_test; diff --git a/crates/charon-crypto/Cargo.toml b/crates/charon-crypto/Cargo.toml index ef662003..a615d7f5 100644 --- a/crates/charon-crypto/Cargo.toml +++ b/crates/charon-crypto/Cargo.toml @@ -7,12 +7,12 @@ license.workspace = true publish.workspace = true [dependencies] -rand.workspace = true blst.workspace = true +hex.workspace = true +rand.workspace = true rand_core.workspace = true serde.workspace = true thiserror.workspace = true -hex.workspace = true [lints.rust] # Allow unsafe code for blst C bindings (overrides workspace forbid) diff --git a/flake.nix b/flake.nix index 24ad2b8d..8da70392 100644 --- a/flake.nix +++ b/flake.nix @@ -17,6 +17,9 @@ shellHook = '' chmod +x .githooks/* && git config --local core.hooksPath .githooks/ ''; + + LD_LIBRARY_PATH = "${pkgs.openssl}/lib"; + PKG_CONFIG_PATH = "${pkgs.openssl.dev}/lib/pkgconfig"; }; }