Skip to content

Commit 8d5f58d

Browse files
committed
Implement CLN ExternalNode using clightningrpc
Co-authored-by: AI (Claude Code) Fix CLN test deadlock by moving synchronous RPC to blocking threads The clightningrpc crate performs synchronous I/O over a Unix socket. With tokio tests configured as worker_threads=1, calling these blocking RPC methods directly on the async runtime blocks the only worker thread, preventing LDK's background tasks (event processing, peer message handling) from making progress. This creates a deadlock: CLN waits for LDK's protocol response while LDK can't process because the thread is blocked waiting for CLN's RPC reply. Fix by introducing a `rpc()` helper that runs each synchronous CLN call inside `tokio::task::spawn_blocking`, ensuring the tokio worker thread remains free for LDK's async tasks. Generated with the assistance of AI (Claude Code).
1 parent 4cce03e commit 8d5f58d

2 files changed

Lines changed: 321 additions & 2 deletions

File tree

tests/common/cln.rs

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use std::str::FromStr;
9+
use std::time::Duration;
10+
11+
use async_trait::async_trait;
12+
use clightningrpc::lightningrpc::LightningRPC;
13+
use clightningrpc::lightningrpc::PayOptions;
14+
use ldk_node::bitcoin::secp256k1::PublicKey;
15+
use ldk_node::lightning::ln::msgs::SocketAddress;
16+
use serde_json::json;
17+
18+
use super::external_node::{ExternalChannel, ExternalNode, TestFailure};
19+
20+
pub(crate) struct TestClnNode {
21+
socket_path: String,
22+
}
23+
24+
impl TestClnNode {
25+
pub(crate) fn new(socket_path: &str) -> Self {
26+
Self { socket_path: socket_path.to_string() }
27+
}
28+
29+
pub(crate) fn from_env() -> Self {
30+
let sock =
31+
std::env::var("CLN_SOCKET_PATH").unwrap_or_else(|_| "/tmp/lightning-rpc".to_string());
32+
Self::new(&sock)
33+
}
34+
35+
/// Wait for CLN to sync to chain tip (blockheight > 0).
36+
pub(crate) async fn wait_for_sync(&self) {
37+
loop {
38+
let info = self.rpc(|c| c.getinfo()).await.unwrap();
39+
if info.blockheight > 0 {
40+
break;
41+
}
42+
tokio::time::sleep(Duration::from_millis(250)).await;
43+
}
44+
}
45+
46+
/// Run a synchronous CLN RPC call on a dedicated blocking thread.
47+
///
48+
/// The `clightningrpc` crate performs synchronous I/O over a Unix socket.
49+
/// Running these calls directly on the tokio runtime would block the worker
50+
/// thread, preventing LDK's background tasks from making progress and
51+
/// causing deadlocks (especially with `worker_threads = 1`).
52+
async fn rpc<F, T>(&self, f: F) -> T
53+
where
54+
F: FnOnce(&LightningRPC) -> T + Send + 'static,
55+
T: Send + 'static,
56+
{
57+
let path = self.socket_path.clone();
58+
tokio::task::spawn_blocking(move || {
59+
let client = LightningRPC::new(&path);
60+
f(&client)
61+
})
62+
.await
63+
.expect("CLN RPC task panicked")
64+
}
65+
66+
fn make_error(&self, detail: String) -> TestFailure {
67+
TestFailure::ExternalNodeError { node: "CLN".to_string(), detail }
68+
}
69+
70+
/// Repeatedly call `splice_update` until `commitments_secured` is true.
71+
/// Returns the final PSBT. Gives up after 10 attempts.
72+
async fn splice_update_loop(
73+
&self, channel_id: &str, mut psbt: String,
74+
) -> Result<String, TestFailure> {
75+
const MAX_ATTEMPTS: u32 = 10;
76+
for _ in 0..MAX_ATTEMPTS {
77+
let ch_id = channel_id.to_string();
78+
let psbt_arg = psbt.clone();
79+
let update_result: serde_json::Value = self
80+
.rpc(move |c| {
81+
c.call("splice_update", &json!({"channel_id": ch_id, "psbt": psbt_arg}))
82+
})
83+
.await
84+
.map_err(|e| self.make_error(format!("splice_update: {}", e)))?;
85+
psbt = update_result["psbt"]
86+
.as_str()
87+
.ok_or_else(|| self.make_error("splice_update did not return psbt".to_string()))?
88+
.to_string();
89+
if update_result["commitments_secured"].as_bool() == Some(true) {
90+
return Ok(psbt);
91+
}
92+
}
93+
Err(self.make_error(format!(
94+
"splice_update did not reach commitments_secured after {} attempts",
95+
MAX_ATTEMPTS
96+
)))
97+
}
98+
}
99+
100+
#[async_trait]
101+
impl ExternalNode for TestClnNode {
102+
fn name(&self) -> &str {
103+
"CLN"
104+
}
105+
106+
async fn get_node_id(&self) -> Result<PublicKey, TestFailure> {
107+
let info = self
108+
.rpc(|c| c.getinfo())
109+
.await
110+
.map_err(|e| self.make_error(format!("getinfo: {}", e)))?;
111+
PublicKey::from_str(&info.id).map_err(|e| self.make_error(format!("parse node id: {}", e)))
112+
}
113+
114+
async fn get_listening_address(&self) -> Result<SocketAddress, TestFailure> {
115+
// CLN binds to 0.0.0.0 inside Docker, but we reach it via the host's
116+
// port-mapped address. Hard-code the host-side address just like LND.
117+
Ok("127.0.0.1:19846".parse().unwrap())
118+
}
119+
120+
async fn get_block_height(&self) -> Result<u64, TestFailure> {
121+
let info = self
122+
.rpc(|c| c.getinfo())
123+
.await
124+
.map_err(|e| self.make_error(format!("getinfo: {}", e)))?;
125+
Ok(info.blockheight as u64)
126+
}
127+
128+
async fn connect_peer(
129+
&self, peer_id: PublicKey, addr: SocketAddress,
130+
) -> Result<(), TestFailure> {
131+
let uri = format!("{}@{}", peer_id, addr);
132+
let _: serde_json::Value = self
133+
.rpc(move |c| c.call("connect", &json!({"id": uri})))
134+
.await
135+
.map_err(|e| self.make_error(format!("connect: {}", e)))?;
136+
Ok(())
137+
}
138+
139+
async fn disconnect_peer(&self, peer_id: PublicKey) -> Result<(), TestFailure> {
140+
let id = peer_id.to_string();
141+
let _: serde_json::Value = self
142+
.rpc(move |c| c.call("disconnect", &json!({"id": id})))
143+
.await
144+
.map_err(|e| self.make_error(format!("disconnect: {}", e)))?;
145+
Ok(())
146+
}
147+
148+
async fn open_channel(
149+
&self, peer_id: PublicKey, _addr: SocketAddress, capacity_sat: u64, push_msat: Option<u64>,
150+
) -> Result<String, TestFailure> {
151+
// Use the generic `call` method to include `push_msat`, which the
152+
// typed `fundchannel` method does not support.
153+
let mut params = json!({
154+
"id": peer_id.to_string(),
155+
"amount": capacity_sat,
156+
});
157+
if let Some(push) = push_msat {
158+
params["push_msat"] = json!(push);
159+
}
160+
161+
let result: serde_json::Value = self
162+
.rpc(move |c| c.call("fundchannel", &params))
163+
.await
164+
.map_err(|e| self.make_error(format!("fundchannel: {}", e)))?;
165+
166+
let channel_id = result["channel_id"]
167+
.as_str()
168+
.ok_or_else(|| self.make_error("fundchannel did not return channel_id".to_string()))?;
169+
Ok(channel_id.to_string())
170+
}
171+
172+
async fn close_channel(&self, channel_id: &str) -> Result<(), TestFailure> {
173+
let ch_id = channel_id.to_string();
174+
self.rpc(move |c| c.close(&ch_id, None, None))
175+
.await
176+
.map_err(|e| self.make_error(format!("close: {}", e)))?;
177+
Ok(())
178+
}
179+
180+
async fn force_close_channel(&self, channel_id: &str) -> Result<(), TestFailure> {
181+
// CLN v23.08 removed the `force` parameter; use `unilateraltimeout: 1`
182+
// to trigger an immediate unilateral close.
183+
let ch_id = channel_id.to_string();
184+
let _: serde_json::Value = self
185+
.rpc(move |c| c.call("close", &json!({"id": ch_id, "unilateraltimeout": 1})))
186+
.await
187+
.map_err(|e| self.make_error(format!("force close: {}", e)))?;
188+
Ok(())
189+
}
190+
191+
async fn create_invoice(
192+
&self, amount_msat: u64, description: &str,
193+
) -> Result<String, TestFailure> {
194+
let desc = description.to_string();
195+
let invoice = self
196+
.rpc(move |c| c.invoice(Some(amount_msat), &desc, &desc, None, None, None))
197+
.await
198+
.map_err(|e| self.make_error(format!("invoice: {}", e)))?;
199+
Ok(invoice.bolt11)
200+
}
201+
202+
async fn pay_invoice(&self, invoice: &str) -> Result<String, TestFailure> {
203+
let inv = invoice.to_string();
204+
let result = self
205+
.rpc(move |c| c.pay(&inv, PayOptions::default()))
206+
.await
207+
.map_err(|e| self.make_error(format!("pay: {}", e)))?;
208+
Ok(result.payment_preimage)
209+
}
210+
211+
async fn send_keysend(
212+
&self, peer_id: PublicKey, amount_msat: u64,
213+
) -> Result<String, TestFailure> {
214+
let dest = peer_id.to_string();
215+
let result: serde_json::Value = self
216+
.rpc(move |c| {
217+
c.call("keysend", &json!({"destination": dest, "amount_msat": amount_msat}))
218+
})
219+
.await
220+
.map_err(|e| self.make_error(format!("keysend: {}", e)))?;
221+
let preimage =
222+
result["payment_preimage"].as_str().filter(|s| !s.is_empty()).ok_or_else(|| {
223+
self.make_error("keysend did not return payment_preimage".to_string())
224+
})?;
225+
Ok(preimage.to_string())
226+
}
227+
228+
async fn get_funding_address(&self) -> Result<String, TestFailure> {
229+
let addr = self
230+
.rpc(|c| c.newaddr(None))
231+
.await
232+
.map_err(|e| self.make_error(format!("newaddr: {}", e)))?;
233+
addr.bech32.ok_or_else(|| self.make_error("no bech32 address returned".to_string()))
234+
}
235+
236+
async fn splice_in(&self, channel_id: &str, amount_sat: u64) -> Result<(), TestFailure> {
237+
// Step 1: splice_init with positive relative_amount
238+
let ch_id = channel_id.to_string();
239+
let amount = amount_sat as i64;
240+
let init_result: serde_json::Value = self
241+
.rpc(move |c| {
242+
c.call("splice_init", &json!({"channel_id": ch_id, "relative_amount": amount}))
243+
})
244+
.await
245+
.map_err(|e| self.make_error(format!("splice_init: {}", e)))?;
246+
let mut psbt = init_result["psbt"]
247+
.as_str()
248+
.ok_or_else(|| self.make_error("splice_init did not return psbt".to_string()))?
249+
.to_string();
250+
251+
// Step 2: splice_update until commitments_secured
252+
psbt = self.splice_update_loop(channel_id, psbt).await?;
253+
254+
// Step 3: splice_signed
255+
let ch_id = channel_id.to_string();
256+
let _: serde_json::Value = self
257+
.rpc(move |c| c.call("splice_signed", &json!({"channel_id": ch_id, "psbt": psbt})))
258+
.await
259+
.map_err(|e| self.make_error(format!("splice_signed: {}", e)))?;
260+
Ok(())
261+
}
262+
263+
async fn splice_out(
264+
&self, channel_id: &str, amount_sat: u64, _address: Option<&str>,
265+
) -> Result<(), TestFailure> {
266+
// CLN splice-out uses negative relative_amount.
267+
// Funds go to CLN's own wallet (address parameter is not used).
268+
let ch_id = channel_id.to_string();
269+
let amount = -(amount_sat as i64);
270+
let init_result: serde_json::Value = self
271+
.rpc(move |c| {
272+
c.call("splice_init", &json!({"channel_id": ch_id, "relative_amount": amount}))
273+
})
274+
.await
275+
.map_err(|e| self.make_error(format!("splice_init: {}", e)))?;
276+
let mut psbt = init_result["psbt"]
277+
.as_str()
278+
.ok_or_else(|| self.make_error("splice_init did not return psbt".to_string()))?
279+
.to_string();
280+
281+
psbt = self.splice_update_loop(channel_id, psbt).await?;
282+
283+
let ch_id = channel_id.to_string();
284+
let _: serde_json::Value = self
285+
.rpc(move |c| c.call("splice_signed", &json!({"channel_id": ch_id, "psbt": psbt})))
286+
.await
287+
.map_err(|e| self.make_error(format!("splice_signed: {}", e)))?;
288+
Ok(())
289+
}
290+
291+
async fn list_channels(&self) -> Result<Vec<ExternalChannel>, TestFailure> {
292+
let response = self
293+
.rpc(|c| c.listpeers(None, None))
294+
.await
295+
.map_err(|e| self.make_error(format!("listpeers: {}", e)))?;
296+
let mut channels = Vec::new();
297+
298+
for peer in response.peers {
299+
let peer_id = match PublicKey::from_str(&peer.id) {
300+
Ok(pk) => pk,
301+
Err(_) => continue,
302+
};
303+
for ch in peer.channels {
304+
channels.push(ExternalChannel {
305+
channel_id: ch.channel_id.clone(),
306+
peer_id,
307+
capacity_sat: ch.total_msat.0 / 1000,
308+
local_balance_msat: ch.to_us_msat.0,
309+
remote_balance_msat: ch.total_msat.0.saturating_sub(ch.to_us_msat.0),
310+
funding_txid: Some(ch.funding_txid),
311+
is_active: ch.state == "CHANNELD_NORMAL",
312+
});
313+
}
314+
}
315+
Ok(channels)
316+
}
317+
}

tests/common/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
pub(crate) mod external_node;
1212
pub(crate) mod logging;
1313

14-
#[cfg(lnd_test)]
15-
pub(crate) mod lnd;
14+
#[cfg(cln_test)]
15+
pub(crate) mod cln;
1616
#[cfg(eclair_test)]
1717
pub(crate) mod eclair;
18+
#[cfg(lnd_test)]
19+
pub(crate) mod lnd;
1820

1921
use std::collections::{HashMap, HashSet};
2022
use std::env;

0 commit comments

Comments
 (0)