@@ -9,15 +9,17 @@ use pluto_core::{
99 corepb:: v1:: core as pbcore,
1010 types:: { Duty , DutyType , SlotNumber } ,
1111} ;
12+ use pluto_eth2api:: spec:: phase0;
1213use prost:: bytes:: Bytes ;
1314use test_case:: test_case;
14- use tokio:: sync:: mpsc;
15+ use tokio:: { sync:: mpsc, task :: JoinSet } ;
1516use tokio_util:: sync:: CancellationToken ;
1617
1718use super :: {
1819 Peer ,
1920 component:: { self , Config , Consensus } ,
2021} ;
22+ use crate :: timer:: { RoundTimer , RoundTimerFunc , RoundTimerFuture , TimerType } ;
2123
2224#[ test_case( 2 , 3 ; "two_of_three" ) ]
2325#[ test_case( 3 , 4 ; "three_of_four" ) ]
@@ -26,8 +28,77 @@ use super::{
2628#[ tokio:: test]
2729async fn qbft_consensus ( threshold : usize , cluster_nodes : usize ) {
2830 assert ! ( threshold <= cluster_nodes) ;
31+ run_qbft_consensus ( threshold, false , unsigned_value) . await ;
32+ }
33+
34+ #[ tokio:: test]
35+ async fn qbft_consensus_attester_compare_enabled ( ) {
36+ run_qbft_consensus ( 3 , true , |_| attester_value ( 0 ) ) . await ;
37+ }
38+
39+ #[ tokio:: test]
40+ async fn qbft_consensus_attester_compare_mismatch_does_not_decide ( ) {
41+ let threshold = 3 ;
42+ let ( sniffed_tx, _sniffed_rx) = mpsc:: unbounded_channel ( ) ;
43+ let active_nodes =
44+ in_memory_network ( threshold, true , Some ( Duration :: from_millis ( 20 ) ) , sniffed_tx) ;
45+ let ( decided_tx, mut decided_rx) = mpsc:: unbounded_channel ( ) ;
46+ let duty = Duty :: new ( SlotNumber :: new ( 1 ) , DutyType :: Attester ) ;
47+ let ct = CancellationToken :: new ( ) ;
48+ let start_ct = CancellationToken :: new ( ) ;
49+ let mut expired_txs = Vec :: with_capacity ( active_nodes. len ( ) ) ;
50+ let mut start_tasks = Vec :: with_capacity ( active_nodes. len ( ) ) ;
51+
52+ for ( node_idx, node) in active_nodes. iter ( ) . enumerate ( ) {
53+ let decided_tx = decided_tx. clone ( ) ;
54+ node. subscribe ( move |duty, value| {
55+ let _ = decided_tx. send ( ( node_idx, duty, value) ) ;
56+ Ok ( ( ) )
57+ } ) ;
58+
59+ let ( expired_tx, expired_rx) = mpsc:: channel ( 1 ) ;
60+ expired_txs. push ( expired_tx) ;
61+ start_tasks. push ( Arc :: clone ( node) . start ( start_ct. clone ( ) , expired_rx) ) ;
62+ }
63+ drop ( decided_tx) ;
64+
65+ let mut tasks = JoinSet :: new ( ) ;
66+ for ( node_idx, node) in active_nodes. iter ( ) . enumerate ( ) {
67+ let node = Arc :: clone ( node) ;
68+ let duty = duty. clone ( ) ;
69+ let value = attester_value ( node_idx) ;
70+ let ct = ct. clone ( ) ;
71+ tasks. spawn ( async move { node. propose ( & ct, duty, value) . await } ) ;
72+ }
73+
74+ tokio:: time:: timeout ( Duration :: from_millis ( 150 ) , decided_rx. recv ( ) )
75+ . await
76+ . expect_err ( "mismatched attester compare unexpectedly decided" ) ;
77+
78+ ct. cancel ( ) ;
79+ tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , async {
80+ while let Some ( result) = tasks. join_next ( ) . await {
81+ assert ! ( result. expect( "mismatched compare task panicked" ) . is_err( ) ) ;
82+ }
83+ } )
84+ . await
85+ . expect ( "mismatched compare tasks did not stop after cancellation" ) ;
86+ assert ! ( decided_rx. try_recv( ) . is_err( ) ) ;
87+
88+ start_ct. cancel ( ) ;
89+ drop ( expired_txs) ;
90+ for task in start_tasks {
91+ task. await . unwrap ( ) ;
92+ }
93+ }
94+
95+ async fn run_qbft_consensus (
96+ threshold : usize ,
97+ compare_attestations : bool ,
98+ value : fn ( usize ) -> pbcore:: UnsignedDataSet ,
99+ ) {
29100 let ( sniffed_tx, mut sniffed_rx) = mpsc:: unbounded_channel ( ) ;
30- let active_nodes = in_memory_network ( threshold, sniffed_tx) ;
101+ let active_nodes = in_memory_network ( threshold, compare_attestations , None , sniffed_tx) ;
31102 let ( decided_tx, mut decided_rx) = mpsc:: unbounded_channel ( ) ;
32103 let duty = Duty :: new ( SlotNumber :: new ( 1 ) , DutyType :: Attester ) ;
33104 let ct = CancellationToken :: new ( ) ;
@@ -48,24 +119,24 @@ async fn qbft_consensus(threshold: usize, cluster_nodes: usize) {
48119 }
49120 drop ( decided_tx) ;
50121
51- let mut tasks = Vec :: with_capacity ( active_nodes . len ( ) ) ;
122+ let mut tasks = JoinSet :: new ( ) ;
52123 for ( node_idx, node) in active_nodes. iter ( ) . enumerate ( ) {
53124 let node = Arc :: clone ( node) ;
54125 let duty = duty. clone ( ) ;
55- let value = unsigned_value ( node_idx) ;
126+ let value = value ( node_idx) ;
56127 let ct = ct. clone ( ) ;
57- tasks. push ( tokio:: spawn (
58- async move { node. propose ( & ct, duty, value) . await } ,
59- ) ) ;
128+ tasks. spawn ( async move { node. propose ( & ct, duty, value) . await } ) ;
60129 }
61130
62131 let mut decided = Vec :: with_capacity ( active_nodes. len ( ) ) ;
63132 for _ in 0 ..active_nodes. len ( ) {
64133 decided. push ( recv_one ( & mut decided_rx) . await ) ;
65134 }
66135
67- for task in tasks {
68- task. await . unwrap ( ) . unwrap ( ) ;
136+ while let Some ( result) = tasks. join_next ( ) . await {
137+ result
138+ . expect ( "consensus task panicked" )
139+ . expect ( "consensus task failed" ) ;
69140 }
70141
71142 decided. sort_by_key ( |( node_idx, ..) | * node_idx) ;
@@ -112,8 +183,60 @@ fn unsigned_value(seed: usize) -> pbcore::UnsignedDataSet {
112183 pbcore:: UnsignedDataSet { set }
113184}
114185
186+ fn attester_value ( seed : usize ) -> pbcore:: UnsignedDataSet {
187+ let mut set = BTreeMap :: new ( ) ;
188+ set. insert ( pubkey ( 1 ) , attestation_json_bytes ( & attestation_data ( seed) ) ) ;
189+ pbcore:: UnsignedDataSet { set }
190+ }
191+
192+ fn attestation_json_bytes ( data : & phase0:: AttestationData ) -> Bytes {
193+ let value = serde_json:: json!( {
194+ "attestation_data" : data,
195+ "attestation_duty" : {
196+ "slot" : "1" ,
197+ "validator_index" : "1" ,
198+ "committee_index" : "2" ,
199+ "committee_length" : "8" ,
200+ "committees_at_slot" : "1" ,
201+ "validator_committee_index" : "1" ,
202+ } ,
203+ } ) ;
204+ Bytes :: from ( serde_json:: to_vec ( & value) . expect ( "test attestation json serializes" ) )
205+ }
206+
207+ fn attestation_data ( seed : usize ) -> phase0:: AttestationData {
208+ let seed = u8:: try_from ( seed) . expect ( "test attestation seed fits u8" ) ;
209+ let source_epoch = u64:: from ( seed)
210+ . checked_add ( 4 )
211+ . expect ( "test source epoch fits u64" ) ;
212+ let source_root = seed. checked_add ( 5 ) . expect ( "test source root byte fits u8" ) ;
213+ let target_epoch = u64:: from ( seed)
214+ . checked_add ( 6 )
215+ . expect ( "test target epoch fits u64" ) ;
216+ let target_root = seed. checked_add ( 7 ) . expect ( "test target root byte fits u8" ) ;
217+ phase0:: AttestationData {
218+ slot : 1 ,
219+ index : 2 ,
220+ beacon_block_root : [ 3 ; 32 ] ,
221+ source : phase0:: Checkpoint {
222+ epoch : source_epoch,
223+ root : [ source_root; 32 ] ,
224+ } ,
225+ target : phase0:: Checkpoint {
226+ epoch : target_epoch,
227+ root : [ target_root; 32 ] ,
228+ } ,
229+ }
230+ }
231+
232+ fn pubkey ( seed : u8 ) -> String {
233+ format ! ( "0x{}" , hex:: encode( [ seed; 48 ] ) )
234+ }
235+
115236fn in_memory_network (
116237 count : usize ,
238+ compare_attestations : bool ,
239+ round_timeout : Option < Duration > ,
117240 sniffed_tx : mpsc:: UnboundedSender < ( usize , usize ) > ,
118241) -> Vec < Arc < Consensus > > {
119242 let peers = ( 0 ..count)
@@ -156,7 +279,11 @@ fn in_memory_network(
156279 . expect ( "test peer index fits u8" ) ,
157280 ) ,
158281 broadcaster,
159- compare_attestations : false ,
282+ compare_attestations,
283+ timer_func : match round_timeout {
284+ Some ( timeout) => short_timer_func ( timeout) ,
285+ None => crate :: timer:: get_round_timer_func ( ) ,
286+ } ,
160287 sniffer : {
161288 let sniffed_tx = sniffed_tx. clone ( ) ;
162289 Arc :: new ( move |instance| {
@@ -172,3 +299,27 @@ fn in_memory_network(
172299
173300 nodes. lock ( ) . unwrap ( ) . clone ( )
174301}
302+
303+ fn short_timer_func ( timeout : Duration ) -> RoundTimerFunc {
304+ Box :: new ( move |_| Box :: new ( ShortRoundTimer { timeout } ) )
305+ }
306+
307+ struct ShortRoundTimer {
308+ timeout : Duration ,
309+ }
310+
311+ impl RoundTimer for ShortRoundTimer {
312+ fn timer_type ( & self ) -> TimerType {
313+ TimerType :: Increasing
314+ }
315+
316+ fn timer ( & self , _round : i64 ) -> crate :: timer:: Result < RoundTimerFuture > {
317+ let deadline = tokio:: time:: Instant :: now ( )
318+ . checked_add ( self . timeout )
319+ . expect ( "test timer deadline fits Instant" ) ;
320+ Ok ( Box :: pin ( async move {
321+ tokio:: time:: sleep_until ( deadline) . await ;
322+ deadline
323+ } ) )
324+ }
325+ }
0 commit comments