Skip to content

Commit 8f852de

Browse files
committed
feat: improve legacy top down, have a simpler storage node runner
1 parent e77672d commit 8f852de

20 files changed

Lines changed: 1205 additions & 309 deletions

File tree

fendermint/vm/interpreter/src/fvm/legacy_topdown.rs

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,21 @@ impl LegacyTopDownHandler {
4747
height: finality.height as u64,
4848
block_hash: finality.block_hash,
4949
};
50+
let quorum_threshold = atomically(|| self.votes.quorum_threshold()).await;
51+
52+
// In a single-validator subnet, self-attestation should not depend on local
53+
// cache catch-up. Otherwise aggressive proposals can be rejected by the same
54+
// node that created them, stalling consensus at one height.
55+
if quorum_threshold == 1 {
56+
let committed_height = atomically(|| {
57+
self.provider
58+
.last_committed_finality()
59+
.map(|f| f.map(|f| f.height).unwrap_or_default())
60+
})
61+
.await;
62+
return prop.height > committed_height;
63+
}
64+
5065
atomically(|| self.provider.check_proposal(&prop)).await
5166
}
5267

@@ -103,37 +118,101 @@ impl LegacyTopDownHandler {
103118
async fn chain_message_from_finality_or_quorum(&self) -> Option<ChainMessage> {
104119
atomically(|| self.votes.pause_votes_until_find_quorum()).await;
105120

106-
let (parent, quorum) = atomically(|| {
121+
let (parent, quorum, quorum_threshold) = atomically(|| {
107122
let parent = self.provider.next_proposal()?;
108123

109124
let quorum = self
110125
.votes
111126
.find_quorum()?
112127
.map(|(height, block_hash)| IPCParentFinality { height, block_hash });
113128

114-
Ok((parent, quorum))
129+
let quorum_threshold = self.votes.quorum_threshold()?;
130+
131+
Ok((parent, quorum, quorum_threshold))
115132
})
116133
.await;
117134

118135
let parent = parent?;
119136

120-
let quorum = if let Some(quorum) = quorum {
121-
quorum
122-
} else {
123-
emit!(
124-
DEBUG,
125-
ParentFinalityMissingQuorum {
126-
block_height: parent.height,
127-
block_hash: &hex::encode(&parent.block_hash),
128-
}
129-
);
130-
return None;
131-
};
132-
133-
let finality = if parent.height <= quorum.height {
134-
parent
137+
// Fast-path for single-validator subnets: quorum threshold is 1, so requiring
138+
// a separate vote-derived quorum can unnecessarily throttle catch-up.
139+
//
140+
// In addition, bypass `next_proposal()` bounds (`max_proposal_range`, `proposal_delay`)
141+
// and favor the freshest finalized parent view queried directly from parent RPC
142+
// (`chain_head - chain_head_delay`). This avoids being throttled by local cache
143+
// catch-up speed when the node is far behind.
144+
let finality = if quorum_threshold == 1 {
145+
let committed_height = atomically(|| {
146+
self.provider
147+
.last_committed_finality()
148+
.map(|f| f.map(|f| f.height).unwrap_or_default())
149+
})
150+
.await;
151+
152+
let remote_finalized = self
153+
.provider
154+
.latest_finalized_parent_view()
155+
.await
156+
.ok()
157+
.flatten();
158+
let candidate = if let Some(remote_finalized) =
159+
remote_finalized.filter(|f| f.height > committed_height)
160+
{
161+
remote_finalized
162+
} else {
163+
let latest_non_null = atomically(|| {
164+
let latest = self.provider.latest_height()?;
165+
let committed = self.provider.last_committed_finality()?;
166+
167+
let (latest, committed_height) = match (latest, committed) {
168+
(Some(latest), Some(committed)) => (latest, committed.height),
169+
_ => return Ok(None),
170+
};
171+
172+
if latest <= committed_height {
173+
return Ok(None);
174+
}
175+
176+
let latest_non_null = self
177+
.provider
178+
.first_non_null_block(latest)?
179+
.filter(|h| *h > committed_height);
180+
181+
let Some(height) = latest_non_null else {
182+
return Ok(None);
183+
};
184+
185+
let Some(block_hash) = self.provider.block_hash(height)? else {
186+
return Ok(None);
187+
};
188+
189+
Ok(Some(IPCParentFinality { height, block_hash }))
190+
})
191+
.await;
192+
193+
latest_non_null.unwrap_or(parent)
194+
};
195+
196+
candidate
135197
} else {
136-
quorum
198+
let quorum = if let Some(quorum) = quorum {
199+
quorum
200+
} else {
201+
emit!(
202+
DEBUG,
203+
ParentFinalityMissingQuorum {
204+
block_height: parent.height,
205+
block_hash: &hex::encode(&parent.block_hash),
206+
}
207+
);
208+
return None;
209+
};
210+
211+
if parent.height <= quorum.height {
212+
parent
213+
} else {
214+
quorum
215+
}
137216
};
138217

139218
Some(ChainMessage::Ipc(IpcMessage::TopDownExec(ParentFinality {

fendermint/vm/interpreter/src/fvm/topdown.rs

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ use crate::types::AppliedMessage;
2020
use ipc_api::cross::IpcEnvelope;
2121
use ipc_observability::emit;
2222

23+
/// Maximum inclusive parent-height span processed per legacy topdown chunk.
24+
const LEGACY_TOPDOWN_CHUNK_SIZE: BlockHeight = 200;
25+
2326
#[derive(Clone, Debug)]
2427
pub struct F3ExecutionCacheRetryConfig {
2528
pub backoff_initial: std::time::Duration,
@@ -399,41 +402,75 @@ where
399402
// be _at least_ 1 height behind.
400403
let (execution_fr, execution_to) = (prev_height + 1, finality.height);
401404

402-
// error happens if we cannot get the validator set from ipc agent after retries
403-
let validator_changes = legacy
404-
.validator_changes_from(execution_fr, execution_to)
405-
.await
406-
.context("failed to fetch validator changes")?;
407-
408-
tracing::debug!(
409-
from = execution_fr,
410-
to = execution_to,
411-
msgs = validator_changes.len(),
412-
"chain interpreter received total validator changes"
413-
);
405+
let mut total_validator_changes = 0usize;
406+
let mut total_topdown_msgs = 0usize;
407+
let mut ret: Option<AppliedMessage> = None;
408+
let mut chunk_start = execution_fr;
409+
while chunk_start <= execution_to {
410+
let chunk_end = chunk_start
411+
.saturating_add(LEGACY_TOPDOWN_CHUNK_SIZE.saturating_sub(1))
412+
.min(execution_to);
413+
414+
// error happens if we cannot get the validator set from ipc agent after retries
415+
let validator_changes = legacy
416+
.validator_changes_from(chunk_start, chunk_end)
417+
.await
418+
.context("failed to fetch validator changes")?;
419+
total_validator_changes += validator_changes.len();
420+
421+
tracing::debug!(
422+
chunk_start,
423+
chunk_end,
424+
changes = validator_changes.len(),
425+
"chain interpreter received validator changes chunk"
426+
);
414427

415-
self.inner
416-
.gateway_caller
417-
.store_validator_changes(state, validator_changes)
418-
.context("failed to store validator changes")?;
428+
self.inner
429+
.gateway_caller
430+
.store_validator_changes(state, validator_changes)
431+
.context("failed to store validator changes")?;
432+
433+
// error happens if we cannot get the cross messages from ipc agent after retries
434+
let msgs = legacy
435+
.top_down_msgs_from(chunk_start, chunk_end)
436+
.await
437+
.context("failed to fetch top down messages")?;
438+
total_topdown_msgs += msgs.len();
439+
440+
tracing::debug!(
441+
chunk_start,
442+
chunk_end,
443+
number_of_messages = msgs.len(),
444+
"chain interpreter received topdown messages chunk",
445+
);
446+
447+
if !msgs.is_empty() {
448+
ret = Some(
449+
self.execute_topdown_msgs(state, msgs)
450+
.await
451+
.context("failed to execute top down messages")?,
452+
);
453+
}
419454

420-
// error happens if we cannot get the cross messages from ipc agent after retries
421-
let msgs = legacy
422-
.top_down_msgs_from(execution_fr, execution_to)
423-
.await
424-
.context("failed to fetch top down messages")?;
455+
chunk_start = chunk_end.saturating_add(1);
456+
}
425457

426458
tracing::debug!(
427-
number_of_messages = msgs.len(),
428459
start = execution_fr,
429460
end = execution_to,
430-
"chain interpreter received topdown msgs",
461+
total_validator_changes,
462+
total_topdown_msgs,
463+
"chain interpreter processed topdown effects in chunks",
431464
);
432465

433-
let ret = self
434-
.execute_topdown_msgs(state, msgs)
435-
.await
436-
.context("failed to execute top down messages")?;
466+
// Preserve previous behavior: return an AppliedMessage even when there are no topdown msgs.
467+
let ret = if let Some(ret) = ret {
468+
ret
469+
} else {
470+
self.execute_topdown_msgs(state, Vec::<IpcEnvelope>::new())
471+
.await
472+
.context("failed to execute empty top down messages batch")?
473+
};
437474

438475
tracing::debug!("chain interpreter applied topdown msgs");
439476

0 commit comments

Comments
 (0)