Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ proposals:
voter_window_num_validators_multiplier: 1
weight_by_voting_power: true
use_history_from_previous_epoch_max_count: 5
failure_window_num_validators_multiplier: 0
max_failed_authors_to_store: 10
quorum_store_enabled: true
vtxn:
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-release-builder/data/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ proposals:
voter_window_num_validators_multiplier: 1
weight_by_voting_power: true
use_history_from_previous_epoch_max_count: 5
failure_window_num_validators_multiplier: 0
max_failed_authors_to_store: 10
- Execution:
V1:
Expand Down
10 changes: 10 additions & 0 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ pub struct ConsensusConfig {
// Number of tokio worker theads to use for the Consensus runtime.
// If set to 0, it will be minimum of num_cpus/2 and DEFAULT_WORKER_THREADS.
pub num_tokio_worker_threads: u16,
/// When true, scale active-validator weights in LeaderReputation by their historical
/// round-time performance so that validators that commit quorums faster are elected
/// as leaders proportionally more often.
pub use_latency_weighted_leader: bool,
/// Exponent for latency-weighted leader selection. The weight for an active validator is
/// `active_weight * (max_median / median_rt)^multiplier`. Higher values more aggressively
/// favor low-latency proposers. Default 1 gives linear scaling.
pub latency_weight_multiplier: f64,
}

/// Deprecated
Expand Down Expand Up @@ -405,6 +413,8 @@ impl Default for ConsensusConfig {
enable_optimistic_proposal_rx: true,
enable_optimistic_proposal_tx: true,
num_tokio_worker_threads: 0,
use_latency_weighted_leader: false,
latency_weight_multiplier: 1.0,
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,12 +406,21 @@ impl DagBootstrapper {
)
})
.collect();
let failure_window_multiplier = if config.failure_window_num_validators_multiplier > 0 {
config.failure_window_num_validators_multiplier
} else {
config.proposer_window_num_validators_multiplier
};
let metadata_adapter = Arc::new(MetadataBackendAdapter::new(
num_validators
* std::cmp::max(
* [
config.proposer_window_num_validators_multiplier,
config.voter_window_num_validators_multiplier,
),
failure_window_multiplier,
]
.into_iter()
.max()
.unwrap(),
epoch_to_validator_map,
));
let heuristic: Box<dyn ReputationHeuristic> = Box::new(ProposerAndVoterHeuristic::new(
Expand All @@ -422,6 +431,7 @@ impl DagBootstrapper {
config.failure_threshold_percent,
num_validators * config.voter_window_num_validators_multiplier,
num_validators * config.proposer_window_num_validators_multiplier,
num_validators * failure_window_multiplier,
false,
));

Expand Down
48 changes: 35 additions & 13 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::{
liveness::{
cached_proposer_election::CachedProposerElection,
leader_reputation::{
extract_epoch_to_proposers, AptosDBBackend, LeaderReputation,
ProposerAndVoterHeuristic, ReputationHeuristic,
extract_epoch_to_proposers, AptosDBBackend, LatencyWeightedHeuristic,
LeaderReputation, ProposerAndVoterHeuristic, ReputationHeuristic,
},
proposal_generator::{
ChainHealthBackoffConfig, PipelineBackpressureConfig, ProposalGenerator,
Expand Down Expand Up @@ -329,20 +329,42 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
* proposer_and_voter_config.proposer_window_num_validators_multiplier;
let voter_window_size = proposers.len()
* proposer_and_voter_config.voter_window_num_validators_multiplier;
let failure_window_size = if proposer_and_voter_config
.failure_window_num_validators_multiplier
> 0
{
proposers.len()
* proposer_and_voter_config.failure_window_num_validators_multiplier
} else {
proposer_window_size
};
let inner_heuristic = ProposerAndVoterHeuristic::new(
self.author,
proposer_and_voter_config.active_weight,
proposer_and_voter_config.inactive_weight,
proposer_and_voter_config.failed_weight,
proposer_and_voter_config.failure_threshold_percent,
voter_window_size,
proposer_window_size,
failure_window_size,
leader_reputation_type.use_reputation_window_from_stale_end(),
);
let heuristic: Box<dyn ReputationHeuristic> =
Box::new(ProposerAndVoterHeuristic::new(
self.author,
proposer_and_voter_config.active_weight,
proposer_and_voter_config.inactive_weight,
proposer_and_voter_config.failed_weight,
proposer_and_voter_config.failure_threshold_percent,
voter_window_size,
proposer_window_size,
leader_reputation_type.use_reputation_window_from_stale_end(),
));
if self.config.use_latency_weighted_leader {
Box::new(LatencyWeightedHeuristic::new(
inner_heuristic,
proposer_and_voter_config.active_weight,
self.config.latency_weight_multiplier,
))
} else {
Box::new(inner_heuristic)
};
(
heuristic,
std::cmp::max(proposer_window_size, voter_window_size),
[proposer_window_size, voter_window_size, failure_window_size]
.into_iter()
.max()
.unwrap(),
proposer_and_voter_config.weight_by_voting_power,
proposer_and_voter_config.use_history_from_previous_epoch_max_count,
)
Expand Down
131 changes: 130 additions & 1 deletion consensus/src/liveness/leader_reputation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,23 @@ pub struct NewBlockEventAggregation {
// dependig on how many failures we have.
voter_window_size: usize,
proposer_window_size: usize,
// Separate window for counting failed proposals. When larger than proposer_window_size,
// failures are remembered for longer, preventing oscillation between failed/inactive/active.
failure_window_size: usize,
reputation_window_from_stale_end: bool,
}

impl NewBlockEventAggregation {
pub fn new(
voter_window_size: usize,
proposer_window_size: usize,
failure_window_size: usize,
reputation_window_from_stale_end: bool,
) -> Self {
Self {
voter_window_size,
proposer_window_size,
failure_window_size,
reputation_window_from_stale_end,
}
}
Expand Down Expand Up @@ -433,7 +438,7 @@ impl NewBlockEventAggregation {
Self::history_iter(
history,
epoch_to_candidates,
self.proposer_window_size,
self.failure_window_size,
self.reputation_window_from_stale_end,
)
.fold(HashMap::new(), |mut map, meta| {
Expand Down Expand Up @@ -501,6 +506,7 @@ impl ProposerAndVoterHeuristic {
failure_threshold_percent: u32,
voter_window_size: usize,
proposer_window_size: usize,
failure_window_size: usize,
reputation_window_from_stale_end: bool,
) -> Self {
Self {
Expand All @@ -512,6 +518,7 @@ impl ProposerAndVoterHeuristic {
aggregation: NewBlockEventAggregation::new(
voter_window_size,
proposer_window_size,
failure_window_size,
reputation_window_from_stale_end,
),
}
Expand Down Expand Up @@ -552,6 +559,128 @@ impl ReputationHeuristic for ProposerAndVoterHeuristic {
}
}

/// A heuristic that wraps `ProposerAndVoterHeuristic` but additionally scales active-validator
/// weights by their historical round-time performance. Validators that have been proposing
/// fast rounds (i.e. collected quorum quickly) get a proportionally higher chance of being
/// selected as leader, while validators with slow round times get a lower chance.
///
/// Concretely, for every active validator we compute the median interval between consecutive
/// committed blocks in the history window. The weight is then:
///
/// active_weight * (max_median_round_time_us / validator_median_round_time_us)^multiplier
///
/// Inactive / failed validators keep their base weights unchanged.
/// If there is not enough history to compute a median the base active weight is used.
pub struct LatencyWeightedHeuristic {
inner: ProposerAndVoterHeuristic,
active_weight: u64,
multiplier: f64,
}

impl LatencyWeightedHeuristic {
pub fn new(inner: ProposerAndVoterHeuristic, active_weight: u64, multiplier: f64) -> Self {
Self {
inner,
active_weight,
multiplier: if multiplier > 0.0 { multiplier } else { 1.0 },
}
}

/// Compute per-proposer round times from the history.
///
/// History is ordered newest-first: `history[0]` is the latest block.
/// The round time for block `i` is `history[i].proposed_time() - history[i+1].proposed_time()`.
fn compute_round_times(history: &[NewBlockEvent]) -> HashMap<Author, Vec<u64>> {
let mut round_times: HashMap<Author, Vec<u64>> = HashMap::new();
for i in 0..history.len().saturating_sub(1) {
let newer = &history[i];
let older = &history[i + 1];
// Only compute within the same epoch to avoid epoch-boundary outliers.
if newer.epoch() != older.epoch() {
continue;
}
let interval = newer.proposed_time().saturating_sub(older.proposed_time());
if interval > 0 {
round_times.entry(newer.proposer()).or_default().push(interval);
}
}
round_times
}

fn median(values: &mut Vec<u64>) -> u64 {
if values.is_empty() {
return 0;
}
values.sort_unstable();
let mid = values.len() / 2;
if values.len() % 2 == 0 {
(values[mid - 1] + values[mid]) / 2
} else {
values[mid]
}
}
}

impl ReputationHeuristic for LatencyWeightedHeuristic {
fn get_weights(
&self,
epoch: u64,
epoch_to_candidates: &HashMap<u64, Vec<Author>>,
history: &[NewBlockEvent],
) -> Vec<u64> {
let base_weights = self.inner.get_weights(epoch, epoch_to_candidates, history);

let mut round_times = Self::compute_round_times(history);

// Compute per-validator median round time.
let mut medians: HashMap<Author, u64> = HashMap::new();
for (author, times) in round_times.iter_mut() {
let m = Self::median(times);
if m > 0 {
medians.insert(*author, m);
}
}

// We need at least a handful of data points to trust the medians.
// Require each candidate to have at least 2 observations; otherwise fall back to base.
let min_observations = 2usize;
let candidates = &epoch_to_candidates[&epoch];
let enough_data = candidates.iter().all(|author| {
round_times
.get(author)
.map_or(false, |v| v.len() >= min_observations)
});

if !enough_data || medians.is_empty() {
return base_weights;
}

// The slowest (highest) median round time is the reference — we scale all others
// relative to it so that the fastest proposer keeps `active_weight` and slower
// ones get a proportionally smaller share.
let max_median = *medians.values().max().expect("medians is non-empty");

epoch_to_candidates[&epoch]
.iter()
.zip(base_weights.iter())
.map(|(author, &base)| {
// Only adjust the weight for validators that received the active weight.
if base != self.active_weight {
return base;
}
match medians.get(author) {
Some(&median_rt) if median_rt > 0 => {
// Scale by (max_median / median_rt)^multiplier.
let ratio = (max_median as f64 / median_rt as f64).powf(self.multiplier);
(self.active_weight as f64 * ratio) as u64
},
_ => base,
}
})
.collect()
}
}

/// Committed history based proposer election implementation that could help bias towards
/// successful leaders to help improve performance.
pub struct LeaderReputation {
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/liveness/leader_reputation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ fn test_aggregation_counting() {
let mut example1 = Example1::new(5);
let validators0 = example1.validators0.clone();
let epoch_to_validators = HashMap::from([(0u64, validators0.clone())]);
let aggregation = NewBlockEventAggregation::new(2, 5, false);
let aggregation = NewBlockEventAggregation::new(2, 5, 5, false);

example1.step1();

Expand Down Expand Up @@ -244,7 +244,7 @@ fn test_proposer_and_voter_heuristic() {
let validators0 = example1.validators0.clone();
let epoch_to_validators0 = HashMap::from([(0u64, validators0.clone())]);
let heuristic =
ProposerAndVoterHeuristic::new(example1.validators0[0], 100, 10, 1, 49, 2, 5, false);
ProposerAndVoterHeuristic::new(example1.validators0[0], 100, 10, 1, 49, 2, 5, 5, false);

example1.step1();
assert_eq!(
Expand Down Expand Up @@ -334,6 +334,7 @@ fn test_api(use_root_hash: bool) {
10,
proposers.len(),
proposers.len(),
proposers.len(),
false,
)),
4,
Expand Down
34 changes: 30 additions & 4 deletions testsuite/forge-cli/src/suites/realistic_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use aptos_forge::{
EmitJobMode, EmitJobRequest, ForgeConfig, NetworkTest, NodeResourceOverride,
};
use aptos_sdk::types::on_chain_config::{
BlockGasLimitType, FeatureFlag, Features, OnChainChunkyDKGConfig, OnChainConsensusConfig,
OnChainExecutionConfig, OnChainRandomnessConfig, TransactionShufflerType,
BlockGasLimitType, ConsensusAlgorithmConfig, FeatureFlag, Features, LeaderReputationType,
OnChainChunkyDKGConfig, OnChainConsensusConfig, OnChainExecutionConfig,
OnChainRandomnessConfig, ProposerAndVoterConfig, ProposerElectionType,
TransactionShufflerType,
};
use aptos_testcases::{
load_vs_perf_benchmark::{LoadVsPerfBenchmark, TransactionWorkload, Workloads},
Expand Down Expand Up @@ -444,16 +446,40 @@ pub(crate) fn realistic_env_max_load_test(
.with_genesis_helm_config_fn(Arc::new(move |helm_values| {
// No epoch change so measurements are stable.
helm_values["chain"]["epoch_duration_secs"] = (24 * 3600).into();
// Set failure_window_num_validators_multiplier to 50 so failed validators
// stay penalized even after restart, preventing failed→inactive→active oscillation.
let mut consensus_config = OnChainConsensusConfig::default_for_genesis();
if let OnChainConsensusConfig::V5 {
alg: ConsensusAlgorithmConfig::JolteonV2 { ref mut main, .. },
..
} = consensus_config
{
main.proposer_election_type = ProposerElectionType::LeaderReputation(
LeaderReputationType::ProposerAndVoterV2(ProposerAndVoterConfig {
active_weight: 1000,
inactive_weight: 10,
failed_weight: 1,
failure_threshold_percent: 10,
proposer_window_num_validators_multiplier: 10,
voter_window_num_validators_multiplier: 1,
weight_by_voting_power: true,
use_history_from_previous_epoch_max_count: 5,
failure_window_num_validators_multiplier: 50,
}),
);
}
helm_values["chain"]["on_chain_consensus_config"] =
serde_yaml::to_value(OnChainConsensusConfig::default_for_genesis())
.expect("must serialize");
serde_yaml::to_value(consensus_config).expect("must serialize");
helm_values["chain"]["on_chain_execution_config"] =
serde_yaml::to_value(OnChainExecutionConfig::default_for_genesis())
.expect("must serialize");
}))
.with_validator_override_node_config_fn(Arc::new(|config, _| {
// Allow validator-PFN connections
config.base.enable_validator_pfn_connections = true;
// Enable latency-weighted leader selection
config.consensus.use_latency_weighted_leader = true;
config.consensus.latency_weight_multiplier = 1.0;
}))
.with_fullnode_override_node_config_fn(Arc::new(|config, _| {
// Increase the consensus observer fallback thresholds
Expand Down
Loading
Loading