Skip to content

Commit 832d97a

Browse files
apollo_integration_tests: await abort handles when shutting down nodes in integration test (#14078)
Signed-off-by: Dori Medini <dori@starkware.co>
1 parent 432ba54 commit 832d97a

6 files changed

Lines changed: 41 additions & 16 deletions

File tree

crates/apollo_integration_tests/src/bin/sequencer_node_end_to_end_integration_tests/integration_test_positive_flow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async fn main() {
5151
.await;
5252

5353
info!("Shutting down nodes.");
54-
integration_test_manager.shutdown_nodes(node_indices);
54+
integration_test_manager.shutdown_nodes(node_indices).await;
5555

5656
info!("Positive flow integration test completed successfully!");
5757
}

crates/apollo_integration_tests/src/bin/sequencer_node_end_to_end_integration_tests/integration_test_restart_flow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ async fn main() {
5757
info!("Awaiting transactions while all nodes are up");
5858
integration_test_manager.poll_all_running_nodes_received_more_txs(TIMEOUT).await;
5959

60-
integration_test_manager.shutdown_nodes([RESTART_NODE].into());
60+
integration_test_manager.shutdown_nodes([RESTART_NODE].into()).await;
6161
info!("Awaiting transactions while node {RESTART_NODE} is down");
6262
integration_test_manager.poll_all_running_nodes_received_more_txs(TIMEOUT).await;
6363

@@ -77,7 +77,7 @@ async fn main() {
7777

7878
// Shutdown a second node to test that the restarted node has joined consensus (the network
7979
// can't reach consensus without the restarted node if the second node is down).
80-
integration_test_manager.shutdown_nodes([SHUTDOWN_NODE].into());
80+
integration_test_manager.shutdown_nodes([SHUTDOWN_NODE].into()).await;
8181
// Shutting down a node that's already down results in an error so we remove it from the set
8282
// here.
8383
node_indices.remove(&SHUTDOWN_NODE);
@@ -98,6 +98,6 @@ async fn main() {
9898

9999
integration_test_manager.verify_block_hash_across_all_running_nodes(None).await;
100100

101-
integration_test_manager.shutdown_nodes(node_indices);
101+
integration_test_manager.shutdown_nodes(node_indices).await;
102102
info!("Restart flow integration test completed successfully!");
103103
}

crates/apollo_integration_tests/src/bin/sequencer_node_end_to_end_integration_tests/integration_test_restart_service_multiple_nodes_flow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ async fn main() {
8080
.await;
8181

8282
info!("Shutting down nodes.");
83-
integration_test_manager.shutdown_nodes(node_indices);
83+
integration_test_manager.shutdown_nodes(node_indices).await;
8484

8585
info!("Restart service multiple nodes flow integration test completed successfully!");
8686
}

crates/apollo_integration_tests/src/bin/sequencer_node_end_to_end_integration_tests/integration_test_restart_service_single_node_flow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,6 @@ async fn main() {
8989

9090
integration_test_manager.verify_block_hash_across_all_running_nodes(None).await;
9191

92-
integration_test_manager.shutdown_nodes(node_indices);
92+
integration_test_manager.shutdown_nodes(node_indices).await;
9393
info!("Restart service single node flow integration test completed successfully!");
9494
}

crates/apollo_integration_tests/src/bin/sequencer_node_end_to_end_integration_tests/integration_test_revert_flow.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async fn main() {
6969
.await;
7070

7171
info!("Shutting down nodes.");
72-
integration_test_manager.shutdown_nodes(node_indices.clone());
72+
integration_test_manager.shutdown_nodes(node_indices.clone()).await;
7373

7474
let expected_block_number_after_revert = REVERT_UP_TO_AND_INCLUDING.prev().unwrap_or_default();
7575
info!(
@@ -98,7 +98,7 @@ async fn main() {
9898
.await;
9999

100100
info!("All nodes reverted to block {expected_block_number_after_revert}. Shutting down nodes.");
101-
integration_test_manager.shutdown_nodes(node_indices.clone());
101+
integration_test_manager.shutdown_nodes(node_indices.clone()).await;
102102

103103
// Restore the tx generator state.
104104
*integration_test_manager.tx_generator_mut() = tx_generator_snapshot;
@@ -129,7 +129,7 @@ async fn main() {
129129
))
130130
.await;
131131

132-
integration_test_manager.shutdown_nodes(node_indices);
132+
integration_test_manager.shutdown_nodes(node_indices).await;
133133

134134
info!("Revert flow integration test completed successfully!");
135135
}

crates/apollo_integration_tests/src/integration_test_manager.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -660,23 +660,48 @@ impl IntegrationTestManager {
660660
});
661661
}
662662

663-
pub fn shutdown_nodes(&mut self, nodes_to_shutdown: HashSet<usize>) {
664-
nodes_to_shutdown.into_iter().for_each(|index| {
663+
pub async fn shutdown_nodes(&mut self, nodes_to_shutdown: HashSet<usize>) {
664+
let mut all_handles: Vec<AbortOnDropHandle<()>> = Vec::new();
665+
666+
for index in nodes_to_shutdown {
665667
let running_node = self
666668
.running_nodes
667669
.remove(&index)
668670
.unwrap_or_else(|| panic!("Node {index} is not in the running map."));
669671
// TODO(Tsabary): should this function call `shutdown_node_services` per node as well?
670-
running_node.executable_handles.values().for_each(|handle| {
672+
let RunningNode { node_setup, executable_handles } = running_node;
673+
let handles: Vec<AbortOnDropHandle<()>> = executable_handles.into_values().collect();
674+
for handle in &handles {
671675
assert!(!handle.is_finished(), "Node {index} should still be running.");
672676
handle.abort();
673-
});
677+
}
674678
assert!(
675-
self.idle_nodes.insert(index, running_node.node_setup).is_none(),
679+
self.idle_nodes.insert(index, node_setup).is_none(),
676680
"Node {index} is already in the idle map."
677681
);
678-
info!("Node {} has been shut down.", index);
679-
});
682+
info!("Node {index} has been aborted and should shut down soon.");
683+
all_handles.extend(handles);
684+
}
685+
686+
// Wait for all tasks to be cancelled. When each task is cancelled its Child process handle
687+
// is dropped, which triggers kill_on_drop and sends SIGKILL to the child process.
688+
let results = tokio::time::timeout(Duration::from_secs(10), join_all(all_handles))
689+
.await
690+
.expect("Node tasks did not cancel within timeout.");
691+
692+
// Assert that all tasks were cancelled. Any shutdown for any other reason should be
693+
// considered a bug.
694+
for result in results {
695+
match result {
696+
Err(e) if e.is_cancelled() => {}
697+
Ok(()) => panic!("A node task exited before shutdown — node may have crashed."),
698+
Err(e) => panic!("A node task failed during shutdown: {e:?}"),
699+
}
700+
}
701+
info!("All nodes have been shut down.");
702+
703+
// Brief pause to let the OS release ports after SIGKILL before new processes bind them.
704+
sleep(Duration::from_millis(100)).await;
680705
}
681706

682707
pub async fn send_deploy_and_invoke_txs_and_verify(&mut self) {

0 commit comments

Comments
 (0)