Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use aptos_types::{
epoch_state::EpochState,
on_chain_config::{
AnchorElectionMode, DagConsensusConfigV1,
LeaderReputationType::{ProposerAndVoter, ProposerAndVoterV2},
LeaderReputationType::{ProposerAndVoter, ProposerAndVoterV2, ProposerAndVoterV3},
OnChainJWKConsensusConfig, OnChainRandomnessConfig, ProposerAndVoterConfig,
ValidatorTxnConfig,
},
Expand Down Expand Up @@ -479,6 +479,24 @@ impl DagBootstrapper {
self.build_leader_reputation_components(config),
)
},
// V3 is V2 plus the latency-weighted gate. The DAG path does not yet
// wire `LatencyWeightedHeuristic` into its anchor-election plumbing,
// so we use the V3 base config and ignore the latency-weighted toggle.
// TODO(consensus): plumb LatencyWeightedHeuristic into DAG when needed.
ProposerAndVoterV3(config) => {
let base = &config.base;
let commit_events = self
.storage
.get_latest_k_committed_events(
std::cmp::max(
base.proposer_window_num_validators_multiplier,
base.voter_window_num_validators_multiplier,
) as u64
* self.epoch_state.verifier.len() as u64,
)
.expect("Failed to read commit events from storage");
(commit_events, self.build_leader_reputation_components(base))
},
ProposerAndVoter(_) => unreachable!("unsupported mode"),
};

Expand Down
74 changes: 38 additions & 36 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
liveness::{
cached_proposer_election::CachedProposerElection,
leader_reputation::{
extract_epoch_to_proposers, AptosDBBackend, LeaderReputation,
extract_epoch_to_proposers, AptosDBBackend, LatencyWeightedHeuristic, LeaderReputation,
ProposerAndVoterHeuristic, ReputationHeuristic,
},
proposal_generator::{
Expand Down Expand Up @@ -101,11 +101,10 @@ use aptos_types::{
epoch_state::EpochState,
jwks::SupportedOIDCProviders,
on_chain_config::{
ChunkyDKGConfigMoveStruct, ChunkyDKGConfigSeqNum, Features, LeaderReputationType,
OnChainChunkyDKGConfig, OnChainConfigPayload, OnChainConfigProvider,
OnChainConsensusConfig, OnChainExecutionConfig, OnChainJWKConsensusConfig,
OnChainRandomnessConfig, ProposerElectionType, RandomnessConfigMoveStruct,
RandomnessConfigSeqNum, ValidatorSet,
ChunkyDKGConfigMoveStruct, ChunkyDKGConfigSeqNum, Features, OnChainChunkyDKGConfig,
OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig,
OnChainExecutionConfig, OnChainJWKConsensusConfig, OnChainRandomnessConfig,
ProposerElectionType, RandomnessConfigMoveStruct, RandomnessConfigSeqNum, ValidatorSet,
},
randomness::{RandKeys, WvufPP, WVUF},
secret_sharing::SecretShareConfig,
Expand Down Expand Up @@ -320,37 +319,40 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
Arc::new(RotatingProposer::new(vec![proposer], *contiguous_rounds))
},
ProposerElectionType::LeaderReputation(leader_reputation_type) => {
let (
heuristic,
window_size,
weight_by_voting_power,
use_history_from_previous_epoch_max_count,
) = match &leader_reputation_type {
LeaderReputationType::ProposerAndVoter(proposer_and_voter_config)
| LeaderReputationType::ProposerAndVoterV2(proposer_and_voter_config) => {
let proposer_window_size = proposers.len()
* proposer_and_voter_config.proposer_window_num_validators_multiplier;
let voter_window_size = proposers.len()
* proposer_and_voter_config.voter_window_num_validators_multiplier;
let heuristic: Box<dyn ReputationHeuristic> =
Box::new(ProposerAndVoterHeuristic::new(
self.author,
proposer_and_voter_config.active_weight,
proposer_and_voter_config.inactive_weight,
proposer_and_voter_config.failed_weight,
proposer_and_voter_config.failure_threshold_percent,
voter_window_size,
proposer_window_size,
leader_reputation_type.use_reputation_window_from_stale_end(),
));
(
heuristic,
std::cmp::max(proposer_window_size, voter_window_size),
proposer_and_voter_config.weight_by_voting_power,
proposer_and_voter_config.use_history_from_previous_epoch_max_count,
)
},
// Pull base parameters and the latency-weighted gate out of the on-chain
// config in a version-agnostic way. V3 carries the latency settings; V1/V2
// return false for `use_latency_weighted` so behavior is unchanged for them.
let params = leader_reputation_type.proposer_and_voter_params();
let proposer_and_voter_config = params.base;
let proposer_window_size = proposers.len()
* proposer_and_voter_config.proposer_window_num_validators_multiplier;
let voter_window_size = proposers.len()
* proposer_and_voter_config.voter_window_num_validators_multiplier;
let inner_heuristic = ProposerAndVoterHeuristic::new(
self.author,
proposer_and_voter_config.active_weight,
proposer_and_voter_config.inactive_weight,
proposer_and_voter_config.failed_weight,
proposer_and_voter_config.failure_threshold_percent,
voter_window_size,
proposer_window_size,
leader_reputation_type.use_reputation_window_from_stale_end(),
);
let heuristic: Box<dyn ReputationHeuristic> = if params.use_latency_weighted {
// Decode milli-multiplier (1000 = 1.0×) deterministically.
let multiplier = params.latency_weight_multiplier_milli as f64 / 1000.0;
Box::new(LatencyWeightedHeuristic::new(
inner_heuristic,
proposer_and_voter_config.active_weight,
multiplier,
))
} else {
Box::new(inner_heuristic)
};
let window_size = std::cmp::max(proposer_window_size, voter_window_size);
let weight_by_voting_power = proposer_and_voter_config.weight_by_voting_power;
let use_history_from_previous_epoch_max_count =
proposer_and_voter_config.use_history_from_previous_epoch_max_count;

let seek_len = onchain_config.leader_reputation_exclude_round() as usize
+ onchain_config.max_failed_authors_to_store()
Expand Down
211 changes: 211 additions & 0 deletions consensus/src/liveness/leader_reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,217 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic {
}
}

/// A heuristic that wraps `ProposerAndVoterHeuristic` but additionally scales active-validator
/// weights down based on their historical round-time performance. Validators with slow round
/// times (i.e. those that take longer to produce a committed block, including timeouts
/// attributed to them via `failed_proposer_indices`) get a proportionally lower chance of
/// being selected as leader. Healthy validators are not boosted — they keep their base
/// active weight.
///
/// ## Penalty formula
///
/// For every active validator we compute the mean interval between consecutive committed
/// blocks in the history window:
/// * Successful pairs are split 50/50 between the two adjacent proposers.
/// * Timeout-spanning gaps are attributed in full to the failed proposers
/// (via `failed_proposer_indices`).
///
/// We use the **median** of all observed per-validator means as the reference point. The
/// per-validator weight is:
///
/// factor = 1.0 / max(1.0, val_mean / median_mean).clamp(_, MAX_LATENCY_RATIO).powf(multiplier)
/// weight = active_weight * factor
///
/// Properties:
/// * Validators at or below the median: `factor = 1.0` (no penalty, no boost). This makes
/// the heuristic robust to small natural variation between healthy validators — at higher
/// multipliers, transient blips on a fast validator no longer cliff its weight.
/// * Validators above the median: penalized by `(val_mean / median_mean)^-multiplier`.
/// The slowest validator (which used to get the **base** weight under the old `max_mean /
/// val_mean` formula) now gets the **lowest** weight — closer to the original intent.
/// * Penalty ratio is clamped at `MAX_LATENCY_RATIO` to bound the suppression for a single
/// anomalously-slow validator.
///
/// ## Carry-forward for validators with no observations
///
/// A validator with `< MIN_OBSERVATIONS` round-time samples in the window does NOT fall back
/// to the base active weight (the previous behavior). Instead, we apply the **last computed
/// factor** for that validator (stored in `last_factor`). This breaks the oscillation cycle
/// where a successfully-suppressed slow validator would pop back to full weight as soon as
/// our successful suppression denied it observations:
///
/// * fresh observations → recompute factor, store it
/// * no fresh observations → carry-forward the previous factor
/// * never seen before → factor = 1.0 (benefit of doubt for newly-rotated-in validators)
///
/// State is per-epoch (the heuristic is reconstructed on epoch change), so cross-epoch
/// rotation is naturally handled.
pub struct LatencyWeightedHeuristic {
inner: ProposerAndVoterHeuristic,
active_weight: u64,
multiplier: f64,
/// Carry-forward state: last computed weight factor per author (guarded for &self
/// access). Mutated only inside `get_weights`.
last_factor: Mutex<HashMap<Author, f64>>,
}

/// Minimum number of round-time observations needed before a validator is scaled by the
/// latency-weighted heuristic; below this we apply the carry-forward factor (or 1.0 for
/// validators we have never observed).
const MIN_OBSERVATIONS: usize = 2;

/// Hard ceiling on the per-validator scaling ratio (`val_mean / median_mean`) used to
/// bound how aggressively a single anomalously-slow validator can be suppressed. With
/// multiplier=2.0 and MAX_LATENCY_RATIO=10, the minimum factor is 1/100 = 0.01.
const MAX_LATENCY_RATIO: f64 = 10.0;

impl LatencyWeightedHeuristic {
pub fn new(inner: ProposerAndVoterHeuristic, active_weight: u64, multiplier: f64) -> Self {
Self {
inner,
active_weight,
multiplier: if multiplier > 0.0 { multiplier } else { 1.0 },
last_factor: Mutex::new(HashMap::new()),
}
}

/// Compute per-proposer round-time observations from the history.
///
/// History is ordered newest-first: `history[0]` is the latest block.
/// For each consecutive pair `(newer, older)` within the same epoch we compute
/// `interval = newer.proposed_time() - older.proposed_time()` and attribute it as follows:
/// * If `newer.failed_proposer_indices()` is empty the pair represents a clean
/// consecutive-round commit: split the interval 50/50 between `newer.proposer()` and
/// `older.proposer()`. Both contributed to closing the round (the older proposed it,
/// the newer aggregated votes and built the next proposal), so each absorbs half.
/// * If `newer.failed_proposer_indices()` is non-empty the gap absorbed one or more
/// timeouts: divide the full interval equally among the failed proposers (resolved via
/// `epoch_to_candidates`) and attribute none of it to `newer`/`older`. The healthy
/// adjacent proposers should not be penalized for absorbing someone else's timeout.
fn compute_round_times(
history: &[NewBlockEvent],
epoch_to_candidates: &HashMap<u64, Vec<Author>>,
) -> HashMap<Author, Vec<u64>> {
let mut round_times: HashMap<Author, Vec<u64>> = HashMap::new();
for i in 0..history.len().saturating_sub(1) {
let newer = &history[i];
let older = &history[i + 1];
// Only compute within the same epoch to avoid epoch-boundary outliers.
if newer.epoch() != older.epoch() {
continue;
}
let interval = newer.proposed_time().saturating_sub(older.proposed_time());
if interval == 0 {
continue;
}

let failed = newer.failed_proposer_indices();
if failed.is_empty() {
// Successful pair: 50/50 split between the two adjacent proposers.
let half = interval / 2;
if half > 0 {
round_times.entry(newer.proposer()).or_default().push(half);
round_times.entry(older.proposer()).or_default().push(half);
}
} else {
// Timeout-spanning pair: attribute the full gap to the failed proposer(s).
let candidates = match epoch_to_candidates.get(&newer.epoch()) {
Some(c) => c,
None => continue,
};
let per_failure = interval / failed.len() as u64;
if per_failure > 0 {
for &idx in failed {
if let Some(author) = candidates.get(idx as usize) {
round_times.entry(*author).or_default().push(per_failure);
}
}
}
}
}
round_times
}
}

impl ReputationHeuristic for LatencyWeightedHeuristic {
fn get_weights(
&self,
epoch: u64,
epoch_to_candidates: &HashMap<u64, Vec<Author>>,
history: &[NewBlockEvent],
) -> Vec<u64> {
let base_weights = self.inner.get_weights(epoch, epoch_to_candidates, history);

let round_times = Self::compute_round_times(history, epoch_to_candidates);

// Per-validator mean round time, computed only for validators with enough data.
let means: HashMap<Author, u64> = round_times
.iter()
.filter(|(_, v)| v.len() >= MIN_OBSERVATIONS)
.map(|(a, v)| (*a, v.iter().sum::<u64>() / v.len() as u64))
.collect();

// Median of observed means is the reference point. Below median: no penalty.
// Above median: penalty proportional to ratio. We use median (rather than max)
// so that the slowest validator gets the LARGEST penalty rather than the base
// weight, and so that small variations among healthy validators do not
// exponentially amplify (the failure mode that made the old `max_mean / val_mean`
// formula fragile at multiplier > 2).
let median_mean = compute_median(&means);

let mut last_factor = self.last_factor.lock();

epoch_to_candidates[&epoch]
.iter()
.zip(base_weights.iter())
.map(|(author, &base)| {
// Only adjust the weight for validators that received the active weight.
// Inactive / failed-classified validators keep their base weight (the
// inner classifier is already handling those).
if base != self.active_weight {
return base;
}

let factor = match (median_mean, means.get(author)) {
(Some(median), Some(&val_mean)) if median > 0 && val_mean > 0 => {
// Asymmetric penalty: validators at or below median are unchanged
// (max(1.0) clamps the ratio), validators above median are
// penalized by 1 / ratio^multiplier.
let ratio = (val_mean as f64 / median as f64).clamp(1.0, MAX_LATENCY_RATIO);
let f = 1.0 / ratio.powf(self.multiplier);
// Persist for carry-forward when this validator next has no obs.
last_factor.insert(*author, f);
f
},
// No fresh observations (or degenerate median): apply carry-forward
// factor if we have one for this validator, else 1.0 (benefit of doubt
// for never-before-seen / newly-rotated-in validators).
_ => last_factor.get(author).copied().unwrap_or(1.0),
};

(self.active_weight as f64 * factor) as u64
})
.collect()
}
}

/// Median of the observed per-validator means. `None` if there are no observations.
fn compute_median(means: &HashMap<Author, u64>) -> Option<u64> {
if means.is_empty() {
return None;
}
let mut vals: Vec<u64> = means.values().copied().collect();
vals.sort_unstable();
let n = vals.len();
Some(
if n.is_multiple_of(2) {
(vals[n / 2 - 1] + vals[n / 2]) / 2
} else {
vals[n / 2]
},
)
}

/// Committed history based proposer election implementation that could help bias towards
/// successful leaders to help improve performance.
pub struct LeaderReputation {
Expand Down
Loading
Loading