Skip to content

Commit a82dcc5

Browse files
benthecarmanclaude
andcommitted
Add SubscribeEvents server-streaming RPC
Implement the SubscribeEvents RPC end-to-end: a Stream variant in GrpcBody that delivers multiple gRPC-framed messages from an mpsc channel, a handler in service.rs that bridges the broadcast channel to the streaming response, a client EventStream type that reads gRPC frames incrementally from the HTTP/2 body, and e2e test assertions that verify payment events arrive via the stream. Also regenerates proto code to include SubscribeEventsRequest. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 39852b9 commit a82dcc5

10 files changed

Lines changed: 331 additions & 1417 deletions

File tree

e2e-tests/Cargo.lock

Lines changed: 37 additions & 1140 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

e2e-tests/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,4 @@ ldk-server-client = { path = "../ldk-server-client" }
1111
ldk-server-protos = { path = "../ldk-server-protos", features = ["serde"] }
1212
serde_json = "1.0"
1313
hex-conservative = { version = "0.2", features = ["std"] }
14-
lapin = { version = "2.4.0", features = ["rustls"], default-features = false }
15-
prost = { version = "0.11.6", default-features = false, features = ["std"] }
16-
futures-util = "0.3"
1714
ldk-node = { git = "https://github.com/lightningdevkit/ldk-node", rev = "d1bbf978c8b7abe87ae2e40793556c1fe4e7ea49" }

e2e-tests/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ fn main() {
2121
"-p",
2222
"ldk-server",
2323
"--features",
24-
"events-rabbitmq,experimental-lsps2-support",
24+
"experimental-lsps2-support",
2525
"-p",
2626
"ldk-server-cli",
2727
])

e2e-tests/src/lib.rs

Lines changed: 6 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,12 @@ impl TestBitcoind {
8787
/// Handle to a running ldk-server child process.
8888
pub struct LdkServerHandle {
8989
child: Option<Child>,
90-
pub rest_port: u16,
90+
pub grpc_port: u16,
9191
pub p2p_port: u16,
9292
pub storage_dir: PathBuf,
9393
pub api_key: String,
9494
pub tls_cert_path: PathBuf,
9595
pub node_id: String,
96-
pub exchange_name: String,
9796
client: LdkServerClient,
9897
}
9998

@@ -103,19 +102,17 @@ impl LdkServerHandle {
103102
pub async fn start(bitcoind: &TestBitcoind) -> Self {
104103
#[allow(deprecated)]
105104
let storage_dir = tempfile::tempdir().unwrap().into_path();
106-
let rest_port = find_available_port();
105+
let grpc_port = find_available_port();
107106
let p2p_port = find_available_port();
108107

109108
let (rpc_host, rpc_port_num, rpc_user, rpc_password) = bitcoind.rpc_details();
110109
let rpc_address = format!("{rpc_host}:{rpc_port_num}");
111110

112-
let exchange_name = format!("e2e_test_exchange_{rest_port}");
113-
114111
let config_content = format!(
115112
r#"[node]
116113
network = "regtest"
117114
listening_addresses = ["127.0.0.1:{p2p_port}"]
118-
rest_service_address = "127.0.0.1:{rest_port}"
115+
grpc_service_address = "127.0.0.1:{grpc_port}"
119116
alias = "e2e-test-node"
120117
121118
[storage.disk]
@@ -126,10 +123,6 @@ rpc_address = "{rpc_address}"
126123
rpc_user = "{rpc_user}"
127124
rpc_password = "{rpc_password}"
128125
129-
[rabbitmq]
130-
connection_string = "amqp://guest:guest@localhost:5672/%2f"
131-
exchange_name = "{exchange_name}"
132-
133126
[liquidity.lsps2_service]
134127
advertise_service = false
135128
channel_opening_fee_ppm = 10000
@@ -191,18 +184,17 @@ client_trusts_lsp = true
191184
// Read TLS cert
192185
let tls_cert_pem = std::fs::read(&tls_cert_path).unwrap();
193186

194-
let base_url = format!("127.0.0.1:{rest_port}");
187+
let base_url = format!("127.0.0.1:{grpc_port}");
195188
let client = LdkServerClient::new(base_url, api_key.clone(), &tls_cert_pem).unwrap();
196189

197190
let mut handle = Self {
198191
child: Some(child),
199-
rest_port,
192+
grpc_port,
200193
p2p_port,
201194
storage_dir,
202195
api_key,
203196
tls_cert_path,
204197
node_id: String::new(),
205-
exchange_name,
206198
client,
207199
};
208200

@@ -222,7 +214,7 @@ client_trusts_lsp = true
222214
}
223215

224216
pub fn base_url(&self) -> String {
225-
format!("127.0.0.1:{}", self.rest_port)
217+
format!("127.0.0.1:{}", self.grpc_port)
226218
}
227219
}
228220

@@ -428,100 +420,3 @@ pub async fn setup_funded_channel(
428420
open_resp.user_channel_id
429421
}
430422

431-
/// RabbitMQ event consumer for verifying events published by ldk-server.
432-
pub struct RabbitMqEventConsumer {
433-
_connection: lapin::Connection,
434-
channel: lapin::Channel,
435-
consumer: lapin::Consumer,
436-
}
437-
438-
impl RabbitMqEventConsumer {
439-
/// Connect to RabbitMQ and create an exclusive queue bound to the given exchange.
440-
pub async fn new(exchange_name: &str) -> Self {
441-
use lapin::options::{
442-
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
443-
};
444-
use lapin::types::FieldTable;
445-
use lapin::{ConnectionProperties, ExchangeKind};
446-
447-
let connection = lapin::Connection::connect(
448-
"amqp://guest:guest@localhost:5672/%2f",
449-
ConnectionProperties::default(),
450-
)
451-
.await
452-
.expect("Failed to connect to RabbitMQ");
453-
454-
let channel = connection.create_channel().await.expect("Failed to create channel");
455-
456-
// Declare exchange (idempotent — may already exist from the server)
457-
channel
458-
.exchange_declare(
459-
exchange_name,
460-
ExchangeKind::Fanout,
461-
ExchangeDeclareOptions { durable: true, ..Default::default() },
462-
FieldTable::default(),
463-
)
464-
.await
465-
.expect("Failed to declare exchange");
466-
467-
// Create exclusive auto-delete queue with server-generated name
468-
let queue = channel
469-
.queue_declare(
470-
"",
471-
QueueDeclareOptions { exclusive: true, auto_delete: true, ..Default::default() },
472-
FieldTable::default(),
473-
)
474-
.await
475-
.expect("Failed to declare queue");
476-
let queue_name = queue.name().to_string();
477-
478-
channel
479-
.queue_bind(
480-
&queue_name,
481-
exchange_name,
482-
"",
483-
QueueBindOptions::default(),
484-
FieldTable::default(),
485-
)
486-
.await
487-
.expect("Failed to bind queue");
488-
489-
let consumer = channel
490-
.basic_consume(
491-
&queue_name,
492-
&format!("consumer_{}", queue_name),
493-
BasicConsumeOptions::default(),
494-
FieldTable::default(),
495-
)
496-
.await
497-
.expect("Failed to start consumer");
498-
499-
Self { _connection: connection, channel, consumer }
500-
}
501-
502-
/// Consume up to `count` events, waiting up to `timeout` for each.
503-
pub async fn consume_events(
504-
&mut self, count: usize, timeout: Duration,
505-
) -> Vec<ldk_server_protos::events::EventEnvelope> {
506-
use futures_util::StreamExt;
507-
use lapin::options::BasicAckOptions;
508-
use prost::Message;
509-
510-
let mut events = Vec::new();
511-
for _ in 0..count {
512-
match tokio::time::timeout(timeout, self.consumer.next()).await {
513-
Ok(Some(Ok(delivery))) => {
514-
let event = ldk_server_protos::events::EventEnvelope::decode(&*delivery.data)
515-
.expect("Failed to decode event");
516-
self.channel
517-
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
518-
.await
519-
.expect("Failed to ack");
520-
events.push(event);
521-
},
522-
_ => break,
523-
}
524-
}
525-
events
526-
}
527-
}

0 commit comments

Comments
 (0)