Skip to content

Commit 95cc247

Browse files
authored
feat: add parsig db (#278)
* feat: add deadline.rs * feat: deadline tests * fix: remove comments * wip: add parasig db and some tests * refactor: remove old app/deadline * feat: add rust docs * feat: add clone box and clone eq * feat: finish tests * fix: typo * fix: review comments * fix: build test issues * fix: review comments * fix: linter
1 parent 2fc04ce commit 95cc247

15 files changed

Lines changed: 889 additions & 435 deletions

File tree

Cargo.lock

Lines changed: 170 additions & 254 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ cancellation = "0.1.0"
3838
chrono = { version = "0.4", features = ["serde"] }
3939
clap = { version = "4.5.53", features = ["derive", "env", "cargo"] }
4040
crossbeam = "0.8.4"
41+
dyn-clone = "1.0"
42+
dyn-eq = "0.1.3"
4143
either = "1.13"
4244
futures = "0.3"
4345
futures-timer = "3.0"

crates/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ cancellation.workspace = true
1212
chrono.workspace = true
1313
crossbeam.workspace = true
1414
futures.workspace = true
15+
dyn-clone.workspace = true
16+
dyn-eq.workspace = true
1517
hex.workspace = true
1618
libp2p.workspace = true
19+
vise.workspace = true
1720
pluto-eth2api.workspace = true
1821
prost.workspace = true
1922
prost-types.workspace = true
@@ -38,6 +41,8 @@ prost-types.workspace = true
3841
hex.workspace = true
3942
chrono.workspace = true
4043
test-case.workspace = true
44+
pluto-eth2util.workspace = true
45+
pluto-testutil.workspace = true
4146
tokio = { workspace = true, features = ["test-util"] }
4247
wiremock.workspace = true
4348
pluto-ssz.workspace = true

crates/core/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,10 @@ pub mod version;
2222

2323
/// Duty deadline tracking and notification.
2424
pub mod deadline;
25+
26+
/// parsigdb
27+
pub mod parsigdb;
28+
29+
/// Test utilities.
30+
#[cfg(test)]
31+
pub mod testutils;
Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use tracing::{debug, warn};
55

66
use crate::{
77
deadline::Deadliner,
8-
parasigdb::metrics::PARASIG_DB_METRICS,
8+
parsigdb::metrics::PARSIG_DB_METRICS,
9+
signeddata::SignedDataError,
910
types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey},
1011
};
1112
use chrono::{DateTime, Utc};
@@ -55,9 +56,9 @@ pub type ThreshSub = Arc<
5556

5657
/// Helper to create an internal subscriber from a closure.
5758
///
58-
/// The closure receives owned copies of the duty and data set. Since the closure
59-
/// is `Fn` (can be called multiple times), you need to clone any captured Arc values
60-
/// before the `async move` block.
59+
/// The closure receives owned copies of the duty and data set. Since the
60+
/// closure is `Fn` (can be called multiple times), you need to clone any
61+
/// captured Arc values before the `async move` block.
6162
///
6263
/// # Example
6364
/// ```ignore
@@ -88,8 +89,8 @@ where
8889
/// Helper to create a threshold subscriber from a closure.
8990
///
9091
/// The closure receives owned copies of the duty and data. Since the closure
91-
/// is `Fn` (can be called multiple times), you need to clone any captured Arc values
92-
/// before the `async move` block.
92+
/// is `Fn` (can be called multiple times), you need to clone any captured Arc
93+
/// values before the `async move` block.
9394
///
9495
/// # Example
9596
/// ```ignore
@@ -128,6 +129,10 @@ pub enum MemDBError {
128129
/// Share index of the mismatched signature
129130
share_idx: u64,
130131
},
132+
133+
/// Signed data error.
134+
#[error("signed data error: {0}")]
135+
SignedDataError(#[from] SignedDataError),
131136
}
132137

133138
type Result<T> = std::result::Result<T, MemDBError>;
@@ -186,8 +191,8 @@ impl MemDB {
186191
impl MemDB {
187192
/// Registers a subscriber for internally generated partial signed data.
188193
///
189-
/// The subscriber will be called when the node generates partial signed data
190-
/// that needs to be exchanged with peers.
194+
/// The subscriber will be called when the node generates partial signed
195+
/// data that needs to be exchanged with peers.
191196
pub async fn subscribe_internal(&self, sub: InternalSub) -> Result<()> {
192197
let mut inner = self.inner.lock().await;
193198
inner.internal_subs.push(sub);
@@ -204,11 +209,13 @@ impl MemDB {
204209
Ok(())
205210
}
206211

207-
/// Stores internally generated partial signed data and notifies subscribers.
212+
/// Stores internally generated partial signed data and notifies
213+
/// subscribers.
208214
///
209-
/// This is called when the node generates partial signed data that needs to be
210-
/// stored and exchanged with peers. It first stores the data (via `store_external`),
211-
/// then calls all internal subscribers to trigger peer exchange.
215+
/// This is called when the node generates partial signed data that needs to
216+
/// be stored and exchanged with peers. It first stores the data (via
217+
/// `store_external`), then calls all internal subscribers to trigger
218+
/// peer exchange.
212219
pub async fn store_internal(&self, duty: &Duty, signed_set: &ParSignedDataSet) -> Result<()> {
213220
self.store_external(duty, signed_set).await?;
214221

@@ -226,9 +233,10 @@ impl MemDB {
226233

227234
/// Stores externally received partial signed data and checks for threshold.
228235
///
229-
/// This is called when the node receives partial signed data from peers. It stores
230-
/// the data, checks if enough matching signatures have been collected to meet the
231-
/// threshold, and calls threshold subscribers when the threshold is reached.
236+
/// This is called when the node receives partial signed data from peers. It
237+
/// stores the data, checks if enough matching signatures have been
238+
/// collected to meet the threshold, and calls threshold subscribers
239+
/// when the threshold is reached.
232240
pub async fn store_external(&self, duty: &Duty, signed_data: &ParSignedDataSet) -> Result<()> {
233241
let _ = self.deadliner.add(duty.clone()).await;
234242

@@ -239,7 +247,7 @@ impl MemDB {
239247
.store(
240248
Key {
241249
duty: duty.clone(),
242-
pub_key: pub_key.clone(),
250+
pub_key: *pub_key,
243251
},
244252
par_signed.clone(),
245253
)
@@ -257,7 +265,7 @@ impl MemDB {
257265
continue;
258266
};
259267

260-
output.insert(pub_key.clone(), psigs);
268+
output.insert(*pub_key, psigs);
261269
}
262270

263271
if output.is_empty() {
@@ -278,17 +286,15 @@ impl MemDB {
278286

279287
/// Trims expired duties from the database.
280288
///
281-
/// This method runs in a loop, listening for expired duties from the deadliner
282-
/// and removing their associated data from the database. It should be spawned
283-
/// as a background task and will run until the cancellation token is triggered.
289+
/// This method runs in a loop, listening for expired duties from the
290+
/// deadliner and removing their associated data from the database. It
291+
/// should be spawned as a background task and will run until the
292+
/// cancellation token is triggered.
284293
pub async fn trim(&self) {
285-
let deadliner_rx = self.deadliner.c();
286-
if deadliner_rx.is_none() {
294+
let Some(mut deadliner_rx) = self.deadliner.c() else {
287295
warn!("Deadliner channel is not available");
288296
return;
289-
}
290-
291-
let mut deadliner_rx = deadliner_rx.unwrap();
297+
};
292298

293299
loop {
294300
tokio::select! {
@@ -345,14 +351,10 @@ impl MemDB {
345351
.push(k.clone());
346352

347353
if k.duty.duty_type == DutyType::Exit {
348-
PARASIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc();
354+
PARSIG_DB_METRICS.exit_total[&k.pub_key.to_string()].inc();
349355
}
350356

351-
let result = inner
352-
.entries
353-
.get(&k)
354-
.map(|entries| entries.clone())
355-
.unwrap_or_default();
357+
let result = inner.entries.get(&k).cloned().unwrap_or_default();
356358

357359
Ok(Some(result))
358360
}
@@ -381,11 +383,11 @@ async fn get_threshold_matching(
381383
let mut sigs_by_msg_root: HashMap<[u8; 32], Vec<ParSignedData>> = HashMap::new();
382384

383385
for sig in sigs {
384-
let root = sig.signed_data.message_root();
385-
sigs_by_msg_root
386-
.entry(root)
387-
.or_insert_with(Vec::new)
388-
.push(sig.clone());
386+
let root = sig
387+
.signed_data
388+
.message_root()
389+
.map_err(MemDBError::SignedDataError)?;
390+
sigs_by_msg_root.entry(root).or_default().push(sig.clone());
389391
}
390392

391393
// Return the first set that has exactly threshold number of signatures

0 commit comments

Comments
 (0)