Skip to content

Commit 765a60b

Browse files
authored
Merge branch 'main' into ma/binary-upgrade-test-scenarios
2 parents 8dec211 + 5996442 commit 765a60b

40 files changed

Lines changed: 2183 additions & 236 deletions

File tree

.config/nextest.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ report-name = "nextest"
6868
filter = """
6969
test(test_success_with_async_delay_with_epochs) |
7070
test(test_success_with_async_delay_2_with_epochs) |
71-
test(test_vid2_success) |
7271
test(test_combined_network_half_dc) |
7372
test(test_staggered_restart_first_block) |
7473
test(/test_staggered_restart$/) |

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/espresso/node/src/consensus_handle.rs

Lines changed: 57 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
1-
use std::{
2-
collections::HashMap,
3-
sync::{
4-
Arc,
5-
atomic::{AtomicBool, Ordering},
6-
},
7-
};
1+
use std::{collections::HashMap, sync::Arc};
82

93
use async_broadcast::{InactiveReceiver, Sender};
104
use async_lock::RwLock;
@@ -13,8 +7,12 @@ use futures::{FutureExt, StreamExt, future::BoxFuture, stream::BoxStream};
137
use hotshot::{traits::NodeImplementation, types::SystemContextHandle};
148
use hotshot_new_protocol::{
159
client::ClientApi,
16-
consensus::ConsensusOutput,
10+
consensus::{ConsensusOutput, PreCutoverSeed},
1711
coordinator::{Coordinator, CoordinatorOutput, error::Severity},
12+
cutover::{
13+
CutoverGate, extract_pre_cutover_seed, forward_legacy_epoch_changes,
14+
forward_legacy_timeout_votes,
15+
},
1816
network::Network,
1917
state::UpdateLeaf,
2018
storage::NewProtocolStorage,
@@ -78,15 +76,21 @@ where
7876
cert2: cert2.clone(),
7977
})
8078
},
81-
ConsensusOutput::ViewChanged(view, _epoch) => {
82-
Some(CoordinatorEvent::ViewChanged { view_number: *view })
83-
},
8479
ConsensusOutput::ProposalValidated { proposal, sender } => {
8580
Some(CoordinatorEvent::QuorumProposal {
8681
proposal: proposal.clone(),
8782
sender: sender.clone(),
8883
})
8984
},
85+
ConsensusOutput::BlockPayloadReconstructed {
86+
view,
87+
header,
88+
payload,
89+
} => Some(CoordinatorEvent::BlockPayloadReconstructed {
90+
view: *view,
91+
header: header.clone(),
92+
payload: payload.clone(),
93+
}),
9094
_ => None,
9195
}
9296
}
@@ -116,7 +120,7 @@ pub struct ConsensusHandle<T: NodeType, I: NodeImplementation<T>> {
116120
client_api: ClientApi<T>,
117121
coordinator_task: AbortOnDropHandle<()>,
118122
epoch_height: u64,
119-
new_protocol_active: AtomicBool,
123+
cutover_gate: CutoverGate,
120124
legacy_event_rx: InactiveReceiver<Event<T>>,
121125
event_rx: InactiveReceiver<CoordinatorEvent<T>>,
122126
}
@@ -147,17 +151,32 @@ where
147151
let coordinator_task =
148152
AbortOnDropHandle::new(spawn(run_coordinator(coordinator, event_tx)));
149153

154+
spawn(forward_legacy_timeout_votes(
155+
legacy_event_rx.clone(),
156+
client_api.clone(),
157+
));
158+
spawn(forward_legacy_epoch_changes(
159+
legacy_event_rx.clone(),
160+
client_api.clone(),
161+
epoch_height,
162+
));
163+
150164
Self {
151165
legacy_handle,
152166
client_api,
153167
coordinator_task,
154168
epoch_height,
155-
new_protocol_active: AtomicBool::new(false),
169+
cutover_gate: CutoverGate::new(),
156170
legacy_event_rx,
157171
event_rx: event_rx.deactivate(),
158172
}
159173
}
160174

175+
pub async fn extract_pre_cutover_seed(&self) -> Option<PreCutoverSeed<T>> {
176+
let legacy = self.legacy_handle.read().await;
177+
extract_pre_cutover_seed(&legacy).await
178+
}
179+
161180
pub fn legacy_consensus(&self) -> Arc<RwLock<SystemContextHandle<T, I>>> {
162181
self.legacy_handle.clone()
163182
}
@@ -166,7 +185,11 @@ where
166185
&self.client_api
167186
}
168187

169-
async fn new_protocol_at(&self, view: ViewNumber) -> bool {
188+
/// Whether `view` is at or past the new-protocol upgrade boundary,
189+
/// according to the legacy upgrade lock. This is a stateless version
190+
/// check — use it when routing per-view queries like `state(view)`.
191+
/// For "should we route to the coordinator?" use [`cutover_active`](Self::cutover_active).
192+
async fn at_or_past_cutover(&self, view: ViewNumber) -> bool {
170193
self.legacy_handle
171194
.read()
172195
.await
@@ -176,16 +199,16 @@ where
176199
>= NEW_PROTOCOL_VERSION
177200
}
178201

179-
async fn new_protocol(&self) -> bool {
180-
if self.new_protocol_active.load(Ordering::Relaxed) {
202+
/// Whether the cutover has happened — the gate has latched on this
203+
/// node. Stateful: the first call after legacy crosses the cutover
204+
/// view triggers seed extraction + dispatch. Use this for "should
205+
/// we route to the coordinator?" decisions.
206+
pub async fn cutover_active(&self) -> bool {
207+
if self.cutover_gate.is_active() {
181208
return true;
182209
}
183-
let view = self.legacy_handle.read().await.cur_view().await;
184-
let active = self.new_protocol_at(view).await;
185-
if active {
186-
self.new_protocol_active.store(true, Ordering::Relaxed);
187-
}
188-
active
210+
let legacy = self.legacy_handle.read().await;
211+
self.cutover_gate.check(&legacy, &self.client_api).await
189212
}
190213

191214
pub fn event_stream(&self) -> BoxStream<'static, CoordinatorEvent<T>> {
@@ -200,7 +223,7 @@ where
200223
}
201224

202225
pub async fn current_view(&self) -> ViewNumber {
203-
if self.new_protocol().await {
226+
if self.cutover_active().await {
204227
return self
205228
.client_api
206229
.current_view()
@@ -211,7 +234,7 @@ where
211234
}
212235

213236
pub async fn decided_leaf(&self) -> Leaf2<T> {
214-
if self.new_protocol().await {
237+
if self.cutover_active().await {
215238
return self
216239
.client_api
217240
.decided_leaf()
@@ -222,7 +245,7 @@ where
222245
}
223246

224247
pub async fn decided_state(&self) -> Option<Arc<T::ValidatedState>> {
225-
if self.new_protocol().await {
248+
if self.cutover_active().await {
226249
return self
227250
.client_api
228251
.decided_state()
@@ -233,7 +256,7 @@ where
233256
}
234257

235258
pub async fn state(&self, view: ViewNumber) -> Option<Arc<T::ValidatedState>> {
236-
if self.new_protocol_at(view).await {
259+
if self.at_or_past_cutover(view).await {
237260
return self
238261
.client_api
239262
.state(view)
@@ -244,7 +267,7 @@ where
244267
}
245268

246269
pub async fn state_and_delta(&self, view: ViewNumber) -> StateAndDelta<T> {
247-
if self.new_protocol_at(view).await {
270+
if self.at_or_past_cutover(view).await {
248271
return self
249272
.client_api
250273
.state_and_delta(view)
@@ -262,7 +285,7 @@ where
262285
}
263286

264287
pub async fn undecided_leaves(&self) -> Vec<Leaf2<T>> {
265-
if self.new_protocol().await {
288+
if self.cutover_active().await {
266289
return self
267290
.client_api
268291
.undecided_leaves()
@@ -280,7 +303,7 @@ where
280303
}
281304

282305
pub async fn current_epoch(&self) -> Option<EpochNumber> {
283-
if self.new_protocol().await {
306+
if self.cutover_active().await {
284307
return self
285308
.client_api
286309
.current_epoch()
@@ -291,7 +314,7 @@ where
291314
}
292315

293316
pub async fn epoch_height(&self) -> u64 {
294-
if self.new_protocol().await {
317+
if self.cutover_active().await {
295318
return self.epoch_height;
296319
}
297320
self.legacy_handle.read().await.epoch_height
@@ -372,7 +395,7 @@ where
372395
) -> anyhow::Result<
373396
BoxFuture<'static, anyhow::Result<SignedProposal<T, QuorumProposalWrapper<T>>>>,
374397
> {
375-
if self.new_protocol_at(view).await {
398+
if self.at_or_past_cutover(view).await {
376399
let client_api = self.client_api.clone();
377400
return Ok(async move {
378401
client_api
@@ -395,7 +418,7 @@ where
395418

396419
pub async fn submit_transaction(&self, tx: T::Transaction) -> anyhow::Result<()> {
397420
let view = self.current_view().await;
398-
if self.new_protocol_at(view).await {
421+
if self.at_or_past_cutover(view).await {
399422
return self
400423
.client_api
401424
.submit_transaction(tx)
@@ -417,7 +440,7 @@ where
417440
delta: Option<Arc<<T::ValidatedState as ValidatedState<T>>::Delta>>,
418441
) -> anyhow::Result<()> {
419442
let view = leaf.view_number();
420-
if self.new_protocol_at(view).await {
443+
if self.at_or_past_cutover(view).await {
421444
return self
422445
.client_api
423446
.update_leaf(UpdateLeaf {
@@ -441,7 +464,7 @@ where
441464
}
442465

443466
pub async fn start_consensus(&self) {
444-
if self.new_protocol().await {
467+
if self.cutover_active().await {
445468
// New protocol consensus is already running via the coordinator task.
446469
// Don't start legacy HotShot consensus tasks.
447470
return;

crates/espresso/node/src/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,8 @@ async fn handle_events<N, P>(
565565
{
566566
tracing::warn!("Failed to handle legacy external message: {:?}", err);
567567
}
568+
// Check if we're ready to start the new protocol
569+
consensus_handle.cutover_active().await;
568570
},
569571
CoordinatorEvent::ExternalMessageReceived { data, .. } => {
570572
if let Err(err) = external_event_handler.handle_event(data).await {

crates/hotshot/new-protocol/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ time = { workspace = true }
3737
tokio = { workspace = true }
3838
tracing = { workspace = true }
3939
tracing-subscriber = { workspace = true }
40+
url = { workspace = true }
41+
vec1 = { workspace = true }
4042
versions = { workspace = true }
4143

4244
[lints]

crates/hotshot/new-protocol/src/block.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ impl<T: NodeType> BlockBuilder<T> {
152152
// sleep so the coordinator's event queue doesnot overflow
153153
// because if there are no transactions then the block production is way too fast
154154
if buffer.is_empty() {
155-
sleep(Duration::from_millis(100)).await;
155+
sleep(Duration::from_millis(500)).await;
156156
}
157157
let (hashes, txs): (Vec<_>, Vec<_>) = buffer.into_iter().unzip();
158158
let manifest = DedupManifest {

crates/hotshot/new-protocol/src/client.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@ use committable::Commitment;
55
use hotshot_types::{
66
data::{EpochNumber, Leaf2, ViewNumber},
77
message::Proposal as SignedProposal,
8+
simple_vote::TimeoutVote2,
89
traits::{leaf_fetcher_network::LeafFetcherNetwork, node_implementation::NodeType},
910
utils::StateAndDelta,
1011
};
1112
use tokio::sync::{mpsc, oneshot};
1213

13-
use crate::{coordinator::error::CoordinatorError, message::Proposal, state::UpdateLeaf};
14+
use crate::{
15+
consensus::PreCutoverSeed, coordinator::error::CoordinatorError, message::Proposal,
16+
state::UpdateLeaf,
17+
};
1418

1519
#[derive(Clone)]
1620
pub struct ClientApi<T: NodeType> {
@@ -112,6 +116,28 @@ impl<T: NodeType> ClientApi<T> {
112116
.await?
113117
}
114118

119+
/// Forward a legacy `TimeoutVote2` into the new-protocol timeout collectors.
120+
pub async fn submit_timeout_vote(&self, vote: TimeoutVote2<T>) -> Result<(), QueryError> {
121+
let (respond, rx) = oneshot::channel();
122+
self.call(ClientRequest::SubmitTimeoutVote { vote, respond }, rx)
123+
.await
124+
}
125+
126+
/// Refresh the coordinator network's peer set for `epoch`.
127+
pub async fn bump_network_epoch(&self, epoch: EpochNumber) -> Result<(), QueryError> {
128+
let (respond, rx) = oneshot::channel();
129+
self.call(ClientRequest::BumpNetworkEpoch { epoch, respond }, rx)
130+
.await
131+
}
132+
133+
/// Bridge legacy state into the coordinator at the cutover.
134+
/// Idempotent at the consensus layer.
135+
pub async fn seed_pre_cutover(&self, seed: PreCutoverSeed<T>) -> Result<(), QueryError> {
136+
let (respond, rx) = oneshot::channel();
137+
self.call(ClientRequest::SeedPreCutover { seed, respond }, rx)
138+
.await
139+
}
140+
115141
async fn call<A>(
116142
&self,
117143
request: ClientRequest<T>,
@@ -192,6 +218,18 @@ pub(crate) enum ClientRequest<T: NodeType> {
192218
recipient: T::SignatureKey,
193219
respond: oneshot::Sender<Result<(), QueryError>>,
194220
},
221+
SeedPreCutover {
222+
seed: PreCutoverSeed<T>,
223+
respond: oneshot::Sender<()>,
224+
},
225+
SubmitTimeoutVote {
226+
vote: TimeoutVote2<T>,
227+
respond: oneshot::Sender<()>,
228+
},
229+
BumpNetworkEpoch {
230+
epoch: EpochNumber,
231+
respond: oneshot::Sender<()>,
232+
},
195233
}
196234

197235
#[derive(Debug, thiserror::Error)]

0 commit comments

Comments
 (0)