diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 462e1d45dd7..122cd90d216 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -49,7 +49,7 @@ use aptos_types::{ epoch_state::EpochState, on_chain_config::{ AnchorElectionMode, DagConsensusConfigV1, - LeaderReputationType::{ProposerAndVoter, ProposerAndVoterV2}, + LeaderReputationType::{ProposerAndVoter, ProposerAndVoterV2, ProposerAndVoterV3}, OnChainJWKConsensusConfig, OnChainRandomnessConfig, ProposerAndVoterConfig, ValidatorTxnConfig, }, @@ -479,6 +479,24 @@ impl DagBootstrapper { self.build_leader_reputation_components(config), ) }, + // V3 is V2 plus the latency-weighted gate. The DAG path does not yet + // wire `LatencyWeightedHeuristic` into its anchor-election plumbing, + // so we use the V3 base config and ignore the latency-weighted toggle. + // TODO(consensus): plumb LatencyWeightedHeuristic into DAG when needed. + ProposerAndVoterV3(config) => { + let base = &config.base; + let commit_events = self + .storage + .get_latest_k_committed_events( + std::cmp::max( + base.proposer_window_num_validators_multiplier, + base.voter_window_num_validators_multiplier, + ) as u64 + * self.epoch_state.verifier.len() as u64, + ) + .expect("Failed to read commit events from storage"); + (commit_events, self.build_leader_reputation_components(base)) + }, ProposerAndVoter(_) => unreachable!("unsupported mode"), }; diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 6304a8811a7..7a897f64731 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -14,7 +14,7 @@ use crate::{ liveness::{ cached_proposer_election::CachedProposerElection, leader_reputation::{ - extract_epoch_to_proposers, AptosDBBackend, LeaderReputation, + extract_epoch_to_proposers, AptosDBBackend, LatencyWeightedHeuristic, LeaderReputation, ProposerAndVoterHeuristic, ReputationHeuristic, }, proposal_generator::{ @@ -101,11 +101,10 @@ use aptos_types::{ epoch_state::EpochState, jwks::SupportedOIDCProviders, on_chain_config::{ - ChunkyDKGConfigMoveStruct, ChunkyDKGConfigSeqNum, Features, LeaderReputationType, - OnChainChunkyDKGConfig, OnChainConfigPayload, OnChainConfigProvider, - OnChainConsensusConfig, OnChainExecutionConfig, OnChainJWKConsensusConfig, - OnChainRandomnessConfig, ProposerElectionType, RandomnessConfigMoveStruct, - RandomnessConfigSeqNum, ValidatorSet, + ChunkyDKGConfigMoveStruct, ChunkyDKGConfigSeqNum, Features, OnChainChunkyDKGConfig, + OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig, + OnChainExecutionConfig, OnChainJWKConsensusConfig, OnChainRandomnessConfig, + ProposerElectionType, RandomnessConfigMoveStruct, RandomnessConfigSeqNum, ValidatorSet, }, randomness::{RandKeys, WvufPP, WVUF}, secret_sharing::SecretShareConfig, @@ -320,37 +319,40 @@ impl EpochManager

{ Arc::new(RotatingProposer::new(vec![proposer], *contiguous_rounds)) }, ProposerElectionType::LeaderReputation(leader_reputation_type) => { - let ( - heuristic, - window_size, - weight_by_voting_power, - use_history_from_previous_epoch_max_count, - ) = match &leader_reputation_type { - LeaderReputationType::ProposerAndVoter(proposer_and_voter_config) - | LeaderReputationType::ProposerAndVoterV2(proposer_and_voter_config) => { - let proposer_window_size = proposers.len() - * proposer_and_voter_config.proposer_window_num_validators_multiplier; - let voter_window_size = proposers.len() - * proposer_and_voter_config.voter_window_num_validators_multiplier; - let heuristic: Box = - Box::new(ProposerAndVoterHeuristic::new( - self.author, - proposer_and_voter_config.active_weight, - proposer_and_voter_config.inactive_weight, - proposer_and_voter_config.failed_weight, - proposer_and_voter_config.failure_threshold_percent, - voter_window_size, - proposer_window_size, - leader_reputation_type.use_reputation_window_from_stale_end(), - )); - ( - heuristic, - std::cmp::max(proposer_window_size, voter_window_size), - proposer_and_voter_config.weight_by_voting_power, - proposer_and_voter_config.use_history_from_previous_epoch_max_count, - ) - }, + // Pull base parameters and the latency-weighted gate out of the on-chain + // config in a version-agnostic way. V3 carries the latency settings; V1/V2 + // return false for `use_latency_weighted` so behavior is unchanged for them. + let params = leader_reputation_type.proposer_and_voter_params(); + let proposer_and_voter_config = params.base; + let proposer_window_size = proposers.len() + * proposer_and_voter_config.proposer_window_num_validators_multiplier; + let voter_window_size = proposers.len() + * proposer_and_voter_config.voter_window_num_validators_multiplier; + let inner_heuristic = ProposerAndVoterHeuristic::new( + self.author, + proposer_and_voter_config.active_weight, + proposer_and_voter_config.inactive_weight, + proposer_and_voter_config.failed_weight, + proposer_and_voter_config.failure_threshold_percent, + voter_window_size, + proposer_window_size, + leader_reputation_type.use_reputation_window_from_stale_end(), + ); + let heuristic: Box = if params.use_latency_weighted { + // Decode milli-multiplier (1000 = 1.0×) deterministically. + let multiplier = params.latency_weight_multiplier_milli as f64 / 1000.0; + Box::new(LatencyWeightedHeuristic::new( + inner_heuristic, + proposer_and_voter_config.active_weight, + multiplier, + )) + } else { + Box::new(inner_heuristic) }; + let window_size = std::cmp::max(proposer_window_size, voter_window_size); + let weight_by_voting_power = proposer_and_voter_config.weight_by_voting_power; + let use_history_from_previous_epoch_max_count = + proposer_and_voter_config.use_history_from_previous_epoch_max_count; let seek_len = onchain_config.leader_reputation_exclude_round() as usize + onchain_config.max_failed_authors_to_store() diff --git a/consensus/src/liveness/leader_reputation.rs b/consensus/src/liveness/leader_reputation.rs index e9f7100a80e..d3599f7dfca 100644 --- a/consensus/src/liveness/leader_reputation.rs +++ b/consensus/src/liveness/leader_reputation.rs @@ -552,6 +552,217 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic { } } +/// A heuristic that wraps `ProposerAndVoterHeuristic` but additionally scales active-validator +/// weights down based on their historical round-time performance. Validators with slow round +/// times (i.e. those that take longer to produce a committed block, including timeouts +/// attributed to them via `failed_proposer_indices`) get a proportionally lower chance of +/// being selected as leader. Healthy validators are not boosted — they keep their base +/// active weight. +/// +/// ## Penalty formula +/// +/// For every active validator we compute the mean interval between consecutive committed +/// blocks in the history window: +/// * Successful pairs are split 50/50 between the two adjacent proposers. +/// * Timeout-spanning gaps are attributed in full to the failed proposers +/// (via `failed_proposer_indices`). +/// +/// We use the **median** of all observed per-validator means as the reference point. The +/// per-validator weight is: +/// +/// factor = 1.0 / max(1.0, val_mean / median_mean).clamp(_, MAX_LATENCY_RATIO).powf(multiplier) +/// weight = active_weight * factor +/// +/// Properties: +/// * Validators at or below the median: `factor = 1.0` (no penalty, no boost). This makes +/// the heuristic robust to small natural variation between healthy validators — at higher +/// multipliers, transient blips on a fast validator no longer cliff its weight. +/// * Validators above the median: penalized by `(val_mean / median_mean)^-multiplier`. +/// The slowest validator (which used to get the **base** weight under the old `max_mean / +/// val_mean` formula) now gets the **lowest** weight — closer to the original intent. +/// * Penalty ratio is clamped at `MAX_LATENCY_RATIO` to bound the suppression for a single +/// anomalously-slow validator. +/// +/// ## Carry-forward for validators with no observations +/// +/// A validator with `< MIN_OBSERVATIONS` round-time samples in the window does NOT fall back +/// to the base active weight (the previous behavior). Instead, we apply the **last computed +/// factor** for that validator (stored in `last_factor`). This breaks the oscillation cycle +/// where a successfully-suppressed slow validator would pop back to full weight as soon as +/// our successful suppression denied it observations: +/// +/// * fresh observations → recompute factor, store it +/// * no fresh observations → carry-forward the previous factor +/// * never seen before → factor = 1.0 (benefit of doubt for newly-rotated-in validators) +/// +/// State is per-epoch (the heuristic is reconstructed on epoch change), so cross-epoch +/// rotation is naturally handled. +pub struct LatencyWeightedHeuristic { + inner: ProposerAndVoterHeuristic, + active_weight: u64, + multiplier: f64, + /// Carry-forward state: last computed weight factor per author (guarded for &self + /// access). Mutated only inside `get_weights`. + last_factor: Mutex>, +} + +/// Minimum number of round-time observations needed before a validator is scaled by the +/// latency-weighted heuristic; below this we apply the carry-forward factor (or 1.0 for +/// validators we have never observed). +const MIN_OBSERVATIONS: usize = 2; + +/// Hard ceiling on the per-validator scaling ratio (`val_mean / median_mean`) used to +/// bound how aggressively a single anomalously-slow validator can be suppressed. With +/// multiplier=2.0 and MAX_LATENCY_RATIO=10, the minimum factor is 1/100 = 0.01. +const MAX_LATENCY_RATIO: f64 = 10.0; + +impl LatencyWeightedHeuristic { + pub fn new(inner: ProposerAndVoterHeuristic, active_weight: u64, multiplier: f64) -> Self { + Self { + inner, + active_weight, + multiplier: if multiplier > 0.0 { multiplier } else { 1.0 }, + last_factor: Mutex::new(HashMap::new()), + } + } + + /// Compute per-proposer round-time observations from the history. + /// + /// History is ordered newest-first: `history[0]` is the latest block. + /// For each consecutive pair `(newer, older)` within the same epoch we compute + /// `interval = newer.proposed_time() - older.proposed_time()` and attribute it as follows: + /// * If `newer.failed_proposer_indices()` is empty the pair represents a clean + /// consecutive-round commit: split the interval 50/50 between `newer.proposer()` and + /// `older.proposer()`. Both contributed to closing the round (the older proposed it, + /// the newer aggregated votes and built the next proposal), so each absorbs half. + /// * If `newer.failed_proposer_indices()` is non-empty the gap absorbed one or more + /// timeouts: divide the full interval equally among the failed proposers (resolved via + /// `epoch_to_candidates`) and attribute none of it to `newer`/`older`. The healthy + /// adjacent proposers should not be penalized for absorbing someone else's timeout. + fn compute_round_times( + history: &[NewBlockEvent], + epoch_to_candidates: &HashMap>, + ) -> HashMap> { + let mut round_times: HashMap> = HashMap::new(); + for i in 0..history.len().saturating_sub(1) { + let newer = &history[i]; + let older = &history[i + 1]; + // Only compute within the same epoch to avoid epoch-boundary outliers. + if newer.epoch() != older.epoch() { + continue; + } + let interval = newer.proposed_time().saturating_sub(older.proposed_time()); + if interval == 0 { + continue; + } + + let failed = newer.failed_proposer_indices(); + if failed.is_empty() { + // Successful pair: 50/50 split between the two adjacent proposers. + let half = interval / 2; + if half > 0 { + round_times.entry(newer.proposer()).or_default().push(half); + round_times.entry(older.proposer()).or_default().push(half); + } + } else { + // Timeout-spanning pair: attribute the full gap to the failed proposer(s). + let candidates = match epoch_to_candidates.get(&newer.epoch()) { + Some(c) => c, + None => continue, + }; + let per_failure = interval / failed.len() as u64; + if per_failure > 0 { + for &idx in failed { + if let Some(author) = candidates.get(idx as usize) { + round_times.entry(*author).or_default().push(per_failure); + } + } + } + } + } + round_times + } +} + +impl ReputationHeuristic for LatencyWeightedHeuristic { + fn get_weights( + &self, + epoch: u64, + epoch_to_candidates: &HashMap>, + history: &[NewBlockEvent], + ) -> Vec { + let base_weights = self.inner.get_weights(epoch, epoch_to_candidates, history); + + let round_times = Self::compute_round_times(history, epoch_to_candidates); + + // Per-validator mean round time, computed only for validators with enough data. + let means: HashMap = round_times + .iter() + .filter(|(_, v)| v.len() >= MIN_OBSERVATIONS) + .map(|(a, v)| (*a, v.iter().sum::() / v.len() as u64)) + .collect(); + + // Median of observed means is the reference point. Below median: no penalty. + // Above median: penalty proportional to ratio. We use median (rather than max) + // so that the slowest validator gets the LARGEST penalty rather than the base + // weight, and so that small variations among healthy validators do not + // exponentially amplify (the failure mode that made the old `max_mean / val_mean` + // formula fragile at multiplier > 2). + let median_mean = compute_median(&means); + + let mut last_factor = self.last_factor.lock(); + + epoch_to_candidates[&epoch] + .iter() + .zip(base_weights.iter()) + .map(|(author, &base)| { + // Only adjust the weight for validators that received the active weight. + // Inactive / failed-classified validators keep their base weight (the + // inner classifier is already handling those). + if base != self.active_weight { + return base; + } + + let factor = match (median_mean, means.get(author)) { + (Some(median), Some(&val_mean)) if median > 0 && val_mean > 0 => { + // Asymmetric penalty: validators at or below median are unchanged + // (max(1.0) clamps the ratio), validators above median are + // penalized by 1 / ratio^multiplier. + let ratio = (val_mean as f64 / median as f64).clamp(1.0, MAX_LATENCY_RATIO); + let f = 1.0 / ratio.powf(self.multiplier); + // Persist for carry-forward when this validator next has no obs. + last_factor.insert(*author, f); + f + }, + // No fresh observations (or degenerate median): apply carry-forward + // factor if we have one for this validator, else 1.0 (benefit of doubt + // for never-before-seen / newly-rotated-in validators). + _ => last_factor.get(author).copied().unwrap_or(1.0), + }; + + (self.active_weight as f64 * factor) as u64 + }) + .collect() + } +} + +/// Median of the observed per-validator means. `None` if there are no observations. +fn compute_median(means: &HashMap) -> Option { + if means.is_empty() { + return None; + } + let mut vals: Vec = means.values().copied().collect(); + vals.sort_unstable(); + let n = vals.len(); + Some( + if n.is_multiple_of(2) { + (vals[n / 2 - 1] + vals[n / 2]) / 2 + } else { + vals[n / 2] + }, + ) +} + /// Committed history based proposer election implementation that could help bias towards /// successful leaders to help improve performance. pub struct LeaderReputation { diff --git a/consensus/src/liveness/leader_reputation_test.rs b/consensus/src/liveness/leader_reputation_test.rs index 2fa87634d58..082cf851e9a 100644 --- a/consensus/src/liveness/leader_reputation_test.rs +++ b/consensus/src/liveness/leader_reputation_test.rs @@ -2,7 +2,8 @@ // Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE use super::leader_reputation::{ - extract_epoch_to_proposers_impl, AptosDBBackend, ProposerAndVoterHeuristic, + extract_epoch_to_proposers_impl, AptosDBBackend, LatencyWeightedHeuristic, + ProposerAndVoterHeuristic, }; use crate::liveness::{ leader_reputation::{ @@ -762,3 +763,288 @@ fn test_extract_epoch_to_proposers_impl() { .unwrap() ); } + +/// #### LatencyWeightedHeuristic tests #### + +/// Build a `NewBlockEvent` with the given proposer, timestamp, round, epoch, and failed +/// proposer indices. Other fields are set to deterministic defaults. +fn make_block_event( + proposer: Author, + epoch: u64, + round: u64, + timestamp_us: u64, + failed_proposer_indices: Vec, +) -> NewBlockEvent { + NewBlockEvent::new( + AccountAddress::ZERO, + epoch, + round, + round, + BitVec::with_num_bits(1).into(), + proposer, + failed_proposer_indices, + timestamp_us, + ) +} + +/// Build a `LatencyWeightedHeuristic` wrapping a `ProposerAndVoterHeuristic` configured so +/// that all candidates with successful proposals receive `active_weight = 1000`. +fn make_latency_weighted_heuristic(self_author: Author) -> LatencyWeightedHeuristic { + // Wide windows + low failure threshold so test fixtures do not accidentally trigger the + // failed/inactive branches in the inner heuristic — we want to exercise the latency + // scaling on top of `active_weight`. + let inner = ProposerAndVoterHeuristic::new(self_author, 1000, 10, 1, 50, 100, 100, false); + LatencyWeightedHeuristic::new(inner, 1000, 1.0) +} + +#[test] +fn test_latency_weighted_50_50_split_equal_intervals() { + // With four validators and four successful blocks at uniform 100µs intervals, every + // validator's mean is identical, so weights collapse to `active_weight`. + let validators: Vec = (0..4).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // History is newest-first: rounds 4, 3, 2, 1 at timestamps 400, 300, 200, 100. + let history = vec![ + make_block_event(validators[3], 0, 4, 400, vec![]), + make_block_event(validators[2], 0, 3, 300, vec![]), + make_block_event(validators[1], 0, 2, 200, vec![]), + make_block_event(validators[0], 0, 1, 100, vec![]), + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // All means equal → ratio is 1 → all weights stay at active_weight = 1000. + assert_eq!(weights, vec![1000, 1000, 1000, 1000]); +} + +#[test] +fn test_latency_weighted_failure_attributed_to_failed_proposer() { + // Three validators V0, V1, V2. V1 (index 1) fails round 3; V2 commits round 4 to rescue. + // The 600µs gap between V0's commit (round 2) and V2's commit (round 4) absorbs V1's + // timeout and must be attributed to V1, not to V2. + let validators: Vec = (0..3).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // Newest-first. + let history = vec![ + make_block_event(validators[2], 0, 6, 1000, vec![]), // round 6 + make_block_event(validators[1], 0, 5, 900, vec![]), // round 5 (V1 succeeds here) + make_block_event(validators[0], 0, 4, 800, vec![]), // round 4 + make_block_event(validators[2], 0, 4, 700, vec![1]), // round 4 rescue, V1 failed round 3 + make_block_event(validators[0], 0, 2, 100, vec![]), // round 2 + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // V1 should have the largest mean (absorbed the 600µs failure entry) and therefore the + // smallest weight. V0 and V2 should be boosted relative to V1. + assert!(weights[1] < weights[0]); + assert!(weights[1] < weights[2]); +} + +#[test] +fn test_latency_weighted_multiple_failed_proposers_split_gap() { + // Two failed proposers in a single gap: the interval is split equally between them. + let validators: Vec = (0..4).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // Round 5 commits with both V1 (idx 1) and V2 (idx 2) listed as failed; gap of 1000µs. + let history = vec![ + make_block_event(validators[3], 0, 6, 1100, vec![]), + make_block_event(validators[0], 0, 5, 1000, vec![1, 2]), + make_block_event(validators[3], 0, 2, 0, vec![]), + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // Both failed proposers should be down-weighted relative to V0/V3. + assert!(weights[1] < weights[0]); + assert!(weights[2] < weights[0]); + assert!(weights[1] < weights[3]); + assert!(weights[2] < weights[3]); +} + +#[test] +fn test_latency_weighted_per_validator_fallback() { + // V0 has no observations (it is in the candidate set but never appears in history). + // The heuristic must NOT fall back globally just because V0 lacks data — V1 should + // still be scaled relative to V2 even though V0 has nothing to compute from. + let validators: Vec = (0..3).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // V1 has fast intervals (~50µs/entry under 50/50), V2 has slow intervals (~500µs/entry). + // V0 never appears as proposer. + let history = vec![ + make_block_event(validators[2], 0, 10, 5000, vec![]), + make_block_event(validators[2], 0, 8, 4000, vec![]), + make_block_event(validators[2], 0, 6, 3000, vec![]), + make_block_event(validators[1], 0, 4, 200, vec![]), + make_block_event(validators[1], 0, 3, 150, vec![]), + make_block_event(validators[1], 0, 2, 100, vec![]), + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // V1 is faster than V2, so V1's weight must be strictly larger than V2's. + // If the heuristic had fallen back globally because of V0's empty data, V1 == V2. + assert!( + weights[1] > weights[2], + "expected V1 boosted over V2 despite V0 lacking observations; got {:?}", + weights, + ); +} + +#[test] +fn test_latency_weighted_empty_history_falls_back_to_base() { + // No history → no observations → base weights returned unchanged. + let validators: Vec = (0..2).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + let history: Vec = vec![]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // Both validators still get base weight (inactive_weight in this case since no proposals). + assert_eq!(weights.len(), 2); +} + +#[test] +fn test_latency_weighted_max_ratio_clamp() { + // Three validators, all classified active. V1 has a huge mean from absorbing + // failure attributions, while V0 and V2 have small means from successful pairs. + // The raw penalty ratio (V1_mean / median_mean) is much larger than + // MAX_LATENCY_RATIO=10 and must be clamped → V1 weight floors at + // active_weight / 10 = 100 (with multiplier=1.0). V0 and V2 stay at base 1000. + let validators: Vec = (0..3).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // History (newest first): + // - 3 huge V1-failure attributions (gap ≈ 10000 each) — pushes V1 mean way up. + // - 3 V1 successful proposals at small intervals — keeps V1 under the 50% + // failure threshold so it stays classified active. + // - 6 alternating V0/V2 successful proposals at small intervals — both V0 and + // V2 accumulate enough small-mean observations to set the median. + let history = vec![ + make_block_event(validators[0], 0, 18, 1_030_000, vec![1]), + make_block_event(validators[0], 0, 16, 1_020_000, vec![1]), + make_block_event(validators[0], 0, 14, 1_010_000, vec![1]), + make_block_event(validators[1], 0, 12, 1_000_030, vec![]), + make_block_event(validators[1], 0, 11, 1_000_020, vec![]), + make_block_event(validators[1], 0, 10, 1_000_010, vec![]), + make_block_event(validators[2], 0, 9, 1_000_005, vec![]), + make_block_event(validators[0], 0, 8, 1_000_000, vec![]), + make_block_event(validators[2], 0, 7, 999_995, vec![]), + make_block_event(validators[0], 0, 6, 999_990, vec![]), + make_block_event(validators[2], 0, 5, 999_985, vec![]), + make_block_event(validators[0], 0, 4, 999_980, vec![]), + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(0, &epoch_to_validators, &history); + + // V0, V2 (healthy, near median): no penalty, weight = active_weight = 1000. + assert_eq!( + weights[0], 1000, + "expected V0 to keep base active_weight; got {:?}", + weights, + ); + assert_eq!( + weights[2], 1000, + "expected V2 to keep base active_weight; got {:?}", + weights, + ); + // V1 (very slow): penalty clamped at MAX_LATENCY_RATIO=10, multiplier=1.0 + // → minimum factor = 1/10 → weight = active_weight / 10 = 100. + assert_eq!( + weights[1], 100, + "expected V1 penalty clamped at 1/10 of active_weight; got {:?}", + weights, + ); +} + +#[test] +fn test_latency_weighted_carry_forward_for_unobserved_validator() { + // Verify carry-forward semantics. First call: V1 has slow observations and gets + // penalized. Second call: V1 has too few observations to recompute → must retain + // the previous penalty rather than jumping back to base weight (which would + // create the oscillation that the carry-forward is designed to prevent). + let validators: Vec = (0..2).map(|_| Author::random()).collect(); + let epoch_to_validators = HashMap::from([(0u64, validators.clone())]); + + // First call: V0 and V1 alternate as successful proposers; one gap is attributed + // to V1 via failed_proposer_indices. V1 has a single failure (3 proposals + // succeed, 1 fails → 25% < 50% threshold → active) but its mean is still + // dominated by the 4500µs failure attribution → meaningful penalty. + let history_with_obs = vec![ + make_block_event(validators[0], 0, 8, 5000, vec![1]), + make_block_event(validators[1], 0, 6, 500, vec![]), + make_block_event(validators[0], 0, 5, 100, vec![]), + make_block_event(validators[1], 0, 4, 50, vec![]), + make_block_event(validators[0], 0, 3, 25, vec![]), + make_block_event(validators[1], 0, 2, 10, vec![]), + ]; + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights1 = heuristic.get_weights(0, &epoch_to_validators, &history_with_obs); + + // V1 (slow) is below base; V0 (healthy, below median) stays at base. + assert_eq!( + weights1[0], 1000, + "V0 base preserved in first call; got {:?}", + weights1 + ); + assert!( + weights1[1] < 1000, + "V1 should be penalized in first call; got {:?}", + weights1, + ); + let v1_first_weight = weights1[1]; + + // Second call: just one pair → both V0 and V1 get only 1 round-time observation + // (below MIN_OBSERVATIONS=2). Both fall through to carry-forward. Both are still + // classified active because they have a successful proposal in this window. + let history_no_v1_obs = vec![ + make_block_event(validators[1], 0, 1, 100, vec![]), + make_block_event(validators[0], 0, 0, 50, vec![]), + ]; + let weights2 = heuristic.get_weights(0, &epoch_to_validators, &history_no_v1_obs); + + // V0's stored factor was 1.0 → carry-forward restores base. + assert_eq!( + weights2[0], 1000, + "V0 carry-forward at base; got {:?}", + weights2 + ); + // V1's stored factor was the first-call penalty → must be preserved, NOT reset. + assert_eq!( + weights2[1], v1_first_weight, + "carry-forward must preserve V1's penalty; got weights {:?}, expected V1={}", + weights2, v1_first_weight, + ); +} + +#[test] +fn test_latency_weighted_skips_cross_epoch_pairs() { + // A pair spanning an epoch boundary must not contribute to round_times. + let validators: Vec = (0..2).map(|_| Author::random()).collect(); + let epoch_to_validators = + HashMap::from([(0u64, validators.clone()), (1u64, validators.clone())]); + + // Two events: one in epoch 0, one in epoch 1. Cross-epoch pair must be skipped. + let history = vec![ + make_block_event(validators[1], 1, 1, 1_000_000, vec![]), // epoch 1 + make_block_event(validators[0], 0, 5, 500, vec![]), // epoch 0 + ]; + + let heuristic = make_latency_weighted_heuristic(validators[0]); + let weights = heuristic.get_weights(1, &epoch_to_validators, &history); + + // No same-epoch pairs → empty round_times → fall back to base weights. + // Both validators receive whatever base weight the inner heuristic produced; the latency + // scaling has no effect. + assert_eq!(weights.len(), 2); +} diff --git a/testsuite/forge-cli/src/suites/land_blocking.rs b/testsuite/forge-cli/src/suites/land_blocking.rs index 024c393a17c..770f325e912 100644 --- a/testsuite/forge-cli/src/suites/land_blocking.rs +++ b/testsuite/forge-cli/src/suites/land_blocking.rs @@ -29,7 +29,7 @@ pub(crate) fn get_land_blocking_test( let test = match test_name { "land_blocking" | "realistic_env_max_load" => { realistic_env_max_load_test(duration, test_cmd, 7, 0, 0) - .with_duration_override(Duration::from_secs(1200)) + .with_duration_override(Duration::from_secs(900)) }, "compat" => compat(), "framework_upgrade" => framework_upgrade(), diff --git a/testsuite/forge-cli/src/suites/realistic_environment.rs b/testsuite/forge-cli/src/suites/realistic_environment.rs index 683643cec34..d76f3ae5d5b 100644 --- a/testsuite/forge-cli/src/suites/realistic_environment.rs +++ b/testsuite/forge-cli/src/suites/realistic_environment.rs @@ -20,8 +20,10 @@ use aptos_forge::{ TransactionType, }; use aptos_sdk::types::on_chain_config::{ - BlockGasLimitType, FeatureFlag, Features, OnChainChunkyDKGConfig, OnChainConsensusConfig, - OnChainExecutionConfig, OnChainRandomnessConfig, TransactionShufflerType, + BlockGasLimitType, ConsensusAlgorithmConfig, FeatureFlag, Features, LeaderReputationType, + OnChainChunkyDKGConfig, OnChainConsensusConfig, OnChainExecutionConfig, + OnChainRandomnessConfig, ProposerAndVoterConfig, ProposerAndVoterConfigV3, + ProposerElectionType, TransactionShufflerType, }; use aptos_testcases::{ load_vs_perf_benchmark::{LoadVsPerfBenchmark, TransactionWorkload, Workloads}, @@ -452,9 +454,42 @@ pub(crate) fn realistic_env_max_load_test( .with_genesis_helm_config_fn(Arc::new(move |helm_values| { // No epoch change so measurements are stable. helm_values["chain"]["epoch_duration_secs"] = (24 * 3600).into(); + // Use ProposerAndVoterV3 to enable the latency-weighted heuristic via on-chain + // config (so all validators deterministically agree on the leader schedule — + // node-local toggles would fork during partial rollout). + // + // - failed_weight: 1 -> 0 so a validator classified failed is excluded + // entirely from leader selection (matches the classifier-only PR for a + // clean apples-to-apples comparison of heuristic contribution). + // - failure_threshold_percent: 10 -> 5 so V6's true 10% rate clearly exceeds + // the threshold (prevents oscillation at the boundary). + // - proposer_window_num_validators_multiplier: 10 -> 100 for statistical + // stability: 100 proposals per validator gives reliable latency estimates. + let mut consensus_config = OnChainConsensusConfig::default_for_genesis(); + if let OnChainConsensusConfig::V5 { + alg: ConsensusAlgorithmConfig::JolteonV2 { ref mut main, .. }, + .. + } = consensus_config + { + main.proposer_election_type = ProposerElectionType::LeaderReputation( + LeaderReputationType::ProposerAndVoterV3(ProposerAndVoterConfigV3 { + base: ProposerAndVoterConfig { + active_weight: 1000, + inactive_weight: 10, + failed_weight: 0, // bumped from 1: classified-failed -> banned + failure_threshold_percent: 5, // bumped from 10: clear of V6's true 10% + proposer_window_num_validators_multiplier: 100, // bumped from 10 + voter_window_num_validators_multiplier: 1, + weight_by_voting_power: true, + use_history_from_previous_epoch_max_count: 5, + }, + use_latency_weighted: true, + latency_weight_multiplier_milli: 1000, // 1.0× — gentle suppression of slow leaders + }), + ); + } helm_values["chain"]["on_chain_consensus_config"] = - serde_yaml::to_value(OnChainConsensusConfig::default_for_genesis()) - .expect("must serialize"); + serde_yaml::to_value(consensus_config).expect("must serialize"); helm_values["chain"]["on_chain_execution_config"] = serde_yaml::to_value(OnChainExecutionConfig::default_for_genesis()) .expect("must serialize"); diff --git a/testsuite/forge/src/runner.rs b/testsuite/forge/src/runner.rs index fbdb1c6d1d8..0771323f269 100644 --- a/testsuite/forge/src/runner.rs +++ b/testsuite/forge/src/runner.rs @@ -311,7 +311,8 @@ impl<'cfg, F: Factory> Forge<'cfg, F> { &initial_version, &genesis_version, self.tests.genesis_config.as_ref(), - self.tests.duration_override.unwrap_or(self.global_duration) + Duration::from_secs(NAMESPACE_CLEANUP_DURATION_BUFFER_SECS), + self.tests.duration_override.unwrap_or(self.global_duration) + + Duration::from_secs(NAMESPACE_CLEANUP_DURATION_BUFFER_SECS), self.tests.genesis_helm_config_fn.clone(), self.tests.build_node_helm_config_fn(retain_debug_logs), self.tests.existing_db_tag.clone(), @@ -343,10 +344,7 @@ impl<'cfg, F: Factory> Forge<'cfg, F> { let logs_location = swarm.logs_location(); let swarm = Arc::new(tokio::sync::RwLock::new(swarm)); - let effective_duration = self - .tests - .duration_override - .unwrap_or(self.global_duration); + let effective_duration = self.tests.duration_override.unwrap_or(self.global_duration); for test in self.filter_tests(&self.tests.network_tests) { let network_ctx = NetworkContext::new( CoreContext::from_rng(&mut rng), diff --git a/testsuite/smoke-test/src/aptos_cli/validator.rs b/testsuite/smoke-test/src/aptos_cli/validator.rs index c7403e5f656..b6b7c04f32a 100644 --- a/testsuite/smoke-test/src/aptos_cli/validator.rs +++ b/testsuite/smoke-test/src/aptos_cli/validator.rs @@ -225,6 +225,7 @@ async fn test_onchain_config_change() { LeaderReputationType::ProposerAndVoterV2(proposer_and_voter_config) => { proposer_and_voter_config }, + LeaderReputationType::ProposerAndVoterV3(config) => &config.base, }; let new_consensus_config = OnChainConsensusConfig::V1(ConsensusConfigV1 { proposer_election_type: ProposerElectionType::LeaderReputation( @@ -278,7 +279,10 @@ async fn test_onchain_config_change() { panic!() }; let proposer_and_voter_config = match &leader_reputation_type { - LeaderReputationType::ProposerAndVoterV2(_) => panic!(), + LeaderReputationType::ProposerAndVoterV2(_) + | LeaderReputationType::ProposerAndVoterV3(_) => { + panic!() + }, LeaderReputationType::ProposerAndVoter(proposer_and_voter_config) => { proposer_and_voter_config }, diff --git a/testsuite/smoke-test/src/state_sync.rs b/testsuite/smoke-test/src/state_sync.rs index b7d07075e24..a0057b3514d 100644 --- a/testsuite/smoke-test/src/state_sync.rs +++ b/testsuite/smoke-test/src/state_sync.rs @@ -682,8 +682,11 @@ async fn test_validator_sync_and_participate(fast_sync: bool, epoch_changes: boo LeaderReputationType::ProposerAndVoterV2(proposer_and_voter_config) => { proposer_and_voter_config }, + // V3 is V2 plus the latency-weighted gate; this state-sync test only cares + // about the base proposer/voter parameters. + LeaderReputationType::ProposerAndVoterV3(config) => &config.base, leader_reputation_type => panic!( - "This test requires a proposer and voter V2 leader reputation, but got: {:?}", + "This test requires a proposer and voter V2/V3 leader reputation, but got: {:?}", leader_reputation_type ), }; diff --git a/types/src/on_chain_config/consensus_config.rs b/types/src/on_chain_config/consensus_config.rs index 57c1e01e846..5c304171c25 100644 --- a/types/src/on_chain_config/consensus_config.rs +++ b/types/src/on_chain_config/consensus_config.rs @@ -516,6 +516,12 @@ pub enum LeaderReputationType { // * use reputation window from recent end // * unpredictable seed, based on root hash ProposerAndVoterV2(ProposerAndVoterConfig), + // Version 3: extends V2 with on-chain gating for the latency-weighted heuristic. + // Adds `use_latency_weighted` and `latency_weight_multiplier_milli` so all validators + // deterministically agree on whether to apply latency weighting and with what exponent. + // Without on-chain gating, validators on different node-local config values would + // compute different leader schedules and fork. + ProposerAndVoterV3(ProposerAndVoterConfigV3), } impl LeaderReputationType { @@ -528,6 +534,35 @@ impl LeaderReputationType { // all versions after V1 shouldn't use from stale end matches!(self, Self::ProposerAndVoter(_)) } + + /// Borrow the inner `ProposerAndVoter*` parameters in a version-agnostic way. Returns + /// the latency-weighted toggle and milli-multiplier alongside; both are zero/false for + /// V1/V2 since those versions predate the latency-weighted feature. + pub fn proposer_and_voter_params(&self) -> ProposerAndVoterParams<'_> { + match self { + Self::ProposerAndVoter(c) | Self::ProposerAndVoterV2(c) => ProposerAndVoterParams { + base: c, + use_latency_weighted: false, + latency_weight_multiplier_milli: 0, + }, + Self::ProposerAndVoterV3(c) => ProposerAndVoterParams { + base: &c.base, + use_latency_weighted: c.use_latency_weighted, + latency_weight_multiplier_milli: c.latency_weight_multiplier_milli, + }, + } + } +} + +/// Borrowed view over the parameters needed to construct a `LeaderReputation` heuristic, +/// abstracting over `LeaderReputationType` versions. See `proposer_and_voter_params`. +pub struct ProposerAndVoterParams<'a> { + pub base: &'a ProposerAndVoterConfig, + pub use_latency_weighted: bool, + /// Multiplier expressed in milli-units (1000 = 1.0×). Stored as integer for + /// deterministic BCS serialization across implementations. Ignored when + /// `use_latency_weighted` is false. + pub latency_weight_multiplier_milli: u32, } #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -555,6 +590,22 @@ pub struct ProposerAndVoterConfig { pub use_history_from_previous_epoch_max_count: u32, } +/// V3 leader-reputation parameters: V2's `ProposerAndVoterConfig` plus the on-chain gate +/// for the latency-weighted heuristic. New fields live on a new struct so existing on-chain +/// V1/V2 payloads continue to deserialize unchanged. +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ProposerAndVoterConfigV3 { + /// V2 parameters (selection weights, windows, etc.). + pub base: ProposerAndVoterConfig, + /// On-chain toggle for the latency-weighted heuristic that scales active validators by + /// per-validator mean round time. When false the heuristic behaves like V2. + pub use_latency_weighted: bool, + /// Exponent applied to the latency ratio, expressed in milli-units (1000 = 1.0×). + /// Integer-encoded for BCS determinism. Decoded as `value as f64 / 1000.0` at runtime. + /// Ignored when `use_latency_weighted` is false. + pub latency_weight_multiplier_milli: u32, +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] pub enum AnchorElectionMode { diff --git a/types/src/on_chain_config/mod.rs b/types/src/on_chain_config/mod.rs index 0f194969d31..274d3885947 100644 --- a/types/src/on_chain_config/mod.rs +++ b/types/src/on_chain_config/mod.rs @@ -46,8 +46,9 @@ pub use self::{ commit_history::CommitHistoryResource, consensus_config::{ AnchorElectionMode, ConsensusAlgorithmConfig, ConsensusConfigV1, DagConsensusConfigV1, - LeaderReputationType, OnChainConsensusConfig, ProposerAndVoterConfig, ProposerElectionType, - ValidatorTxnConfig, DEFAULT_ENABLED_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, + LeaderReputationType, OnChainConsensusConfig, ProposerAndVoterConfig, + ProposerAndVoterConfigV3, ProposerAndVoterParams, ProposerElectionType, ValidatorTxnConfig, + DEFAULT_ENABLED_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, }, execution_config::{ BlockGasLimitType, ExecutionConfigV1, ExecutionConfigV2, ExecutionConfigV4,