Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ pub struct FlashblocksArgs {
default_value = "false"
)]
pub flashblocks_continuous_build: bool,

/// Number of forced `SharedBest::take()` misses for test isolation.
/// Consumed only under `#[cfg(test)]`; else always 0.
#[arg(skip)]
pub initial_force_take_miss_count: u64,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this not be refactored such that the test instance can be configured with a test SharedBest instance with test config?

}

impl Default for FlashblocksArgs {
Expand Down
6 changes: 6 additions & 0 deletions crates/op-rbuilder/src/builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct FlashblocksConfig {

/// Enable continuous building between scheduler triggers.
pub continuous_build: bool,

/// Initial number of forced `SharedBest::take()` misses for test isolation.
/// Only consumed under `#[cfg(test)]`.
pub initial_force_take_miss_count: u64,
}

impl Default for FlashblocksConfig {
Expand All @@ -83,6 +87,7 @@ impl Default for FlashblocksConfig {
p2p_max_peer_count: 50,
ws_subscriber_limit: None,
continuous_build: false,
initial_force_take_miss_count: 0,
}
}
}
Expand Down Expand Up @@ -128,6 +133,7 @@ impl TryFrom<OpRbuilderArgs> for FlashblocksConfig {
p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count,
ws_subscriber_limit: args.flashblocks.ws_subscriber_limit,
continuous_build: args.flashblocks.flashblocks_continuous_build,
initial_force_take_miss_count: args.flashblocks.initial_force_take_miss_count,
})
}
}
Expand Down
7 changes: 6 additions & 1 deletion crates/op-rbuilder/src/builder/continuous/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{
use alloy_primitives::B256;
use reth_node_api::PayloadBuilderError;
use reth_revm::{State, database::StateProviderDatabase};
#[cfg(test)]
use std::sync::Arc;
use std::{ops::ControlFlow, time::Instant};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, field, metadata::Level, span};
Expand Down Expand Up @@ -127,7 +129,10 @@ where
)
};

let candidate_slot = SharedBest::new();
let candidate_slot = SharedBest::new(
#[cfg(test)]
Arc::clone(&self.force_take_miss_counter),
);
let base_ctx = base_state.ctx.clone();
let base_fb_state = base_state.fb_state.clone();
let base_info = base_state.info.clone();
Expand Down
5 changes: 0 additions & 5 deletions crates/op-rbuilder/src/builder/continuous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,3 @@ mod transition;
mod types;

pub(crate) use types::{BuildState, JobDeps};

#[cfg(test)]
pub(crate) mod test_hooks {
pub(crate) use super::shared_best::test_hooks::force_next_take_misses;
}
58 changes: 23 additions & 35 deletions crates/op-rbuilder/src/builder/continuous/shared_best.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::types::BestCandidate;
#[cfg(test)]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use tracing::warn;

Expand Down Expand Up @@ -59,60 +61,46 @@ impl<T: CandidateCounters> CandidateSlot<T> {
/// build task. The build task writes on each improvement; the main loop takes
/// on trigger to publish without awaiting task completion.
#[derive(Clone)]
pub(super) struct SharedBest(CandidateSlot<BestCandidate>);
pub(super) struct SharedBest {
slot: CandidateSlot<BestCandidate>,
#[cfg(test)]
force_take_miss: Arc<AtomicU64>,
}

impl SharedBest {
pub(super) fn new() -> Self {
Self(CandidateSlot::new())
pub(super) fn new(#[cfg(test)] force_take_miss: Arc<AtomicU64>) -> Self {
Self {
slot: CandidateSlot::new(),
#[cfg(test)]
force_take_miss,
}
}

/// Take the current candidate (if any).
pub(super) fn take(&self) -> Option<BestCandidate> {
#[cfg(test)]
if test_hooks::should_force_take_miss() {
if self
.force_take_miss
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
count.checked_sub(1)
})
.is_ok()
{
return None;
}
self.0.take()
self.slot.take()
}

pub(super) fn store(&self, candidate: BestCandidate) {
self.0.store(candidate);
self.slot.store(candidate);
}

pub(super) fn refresh_metrics(&self, candidates_evaluated: u64, candidates_improved: u64) {
self.0
self.slot
.refresh_metrics(candidates_evaluated, candidates_improved);
}
}

#[cfg(test)]
pub(crate) mod test_hooks {
use std::sync::atomic::{AtomicU64, Ordering};

static FORCE_TAKE_MISS_COUNT: AtomicU64 = AtomicU64::new(0);

pub(crate) struct ForceTakeMissGuard;

impl Drop for ForceTakeMissGuard {
fn drop(&mut self) {
FORCE_TAKE_MISS_COUNT.store(0, Ordering::Release);
}
}

pub(crate) fn force_next_take_misses(count: u64) -> ForceTakeMissGuard {
FORCE_TAKE_MISS_COUNT.store(count, Ordering::Release);
ForceTakeMissGuard
}

pub(super) fn should_force_take_miss() -> bool {
FORCE_TAKE_MISS_COUNT
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
count.checked_sub(1)
})
.is_ok()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 0 additions & 3 deletions crates/op-rbuilder/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ pub use context::OpPayloadJobCtx;
pub use service::FlashblocksServiceBuilder;
pub use state_root::StateRootCalculator;

#[cfg(test)]
pub(crate) use continuous::test_hooks as continuous_test_hooks;

/// Configuration values that are applicable to any type of block builder.
#[derive(Debug, Clone)]
pub struct BuilderConfig {
Expand Down
10 changes: 10 additions & 0 deletions crates/op-rbuilder/src/builder/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ pub(crate) struct OpPayloadBuilderInner<Pool, Client, BuilderTx> {
pool_change_epoch: Arc<AtomicU64>,
/// Task executor used to offload blocking work.
executor: Runtime,
/// Per-builder counter of remaining forced `SharedBest::take()` misses.
#[cfg(test)]
pub(crate) force_take_miss_counter: Arc<AtomicU64>,
}

impl<Pool, Client, BuilderTx> OpPayloadBuilderInner<Pool, Client, BuilderTx> {
Expand Down Expand Up @@ -405,6 +408,11 @@ where
disable_state_root: config.flashblocks_config.disable_state_root,
enable_incremental_state_root: config.flashblocks_config.enable_incremental_state_root,
});
#[cfg(test)]
let force_take_miss_counter = Arc::new(AtomicU64::new(
config.flashblocks_config.initial_force_take_miss_count,
));

Self {
inner: Arc::new(OpPayloadBuilderInner {
builder_ctx,
Expand All @@ -418,6 +426,8 @@ where
task_metrics,
pool_change_epoch,
executor,
#[cfg(test)]
force_take_miss_counter,
}),
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/op-rbuilder/src/tests/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,15 @@ async fn smoke_continuous_resolve_after_publish_preserves_published_state(
flashblocks_addr: "127.0.0.1".into(),
flashblocks_block_time: 200,
flashblocks_continuous_build: true,
// Force the first SharedBest::take() on this builder instance to miss and trigger fallback path.
initial_force_take_miss_count: 1,
..Default::default()
},
..Default::default()
})]
async fn smoke_continuous_trigger_miss_fallback_publishes_candidate(
rbuilder: LocalInstance,
) -> eyre::Result<()> {
let _force_miss = crate::builder::continuous_test_hooks::force_next_take_misses(1);
let driver = rbuilder.driver().await?;
let flashblocks_listener = rbuilder.spawn_flashblocks_listener();

Expand Down
Loading