diff --git a/aptos-move/aptos-release-builder/data/example-release-with-randomness-framework/release.yaml b/aptos-move/aptos-release-builder/data/example-release-with-randomness-framework/release.yaml index 59a4fdfbd33..00534d58993 100644 --- a/aptos-move/aptos-release-builder/data/example-release-with-randomness-framework/release.yaml +++ b/aptos-move/aptos-release-builder/data/example-release-with-randomness-framework/release.yaml @@ -34,6 +34,7 @@ proposals: voter_window_num_validators_multiplier: 1 weight_by_voting_power: true use_history_from_previous_epoch_max_count: 5 + failure_window_num_validators_multiplier: 0 max_failed_authors_to_store: 10 quorum_store_enabled: true vtxn: diff --git a/aptos-move/aptos-release-builder/data/example.yaml b/aptos-move/aptos-release-builder/data/example.yaml index 456fd49f518..9a6abe19270 100644 --- a/aptos-move/aptos-release-builder/data/example.yaml +++ b/aptos-move/aptos-release-builder/data/example.yaml @@ -62,6 +62,7 @@ proposals: voter_window_num_validators_multiplier: 1 weight_by_voting_power: true use_history_from_previous_epoch_max_count: 5 + failure_window_num_validators_multiplier: 0 max_failed_authors_to_store: 10 - Execution: V1: diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index de42cd7ea73..ae967f6dd12 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -112,6 +112,14 @@ pub struct ConsensusConfig { // Number of tokio worker theads to use for the Consensus runtime. // If set to 0, it will be minimum of num_cpus/2 and DEFAULT_WORKER_THREADS. pub num_tokio_worker_threads: u16, + /// When true, scale active-validator weights in LeaderReputation by their historical + /// round-time performance so that validators that commit quorums faster are elected + /// as leaders proportionally more often. + pub use_latency_weighted_leader: bool, + /// Exponent for latency-weighted leader selection. The weight for an active validator is + /// `active_weight * (max_median / median_rt)^multiplier`. Higher values more aggressively + /// favor low-latency proposers. Default 1 gives linear scaling. + pub latency_weight_multiplier: f64, } /// Deprecated @@ -405,6 +413,8 @@ impl Default for ConsensusConfig { enable_optimistic_proposal_rx: true, enable_optimistic_proposal_tx: true, num_tokio_worker_threads: 0, + use_latency_weighted_leader: false, + latency_weight_multiplier: 1.0, } } } diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 462e1d45dd7..ccc9e67d083 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -406,12 +406,21 @@ impl DagBootstrapper { ) }) .collect(); + let failure_window_multiplier = if config.failure_window_num_validators_multiplier > 0 { + config.failure_window_num_validators_multiplier + } else { + config.proposer_window_num_validators_multiplier + }; let metadata_adapter = Arc::new(MetadataBackendAdapter::new( num_validators - * std::cmp::max( + * [ config.proposer_window_num_validators_multiplier, config.voter_window_num_validators_multiplier, - ), + failure_window_multiplier, + ] + .into_iter() + .max() + .unwrap(), epoch_to_validator_map, )); let heuristic: Box = Box::new(ProposerAndVoterHeuristic::new( @@ -422,6 +431,7 @@ impl DagBootstrapper { config.failure_threshold_percent, num_validators * config.voter_window_num_validators_multiplier, num_validators * config.proposer_window_num_validators_multiplier, + num_validators * failure_window_multiplier, false, )); diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index ba1f18a83e2..7102e36f1cb 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -14,8 +14,8 @@ use crate::{ liveness::{ cached_proposer_election::CachedProposerElection, leader_reputation::{ - extract_epoch_to_proposers, AptosDBBackend, LeaderReputation, - ProposerAndVoterHeuristic, ReputationHeuristic, + extract_epoch_to_proposers, AptosDBBackend, LatencyWeightedHeuristic, + LeaderReputation, ProposerAndVoterHeuristic, ReputationHeuristic, }, proposal_generator::{ ChainHealthBackoffConfig, PipelineBackpressureConfig, ProposalGenerator, @@ -329,20 +329,42 @@ impl EpochManager

{ * proposer_and_voter_config.proposer_window_num_validators_multiplier; let voter_window_size = proposers.len() * proposer_and_voter_config.voter_window_num_validators_multiplier; + let failure_window_size = if proposer_and_voter_config + .failure_window_num_validators_multiplier + > 0 + { + proposers.len() + * proposer_and_voter_config.failure_window_num_validators_multiplier + } else { + proposer_window_size + }; + let inner_heuristic = ProposerAndVoterHeuristic::new( + self.author, + proposer_and_voter_config.active_weight, + proposer_and_voter_config.inactive_weight, + proposer_and_voter_config.failed_weight, + proposer_and_voter_config.failure_threshold_percent, + voter_window_size, + proposer_window_size, + failure_window_size, + leader_reputation_type.use_reputation_window_from_stale_end(), + ); let heuristic: Box = - Box::new(ProposerAndVoterHeuristic::new( - self.author, - proposer_and_voter_config.active_weight, - proposer_and_voter_config.inactive_weight, - proposer_and_voter_config.failed_weight, - proposer_and_voter_config.failure_threshold_percent, - voter_window_size, - proposer_window_size, - leader_reputation_type.use_reputation_window_from_stale_end(), - )); + if self.config.use_latency_weighted_leader { + Box::new(LatencyWeightedHeuristic::new( + inner_heuristic, + proposer_and_voter_config.active_weight, + self.config.latency_weight_multiplier, + )) + } else { + Box::new(inner_heuristic) + }; ( heuristic, - std::cmp::max(proposer_window_size, voter_window_size), + [proposer_window_size, voter_window_size, failure_window_size] + .into_iter() + .max() + .unwrap(), proposer_and_voter_config.weight_by_voting_power, proposer_and_voter_config.use_history_from_previous_epoch_max_count, ) diff --git a/consensus/src/liveness/leader_reputation.rs b/consensus/src/liveness/leader_reputation.rs index e9f7100a80e..b5620c7b73a 100644 --- a/consensus/src/liveness/leader_reputation.rs +++ b/consensus/src/liveness/leader_reputation.rs @@ -231,6 +231,9 @@ pub struct NewBlockEventAggregation { // dependig on how many failures we have. voter_window_size: usize, proposer_window_size: usize, + // Separate window for counting failed proposals. When larger than proposer_window_size, + // failures are remembered for longer, preventing oscillation between failed/inactive/active. + failure_window_size: usize, reputation_window_from_stale_end: bool, } @@ -238,11 +241,13 @@ impl NewBlockEventAggregation { pub fn new( voter_window_size: usize, proposer_window_size: usize, + failure_window_size: usize, reputation_window_from_stale_end: bool, ) -> Self { Self { voter_window_size, proposer_window_size, + failure_window_size, reputation_window_from_stale_end, } } @@ -433,7 +438,7 @@ impl NewBlockEventAggregation { Self::history_iter( history, epoch_to_candidates, - self.proposer_window_size, + self.failure_window_size, self.reputation_window_from_stale_end, ) .fold(HashMap::new(), |mut map, meta| { @@ -501,6 +506,7 @@ impl ProposerAndVoterHeuristic { failure_threshold_percent: u32, voter_window_size: usize, proposer_window_size: usize, + failure_window_size: usize, reputation_window_from_stale_end: bool, ) -> Self { Self { @@ -512,6 +518,7 @@ impl ProposerAndVoterHeuristic { aggregation: NewBlockEventAggregation::new( voter_window_size, proposer_window_size, + failure_window_size, reputation_window_from_stale_end, ), } @@ -552,6 +559,128 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic { } } +/// A heuristic that wraps `ProposerAndVoterHeuristic` but additionally scales active-validator +/// weights by their historical round-time performance. Validators that have been proposing +/// fast rounds (i.e. collected quorum quickly) get a proportionally higher chance of being +/// selected as leader, while validators with slow round times get a lower chance. +/// +/// Concretely, for every active validator we compute the median interval between consecutive +/// committed blocks in the history window. The weight is then: +/// +/// active_weight * (max_median_round_time_us / validator_median_round_time_us)^multiplier +/// +/// Inactive / failed validators keep their base weights unchanged. +/// If there is not enough history to compute a median the base active weight is used. +pub struct LatencyWeightedHeuristic { + inner: ProposerAndVoterHeuristic, + active_weight: u64, + multiplier: f64, +} + +impl LatencyWeightedHeuristic { + pub fn new(inner: ProposerAndVoterHeuristic, active_weight: u64, multiplier: f64) -> Self { + Self { + inner, + active_weight, + multiplier: if multiplier > 0.0 { multiplier } else { 1.0 }, + } + } + + /// Compute per-proposer round times from the history. + /// + /// History is ordered newest-first: `history[0]` is the latest block. + /// The round time for block `i` is `history[i].proposed_time() - history[i+1].proposed_time()`. + fn compute_round_times(history: &[NewBlockEvent]) -> HashMap> { + let mut round_times: HashMap> = HashMap::new(); + for i in 0..history.len().saturating_sub(1) { + let newer = &history[i]; + let older = &history[i + 1]; + // Only compute within the same epoch to avoid epoch-boundary outliers. + if newer.epoch() != older.epoch() { + continue; + } + let interval = newer.proposed_time().saturating_sub(older.proposed_time()); + if interval > 0 { + round_times.entry(newer.proposer()).or_default().push(interval); + } + } + round_times + } + + fn median(values: &mut Vec) -> u64 { + if values.is_empty() { + return 0; + } + values.sort_unstable(); + let mid = values.len() / 2; + if values.len() % 2 == 0 { + (values[mid - 1] + values[mid]) / 2 + } else { + values[mid] + } + } +} + +impl ReputationHeuristic for LatencyWeightedHeuristic { + fn get_weights( + &self, + epoch: u64, + epoch_to_candidates: &HashMap>, + history: &[NewBlockEvent], + ) -> Vec { + let base_weights = self.inner.get_weights(epoch, epoch_to_candidates, history); + + let mut round_times = Self::compute_round_times(history); + + // Compute per-validator median round time. + let mut medians: HashMap = HashMap::new(); + for (author, times) in round_times.iter_mut() { + let m = Self::median(times); + if m > 0 { + medians.insert(*author, m); + } + } + + // We need at least a handful of data points to trust the medians. + // Require each candidate to have at least 2 observations; otherwise fall back to base. + let min_observations = 2usize; + let candidates = &epoch_to_candidates[&epoch]; + let enough_data = candidates.iter().all(|author| { + round_times + .get(author) + .map_or(false, |v| v.len() >= min_observations) + }); + + if !enough_data || medians.is_empty() { + return base_weights; + } + + // The slowest (highest) median round time is the reference — we scale all others + // relative to it so that the fastest proposer keeps `active_weight` and slower + // ones get a proportionally smaller share. + let max_median = *medians.values().max().expect("medians is non-empty"); + + epoch_to_candidates[&epoch] + .iter() + .zip(base_weights.iter()) + .map(|(author, &base)| { + // Only adjust the weight for validators that received the active weight. + if base != self.active_weight { + return base; + } + match medians.get(author) { + Some(&median_rt) if median_rt > 0 => { + // Scale by (max_median / median_rt)^multiplier. + let ratio = (max_median as f64 / median_rt as f64).powf(self.multiplier); + (self.active_weight as f64 * ratio) as u64 + }, + _ => base, + } + }) + .collect() + } +} + /// Committed history based proposer election implementation that could help bias towards /// successful leaders to help improve performance. pub struct LeaderReputation { diff --git a/consensus/src/liveness/leader_reputation_test.rs b/consensus/src/liveness/leader_reputation_test.rs index 2fa87634d58..e1fe5f24dd4 100644 --- a/consensus/src/liveness/leader_reputation_test.rs +++ b/consensus/src/liveness/leader_reputation_test.rs @@ -145,7 +145,7 @@ fn test_aggregation_counting() { let mut example1 = Example1::new(5); let validators0 = example1.validators0.clone(); let epoch_to_validators = HashMap::from([(0u64, validators0.clone())]); - let aggregation = NewBlockEventAggregation::new(2, 5, false); + let aggregation = NewBlockEventAggregation::new(2, 5, 5, false); example1.step1(); @@ -244,7 +244,7 @@ fn test_proposer_and_voter_heuristic() { let validators0 = example1.validators0.clone(); let epoch_to_validators0 = HashMap::from([(0u64, validators0.clone())]); let heuristic = - ProposerAndVoterHeuristic::new(example1.validators0[0], 100, 10, 1, 49, 2, 5, false); + ProposerAndVoterHeuristic::new(example1.validators0[0], 100, 10, 1, 49, 2, 5, 5, false); example1.step1(); assert_eq!( @@ -334,6 +334,7 @@ fn test_api(use_root_hash: bool) { 10, proposers.len(), proposers.len(), + proposers.len(), false, )), 4, diff --git a/testsuite/forge-cli/src/suites/realistic_environment.rs b/testsuite/forge-cli/src/suites/realistic_environment.rs index 166ada4ff01..b016225514f 100644 --- a/testsuite/forge-cli/src/suites/realistic_environment.rs +++ b/testsuite/forge-cli/src/suites/realistic_environment.rs @@ -19,8 +19,10 @@ use aptos_forge::{ EmitJobMode, EmitJobRequest, ForgeConfig, NetworkTest, NodeResourceOverride, }; use aptos_sdk::types::on_chain_config::{ - BlockGasLimitType, FeatureFlag, Features, OnChainChunkyDKGConfig, OnChainConsensusConfig, - OnChainExecutionConfig, OnChainRandomnessConfig, TransactionShufflerType, + BlockGasLimitType, ConsensusAlgorithmConfig, FeatureFlag, Features, LeaderReputationType, + OnChainChunkyDKGConfig, OnChainConsensusConfig, OnChainExecutionConfig, + OnChainRandomnessConfig, ProposerAndVoterConfig, ProposerElectionType, + TransactionShufflerType, }; use aptos_testcases::{ load_vs_perf_benchmark::{LoadVsPerfBenchmark, TransactionWorkload, Workloads}, @@ -444,9 +446,30 @@ pub(crate) fn realistic_env_max_load_test( .with_genesis_helm_config_fn(Arc::new(move |helm_values| { // No epoch change so measurements are stable. helm_values["chain"]["epoch_duration_secs"] = (24 * 3600).into(); + // Set failure_window_num_validators_multiplier to 50 so failed validators + // stay penalized even after restart, preventing failed→inactive→active oscillation. + let mut consensus_config = OnChainConsensusConfig::default_for_genesis(); + if let OnChainConsensusConfig::V5 { + alg: ConsensusAlgorithmConfig::JolteonV2 { ref mut main, .. }, + .. + } = consensus_config + { + main.proposer_election_type = ProposerElectionType::LeaderReputation( + LeaderReputationType::ProposerAndVoterV2(ProposerAndVoterConfig { + active_weight: 1000, + inactive_weight: 10, + failed_weight: 1, + failure_threshold_percent: 10, + proposer_window_num_validators_multiplier: 10, + voter_window_num_validators_multiplier: 1, + weight_by_voting_power: true, + use_history_from_previous_epoch_max_count: 5, + failure_window_num_validators_multiplier: 50, + }), + ); + } helm_values["chain"]["on_chain_consensus_config"] = - serde_yaml::to_value(OnChainConsensusConfig::default_for_genesis()) - .expect("must serialize"); + serde_yaml::to_value(consensus_config).expect("must serialize"); helm_values["chain"]["on_chain_execution_config"] = serde_yaml::to_value(OnChainExecutionConfig::default_for_genesis()) .expect("must serialize"); @@ -454,6 +477,9 @@ pub(crate) fn realistic_env_max_load_test( .with_validator_override_node_config_fn(Arc::new(|config, _| { // Allow validator-PFN connections config.base.enable_validator_pfn_connections = true; + // Enable latency-weighted leader selection + config.consensus.use_latency_weighted_leader = true; + config.consensus.latency_weight_multiplier = 1.0; })) .with_fullnode_override_node_config_fn(Arc::new(|config, _| { // Increase the consensus observer fallback thresholds diff --git a/types/src/on_chain_config/consensus_config.rs b/types/src/on_chain_config/consensus_config.rs index 57c1e01e846..b56ba8f9959 100644 --- a/types/src/on_chain_config/consensus_config.rs +++ b/types/src/on_chain_config/consensus_config.rs @@ -480,6 +480,7 @@ impl Default for ConsensusConfigV1 { voter_window_num_validators_multiplier: 1, weight_by_voting_power: true, use_history_from_previous_epoch_max_count: 5, + failure_window_num_validators_multiplier: 0, }), ), } @@ -553,6 +554,12 @@ pub struct ProposerAndVoterConfig { // representing a number of historical epochs (beyond the current one) // to consider. pub use_history_from_previous_epoch_max_count: u32, + // Window into history considered for failure statistics, multiplier + // on top of number of validators. When 0, falls back to proposer_window_num_validators_multiplier. + // A larger failure window keeps failed validators penalized for longer, + // preventing oscillation between failed/inactive/active states. + #[serde(default)] + pub failure_window_num_validators_multiplier: usize, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -583,6 +590,7 @@ impl Default for DagConsensusConfigV1 { voter_window_num_validators_multiplier: 1, weight_by_voting_power: true, use_history_from_previous_epoch_max_count: 5, + failure_window_num_validators_multiplier: 0, }), ), }