Skip to content

Commit a4792e0

Browse files
authored
refactor(qbft): introduce QbftTypes and some function structs (#431)
* test: update qbft test * fix: compare run on retached thread * fix: removed hard coded salt in tests * fix: return error on Context cancelled * fix: hash from string, not magic number * fix: one shot cancel when parent is cancelled * fix: fix make_is_leader test * fix: minors naming and comments * fix: linter * fix: early cancel on the loop * fix: context cancel in compare * fix: validate definition * fix: add check pr < r * fix: test and document on run * fix: compare callback failed should timeout * fix: test use timeout channel instead of sleep * fix: add comment on cancellation poll interval * fix: use enum for invalid defnintion error * fix: add more test on check valid round * fix: using test-case * fix: small fixes * fix: linter * fix: remove unnecessary filter * fix: make_is_leader now 0-based * fix: add cancelled state for fake clock * fix: flanky tests by ordering and add small settle window time * fix: harden fake-clock to avoid flanky scheduling * fix: lint * refactor: introduce QbftTypes * refactor: group QBFT callbacks into typed request/logger structs * refactor: add ClientRecord * refactor: simplify internal_tests * refactor: simplify qbft mod.rs * refactor: remove skip clippy on qbft mod
1 parent 81bd360 commit a4792e0

4 files changed

Lines changed: 1040 additions & 770 deletions

File tree

crates/core/src/qbft/callbacks.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use super::{MessageType, Msg, QbftTypes, Result, UponRule};
2+
use cancellation::CancellationToken;
3+
use crossbeam::channel as mpmc;
4+
use std::time;
5+
6+
pub(super) type BroadcastFn<T> =
7+
dyn for<'a> Fn(BroadcastRequest<'a, T>) -> Result<()> + Send + Sync;
8+
pub(super) type CompareFn<T> = dyn for<'a> Fn(CompareRequest<'a, T>) + Send + Sync + 'static;
9+
pub(super) type UponRuleLoggerFn<T> = dyn for<'a> Fn(UponRuleLog<'a, T>) + Send + Sync;
10+
pub(super) type RoundChangeLoggerFn<T> = dyn for<'a> Fn(RoundChangeLog<'a, T>) + Send + Sync;
11+
pub(super) type UnjustLoggerFn<T> = dyn for<'a> Fn(UnjustLog<'a, T>) + Send + Sync;
12+
pub(super) type LeaderFn<T> = dyn for<'a> Fn(LeaderRequest<'a, T>) -> bool + Send + Sync;
13+
pub(super) type DecideFn<T> = dyn for<'a> Fn(DecideRequest<'a, T>) + Send + Sync;
14+
15+
/// Input passed to `Transport::broadcast`.
16+
pub struct BroadcastRequest<'a, T: QbftTypes> {
17+
/// Parent cancellation token.
18+
pub ct: &'a CancellationToken,
19+
/// Message type to broadcast.
20+
pub type_: MessageType,
21+
/// Consensus instance identifier.
22+
pub instance: &'a T::Instance,
23+
/// Sending process.
24+
pub source: i64,
25+
/// Message round.
26+
pub round: i64,
27+
/// Proposal value.
28+
pub value: &'a T::Value,
29+
/// Prepared round carried by ROUND-CHANGE messages.
30+
pub prepared_round: i64,
31+
/// Prepared value carried by ROUND-CHANGE messages.
32+
pub prepared_value: &'a T::Value,
33+
/// Optional justification piggybacked on the message.
34+
pub justification: Option<&'a Vec<Msg<T>>>,
35+
}
36+
37+
/// Input passed to `Definition::compare`.
38+
pub struct CompareRequest<'a, T: QbftTypes> {
39+
/// Compare-scoped cancellation token.
40+
pub ct: &'a CancellationToken,
41+
/// Proposed commit quorum message.
42+
pub qcommit: &'a Msg<T>,
43+
/// Channel carrying the local compare value if it was not cached yet.
44+
pub input_value_source_ch: &'a mpmc::Receiver<T::Compare>,
45+
/// Cached local compare value.
46+
pub input_value_source: &'a T::Compare,
47+
/// Channel used by the callback to return compare status.
48+
pub return_err: &'a mpmc::Sender<Result<()>>,
49+
/// Channel used by the callback to cache the local compare value.
50+
pub return_value: &'a mpmc::Sender<T::Compare>,
51+
}
52+
53+
/// Timer returned by `Definition::new_timer`.
54+
pub struct Timer {
55+
/// Channel that fires when the timer expires.
56+
pub receive: mpmc::Receiver<time::Instant>,
57+
/// Stops the timer.
58+
pub stop: Box<dyn Fn() + Send + Sync>,
59+
}
60+
61+
/// Input passed to `Definition::is_leader`.
62+
pub struct LeaderRequest<'a, T: QbftTypes> {
63+
/// Consensus instance identifier.
64+
pub instance: &'a T::Instance,
65+
/// Round being evaluated.
66+
pub round: i64,
67+
/// Process being evaluated.
68+
pub process: i64,
69+
}
70+
71+
/// Input passed to `Definition::decide`.
72+
pub struct DecideRequest<'a, T: QbftTypes> {
73+
/// Parent cancellation token.
74+
pub ct: &'a CancellationToken,
75+
/// Consensus instance identifier.
76+
pub instance: &'a T::Instance,
77+
/// Decided value.
78+
pub value: &'a T::Value,
79+
/// Commit quorum justifying the decision.
80+
pub qcommit: &'a Vec<Msg<T>>,
81+
}
82+
83+
/// Input passed to `QbftLogger::upon_rule`.
84+
pub struct UponRuleLog<'a, T: QbftTypes> {
85+
/// Consensus instance identifier.
86+
pub instance: &'a T::Instance,
87+
/// Local process.
88+
pub process: i64,
89+
/// Current local round.
90+
pub round: i64,
91+
/// Message that triggered classification.
92+
pub msg: &'a Msg<T>,
93+
/// Rule that fired.
94+
pub upon_rule: UponRule,
95+
}
96+
97+
/// Input passed to `QbftLogger::round_change`.
98+
pub struct RoundChangeLog<'a, T: QbftTypes> {
99+
/// Consensus instance identifier.
100+
pub instance: &'a T::Instance,
101+
/// Local process.
102+
pub process: i64,
103+
/// Previous local round.
104+
pub round: i64,
105+
/// New local round.
106+
pub new_round: i64,
107+
/// Rule that caused the round change.
108+
pub upon_rule: UponRule,
109+
/// Messages from the previous round.
110+
pub msgs: &'a Vec<Msg<T>>,
111+
}
112+
113+
/// Input passed to `QbftLogger::unjust`.
114+
pub struct UnjustLog<'a, T: QbftTypes> {
115+
/// Consensus instance identifier.
116+
pub instance: &'a T::Instance,
117+
/// Local process.
118+
pub process: i64,
119+
/// Rejected message.
120+
pub msg: Msg<T>,
121+
}

crates/core/src/qbft/fake_clock.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![allow(clippy::arithmetic_side_effects)]
2+
13
use crossbeam::channel as mpmc;
24
use std::{
35
collections::BTreeMap,
@@ -28,7 +30,13 @@ struct FakeClockInner {
2830
now: Instant,
2931
last_id: usize,
3032
cancelled: bool,
31-
clients: BTreeMap<usize, (mpmc::Sender<Instant>, Instant, TimerPriority)>,
33+
clients: BTreeMap<usize, ClientRecord>,
34+
}
35+
36+
struct ClientRecord {
37+
sender: mpmc::Sender<Instant>,
38+
deadline: Instant,
39+
priority: TimerPriority,
3240
}
3341

3442
impl FakeClock {
@@ -80,7 +88,14 @@ impl FakeClock {
8088
let deadline = inner.now + duration;
8189

8290
inner.last_id += 1;
83-
inner.clients.insert(id, (tx, deadline, priority));
91+
inner.clients.insert(
92+
id,
93+
ClientRecord {
94+
sender: tx,
95+
deadline,
96+
priority,
97+
},
98+
);
8499

85100
id
86101
};
@@ -124,9 +139,9 @@ impl FakeClock {
124139
inner.now += duration;
125140
let now = inner.now;
126141

127-
for (&id, (ch, deadline, priority)) in &inner.clients {
128-
if *deadline <= now {
129-
expired.push((id, *deadline, *priority, ch.clone()));
142+
for (&id, record) in &inner.clients {
143+
if record.deadline <= now {
144+
expired.push((id, record.deadline, record.priority, record.sender.clone()));
130145
}
131146
}
132147

0 commit comments

Comments
 (0)