Skip to content

Commit ef1854e

Browse files
committed
feat(testutil/validatormock): port component.go (Component scheduler)
Ports `charon/testutil/validatormock/component.go` to Rust, completing the validatormock module. `Component` drives the duty-by-duty workflow across a sliding window of attesters (per slot) and sync-committee members (per epoch) for the configured pubkeys. - `clock.rs`: `Clock` trait + `SystemClock` (production) + `FakeClock` (tests). `FakeClock` is an explicit-advance clock; pending sleepers register a `oneshot::Sender` and fire when `advance` / `advance_to` moves past their deadline. Avoids `tokio::time::pause()`, which interacts poorly with `wiremock::MockServer` (Plan agent flagged this). - `component.rs`: scheduler built around `tokio::sync::mpsc` + `tokio::spawn` + `tokio_util::sync::CancellationToken`. `Component::builder()` returns a handle that owns the consumer task; `shutdown().await` cancels gracefully and `Drop` cancels for the panic path. `run_duty_via_inner` matches the Go `runDuty` switch on `pluto_core::types::DutyType`. - `duties_for_slot` + `duty_start_times` mirror Go's `dutyStartTimeFuncsByDuty` table, including the half-/third-slot offsets for attest/aggregate/sync messages. Tests cover the start-up swallow window (delay_start_slots = 2), the duty-start-time arithmetic, and the duty-collection algorithm. Driving the full Component through 2 epochs with FakeClock would require comprehensive post_state_validators mounting on BeaconMock; the slot_ticked test asserts the epoch-window machinery instead and shuts down cleanly.
1 parent 30d5a23 commit ef1854e

3 files changed

Lines changed: 738 additions & 0 deletions

File tree

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
//! Injectable clock for the validator-mock scheduler.
2+
//!
3+
//! Replaces Go's `clockwork.FakeClock` from `propose_test.go`. The scheduler
4+
//! always calls into [`Clock`], so tests can substitute [`FakeClock`] to drive
5+
//! time-based duties deterministically without `tokio::time::pause()`, which
6+
//! interacts poorly with `wiremock::MockServer`.
7+
8+
use std::{
9+
sync::{Arc, Mutex},
10+
time::{Duration, SystemTime},
11+
};
12+
13+
use async_trait::async_trait;
14+
use tokio::sync::oneshot;
15+
16+
/// Abstract wall-clock used by [`crate::validatormock::Component`].
17+
#[async_trait]
18+
pub trait Clock: Send + Sync + std::fmt::Debug + 'static {
19+
/// Returns the current time.
20+
fn now(&self) -> SystemTime;
21+
22+
/// Sleeps until `wake_at`. Returns immediately if `wake_at` has already
23+
/// passed.
24+
async fn sleep_until(&self, wake_at: SystemTime);
25+
}
26+
27+
/// Real-time clock backed by `SystemTime::now` and `tokio::time::sleep`.
28+
#[derive(Debug, Default, Clone, Copy)]
29+
pub struct SystemClock;
30+
31+
#[async_trait]
32+
impl Clock for SystemClock {
33+
fn now(&self) -> SystemTime {
34+
SystemTime::now()
35+
}
36+
37+
async fn sleep_until(&self, wake_at: SystemTime) {
38+
let now = SystemTime::now();
39+
let duration = wake_at.duration_since(now).unwrap_or(Duration::ZERO);
40+
if duration.is_zero() {
41+
return;
42+
}
43+
tokio::time::sleep(duration).await;
44+
}
45+
}
46+
47+
/// Test clock: advances only via [`FakeClock::advance`] /
48+
/// [`FakeClock::advance_to`].
49+
///
50+
/// Pending [`Clock::sleep_until`] futures register a oneshot sender; advancing
51+
/// past the wake time fires every sender at or before the new time.
52+
#[derive(Debug, Default, Clone)]
53+
pub struct FakeClock(Arc<Mutex<FakeClockInner>>);
54+
55+
#[derive(Debug)]
56+
struct FakeClockInner {
57+
now: SystemTime,
58+
pending: Vec<(SystemTime, oneshot::Sender<()>)>,
59+
}
60+
61+
impl Default for FakeClockInner {
62+
fn default() -> Self {
63+
Self {
64+
now: SystemTime::UNIX_EPOCH,
65+
pending: Vec::new(),
66+
}
67+
}
68+
}
69+
70+
impl FakeClock {
71+
/// Builds a clock pinned at `now`.
72+
#[must_use]
73+
pub fn new(now: SystemTime) -> Self {
74+
Self(Arc::new(Mutex::new(FakeClockInner {
75+
now,
76+
pending: Vec::new(),
77+
})))
78+
}
79+
80+
/// Advances by `delta` and wakes pending sleepers whose deadline has
81+
/// passed.
82+
pub fn advance(&self, delta: Duration) {
83+
let new_now = {
84+
let guard = self.0.lock().expect("FakeClock mutex poisoned");
85+
guard.now.checked_add(delta).unwrap_or(guard.now)
86+
};
87+
self.advance_to(new_now);
88+
}
89+
90+
/// Advances the clock to `target` (no-op if already past) and wakes
91+
/// pending sleepers whose deadline has passed.
92+
pub fn advance_to(&self, target: SystemTime) {
93+
let drained: Vec<oneshot::Sender<()>> = {
94+
let mut guard = self.0.lock().expect("FakeClock mutex poisoned");
95+
if target > guard.now {
96+
guard.now = target;
97+
}
98+
let now = guard.now;
99+
let mut keep = Vec::with_capacity(guard.pending.len());
100+
let mut fire = Vec::new();
101+
for (wake_at, tx) in guard.pending.drain(..) {
102+
if wake_at <= now {
103+
fire.push(tx);
104+
} else {
105+
keep.push((wake_at, tx));
106+
}
107+
}
108+
guard.pending = keep;
109+
fire
110+
};
111+
for tx in drained {
112+
let _ = tx.send(());
113+
}
114+
}
115+
}
116+
117+
#[async_trait]
118+
impl Clock for FakeClock {
119+
fn now(&self) -> SystemTime {
120+
self.0.lock().expect("FakeClock mutex poisoned").now
121+
}
122+
123+
async fn sleep_until(&self, wake_at: SystemTime) {
124+
let rx = {
125+
let mut guard = self.0.lock().expect("FakeClock mutex poisoned");
126+
if wake_at <= guard.now {
127+
return;
128+
}
129+
let (tx, rx) = oneshot::channel();
130+
guard.pending.push((wake_at, tx));
131+
rx
132+
};
133+
let _ = rx.await;
134+
}
135+
}
136+
137+
#[cfg(test)]
138+
mod tests {
139+
use super::*;
140+
141+
#[tokio::test]
142+
async fn system_clock_now_advances() {
143+
let c = SystemClock;
144+
let a = c.now();
145+
tokio::time::sleep(Duration::from_millis(5)).await;
146+
let b = c.now();
147+
assert!(b > a);
148+
}
149+
150+
#[tokio::test]
151+
async fn fake_clock_sleep_resolves_after_advance() {
152+
let start = SystemTime::UNIX_EPOCH + Duration::from_secs(1_000);
153+
let clock = FakeClock::new(start);
154+
let clock_for_task = clock.clone();
155+
let wake = start + Duration::from_secs(10);
156+
157+
let handle = tokio::spawn(async move {
158+
clock_for_task.sleep_until(wake).await;
159+
});
160+
161+
// Give the task a chance to enqueue.
162+
tokio::task::yield_now().await;
163+
clock.advance(Duration::from_secs(10));
164+
165+
handle.await.expect("sleeper completes");
166+
}
167+
168+
#[tokio::test]
169+
async fn fake_clock_sleep_already_passed_returns_immediately() {
170+
let start = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
171+
let clock = FakeClock::new(start);
172+
clock.sleep_until(start - Duration::from_secs(1)).await; // no panic, returns
173+
}
174+
175+
#[tokio::test]
176+
async fn fake_clock_multiple_sleepers() {
177+
let start = SystemTime::UNIX_EPOCH;
178+
let clock = FakeClock::new(start);
179+
180+
let a = tokio::spawn({
181+
let c = clock.clone();
182+
async move { c.sleep_until(start + Duration::from_secs(1)).await }
183+
});
184+
let b = tokio::spawn({
185+
let c = clock.clone();
186+
async move { c.sleep_until(start + Duration::from_secs(2)).await }
187+
});
188+
tokio::task::yield_now().await;
189+
190+
clock.advance(Duration::from_secs(1));
191+
a.await.expect("a wakes");
192+
// b should still be pending; advance more.
193+
clock.advance(Duration::from_secs(1));
194+
b.await.expect("b wakes");
195+
}
196+
}

0 commit comments

Comments
 (0)