Skip to content

Commit 3222d08

Browse files
committed
fix(node/bft): stop proposing when a node starts syncing again
1 parent 2f784b6 commit 3222d08

2 files changed

Lines changed: 205 additions & 3 deletions

File tree

node/bft/src/primary.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,10 @@ impl<N: Network> proposal_task::BatchPropose for Primary<N> {
413413
self.sync.wait_for_synced_if_syncing()
414414
}
415415

416+
fn is_synced(&self) -> bool {
417+
self.sync.is_synced()
418+
}
419+
416420
/// Proposes the batch for the current round.
417421
///
418422
/// This method performs the following steps:

node/bft/src/primary/proposal_task.rs

Lines changed: 201 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub(super) trait BatchPropose: Send + Sync {
3737
/// once sync completes.
3838
fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>>;
3939

40+
/// Returns `true` if the node is currently synced with the network.
41+
fn is_synced(&self) -> bool;
42+
4043
/// Attempts to propose a batch.
4144
///
4245
/// Returns `Ok(true)` when a batch was successfully proposed, `Ok(false)` to retry, and
@@ -164,14 +167,22 @@ impl<N: Network> ProposalTask<N> {
164167
/// Calls `propose_batch()` with CREATE_BATCH_INTERVAL retries until it returns `Ok(true)`
165168
/// (batch submitted to the network).
166169
///
167-
/// Returns `true` if the batch was submitted, `false` if the round changed (caller should restart).
170+
/// Returns `true` if the batch was submitted, `false` if the round changed or the node started
171+
/// syncing (caller should restart; Stage 1 will then await sync completion).
168172
async fn propose<P: BatchPropose>(primary: &P, round: u64) -> bool {
169173
let mut attempt = 1u32;
170174
loop {
171175
if primary.current_round() != round {
172176
return false;
173177
}
174178

179+
// Bail out if sync started mid-Stage-2; otherwise propose_batch may spin at the
180+
// CREATE_BATCH_INTERVAL cadence on Ok(false) paths (e.g. previous round has not
181+
// reached quorum, not enough connected validators, cached batch rebroadcast).
182+
if !primary.is_synced() {
183+
return false;
184+
}
185+
175186
if attempt > 1 {
176187
sleep(CREATE_BATCH_INTERVAL).await;
177188
debug!("Retrying batch proposal for round {round} (attempt #{attempt})");
@@ -194,7 +205,9 @@ impl<N: Network> ProposalTask<N> {
194205
/// Stage 3: Wait for the proposed batch to collect enough signatures.
195206
///
196207
/// Periodically rebroadcasts the batch to non-signers (via `propose_batch`) at most once per
197-
/// MAX_BATCH_DELAY until the round advances. Returns when the round changes.
208+
/// MAX_BATCH_DELAY until the round advances. Returns when the round changes or when the node
209+
/// starts syncing — in the latter case the outer loop restarts and Stage 1's sync gate takes
210+
/// over.
198211
async fn wait_for_signatures<P: BatchPropose>(primary: &P, ready_rx: &mut watch::Receiver<bool>, round: u64) {
199212
loop {
200213
if primary.current_round() != round {
@@ -212,6 +225,13 @@ impl<N: Network> ProposalTask<N> {
212225
return;
213226
}
214227

228+
// A node cannot rebroadcast its proposed batch while it is syncing — its previous
229+
// certificates may be stale and peers won't sign it anyway. Bail out so the outer
230+
// loop falls back through Stage 1, which awaits sync completion before proposing.
231+
if !primary.is_synced() {
232+
return;
233+
}
234+
215235
// Rebroadcast to non-signers (`propose_batch` handles this internally).
216236
match primary.propose_batch().await {
217237
Ok(_) => {}
@@ -248,7 +268,7 @@ mod tests {
248268
use std::{
249269
sync::{
250270
Arc,
251-
atomic::{AtomicU32, Ordering},
271+
atomic::{AtomicBool, AtomicU32, Ordering},
252272
},
253273
time::Duration,
254274
};
@@ -273,6 +293,10 @@ mod tests {
273293
None
274294
}
275295

296+
fn is_synced(&self) -> bool {
297+
true
298+
}
299+
276300
async fn propose_batch(&self) -> Result<bool> {
277301
self.propose_count.fetch_add(1, Ordering::SeqCst);
278302
self.proposed_notify.notify_one();
@@ -302,6 +326,10 @@ mod tests {
302326
None
303327
}
304328

329+
fn is_synced(&self) -> bool {
330+
true
331+
}
332+
305333
async fn propose_batch(&self) -> Result<bool> {
306334
self.propose_count.fetch_add(1, Ordering::SeqCst);
307335
Ok(true)
@@ -326,6 +354,10 @@ mod tests {
326354
None
327355
}
328356

357+
fn is_synced(&self) -> bool {
358+
true
359+
}
360+
329361
async fn propose_batch(&self) -> Result<bool> {
330362
let count = self.propose_count.fetch_add(1, Ordering::SeqCst) + 1;
331363
if count <= self.retries_before_success {
@@ -426,6 +458,10 @@ mod tests {
426458
None
427459
}
428460

461+
fn is_synced(&self) -> bool {
462+
true
463+
}
464+
429465
async fn propose_batch(&self) -> Result<bool> {
430466
self.propose_count.fetch_add(1, Ordering::SeqCst);
431467
self.proposed_notify.notify_one();
@@ -512,4 +548,166 @@ mod tests {
512548
// Stage 3 may make additional rebroadcast calls after success, so use >.
513549
assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1);
514550
}
551+
552+
/// While the node is syncing, Stage 3 must not rebroadcast the proposed batch — its previous
553+
/// certificates may be stale and peers will not sign it. Once sync completes, rebroadcast
554+
/// should resume.
555+
#[test_log::test(tokio::test)]
556+
async fn test_proposal_task_pauses_rebroadcast_while_syncing() {
557+
/// Synced for Stage 1/2. After the first successful `propose_batch`, flips to "syncing"
558+
/// so Stage 3's rebroadcast loop must pause. The held `sync_release` `Notify` lets the
559+
/// test resume sync on demand to assert that rebroadcast comes back.
560+
struct SyncTogglingProposer {
561+
propose_count: Arc<AtomicU32>,
562+
proposed_notify: Arc<Notify>,
563+
is_syncing: Arc<AtomicBool>,
564+
sync_release: Arc<Notify>,
565+
}
566+
567+
#[async_trait::async_trait]
568+
impl BatchPropose for SyncTogglingProposer {
569+
fn current_round(&self) -> u64 {
570+
1
571+
}
572+
573+
fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
574+
if self.is_syncing.load(Ordering::SeqCst) {
575+
let release = self.sync_release.clone();
576+
Some(Box::pin(async move { release.notified().await }))
577+
} else {
578+
None
579+
}
580+
}
581+
582+
fn is_synced(&self) -> bool {
583+
!self.is_syncing.load(Ordering::SeqCst)
584+
}
585+
586+
async fn propose_batch(&self) -> Result<bool> {
587+
self.propose_count.fetch_add(1, Ordering::SeqCst);
588+
self.proposed_notify.notify_one();
589+
// Transition to syncing once the Stage 2 proposal has gone out.
590+
self.is_syncing.store(true, Ordering::SeqCst);
591+
Ok(true)
592+
}
593+
}
594+
595+
// Default starts ready — Stage 1 completes after MIN_BATCH_DELAY without a signal.
596+
let task = ProposalTask::<MainnetV0>::default();
597+
598+
let proposed_notify = Arc::new(Notify::new());
599+
let propose_count = Arc::new(AtomicU32::new(0));
600+
let is_syncing = Arc::new(AtomicBool::new(false));
601+
let sync_release = Arc::new(Notify::new());
602+
603+
let proposer = SyncTogglingProposer {
604+
propose_count: propose_count.clone(),
605+
proposed_notify: proposed_notify.clone(),
606+
is_syncing: is_syncing.clone(),
607+
sync_release: sync_release.clone(),
608+
};
609+
610+
tokio::spawn(task.run(proposer));
611+
612+
// Wait for Stage 2 to make its single propose call.
613+
tokio::time::timeout(Duration::from_secs(10), proposed_notify.notified())
614+
.await
615+
.expect("Stage 2 did not call propose_batch within 10 seconds");
616+
assert_eq!(propose_count.load(Ordering::SeqCst), 1, "expected exactly one Stage 2 call");
617+
618+
// Stage 3 sleeps MAX_BATCH_DELAY before each rebroadcast attempt; wait past that to give
619+
// the sync gate a chance to fire. Without the gate, propose_count would increment here.
620+
tokio::time::sleep(MAX_BATCH_DELAY + Duration::from_secs(1)).await;
621+
assert_eq!(propose_count.load(Ordering::SeqCst), 1, "Stage 3 rebroadcast fired while the node was syncing",);
622+
623+
// Release sync. Stage 3 should resume rebroadcasting after the next MAX_BATCH_DELAY tick.
624+
is_syncing.store(false, Ordering::SeqCst);
625+
sync_release.notify_waiters();
626+
627+
tokio::time::timeout(MAX_BATCH_DELAY + Duration::from_secs(5), proposed_notify.notified())
628+
.await
629+
.expect("Stage 3 did not resume rebroadcast after sync completed");
630+
assert!(propose_count.load(Ordering::SeqCst) >= 2, "expected rebroadcast after sync completed");
631+
}
632+
633+
/// Stage 2 retries `propose_batch` every CREATE_BATCH_INTERVAL (250ms) while it returns
634+
/// `Ok(false)`. If sync starts mid-retry, the loop must bail out so the outer loop can fall
635+
/// back through Stage 1's sync gate — otherwise the node spins, calling `propose_batch` four
636+
/// times per second (which on a real primary triggers the cached-batch rebroadcast).
637+
#[test_log::test(tokio::test)]
638+
async fn test_proposal_task_bails_stage_2_when_syncing_starts() {
639+
/// Always returns `Ok(false)` from `propose_batch`, so Stage 2 enters its retry loop.
640+
/// The test flips `syncing` to true after a few calls; subsequent calls should stop.
641+
struct AlwaysFalseProposer {
642+
propose_count: Arc<AtomicU32>,
643+
syncing: Arc<AtomicBool>,
644+
sync_release: Arc<Notify>,
645+
}
646+
647+
#[async_trait::async_trait]
648+
impl BatchPropose for AlwaysFalseProposer {
649+
fn current_round(&self) -> u64 {
650+
1
651+
}
652+
653+
fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
654+
if self.syncing.load(Ordering::SeqCst) {
655+
let release = self.sync_release.clone();
656+
Some(Box::pin(async move { release.notified().await }))
657+
} else {
658+
None
659+
}
660+
}
661+
662+
fn is_synced(&self) -> bool {
663+
!self.syncing.load(Ordering::SeqCst)
664+
}
665+
666+
async fn propose_batch(&self) -> Result<bool> {
667+
self.propose_count.fetch_add(1, Ordering::SeqCst);
668+
// Never succeed — Stage 2 will keep retrying every CREATE_BATCH_INTERVAL.
669+
Ok(false)
670+
}
671+
}
672+
673+
let task = ProposalTask::<MainnetV0>::default();
674+
let propose_count = Arc::new(AtomicU32::new(0));
675+
let syncing = Arc::new(AtomicBool::new(false));
676+
let sync_release = Arc::new(Notify::new());
677+
678+
let proposer = AlwaysFalseProposer {
679+
propose_count: propose_count.clone(),
680+
syncing: syncing.clone(),
681+
sync_release: sync_release.clone(),
682+
};
683+
684+
tokio::spawn(task.run(proposer));
685+
686+
// Let Stage 2 spin for a few CREATE_BATCH_INTERVAL ticks (250ms each). After MIN_BATCH_DELAY
687+
// (1s) for Stage 1 to release plus a couple of retry cycles, we should see >= 2 calls.
688+
tokio::time::sleep(Duration::from_millis(2000)).await;
689+
let pre_sync_calls = propose_count.load(Ordering::SeqCst);
690+
assert!(pre_sync_calls >= 2, "Stage 2 should have retried at least twice while synced (got {pre_sync_calls})");
691+
692+
// Start syncing. Stage 2 must bail out, and Stage 1 must then block on the sync gate.
693+
syncing.store(true, Ordering::SeqCst);
694+
695+
// Allow Stage 2 to notice and bail, and Stage 1 to install its sync wait.
696+
tokio::time::sleep(Duration::from_millis(500)).await;
697+
let after_bail_calls = propose_count.load(Ordering::SeqCst);
698+
699+
// From this point, no further propose_batch calls should happen until sync releases.
700+
tokio::time::sleep(Duration::from_secs(2)).await;
701+
assert_eq!(
702+
propose_count.load(Ordering::SeqCst),
703+
after_bail_calls,
704+
"propose_batch called while syncing — Stage 2 did not bail (or Stage 1 missed the gate)",
705+
);
706+
707+
// Release sync; the outer loop should drive Stage 2 again.
708+
syncing.store(false, Ordering::SeqCst);
709+
sync_release.notify_waiters();
710+
tokio::time::sleep(Duration::from_secs(2)).await;
711+
assert!(propose_count.load(Ordering::SeqCst) > after_bail_calls, "Stage 2 did not resume after sync completed",);
712+
}
515713
}

0 commit comments

Comments
 (0)