Skip to content

Commit 3f4071c

Browse files
authored
feat(starfish-core,iota-core): emit misbehavior counts to iota-core (#11534)
# Description of change Stacks on top of #11531. Wires the producer-side misbehavior store (landed in #10088 + #11531) into the consumer in `iota-core` so that the `Scoreboard` / `ReportAggregator` (from #10779) finally receives per-authority misbehavior signal from Starfish. Each `CommittedSubDag` now carries a per-authority absolute snapshot of `persisted + in_memory` counts from `MisbehaviorStore` at commit time. Consumers diff against their own last-seen state if they want deltas — that responsibility doesn't belong in Starfish, and the sum is invariant across the eviction-time move between buckets, so the snapshot is race-free relative to flush. Producer (`starfish-core`): - `MisbehaviorStore::snapshot_totals()` — returns the per-authority absolute totals. - `CommittedSubDag` gets a new `misbehavior_counts: Vec<MisbehaviorCountsV1>` field, threaded at all three production `CommittedSubDag::new` call sites (`commit_solidifier`, `commit_observer`, `commit_syncer/fast`) via the `DagState`-owned `MisbehaviorStore`. - `CommittedSubDag` is **not** serialized over the wire (local `CommitConsumer` channel only), so the added `Vec` is an in-process cost only. Consumer (`iota-core`): - Overrides `ConsensusOutputAPI::misbehavior_counts()` on `starfish_core::CommittedSubDag` to transpose the per-authority struct-of-fields snapshot into the four per-field vecs expected by `ConsensusOutputMisbehaviorCounts`. Downstream wiring in `consensus_handler` and the report aggregator was already in place and waiting for non-empty data. Also folds in a standalone fix: `starfish-core` now declares its `iota-sdk-types` `serde` feature explicitly. Workspace builds were masking the missing feature via unification through `iota-types`; `cargo check -p starfish-core` alone fails without it. ## Links to any relevant issues fixes iotaledger/iota-private#406 Part of iotaledger/iota-private#173 ## How the change has been tested - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [x] Patch-specific tests (correctness, functionality coverage) - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have checked that new and existing unit tests pass locally with my changes Local verification: - `cargo ci-clippy` — clean. - `cargo nextest run -p starfish-core --lib` — new `test_snapshot_totals_sums_persisted_and_in_memory` passes; existing misbehavior tests pass. - `cargo nextest run -p iota-core --lib consensus_output_api::tests` — new transpose + empty-snapshot tests pass.
1 parent 98bbb57 commit 3f4071c

13 files changed

Lines changed: 182 additions & 8 deletions

File tree

crates/iota-core/src/consensus_handler.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,7 @@ mod tests {
883883
leader_header.timestamp_ms(),
884884
CommitRef::new(10, CommitDigest::MIN),
885885
vec![],
886+
vec![],
886887
);
887888

888889
// Test that the consensus handler respects backpressure.

crates/iota-core/src/consensus_types/consensus_output_api.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use std::{collections::BTreeMap, fmt::Display};
66

77
use iota_types::{digests::ConsensusCommitDigest, messages_consensus::ConsensusTransaction};
8+
use itertools::Itertools as _;
89

910
use crate::consensus_types::AuthorityIndex;
1011
/// A list of tuples of:
@@ -135,4 +136,90 @@ impl ConsensusOutputAPI for starfish_core::CommittedSubDag {
135136
});
136137
num_of_committed_headers.into_iter().collect()
137138
}
139+
140+
fn misbehavior_counts(&self) -> ConsensusOutputMisbehaviorCounts {
141+
let (faulty_blocks_provable, faulty_blocks_unprovable, missing_proposals, equivocations) =
142+
self.misbehavior_counts
143+
.iter()
144+
.map(|counts| match counts {
145+
starfish_core::MisbehaviorCounts::V1(v1) => (
146+
v1.faulty_blocks_provable,
147+
v1.faulty_blocks_unprovable,
148+
v1.missing_proposals,
149+
v1.equivocations,
150+
),
151+
})
152+
.multiunzip();
153+
ConsensusOutputMisbehaviorCounts {
154+
faulty_blocks_provable,
155+
faulty_blocks_unprovable,
156+
missing_proposals,
157+
equivocations,
158+
}
159+
}
160+
}
161+
162+
#[cfg(test)]
163+
mod tests {
164+
use starfish_core::{
165+
BlockRef, CommitDigest, CommitRef, CommittedSubDag, MisbehaviorCounts, MisbehaviorCountsV1,
166+
VerifiedBlockHeader,
167+
};
168+
169+
use super::*;
170+
171+
#[test]
172+
fn test_misbehavior_counts_transposes_per_authority_to_per_field_vecs() {
173+
let counts = vec![
174+
MisbehaviorCounts::V1(MisbehaviorCountsV1 {
175+
faulty_blocks_provable: 1,
176+
faulty_blocks_unprovable: 2,
177+
missing_proposals: 3,
178+
equivocations: 4,
179+
}),
180+
MisbehaviorCounts::V1(MisbehaviorCountsV1 {
181+
faulty_blocks_provable: 10,
182+
faulty_blocks_unprovable: 20,
183+
missing_proposals: 30,
184+
equivocations: 40,
185+
}),
186+
MisbehaviorCounts::default(),
187+
];
188+
189+
let subdag = CommittedSubDag::new(
190+
BlockRef::MIN,
191+
Vec::<VerifiedBlockHeader>::new(),
192+
vec![],
193+
vec![],
194+
0,
195+
CommitRef::new(1, CommitDigest::MIN),
196+
vec![],
197+
counts,
198+
);
199+
200+
let out = subdag.misbehavior_counts();
201+
assert_eq!(out.faulty_blocks_provable, vec![1, 10, 0]);
202+
assert_eq!(out.faulty_blocks_unprovable, vec![2, 20, 0]);
203+
assert_eq!(out.missing_proposals, vec![3, 30, 0]);
204+
assert_eq!(out.equivocations, vec![4, 40, 0]);
205+
}
206+
207+
#[test]
208+
fn test_misbehavior_counts_empty_snapshot_produces_empty_vecs() {
209+
let subdag = CommittedSubDag::new(
210+
BlockRef::MIN,
211+
Vec::<VerifiedBlockHeader>::new(),
212+
vec![],
213+
vec![],
214+
0,
215+
CommitRef::new(1, CommitDigest::MIN),
216+
vec![],
217+
vec![],
218+
);
219+
let out = subdag.misbehavior_counts();
220+
assert!(out.faulty_blocks_provable.is_empty());
221+
assert!(out.faulty_blocks_unprovable.is_empty());
222+
assert!(out.missing_proposals.is_empty());
223+
assert!(out.equivocations.is_empty());
224+
}
138225
}

crates/iota-core/src/unit_tests/starfish_manager_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ async fn test_starfish_consensus_handler_handles_older_commits() {
201201
1000 + commit_idx * 1000,
202202
StarfishCommitRef::new(commit_idx as u32, StarfishCommitDigest::MIN),
203203
vec![],
204+
vec![],
204205
)
205206
})
206207
.collect();

crates/starfish/core/src/commit.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::{
2525
},
2626
context::Context,
2727
leader_scoring::ReputationScores,
28+
misbehavior_store::MisbehaviorCounts,
2829
storage::Store,
2930
transaction_ref::{GenericTransactionRef, GenericTransactionRefAPI as _, TransactionRef},
3031
};
@@ -578,6 +579,10 @@ pub struct CommittedSubDag {
578579
pub base: SubDagBase,
579580
/// All the committed blocks that are part of this sub-dag
580581
pub transactions: Vec<VerifiedTransactions>,
582+
/// Absolute per-authority misbehavior counts (`persisted + in_memory`)
583+
/// snapshotted from `MisbehaviorStore` at emission. Indexed by
584+
/// `AuthorityIndex`; consumers diff for deltas.
585+
pub misbehavior_counts: Vec<MisbehaviorCounts>,
581586
}
582587

583588
impl CommittedSubDag {
@@ -590,6 +595,7 @@ impl CommittedSubDag {
590595
timestamp_ms: BlockTimestampMs,
591596
commit_ref: CommitRef,
592597
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
598+
misbehavior_counts: Vec<MisbehaviorCounts>,
593599
) -> Self {
594600
Self {
595601
base: SubDagBase {
@@ -601,6 +607,7 @@ impl CommittedSubDag {
601607
reputation_scores_desc,
602608
},
603609
transactions,
610+
misbehavior_counts,
604611
}
605612
}
606613

crates/starfish/core/src/commit_observer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ impl CommitObserver {
247247
commit.timestamp_ms(),
248248
commit.reference(),
249249
reputation_scores,
250+
self.dag_state.read().misbehavior_store().snapshot_totals(),
250251
))
251252
}
252253

crates/starfish/core/src/commit_solidifier.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ impl CommitSolidifier {
198198
.map(|tx| tx.expect("Transaction must exist since we checked"))
199199
.collect();
200200

201+
// Delayed subdags see current store state, not state at leader-round
202+
// commit time. Safe: counts are absolute and consumers merge-max.
203+
let misbehavior_counts = dag_state.misbehavior_store().snapshot_totals();
201204
Ok(CommittedSubDag::new(
202205
subdag.leader,
203206
subdag.base.headers.clone(),
@@ -206,6 +209,7 @@ impl CommitSolidifier {
206209
subdag.timestamp_ms,
207210
subdag.commit_ref,
208211
subdag.reputation_scores_desc.clone(),
212+
misbehavior_counts,
209213
))
210214
} else {
211215
Err(missing)

crates/starfish/core/src/commit_syncer/fast.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,10 @@ impl<C: NetworkClient> FastCommitSyncer<C> {
662662
// For fast commit sync, we use block headers refs and reputation scores from
663663
// the commit.
664664
let mut committed_subdags = Vec::new();
665+
// Replayed commits share one snapshot of current store state; downstream
666+
// consumers merge-max against their last-seen, so repeating absolute
667+
// totals across the batch is a no-op after the first.
668+
let misbehavior_counts = inner.dag_state.read().misbehavior_store().snapshot_totals();
665669
for commit in &commits {
666670
// Get block headers from the commit
667671
let committed_header_refs = commit.block_headers().to_vec();
@@ -684,6 +688,7 @@ impl<C: NetworkClient> FastCommitSyncer<C> {
684688
commit.timestamp_ms(),
685689
commit.reference(),
686690
reputation_scores,
691+
misbehavior_counts.clone(),
687692
));
688693
}
689694

crates/starfish/core/src/core.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2678,7 +2678,12 @@ mod test {
26782678
// Record traversed headers and sequenced transactions
26792679
while let Some(sub_dag) = commit_receiver_own.recv().await {
26802680
let sub_dag_leader_round = sub_dag.leader.round;
2681-
let CommittedSubDag { base, transactions } = sub_dag;
2681+
let CommittedSubDag {
2682+
base,
2683+
transactions,
2684+
misbehavior_counts,
2685+
} = sub_dag;
2686+
assert_eq!(misbehavior_counts.len(), committee_size);
26822687

26832688
for block_ref in &base.committed_header_refs {
26842689
existing_headers.insert(*block_ref);

crates/starfish/core/src/dag_state.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2503,7 +2503,6 @@ impl DagState {
25032503
self.pending_acknowledgments = acknowledgments.into_iter().collect::<BTreeSet<_>>();
25042504
}
25052505

2506-
#[cfg(test)]
25072506
pub(crate) fn misbehavior_store(&self) -> &MisbehaviorStore {
25082507
&self.misbehavior_store
25092508
}

crates/starfish/core/src/leader_schedule.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,7 @@ mod tests {
841841
context.clock.timestamp_utc_ms(),
842842
CommitRef::new(1, CommitDigest::MIN),
843843
vec![],
844+
vec![],
844845
)
845846
.base,
846847
];
@@ -944,6 +945,7 @@ mod tests {
944945
context.clock.timestamp_utc_ms(),
945946
last_commit.reference(),
946947
vec![],
948+
vec![],
947949
)
948950
.base,
949951
];

0 commit comments

Comments
 (0)