Skip to content

Commit df1207b

Browse files
committed
Await on the background processing task's JoinHandle
Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition.
1 parent c7889de commit df1207b

File tree

2 files changed

+35
-47
lines changed

2 files changed

+35
-47
lines changed

src/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,12 +1591,12 @@ fn build_with_store_internal(
15911591
};
15921592

15931593
let (stop_sender, _) = tokio::sync::watch::channel(());
1594-
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
1594+
let background_processor_task = Mutex::new(None);
15951595

15961596
Ok(Node {
15971597
runtime,
15981598
stop_sender,
1599-
event_handling_stopped_sender,
1599+
background_processor_task,
16001600
config,
16011601
wallet,
16021602
chain_source,

src/lib.rs

Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ uniffi::include_scaffolding!("ldk_node");
180180
pub struct Node {
181181
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
182182
stop_sender: tokio::sync::watch::Sender<()>,
183-
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
183+
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
184184
config: Arc<Config>,
185185
wallet: Arc<Wallet>,
186186
chain_source: Arc<ChainSource>,
@@ -579,8 +579,7 @@ impl Node {
579579
};
580580

581581
let background_stop_logger = Arc::clone(&self.logger);
582-
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
583-
runtime.spawn(async move {
582+
let handle = runtime.spawn(async move {
584583
process_events_async(
585584
background_persister,
586585
|e| background_event_handler.handle_event(e),
@@ -601,19 +600,8 @@ impl Node {
601600
panic!("Failed to process events");
602601
});
603602
log_debug!(background_stop_logger, "Events processing stopped.",);
604-
605-
match event_handling_stopped_sender.send(()) {
606-
Ok(_) => (),
607-
Err(e) => {
608-
log_error!(
609-
background_stop_logger,
610-
"Failed to send 'events handling stopped' signal. This should never happen: {}",
611-
e
612-
);
613-
debug_assert!(false);
614-
},
615-
}
616603
});
604+
*self.background_processor_task.lock().unwrap() = Some(handle);
617605

618606
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
619607
let mut stop_liquidity_handler = self.stop_sender.subscribe();
@@ -678,39 +666,39 @@ impl Node {
678666
// Stop any runtime-dependant chain sources.
679667
self.chain_source.stop();
680668

681-
// Wait until event handling stopped, at least until a timeout is reached.
682-
let event_handling_stopped_logger = Arc::clone(&self.logger);
683-
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
684-
685-
let timeout_res = tokio::task::block_in_place(move || {
686-
runtime.block_on(async {
687-
tokio::time::timeout(
688-
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
689-
event_handling_stopped_receiver.changed(),
690-
)
691-
.await
692-
})
693-
});
669+
// Wait until background processing stopped, at least until a timeout is reached.
670+
if let Some(background_processor_task) =
671+
self.background_processor_task.lock().unwrap().take()
672+
{
673+
let abort_handle = background_processor_task.abort_handle();
674+
let timeout_res = tokio::task::block_in_place(move || {
675+
runtime.block_on(async {
676+
tokio::time::timeout(
677+
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
678+
background_processor_task,
679+
)
680+
.await
681+
})
682+
});
694683

695-
match timeout_res {
696-
Ok(stop_res) => match stop_res {
697-
Ok(()) => {},
684+
match timeout_res {
685+
Ok(stop_res) => match stop_res {
686+
Ok(()) => {},
687+
Err(e) => {
688+
abort_handle.abort();
689+
log_error!(
690+
self.logger,
691+
"Stopping event handling failed. This should never happen: {}",
692+
e
693+
);
694+
panic!("Stopping event handling failed. This should never happen.");
695+
},
696+
},
698697
Err(e) => {
699-
log_error!(
700-
event_handling_stopped_logger,
701-
"Stopping event handling failed. This should never happen: {}",
702-
e
703-
);
704-
panic!("Stopping event handling failed. This should never happen.");
698+
abort_handle.abort();
699+
log_error!(self.logger, "Stopping event handling timed out: {}", e);
705700
},
706-
},
707-
Err(e) => {
708-
log_error!(
709-
event_handling_stopped_logger,
710-
"Stopping event handling timed out: {}",
711-
e
712-
);
713-
},
701+
}
714702
}
715703

716704
#[cfg(tokio_unstable)]

0 commit comments

Comments
 (0)