Skip to content

Commit a49204f

Browse files
committed
fix: release rgb_send_lock on all channel open failure paths
- FundingGenerationReady: release lock unconditionally after funding_transaction_generated (not just on error) - ChannelPending: release lock before spawn_blocking, handle JoinError - ChannelReady: handle JoinError instead of unwrap - ChannelClosed: release lock when channel_funding_txo is None - Add 30s timeout on the lock as safety net for cases where no LDK event fires (e.g. peer disconnects before accept_channel exchange)
1 parent c2d0849 commit a49204f

3 files changed

Lines changed: 100 additions & 29 deletions

File tree

src/ldk.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -690,8 +690,8 @@ async fn handle_ldk_events(
690690
{
691691
tracing::error!(
692692
"ERROR: Channel went away before we could fund it. The peer disconnected or refused the channel.");
693-
*unlocked_state.rgb_send_lock.lock().unwrap() = false;
694693
}
694+
*unlocked_state.rgb_send_lock.lock().unwrap() = None;
695695
}
696696
Event::FundingTxBroadcastSafe { .. } => {
697697
// We don't use the manual broadcasting feature, so this event should never be seen.
@@ -1055,21 +1055,28 @@ async fn handle_ldk_events(
10551055
is_channel_rgb(&channel_id, &PathBuf::from(&static_state.ldk_data_dir));
10561056
tracing::info!("Initiator of the channel (colored: {})", is_chan_colored);
10571057

1058-
let _txid = tokio::task::spawn_blocking(move || {
1058+
*unlocked_state.rgb_send_lock.lock().unwrap() = None;
1059+
1060+
let txid_result = tokio::task::spawn_blocking(move || {
10591061
if is_chan_colored {
10601062
state_copy.rgb_send_end(psbt_str_copy).map(|r| r.txid)
10611063
} else {
10621064
state_copy.rgb_send_btc_end(psbt_str_copy)
10631065
}
10641066
})
1065-
.await
1066-
.unwrap()
1067-
.map_err(|e| {
1068-
tracing::error!("Error completing channel opening: {e:?}");
1069-
ReplayEvent()
1070-
})?;
1067+
.await;
10711068

1072-
*unlocked_state.rgb_send_lock.lock().unwrap() = false;
1069+
match txid_result {
1070+
Ok(Ok(_txid)) => {}
1071+
Ok(Err(e)) => {
1072+
tracing::error!("Error completing channel opening: {e:?}");
1073+
return Err(ReplayEvent());
1074+
}
1075+
Err(e) => {
1076+
tracing::error!("Channel opening task panicked: {e:?}");
1077+
return Err(ReplayEvent());
1078+
}
1079+
}
10731080
} else {
10741081
// acceptor
10751082
let consignment_path = static_state
@@ -1102,20 +1109,25 @@ async fn handle_ldk_events(
11021109
hex_str(&counterparty_node_id.serialize()),
11031110
);
11041111

1105-
tokio::task::spawn_blocking(move || {
1112+
match tokio::task::spawn_blocking(move || {
11061113
unlocked_state.rgb_refresh(false).unwrap();
11071114
unlocked_state.rgb_refresh(true).unwrap()
11081115
})
11091116
.await
1110-
.unwrap();
1117+
{
1118+
Ok(_) => {}
1119+
Err(e) => {
1120+
tracing::error!("RGB refresh task panicked on channel ready: {e:?}");
1121+
}
1122+
}
11111123
}
11121124
Event::ChannelClosed {
11131125
channel_id,
11141126
reason,
11151127
user_channel_id: _,
11161128
counterparty_node_id,
11171129
channel_capacity_sats: _,
1118-
channel_funding_txo: _,
1130+
channel_funding_txo,
11191131
last_local_balance_msat: _,
11201132
} => {
11211133
tracing::info!(
@@ -1127,6 +1139,10 @@ async fn handle_ldk_events(
11271139
reason
11281140
);
11291141

1142+
if channel_funding_txo.is_none() {
1143+
*unlocked_state.rgb_send_lock.lock().unwrap() = None;
1144+
}
1145+
11301146
unlocked_state.delete_channel_id(channel_id);
11311147
}
11321148
Event::DiscardFunding { channel_id, .. } => {
@@ -1137,7 +1153,7 @@ async fn handle_ldk_events(
11371153
channel_id
11381154
);
11391155

1140-
*unlocked_state.rgb_send_lock.lock().unwrap() = false;
1156+
*unlocked_state.rgb_send_lock.lock().unwrap() = None;
11411157

11421158
unlocked_state.delete_channel_id(channel_id);
11431159
}
@@ -2105,7 +2121,7 @@ pub(crate) async fn start_ldk(
21052121
taker_swaps,
21062122
router: Arc::clone(&router),
21072123
output_sweeper: Arc::clone(&output_sweeper),
2108-
rgb_send_lock: Arc::new(Mutex::new(false)),
2124+
rgb_send_lock: Arc::new(Mutex::new(None)),
21092125
channel_ids_map,
21102126
proxy_endpoint: proxy_endpoint.to_string(),
21112127
});

src/routes.rs

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,25 @@ const OPENCHANNEL_MIN_RGB_AMT: u64 = 1;
107107

108108
pub const DUST_LIMIT_MSAT: u64 = 546000;
109109

110+
const RGB_SEND_LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
111+
112+
fn is_rgb_send_locked(lock: &Option<std::time::Instant>) -> bool {
113+
match lock {
114+
None => false,
115+
Some(t) => {
116+
if t.elapsed() > RGB_SEND_LOCK_TIMEOUT {
117+
tracing::warn!(
118+
"rgb_send_lock timed out after {}s, releasing",
119+
t.elapsed().as_secs()
120+
);
121+
false
122+
} else {
123+
true
124+
}
125+
}
126+
}
127+
}
128+
110129
const INVOICE_MIN_MSAT: u64 = HTLC_MIN_MSAT;
111130

112131
pub(crate) const DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 14;
@@ -1936,8 +1955,14 @@ pub(crate) async fn issue_asset_cfa(
19361955
let guard = state.check_unlocked().await?;
19371956
let unlocked_state = guard.as_ref().unwrap();
19381957

1939-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
1940-
return Err(APIError::OpenChannelInProgress);
1958+
{
1959+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
1960+
if is_rgb_send_locked(&lock) {
1961+
return Err(APIError::OpenChannelInProgress);
1962+
}
1963+
if lock.is_some() {
1964+
*lock = None;
1965+
}
19411966
}
19421967

19431968
let file_path = payload.file_digest.map(|d: String| {
@@ -1971,8 +1996,14 @@ pub(crate) async fn issue_asset_nia(
19711996
let guard = state.check_unlocked().await?;
19721997
let unlocked_state = guard.as_ref().unwrap();
19731998

1974-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
1975-
return Err(APIError::OpenChannelInProgress);
1999+
{
2000+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
2001+
if is_rgb_send_locked(&lock) {
2002+
return Err(APIError::OpenChannelInProgress);
2003+
}
2004+
if lock.is_some() {
2005+
*lock = None;
2006+
}
19762007
}
19772008

19782009
let asset = unlocked_state.rgb_issue_asset_nia(
@@ -1997,8 +2028,14 @@ pub(crate) async fn issue_asset_uda(
19972028
let guard = state.check_unlocked().await?;
19982029
let unlocked_state = guard.as_ref().unwrap();
19992030

2000-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
2001-
return Err(APIError::OpenChannelInProgress);
2031+
{
2032+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
2033+
if is_rgb_send_locked(&lock) {
2034+
return Err(APIError::OpenChannelInProgress);
2035+
}
2036+
if lock.is_some() {
2037+
*lock = None;
2038+
}
20022039
}
20032040

20042041
let rgb_media_dir = unlocked_state.rgb_get_media_dir();
@@ -2998,8 +3035,14 @@ pub(crate) async fn open_channel(
29983035
let guard = state.check_unlocked().await?;
29993036
let unlocked_state = guard.as_ref().unwrap();
30003037

3001-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
3002-
return Err(APIError::OpenChannelInProgress);
3038+
{
3039+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
3040+
if is_rgb_send_locked(&lock) {
3041+
return Err(APIError::OpenChannelInProgress);
3042+
}
3043+
if lock.is_some() {
3044+
*lock = None;
3045+
}
30033046
}
30043047

30053048
let temporary_channel_id = if let Some(tmp_chan_id_str) = payload.temporary_channel_id {
@@ -3183,7 +3226,7 @@ pub(crate) async fn open_channel(
31833226
None
31843227
};
31853228

3186-
*unlocked_state.rgb_send_lock.lock().unwrap() = true;
3229+
*unlocked_state.rgb_send_lock.lock().unwrap() = Some(std::time::Instant::now());
31873230
tracing::debug!("RGB send lock set to true");
31883231

31893232
let temporary_channel_id = unlocked_state
@@ -3199,7 +3242,7 @@ pub(crate) async fn open_channel(
31993242
payload.push_asset_amount,
32003243
)
32013244
.map_err(|e| {
3202-
*unlocked_state.rgb_send_lock.lock().unwrap() = false;
3245+
*unlocked_state.rgb_send_lock.lock().unwrap() = None;
32033246
tracing::debug!("RGB send lock set to false (open channel failure: {e:?})");
32043247
match e {
32053248
LDKAPIError::APIMisuseError { err }
@@ -3370,8 +3413,14 @@ pub(crate) async fn rgb_invoice(
33703413
let guard = state.check_unlocked().await?;
33713414
let unlocked_state = guard.as_ref().unwrap();
33723415

3373-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
3374-
return Err(APIError::OpenChannelInProgress);
3416+
{
3417+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
3418+
if is_rgb_send_locked(&lock) {
3419+
return Err(APIError::OpenChannelInProgress);
3420+
}
3421+
if lock.is_some() {
3422+
*lock = None;
3423+
}
33753424
}
33763425

33773426
let assignment = payload.assignment.unwrap_or(Assignment::Any).into();
@@ -3683,8 +3732,14 @@ pub(crate) async fn send_rgb(
36833732
let guard = state.check_unlocked().await?;
36843733
let unlocked_state = guard.as_ref().unwrap();
36853734

3686-
if *unlocked_state.rgb_send_lock.lock().unwrap() {
3687-
return Err(APIError::OpenChannelInProgress);
3735+
{
3736+
let mut lock = unlocked_state.rgb_send_lock.lock().unwrap();
3737+
if is_rgb_send_locked(&lock) {
3738+
return Err(APIError::OpenChannelInProgress);
3739+
}
3740+
if lock.is_some() {
3741+
*lock = None;
3742+
}
36883743
}
36893744

36903745
let recipient_map: HashMap<String, Vec<RgbLibRecipient>> = payload

src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub(crate) struct UnlockedAppState {
108108
pub(crate) rgb_wallet_wrapper: Arc<RgbLibWalletWrapper>,
109109
pub(crate) router: Arc<Router>,
110110
pub(crate) output_sweeper: Arc<OutputSweeper>,
111-
pub(crate) rgb_send_lock: Arc<Mutex<bool>>,
111+
pub(crate) rgb_send_lock: Arc<Mutex<Option<std::time::Instant>>>,
112112
pub(crate) channel_ids_map: Arc<Mutex<ChannelIdsMap>>,
113113
pub(crate) proxy_endpoint: String,
114114
}

0 commit comments

Comments
 (0)