Skip to content

Commit a642144

Browse files
benthecarmanclaude
andcommitted
Add SubscribeEvents server-streaming RPC
Implement the SubscribeEvents RPC end-to-end: a Stream variant in GrpcBody that delivers multiple gRPC-framed messages from an mpsc channel, a handler in service.rs that bridges the broadcast channel to the streaming response, a client EventStream type that reads gRPC frames incrementally from the HTTP/2 body, and e2e test assertions that verify payment events arrive via the stream. Also regenerates proto code to include SubscribeEventsRequest. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fafd321 commit a642144

10 files changed

Lines changed: 1188 additions & 1776 deletions

File tree

Cargo.lock

Lines changed: 709 additions & 345 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

e2e-tests/Cargo.lock

Lines changed: 38 additions & 1145 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

e2e-tests/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,4 @@ ldk-server-client = { path = "../ldk-server-client" }
1111
ldk-server-grpc = { path = "../ldk-server-grpc", features = ["serde"] }
1212
serde_json = "1.0"
1313
hex-conservative = { version = "0.2", features = ["std"] }
14-
lapin = { version = "2.4.0", features = ["rustls"], default-features = false }
15-
prost = { version = "0.11.6", default-features = false, features = ["std"] }
16-
futures-util = "0.3"
1714
ldk-node = { git = "https://github.com/lightningdevkit/ldk-node", rev = "3aef2b39265ae60b29f4d60de8291895f12eb880" }

e2e-tests/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ fn main() {
2121
"-p",
2222
"ldk-server",
2323
"--features",
24-
"events-rabbitmq,experimental-lsps2-support",
24+
"experimental-lsps2-support",
2525
"-p",
2626
"ldk-server-cli",
2727
])

e2e-tests/src/lib.rs

Lines changed: 7 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -87,26 +87,20 @@ impl TestBitcoind {
8787
/// Handle to a running ldk-server child process.
8888
pub struct LdkServerHandle {
8989
child: Option<Child>,
90-
pub rest_port: u16,
90+
pub grpc_port: u16,
9191
pub p2p_port: u16,
9292
pub storage_dir: PathBuf,
9393
pub api_key: String,
9494
pub tls_cert_path: PathBuf,
9595
pub node_id: String,
96-
pub exchange_name: String,
9796
client: LdkServerClient,
9897
}
9998

99+
#[derive(Default)]
100100
pub struct LdkServerConfig {
101101
pub metrics_auth: Option<(String, String)>,
102102
}
103103

104-
impl Default for LdkServerConfig {
105-
fn default() -> Self {
106-
Self { metrics_auth: None }
107-
}
108-
}
109-
110104
impl LdkServerHandle {
111105
/// Starts a new ldk-server instance against the given bitcoind.
112106
/// Waits until the server is ready to accept requests.
@@ -117,14 +111,12 @@ impl LdkServerHandle {
117111
pub async fn start_with_config(bitcoind: &TestBitcoind, config: LdkServerConfig) -> Self {
118112
#[allow(deprecated)]
119113
let storage_dir = tempfile::tempdir().unwrap().into_path();
120-
let rest_port = find_available_port();
114+
let grpc_port = find_available_port();
121115
let p2p_port = find_available_port();
122116

123117
let (rpc_host, rpc_port_num, rpc_user, rpc_password) = bitcoind.rpc_details();
124118
let rpc_address = format!("{rpc_host}:{rpc_port_num}");
125119

126-
let exchange_name = format!("e2e_test_exchange_{rest_port}");
127-
128120
let metrics_auth_config = if let Some((user, pass)) = config.metrics_auth {
129121
format!("username = \"{}\"\npassword = \"{}\"", user, pass)
130122
} else {
@@ -135,7 +127,7 @@ impl LdkServerHandle {
135127
r#"[node]
136128
network = "regtest"
137129
listening_addresses = ["127.0.0.1:{p2p_port}"]
138-
rest_service_address = "127.0.0.1:{rest_port}"
130+
grpc_service_address = "127.0.0.1:{grpc_port}"
139131
alias = "e2e-test-node"
140132
141133
[storage.disk]
@@ -146,10 +138,6 @@ rpc_address = "{rpc_address}"
146138
rpc_user = "{rpc_user}"
147139
rpc_password = "{rpc_password}"
148140
149-
[rabbitmq]
150-
connection_string = "amqp://guest:guest@localhost:5672/%2f"
151-
exchange_name = "{exchange_name}"
152-
153141
[liquidity.lsps2_service]
154142
advertise_service = false
155143
channel_opening_fee_ppm = 10000
@@ -216,18 +204,17 @@ poll_metrics_interval = 1
216204
// Read TLS cert
217205
let tls_cert_pem = std::fs::read(&tls_cert_path).unwrap();
218206

219-
let base_url = format!("127.0.0.1:{rest_port}");
207+
let base_url = format!("127.0.0.1:{grpc_port}");
220208
let client = LdkServerClient::new(base_url, api_key.clone(), &tls_cert_pem).unwrap();
221209

222210
let mut handle = Self {
223211
child: Some(child),
224-
rest_port,
212+
grpc_port,
225213
p2p_port,
226214
storage_dir,
227215
api_key,
228216
tls_cert_path,
229217
node_id: String::new(),
230-
exchange_name,
231218
client,
232219
};
233220

@@ -247,7 +234,7 @@ poll_metrics_interval = 1
247234
}
248235

249236
pub fn base_url(&self) -> String {
250-
format!("127.0.0.1:{}", self.rest_port)
237+
format!("127.0.0.1:{}", self.grpc_port)
251238
}
252239
}
253240

@@ -452,101 +439,3 @@ pub async fn setup_funded_channel(
452439

453440
open_resp.user_channel_id
454441
}
455-
456-
/// RabbitMQ event consumer for verifying events published by ldk-server.
457-
pub struct RabbitMqEventConsumer {
458-
_connection: lapin::Connection,
459-
channel: lapin::Channel,
460-
consumer: lapin::Consumer,
461-
}
462-
463-
impl RabbitMqEventConsumer {
464-
/// Connect to RabbitMQ and create an exclusive queue bound to the given exchange.
465-
pub async fn new(exchange_name: &str) -> Self {
466-
use lapin::options::{
467-
BasicConsumeOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
468-
};
469-
use lapin::types::FieldTable;
470-
use lapin::{ConnectionProperties, ExchangeKind};
471-
472-
let connection = lapin::Connection::connect(
473-
"amqp://guest:guest@localhost:5672/%2f",
474-
ConnectionProperties::default(),
475-
)
476-
.await
477-
.expect("Failed to connect to RabbitMQ");
478-
479-
let channel = connection.create_channel().await.expect("Failed to create channel");
480-
481-
// Declare exchange (idempotent — may already exist from the server)
482-
channel
483-
.exchange_declare(
484-
exchange_name,
485-
ExchangeKind::Fanout,
486-
ExchangeDeclareOptions { durable: true, ..Default::default() },
487-
FieldTable::default(),
488-
)
489-
.await
490-
.expect("Failed to declare exchange");
491-
492-
// Create exclusive auto-delete queue with server-generated name
493-
let queue = channel
494-
.queue_declare(
495-
"",
496-
QueueDeclareOptions { exclusive: true, auto_delete: true, ..Default::default() },
497-
FieldTable::default(),
498-
)
499-
.await
500-
.expect("Failed to declare queue");
501-
let queue_name = queue.name().to_string();
502-
503-
channel
504-
.queue_bind(
505-
&queue_name,
506-
exchange_name,
507-
"",
508-
QueueBindOptions::default(),
509-
FieldTable::default(),
510-
)
511-
.await
512-
.expect("Failed to bind queue");
513-
514-
let consumer = channel
515-
.basic_consume(
516-
&queue_name,
517-
&format!("consumer_{}", queue_name),
518-
BasicConsumeOptions::default(),
519-
FieldTable::default(),
520-
)
521-
.await
522-
.expect("Failed to start consumer");
523-
524-
Self { _connection: connection, channel, consumer }
525-
}
526-
527-
/// Consume up to `count` events, waiting up to `timeout` for each.
528-
pub async fn consume_events(
529-
&mut self, count: usize, timeout: Duration,
530-
) -> Vec<ldk_server_protos::events::EventEnvelope> {
531-
use futures_util::StreamExt;
532-
use lapin::options::BasicAckOptions;
533-
use prost::Message;
534-
535-
let mut events = Vec::new();
536-
for _ in 0..count {
537-
match tokio::time::timeout(timeout, self.consumer.next()).await {
538-
Ok(Some(Ok(delivery))) => {
539-
let event = ldk_server_protos::events::EventEnvelope::decode(&*delivery.data)
540-
.expect("Failed to decode event");
541-
self.channel
542-
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
543-
.await
544-
.expect("Failed to ack");
545-
events.push(event);
546-
},
547-
_ => break,
548-
}
549-
}
550-
events
551-
}
552-
}

0 commit comments

Comments
 (0)