|
6 | 6 | mod sequencer_consensus_context_test; |
7 | 7 |
|
8 | 8 | use std::cmp::max; |
9 | | -use std::collections::{BTreeMap, HashMap}; |
| 9 | +use std::collections::{BTreeMap, HashMap, VecDeque}; |
10 | 10 | use std::sync::{Arc, Mutex}; |
11 | 11 | use std::time::Duration; |
12 | 12 |
|
@@ -37,7 +37,11 @@ use apollo_protobuf::consensus::{ |
37 | 37 | TransactionBatch, |
38 | 38 | Vote, |
39 | 39 | }; |
40 | | -use apollo_state_sync_types::communication::{SharedStateSyncClient, StateSyncClientError}; |
| 40 | +use apollo_state_sync_types::communication::{ |
| 41 | + SharedStateSyncClient, |
| 42 | + StateSyncClientError, |
| 43 | + StateSyncClientResult, |
| 44 | +}; |
41 | 45 | use apollo_state_sync_types::errors::StateSyncError; |
42 | 46 | use apollo_state_sync_types::state_sync_types::SyncBlock; |
43 | 47 | use apollo_time::time::Clock; |
@@ -305,39 +309,40 @@ impl SequencerConsensusContext { |
305 | 309 | self.fee_proposals_window = self.fee_proposals_window.split_off(&cutoff); |
306 | 310 | } |
307 | 311 |
|
308 | | - /// SNIP-35: bootstrap the fee_proposals window from state_sync on startup so `fee_actual` is |
309 | | - /// available immediately. Pre-V0_14_3 blocks return `Ok` with `fee_proposal_fri == None` and |
310 | | - /// contribute nothing; an `Err` from state_sync means the block could not be retrieved, so we |
311 | | - /// log and stop bootstrapping. |
312 | | - // TODO(AndrewL): On `Err`, state_sync may simply not have downloaded the block yet rather than |
313 | | - // the block being absent. Returning a partial window here is non-deterministic across nodes |
314 | | - // (different local sync progress → different windows → different `fee_actual`). Resolving this |
315 | | - // requires a startup gate that guarantees state_sync is caught up to `height - 1` before |
316 | | - // consensus enters `set_height_and_round`, since blocking inside this call to retry is unsafe |
317 | | - // (peers may already be voting at this height). Track and fix in a follow-up PR. |
318 | | - async fn bootstrap_fee_proposals_window(&mut self, height: BlockNumber) { |
| 312 | + /// Fill `[start_height - WINDOW, start_height)` from local state_sync storage. Must be |
| 313 | + /// called before `run_consensus` so the window is populated before voting begins. Blocks |
| 314 | + /// state_sync has not caught up to yet are pushed to the back of the queue and revisited — |
| 315 | + /// joining consensus with a partial window would make this node disagree with caught-up |
| 316 | + /// peers on `fee_actual`. Other state_sync errors propagate. |
| 317 | + pub async fn initialize_fee_proposals_window( |
| 318 | + &mut self, |
| 319 | + start_height: BlockNumber, |
| 320 | + ) -> StateSyncClientResult<()> { |
| 321 | + const STATE_SYNC_RETRY_INTERVAL: Duration = Duration::from_millis(500); |
319 | 322 | let window_size = |
320 | 323 | u64::try_from(FEE_PROPOSAL_WINDOW_SIZE).expect("FEE_PROPOSAL_WINDOW_SIZE fits in u64"); |
321 | | - let start = height.0.saturating_sub(window_size); |
322 | | - for h in start..height.0 { |
323 | | - let block_number = BlockNumber(h); |
| 324 | + let window_end_height = start_height.0; |
| 325 | + let window_start_height = window_end_height.saturating_sub(window_size); |
| 326 | + let mut pending_heights: VecDeque<BlockNumber> = |
| 327 | + (window_start_height..window_end_height).map(BlockNumber).collect(); |
| 328 | + while let Some(block_number) = pending_heights.pop_front() { |
324 | 329 | match self.deps.state_sync_client.get_block(block_number).await { |
325 | | - Ok(block) => { |
326 | | - self.record_fee_proposal( |
327 | | - block_number, |
328 | | - block.block_header_without_hash.fee_proposal_fri, |
329 | | - ); |
330 | | - } |
331 | | - Err(e) => { |
| 330 | + Ok(block) => self.record_fee_proposal( |
| 331 | + block_number, |
| 332 | + block.block_header_without_hash.fee_proposal_fri, |
| 333 | + ), |
| 334 | + Err(StateSyncClientError::StateSyncError(StateSyncError::BlockNotFound(_))) => { |
332 | 335 | warn!( |
333 | | - "SNIP-35 bootstrap failed at block {h}: {e:?}. Window has {} / \ |
334 | | - {FEE_PROPOSAL_WINDOW_SIZE} entries.", |
335 | | - self.fee_proposals_window.len() |
| 336 | + "State sync not ready for height {block_number}; re-queueing after \ |
| 337 | + {STATE_SYNC_RETRY_INTERVAL:?}" |
336 | 338 | ); |
337 | | - break; |
| 339 | + pending_heights.push_back(block_number); |
| 340 | + tokio::time::sleep(STATE_SYNC_RETRY_INTERVAL).await; |
338 | 341 | } |
| 342 | + Err(e) => return Err(e), |
339 | 343 | } |
340 | 344 | } |
| 345 | + Ok(()) |
341 | 346 | } |
342 | 347 |
|
343 | 348 | async fn start_stream(&mut self, stream_id: HeightAndRound) -> StreamSender { |
@@ -954,11 +959,6 @@ impl ConsensusContext for SequencerConsensusContext { |
954 | 959 | let gas_price_u64 = u64::try_from(self.l2_gas_price.0).unwrap_or(u64::MAX); |
955 | 960 | CONSENSUS_L2_GAS_PRICE.set_lossy(gas_price_u64); |
956 | 961 | } |
957 | | - // SNIP-35: on first height, bootstrap the window so fee_actual is available |
958 | | - // immediately. |
959 | | - if self.current_height.is_none() && self.fee_proposals_window.is_empty() { |
960 | | - self.bootstrap_fee_proposals_window(height).await; |
961 | | - } |
962 | 962 | self.current_height = Some(height); |
963 | 963 | self.current_round = round; |
964 | 964 | self.prune_fee_proposals_window(height); |
|
0 commit comments