Skip to content

Commit 20c513c

Browse files
authored
feat: ordererd submission (#1345)
1 parent 3a48d4c commit 20c513c

1 file changed

Lines changed: 15 additions & 13 deletions

File tree

ipc/provider/src/checkpoint.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use crate::config::Subnet;
66
use crate::manager::{BottomUpCheckpointRelayer, EthSubnetManager};
77
use crate::observe::CheckpointSubmitted;
88
use anyhow::{anyhow, Result};
9-
use futures_util::future::try_join_all;
109
use fvm_shared::address::Address;
1110
use fvm_shared::clock::ChainEpoch;
1211
use ipc_api::checkpoint::{BottomUpCheckpointBundle, QuorumReachedEvent};
@@ -17,6 +16,7 @@ use std::fmt::{Display, Formatter};
1716
use std::sync::{Arc, RwLock};
1817
use std::time::Duration;
1918
use tokio::sync::Semaphore;
19+
use tokio::time::timeout;
2020

2121
/// Tracks the config required for bottom up checkpoint submissions
2222
/// parent/child subnet and checkpoint period.
@@ -129,7 +129,7 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
129129
}
130130
}
131131

132-
/// Checks if the relayer has already submitted at the next submission epoch, if not it submits it.
132+
/// Checks if the relayer has already submitted at the next submission epoch, if not it submitts the bottom up checkpoint.
133133
async fn submit_next_epoch(&self, submitter: Address) -> Result<()> {
134134
let last_checkpoint_epoch = self
135135
.parent_handler
@@ -155,7 +155,6 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
155155
);
156156

157157
let mut count = 0;
158-
let mut all_submit_tasks = vec![];
159158

160159
for h in start..=finalized_height {
161160
let events = self.child_handler.quorum_reached_events(h).await?;
@@ -199,12 +198,12 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
199198
.clone()
200199
.acquire_owned()
201200
.await
202-
.unwrap();
203-
all_submit_tasks.push(tokio::task::spawn(async move {
201+
.expect("Semaphore is not poisoned");
202+
203+
let fut = async move {
204204
let height = event.height;
205205
let hash = bundle.checkpoint.block_hash.clone();
206-
207-
let result =
206+
let result: std::result::Result<(), anyhow::Error> =
208207
Self::submit_checkpoint(parent_handler_clone, submitter, bundle, event)
209208
.await
210209
.inspect(|_| {
@@ -221,16 +220,19 @@ impl<T: BottomUpCheckpointRelayer + Send + Sync + 'static> BottomUpCheckpointMan
221220

222221
drop(submission_permit);
223222
result
224-
}));
223+
};
224+
// TODO reevaluate the 30 seconds in practice, tentatively significantly to generous
225+
timeout(Duration::from_secs(30), fut)
226+
.await
227+
.map_err(|_elapsed| {
228+
anyhow!("Timeout was reached at checkpoint with index {count}")
229+
})??;
225230

226231
count += 1;
227-
tracing::debug!("This round has asynchronously submitted {count} checkpoints",);
232+
tracing::debug!("This round has submitted {count} checkpoints",);
228233
}
229234
}
230-
231-
tracing::debug!("Waiting for all submissions to finish");
232-
// Return error if any of the submit task failed.
233-
try_join_all(all_submit_tasks).await?;
235+
tracing::debug!("Submissions complete");
234236

235237
Ok(())
236238
}

0 commit comments

Comments
 (0)