Skip to content

Commit 584a4b0

Browse files
author
anon
committed
feat: add channel state events and e2e assertions
- add channel state events proto and grpc wiring - emit and handle events in server runtime - update docs/client guidance and extend e2e coverage
1 parent d1a0f9d commit 584a4b0

6 files changed

Lines changed: 1106 additions & 33 deletions

File tree

docs/api-guide.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ See [Pagination](#pagination) below for how to page through results.
188188

189189
| RPC | Description |
190190
|-------------------|-------------------------------------------------------------|
191-
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment events |
191+
| `SubscribeEvents` | **Server-streaming.** Subscribe to real-time payment and channel events |
192192

193193
`SubscribeEvents` returns a stream of `EventEnvelope` messages. Each envelope contains one of:
194194

@@ -199,6 +199,7 @@ See [Pagination](#pagination) below for how to page through results.
199199
| `PaymentFailed` | An outbound payment failed |
200200
| `PaymentClaimable` | A hodl invoice payment arrived and is waiting to be claimed or failed |
201201
| `PaymentForwarded` | A payment was routed through this node |
202+
| `ChannelStateChanged` | A channel changed state (pending, ready, open failed, closed) |
202203

203204
Events are broadcast to all connected subscribers. The server uses a bounded broadcast channel
204205
(capacity 1024). A slow subscriber that falls behind will miss events.

e2e-tests/tests/e2e.rs

Lines changed: 332 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ use std::time::Duration;
1212

1313
use e2e_tests::{
1414
find_available_port, mine_and_sync, run_cli, run_cli_raw, setup_funded_channel,
15-
wait_for_onchain_balance, LdkServerConfig, LdkServerHandle, TestBitcoind,
15+
wait_for_onchain_balance, wait_for_usable_channel, LdkServerConfig, LdkServerHandle,
16+
TestBitcoind,
1617
};
1718
use hex_conservative::{DisplayHex, FromHex};
1819
use ldk_node::bitcoin::hashes::{sha256, Hash};
@@ -21,10 +22,12 @@ use ldk_node::lightning::offers::offer::Offer;
2122
use ldk_node::lightning_invoice::Bolt11Invoice;
2223
use ldk_server_client::client::EventStream;
2324
use ldk_server_client::ldk_server_grpc::api::{
24-
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest,
25+
Bolt11ReceiveRequest, Bolt12ReceiveRequest, OnchainReceiveRequest, OpenChannelRequest,
2526
};
2627
use ldk_server_client::ldk_server_grpc::events::event_envelope::Event;
27-
use ldk_server_client::ldk_server_grpc::events::EventEnvelope;
28+
use ldk_server_client::ldk_server_grpc::events::{
29+
ChannelClosureInitiator, ChannelState, ChannelStateChangeReasonKind, EventEnvelope,
30+
};
2831
use ldk_server_client::ldk_server_grpc::types::{
2932
bolt11_invoice_description, Bolt11InvoiceDescription,
3033
};
@@ -410,6 +413,332 @@ async fn test_cli_open_channel() {
410413
assert!(!output["user_channel_id"].as_str().unwrap().is_empty());
411414
}
412415

416+
#[tokio::test]
417+
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_closed() {
418+
let bitcoind = TestBitcoind::new();
419+
let server_a = LdkServerHandle::start(&bitcoind).await;
420+
let server_b = LdkServerHandle::start(&bitcoind).await;
421+
422+
let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
423+
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
424+
bitcoind.fund_address(&addr_a, 1.0);
425+
bitcoind.fund_address(&addr_b, 0.1);
426+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
427+
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
428+
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;
429+
430+
let mut events_a = server_a.client().subscribe_events().await.unwrap();
431+
let mut events_b = server_b.client().subscribe_events().await.unwrap();
432+
433+
let open_resp = server_a
434+
.client()
435+
.open_channel(OpenChannelRequest {
436+
node_pubkey: server_b.node_id().to_string(),
437+
address: format!("127.0.0.1:{}", server_b.p2p_port),
438+
channel_amount_sats: 100_000,
439+
push_to_counterparty_msat: None,
440+
channel_config: None,
441+
announce_channel: true,
442+
disable_counterparty_reserve: false,
443+
})
444+
.await
445+
.unwrap();
446+
447+
let pending_a = wait_for_event(&mut events_a, |e| {
448+
matches!(
449+
e,
450+
Event::ChannelStateChanged(channel_event)
451+
if channel_event.user_channel_id == open_resp.user_channel_id
452+
&& channel_event.state == ChannelState::Pending as i32
453+
)
454+
})
455+
.await;
456+
let pending_a = match pending_a.event {
457+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
458+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
459+
};
460+
assert_eq!(pending_a.user_channel_id, open_resp.user_channel_id);
461+
assert_eq!(pending_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
462+
assert!(pending_a.funding_txo.is_some());
463+
assert!(pending_a.reason.is_none());
464+
assert_eq!(pending_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
465+
466+
let pending_b = wait_for_event(&mut events_b, |e| {
467+
matches!(
468+
e,
469+
Event::ChannelStateChanged(channel_event)
470+
if channel_event.channel_id == pending_a.channel_id
471+
&& channel_event.state == ChannelState::Pending as i32
472+
)
473+
})
474+
.await;
475+
let pending_b = match pending_b.event {
476+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
477+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
478+
};
479+
assert_eq!(pending_b.channel_id, pending_a.channel_id);
480+
assert_eq!(pending_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
481+
assert!(pending_b.funding_txo.is_some());
482+
assert!(pending_b.reason.is_none());
483+
assert_eq!(pending_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
484+
485+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
486+
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;
487+
488+
let ready_a = wait_for_event(&mut events_a, |e| {
489+
matches!(
490+
e,
491+
Event::ChannelStateChanged(channel_event)
492+
if channel_event.channel_id == pending_a.channel_id
493+
&& channel_event.state == ChannelState::Ready as i32
494+
)
495+
})
496+
.await;
497+
let ready_a = match ready_a.event {
498+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
499+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
500+
};
501+
assert_eq!(ready_a.channel_id, pending_a.channel_id);
502+
assert_eq!(ready_a.user_channel_id, open_resp.user_channel_id);
503+
assert_eq!(ready_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
504+
assert!(ready_a.funding_txo.is_some());
505+
assert!(ready_a.reason.is_none());
506+
assert_eq!(ready_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
507+
508+
let ready_b = wait_for_event(&mut events_b, |e| {
509+
matches!(
510+
e,
511+
Event::ChannelStateChanged(channel_event)
512+
if channel_event.channel_id == pending_a.channel_id
513+
&& channel_event.state == ChannelState::Ready as i32
514+
)
515+
})
516+
.await;
517+
let ready_b = match ready_b.event {
518+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
519+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
520+
};
521+
assert_eq!(ready_b.channel_id, pending_a.channel_id);
522+
assert_eq!(ready_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
523+
assert!(ready_b.funding_txo.is_some());
524+
assert!(ready_b.reason.is_none());
525+
assert_eq!(ready_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
526+
527+
run_cli(&server_a, &["close-channel", &open_resp.user_channel_id, server_b.node_id()]);
528+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
529+
530+
let closed_a = wait_for_event(&mut events_a, |e| {
531+
matches!(
532+
e,
533+
Event::ChannelStateChanged(channel_event)
534+
if channel_event.channel_id == pending_a.channel_id
535+
&& channel_event.state == ChannelState::Closed as i32
536+
)
537+
})
538+
.await;
539+
let closed_a = match closed_a.event {
540+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
541+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
542+
};
543+
assert_eq!(closed_a.user_channel_id, open_resp.user_channel_id);
544+
assert_eq!(closed_a.state, ChannelState::Closed as i32);
545+
assert_eq!(closed_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
546+
assert!(closed_a.funding_txo.is_none());
547+
let reason_a = closed_a.reason.expect("closed event must include closure reason");
548+
assert!(matches!(
549+
ChannelStateChangeReasonKind::from_i32(reason_a.kind),
550+
Some(ChannelStateChangeReasonKind::LocallyInitiatedCooperativeClosure)
551+
| Some(ChannelStateChangeReasonKind::LegacyCooperativeClosure)
552+
));
553+
assert_eq!(closed_a.closure_initiator, ChannelClosureInitiator::Local as i32);
554+
555+
let closed_b = wait_for_event(&mut events_b, |e| {
556+
matches!(
557+
e,
558+
Event::ChannelStateChanged(channel_event)
559+
if channel_event.channel_id == pending_a.channel_id
560+
&& channel_event.state == ChannelState::Closed as i32
561+
)
562+
})
563+
.await;
564+
let closed_b = match closed_b.event {
565+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
566+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
567+
};
568+
assert_eq!(closed_b.channel_id, pending_a.channel_id);
569+
assert_eq!(closed_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
570+
assert!(closed_b.funding_txo.is_none());
571+
let reason_b = closed_b.reason.expect("closed event must include closure reason");
572+
assert!(matches!(
573+
ChannelStateChangeReasonKind::from_i32(reason_b.kind),
574+
Some(ChannelStateChangeReasonKind::CounterpartyInitiatedCooperativeClosure)
575+
| Some(ChannelStateChangeReasonKind::LegacyCooperativeClosure)
576+
));
577+
assert_eq!(closed_b.closure_initiator, ChannelClosureInitiator::Remote as i32);
578+
}
579+
580+
#[tokio::test]
581+
async fn test_subscribe_events_channel_state_lifecycle_pending_ready_force_closed() {
582+
let bitcoind = TestBitcoind::new();
583+
let server_a = LdkServerHandle::start(&bitcoind).await;
584+
let server_b = LdkServerHandle::start(&bitcoind).await;
585+
586+
let addr_a = server_a.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
587+
let addr_b = server_b.client().onchain_receive(OnchainReceiveRequest {}).await.unwrap().address;
588+
bitcoind.fund_address(&addr_a, 1.0);
589+
bitcoind.fund_address(&addr_b, 0.1);
590+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
591+
wait_for_onchain_balance(server_a.client(), Duration::from_secs(30)).await;
592+
wait_for_onchain_balance(server_b.client(), Duration::from_secs(30)).await;
593+
594+
let mut events_a = server_a.client().subscribe_events().await.unwrap();
595+
let mut events_b = server_b.client().subscribe_events().await.unwrap();
596+
597+
let open_resp = server_a
598+
.client()
599+
.open_channel(OpenChannelRequest {
600+
node_pubkey: server_b.node_id().to_string(),
601+
address: format!("127.0.0.1:{}", server_b.p2p_port),
602+
channel_amount_sats: 100_000,
603+
push_to_counterparty_msat: None,
604+
channel_config: None,
605+
announce_channel: true,
606+
disable_counterparty_reserve: false,
607+
})
608+
.await
609+
.unwrap();
610+
611+
let pending_a = wait_for_event(&mut events_a, |e| {
612+
matches!(
613+
e,
614+
Event::ChannelStateChanged(channel_event)
615+
if channel_event.user_channel_id == open_resp.user_channel_id
616+
&& channel_event.state == ChannelState::Pending as i32
617+
)
618+
})
619+
.await;
620+
let pending_a = match pending_a.event {
621+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
622+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
623+
};
624+
assert_eq!(pending_a.user_channel_id, open_resp.user_channel_id);
625+
assert_eq!(pending_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
626+
assert!(pending_a.funding_txo.is_some());
627+
assert!(pending_a.reason.is_none());
628+
assert_eq!(pending_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
629+
630+
let pending_b = wait_for_event(&mut events_b, |e| {
631+
matches!(
632+
e,
633+
Event::ChannelStateChanged(channel_event)
634+
if channel_event.channel_id == pending_a.channel_id
635+
&& channel_event.state == ChannelState::Pending as i32
636+
)
637+
})
638+
.await;
639+
let pending_b = match pending_b.event {
640+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
641+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
642+
};
643+
assert_eq!(pending_b.channel_id, pending_a.channel_id);
644+
assert_eq!(pending_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
645+
assert!(pending_b.funding_txo.is_some());
646+
assert!(pending_b.reason.is_none());
647+
assert_eq!(pending_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
648+
649+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
650+
wait_for_usable_channel(server_a.client(), &bitcoind, Duration::from_secs(60)).await;
651+
652+
let ready_a = wait_for_event(&mut events_a, |e| {
653+
matches!(
654+
e,
655+
Event::ChannelStateChanged(channel_event)
656+
if channel_event.channel_id == pending_a.channel_id
657+
&& channel_event.state == ChannelState::Ready as i32
658+
)
659+
})
660+
.await;
661+
let ready_a = match ready_a.event {
662+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
663+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
664+
};
665+
assert_eq!(ready_a.channel_id, pending_a.channel_id);
666+
assert_eq!(ready_a.user_channel_id, open_resp.user_channel_id);
667+
assert_eq!(ready_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
668+
assert!(ready_a.funding_txo.is_some());
669+
assert!(ready_a.reason.is_none());
670+
assert_eq!(ready_a.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
671+
672+
let ready_b = wait_for_event(&mut events_b, |e| {
673+
matches!(
674+
e,
675+
Event::ChannelStateChanged(channel_event)
676+
if channel_event.channel_id == pending_a.channel_id
677+
&& channel_event.state == ChannelState::Ready as i32
678+
)
679+
})
680+
.await;
681+
let ready_b = match ready_b.event {
682+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
683+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
684+
};
685+
assert_eq!(ready_b.channel_id, pending_a.channel_id);
686+
assert_eq!(ready_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
687+
assert!(ready_b.funding_txo.is_some());
688+
assert!(ready_b.reason.is_none());
689+
assert_eq!(ready_b.closure_initiator, ChannelClosureInitiator::Unspecified as i32);
690+
691+
run_cli(&server_a, &["force-close-channel", &open_resp.user_channel_id, server_b.node_id()]);
692+
mine_and_sync(&bitcoind, &[&server_a, &server_b], 6).await;
693+
694+
let closed_a = wait_for_event(&mut events_a, |e| {
695+
matches!(
696+
e,
697+
Event::ChannelStateChanged(channel_event)
698+
if channel_event.channel_id == pending_a.channel_id
699+
&& channel_event.state == ChannelState::Closed as i32
700+
)
701+
})
702+
.await;
703+
let closed_a = match closed_a.event {
704+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
705+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
706+
};
707+
assert_eq!(closed_a.user_channel_id, open_resp.user_channel_id);
708+
assert_eq!(closed_a.state, ChannelState::Closed as i32);
709+
assert_eq!(closed_a.counterparty_node_id.as_deref(), Some(server_b.node_id()));
710+
assert!(closed_a.funding_txo.is_none());
711+
let reason_a = closed_a.reason.expect("closed event must include closure reason");
712+
assert_eq!(
713+
ChannelStateChangeReasonKind::from_i32(reason_a.kind),
714+
Some(ChannelStateChangeReasonKind::HolderForceClosed)
715+
);
716+
assert_eq!(closed_a.closure_initiator, ChannelClosureInitiator::Local as i32);
717+
718+
let closed_b = wait_for_event(&mut events_b, |e| {
719+
matches!(
720+
e,
721+
Event::ChannelStateChanged(channel_event)
722+
if channel_event.channel_id == pending_a.channel_id
723+
&& channel_event.state == ChannelState::Closed as i32
724+
)
725+
})
726+
.await;
727+
let closed_b = match closed_b.event {
728+
Some(Event::ChannelStateChanged(channel_event)) => channel_event,
729+
other => panic!("expected ChannelStateChanged event, got {other:?}"),
730+
};
731+
assert_eq!(closed_b.channel_id, pending_a.channel_id);
732+
assert_eq!(closed_b.counterparty_node_id.as_deref(), Some(server_a.node_id()));
733+
assert!(closed_b.funding_txo.is_none());
734+
let reason_b = closed_b.reason.expect("closed event must include closure reason");
735+
assert_eq!(
736+
ChannelStateChangeReasonKind::from_i32(reason_b.kind),
737+
Some(ChannelStateChangeReasonKind::CounterpartyForceClosed)
738+
);
739+
assert_eq!(closed_b.closure_initiator, ChannelClosureInitiator::Remote as i32);
740+
}
741+
413742
#[tokio::test]
414743
async fn test_cli_list_channels() {
415744
let bitcoind = TestBitcoind::new();

0 commit comments

Comments
 (0)