Skip to content

Commit a294c33

Browse files
authored
Merge pull request #4190 from ProvableHQ/feat/batch-triggers
[Feature] Batch Triggers
2 parents 4c9a024 + f1af61f commit a294c33

19 files changed

Lines changed: 1148 additions & 390 deletions

File tree

.circleci/config.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ workflows:
799799
filters:
800800
branches:
801801
only:
802-
- ci/reenable-check-logs
802+
- fix/batch-triggers
803803
- canary
804804
- testnet
805805
- mainnet
@@ -809,7 +809,7 @@ workflows:
809809
filters:
810810
branches:
811811
only:
812-
- ci/reenable-check-logs
812+
- fix/batch-triggers
813813
- canary
814814
- testnet
815815
- mainnet

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/README.md

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,44 @@ The `snarkos-node-bft` crate provides a node implementation for a BFT-based memo
1010

1111
The primary is the coordinator, responsible for advancing rounds and broadcasting the anchor.
1212

13-
#### Triggering Round Advancement
14-
15-
Each round runs until one of two conditions is met:
16-
1. The coinbase target has been reached, or
17-
2. The round has reached its timeout (currently set to 10 seconds)
18-
19-
#### Advancing Rounds
20-
21-
As described in the paper [Bullshark: The Partially Synchronous Version](https://arxiv.org/abs/2209.05633),
22-
the BFT generally advances rounds when `n − f` vertices are delivered, however:
23-
```
24-
The problem in advancing rounds whenever n − f vertices are delivered is that parties
25-
might not vote for the anchor even if the party that broadcast it is just slightly slower
26-
than the fastest n − f parties. To deal with this, the BFT integrates timeouts into
27-
the DAG construction. If the first n − f vertices a party p gets in an even-numbered round r
28-
do not include the anchor of round r, then p sets a timer and waits for the anchor
29-
until the timer expires. Similarly, in an odd-numbered round, parties wait for either
30-
f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a timeout.
31-
```
32-
Note that in this quote `2f + 1` should really be `n - f`.
13+
#### Round Advancement
14+
15+
A round advances once a quorum (`n - f`) of validators have submitted certificates for that round
16+
and the following round-type-specific conditions are met:
17+
18+
- **Even rounds**: the elected leader's certificate is present among the quorum, confirming the
19+
leader was reachable. If the leader's certificate is absent, the node waits up to
20+
`MAX_LEADER_CERTIFICATE_DELAY` before advancing without it.
21+
- **Odd rounds**: at least `f + 1` certificates from the current round reference the previous
22+
even round's leader certificate (availability threshold), or `n - f` do not (non-leader
23+
quorum). If neither threshold is reached, the node again falls back to the timeout.
24+
25+
In both cases the timeout is `MAX_LEADER_CERTIFICATE_DELAY` (currently 5 seconds), reset at the
26+
start of each round. This follows the [Bullshark](https://arxiv.org/abs/2209.05633) protocol.
27+
28+
#### Batch Proposal
29+
30+
Batch proposals are driven by a dedicated **`ProposalTask`** that runs in a loop and is the only place that calls `Primary::propose_batch()`.
31+
This keeps proposal on a single execution path and avoids concurrent proposal attempts. Each loop iteration covers one full round and proceeds through three stages:
32+
33+
**Stage 1 — Wait until ready to propose**
34+
35+
The task blocks until all of the following conditions are satisfied:
36+
1. The node is synced. If it is currently syncing, the task waits via `wait_for_synced_if_syncing()` before continuing.
37+
2. `MIN_BATCH_DELAY` has elapsed since the start of the round, enforcing a minimum inter-proposal interval.
38+
3. One of two events fires:
39+
- **Ready signal**`ProposalTask::signal()` is called from `try_increment_to_the_next_round()` when the primary successfully advances to a new round (e.g. after a leader certificate is committed). This is delivered via a `watch` channel.
40+
- **`MAX_BATCH_DELAY` timeout** — If no signal arrives within `MAX_BATCH_DELAY` of the round start, the task proceeds anyway. This handles the case where the elected leader's certificate never arrives.
41+
42+
A short `CREATE_BATCH_INTERVAL` heartbeat keeps the round-change check alive while waiting.
43+
44+
**Stage 2 — Propose**
45+
46+
The task calls `propose_batch()` in a loop until it returns `Ok(true)` (batch submitted). On `Ok(false)` or a transient error it retries every `CREATE_BATCH_INTERVAL`. If the round advances during retries, the task restarts from Stage 1.
47+
48+
**Stage 3 — Wait for signatures**
49+
50+
Once the batch is broadcast, the task periodically calls `propose_batch()` every `MAX_BATCH_DELAY` to rebroadcast to any validators that have not yet signed. It exits this stage as soon as the round advances (detected either via the ready signal or by polling `current_round()`).
3351

3452
### Ledger Advancement
3553

node/bft/src/bft.rs

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
// limitations under the License.
1515

1616
use crate::{
17-
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
17+
MAX_LEADER_CERTIFICATE_DELAY,
1818
helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
1919
primary::{Primary, PrimaryCallback},
2020
sync::SyncCallback,
@@ -42,6 +42,8 @@ use colored::Colorize;
4242
use indexmap::{IndexMap, IndexSet};
4343
#[cfg(feature = "locktick")]
4444
use locktick::parking_lot::RwLock;
45+
#[cfg(feature = "locktick")]
46+
use locktick::tokio::Mutex;
4547
#[cfg(not(feature = "locktick"))]
4648
use parking_lot::RwLock;
4749
use std::{
@@ -52,6 +54,8 @@ use std::{
5254
atomic::{AtomicI64, Ordering},
5355
},
5456
};
57+
#[cfg(not(feature = "locktick"))]
58+
use tokio::sync::Mutex;
5559
use tokio::sync::{OnceCell, oneshot};
5660

5761
#[derive(Clone)]
@@ -66,6 +70,12 @@ pub struct BFT<N: Network> {
6670
leader_certificate_timer: Arc<AtomicI64>,
6771
/// The consensus sender.
6872
consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73+
/// Ensures only one call to `commit_leader_certificate` runs at a time.
74+
///
75+
/// Without this, a second certificate crossing the availability threshold while the consensus
76+
/// callback for a prior commit is still in-flight would re-walk already-committed rounds
77+
/// (because `last_committed_round` hasn't been updated yet), causing duplicate subdag commits.
78+
commit_lock: Arc<Mutex<()>>,
6979
}
7080

7181
impl<N: Network> BFT<N> {
@@ -98,6 +108,7 @@ impl<N: Network> BFT<N> {
98108
leader_certificate: Default::default(),
99109
leader_certificate_timer: Default::default(),
100110
consensus_sender: Default::default(),
111+
commit_lock: Default::default(),
101112
})
102113
}
103114

@@ -207,7 +218,13 @@ impl<N: Network> BFT<N> {
207218
#[async_trait::async_trait]
208219
impl<N: Network> PrimaryCallback<N> for BFT<N> {
209220
/// Notification that a new round has started.
210-
fn update_to_next_round(&self, current_round: u64) -> bool {
221+
///
222+
/// # Arguments
223+
/// * `current_round` - the round the caller is in (to avoid race conditions)
224+
///
225+
/// # Returns
226+
/// `true` if the BFT moved to the next round.
227+
fn try_advance_to_next_round(&self, current_round: u64) -> bool {
211228
// Ensure the current round is at least the storage round (this is a sanity check).
212229
let storage_round = self.storage().current_round();
213230
if current_round < storage_round {
@@ -482,7 +499,7 @@ impl<N: Network> BFT<N> {
482499
///
483500
/// This is always true for a new BFT instance.
484501
fn is_timer_expired(&self) -> bool {
485-
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
502+
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
486503
}
487504

488505
/// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
@@ -580,21 +597,40 @@ impl<N: Network> BFT<N> {
580597
#[cfg(debug_assertions)]
581598
trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
582599

600+
// Serialize all commits so that `last_committed_round` is up-to-date before the next call
601+
// re-walks the DAG, preventing duplicate subdag commits.
602+
let _commit_guard = self.commit_lock.lock().await;
603+
583604
// Fetch the leader round.
584605
let latest_leader_round = leader_certificate.round();
606+
585607
// Determine the list of all previous leader certificates since the last committed round.
586608
// The order of the leader certificates is from **newest** to **oldest**.
587609
let mut leader_certificates = vec![leader_certificate.clone()];
610+
// Whether the consensus callback should be skipped (true when the round is already committed).
611+
// When `latest_leader_round == last_committed_round` the round was already committed by a
612+
// concurrent call that beat us to the lock, or by a prior session whose DAG state was
613+
// reconstructed without populating `recently_committed`. In both cases we still re-run
614+
// DFS + GC to ensure `recently_committed` and `gc_round` are populated, but we must NOT
615+
// send a duplicate subdag to the consensus callback.
616+
let skip_consensus;
588617
{
589-
// Retrieve the leader round and the latest round we committed.
590-
let leader_round = leader_certificate.round();
591-
592618
// Read-lock the DAG.
593619
// We need to hold the lock, so we do not later fail to re-acquire it.
594620
let dag = self.dag.read();
595621

622+
// Re-check under the lock: another call may have committed this round while we were waiting.
623+
if latest_leader_round < dag.last_committed_round() {
624+
trace!("Skipping already-committed leader round {latest_leader_round}");
625+
return Ok(());
626+
}
627+
skip_consensus = latest_leader_round == dag.last_committed_round();
628+
629+
#[cfg(debug_assertions)]
630+
trace!("Attempting to commit leader certificate for round {}...", latest_leader_round);
631+
596632
let mut current_certificate = leader_certificate;
597-
for round in (dag.last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2) {
633+
for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) {
598634
// Retrieve the previous committee for the leader round.
599635
let previous_committee_lookback =
600636
self.ledger().get_committee_lookback_for_round(round).with_context(|| {
@@ -721,25 +757,28 @@ impl<N: Network> BFT<N> {
721757
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
722758
);
723759

724-
// Trigger consensus.
725-
if let Some(consensus_sender) = self.consensus_sender.get() {
726-
// Initialize a callback sender and receiver.
727-
let (callback_sender, callback_receiver) = oneshot::channel();
728-
// Send the subdag and transmissions to consensus.
729-
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
730-
// Await the callback to continue.
731-
match callback_receiver.await {
732-
Ok(Ok(_)) => (),
733-
Ok(Err(err)) => {
734-
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
735-
error!("{}", &flatten_error(err));
736-
return Ok(());
737-
}
738-
Err(err) => {
739-
let err: anyhow::Error = err.into();
740-
let err = err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
741-
error!("{}", flatten_error(err));
742-
return Ok(());
760+
// Trigger consensus (skipped if the round was already committed by a prior call).
761+
if !skip_consensus {
762+
if let Some(consensus_sender) = self.consensus_sender.get() {
763+
// Initialize a callback sender and receiver.
764+
let (callback_sender, callback_receiver) = oneshot::channel();
765+
// Send the subdag and transmissions to consensus.
766+
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
767+
// Await the callback to continue.
768+
match callback_receiver.await {
769+
Ok(Ok(_)) => (),
770+
Ok(Err(err)) => {
771+
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
772+
error!("{}", &flatten_error(err));
773+
return Ok(());
774+
}
775+
Err(err) => {
776+
let err: anyhow::Error = err.into();
777+
let err =
778+
err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
779+
error!("{}", flatten_error(err));
780+
return Ok(());
781+
}
743782
}
744783
}
745784
}
@@ -881,7 +920,7 @@ impl<N: Network> BFT<N> {
881920
mod tests {
882921
use crate::{
883922
BFT,
884-
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
923+
MAX_LEADER_CERTIFICATE_DELAY,
885924
PrimaryCallback,
886925
helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
887926
sync::SyncCallback,
@@ -1114,9 +1153,7 @@ mod tests {
11141153
assert!(!result);
11151154
}
11161155
// Wait for the timer to expire.
1117-
let leader_certificate_timeout =
1118-
std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1119-
std::thread::sleep(leader_certificate_timeout);
1156+
std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
11201157
// Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
11211158
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
11221159
if bft_timer.is_timer_expired() {

node/bft/src/gateway.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use crate::helpers::Telemetry;
1818
use crate::{
1919
CONTEXT,
20-
MAX_BATCH_DELAY_IN_MS,
20+
MAX_BATCH_DELAY,
2121
MEMORY_POOL_PORT,
2222
Worker,
2323
events::{DisconnectReason, EventCodec, PrimaryPing},
@@ -99,9 +99,9 @@ use tokio_stream::StreamExt;
9999
use tokio_util::codec::Framed;
100100

101101
/// The maximum interval of events to cache.
102-
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
102+
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
103103
/// The maximum interval of requests to cache.
104-
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
104+
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
105105

106106
/// The maximum number of connection attempts in an interval.
107107
#[cfg(not(test))]

node/bft/src/helpers/pending.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::{Gateway, MAX_FETCH_TIMEOUT_IN_MS};
16+
use crate::{Gateway, MAX_FETCH_TIMEOUT};
1717
use snarkos_node_bft_ledger_service::LedgerService;
1818
use snarkos_node_network::PeerPoolHandling;
1919
use snarkvm::{
@@ -36,8 +36,8 @@ use time::OffsetDateTime;
3636
use tokio::sync::oneshot;
3737

3838
/// The maximum number of seconds to wait before expiring a callback.
39-
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds.
40-
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64;
39+
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT` when converting to seconds.
40+
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT.as_secs() as i64;
4141

4242
/// Returns the maximum number of redundant requests for the number of validators in the specified round.
4343
pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> Result<usize> {

node/bft/src/helpers/timestamp.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// See the License for the specific language governing permissions and
1414
// limitations under the License.
1515

16-
use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
16+
use crate::MAX_TIMESTAMP_DELTA;
1717
use snarkvm::prelude::{Result, bail};
1818

1919
use time::OffsetDateTime;
@@ -36,7 +36,7 @@ pub fn to_utc_datetime(timestamp: i64) -> OffsetDateTime {
3636
/// Sanity checks the timestamp for liveness.
3737
pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
3838
// Ensure the timestamp is within range.
39-
if timestamp > (now() + MAX_TIMESTAMP_DELTA_IN_SECS) {
39+
if timestamp > (now() + MAX_TIMESTAMP_DELTA.as_secs() as i64) {
4040
bail!("Timestamp {timestamp} is too far in the future")
4141
}
4242
Ok(())
@@ -45,17 +45,17 @@ pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
4545
#[cfg(test)]
4646
mod prop_tests {
4747
use super::*;
48-
use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
48+
use crate::MAX_TIMESTAMP_DELTA;
4949

5050
use proptest::prelude::*;
5151
use test_strategy::proptest;
5252

5353
fn any_valid_timestamp() -> BoxedStrategy<i64> {
54-
(Just(now()), 0..MAX_TIMESTAMP_DELTA_IN_SECS).prop_map(|(now, delta)| now + delta).boxed()
54+
(Just(now()), 0..(MAX_TIMESTAMP_DELTA.as_secs() as i64)).prop_map(|(now, delta)| now + delta).boxed()
5555
}
5656

5757
fn any_invalid_timestamp() -> BoxedStrategy<i64> {
58-
(Just(now()), MAX_TIMESTAMP_DELTA_IN_SECS..).prop_map(|(now, delta)| now + delta).boxed()
58+
(Just(now()), (MAX_TIMESTAMP_DELTA.as_secs() as i64)..).prop_map(|(now, delta)| now + delta).boxed()
5959
}
6060

6161
#[proptest]

0 commit comments

Comments
 (0)