Skip to content

Commit 7eb1b72

Browse files
committed
experimental: add ChainNotifications integration tests for mempool delivery
Two new tests against the multiprocess regtest node: - chain_handle_notifications_delivers_mempool_added: registers a ChainNotifications::Server via handleNotifications, injects a self-transfer through bitcoin-cli on a blocking task, and waits for the corresponding transactionAddedToMempool callback. - chain_request_mempool_transactions_replays_current_mempool: seeds the mempool with a self-transfer, then calls requestMempoolTransactions and confirms the existing entry is replayed through the same notifications interface (no separate getrawmempool call needed). Together these cover the two paths electrs needs to drop its bitcoind JSON-RPC mempool plumbing on the IPC backend.
1 parent 62845e2 commit 7eb1b72

1 file changed

Lines changed: 193 additions & 1 deletion

File tree

tests/test.rs

Lines changed: 193 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use bitcoin_capnp_types::mining_capnp;
1+
use std::cell::RefCell;
2+
use std::rc::Rc;
3+
4+
use bitcoin_capnp_types::{chain_capnp::chain_notifications, mining_capnp};
25

36
#[path = "util/bitcoin_core.rs"]
47
mod bitcoin_core_util;
@@ -572,3 +575,192 @@ async fn chain_wait_for_notifications_unblocks_on_new_block() {
572575
})
573576
.await;
574577
}
578+
579+
/// Minimal `ChainNotifications::Server` used by the notification tests
580+
/// below. Records every `transactionAddedToMempool` /
581+
/// `transactionRemovedFromMempool` event in interior-mutable buffers so the
582+
/// test body can poll them. Other notification methods accept and ignore
583+
/// the call (they're delivered too, but the tests don't assert on them).
584+
#[derive(Default)]
585+
struct RecordingNotifications {
586+
added: Rc<RefCell<Vec<Vec<u8>>>>,
587+
removed: Rc<RefCell<Vec<(Vec<u8>, i32)>>>,
588+
}
589+
590+
impl chain_notifications::Server for RecordingNotifications {
591+
fn destroy(
592+
self: Rc<Self>,
593+
_: chain_notifications::DestroyParams,
594+
_: chain_notifications::DestroyResults,
595+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
596+
std::future::ready(Ok(()))
597+
}
598+
599+
fn transaction_added_to_mempool(
600+
self: Rc<Self>,
601+
params: chain_notifications::TransactionAddedToMempoolParams,
602+
_: chain_notifications::TransactionAddedToMempoolResults,
603+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
604+
let added = self.added.clone();
605+
async move {
606+
let p = params.get()?;
607+
let tx = p.get_tx()?.to_vec();
608+
added.borrow_mut().push(tx);
609+
Ok(())
610+
}
611+
}
612+
613+
fn transaction_removed_from_mempool(
614+
self: Rc<Self>,
615+
params: chain_notifications::TransactionRemovedFromMempoolParams,
616+
_: chain_notifications::TransactionRemovedFromMempoolResults,
617+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
618+
let removed = self.removed.clone();
619+
async move {
620+
let p = params.get()?;
621+
let tx = p.get_tx()?.to_vec();
622+
let reason = p.get_reason();
623+
removed.borrow_mut().push((tx, reason));
624+
Ok(())
625+
}
626+
}
627+
628+
fn block_connected(
629+
self: Rc<Self>,
630+
_: chain_notifications::BlockConnectedParams,
631+
_: chain_notifications::BlockConnectedResults,
632+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
633+
std::future::ready(Ok(()))
634+
}
635+
636+
fn block_disconnected(
637+
self: Rc<Self>,
638+
_: chain_notifications::BlockDisconnectedParams,
639+
_: chain_notifications::BlockDisconnectedResults,
640+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
641+
std::future::ready(Ok(()))
642+
}
643+
644+
fn updated_block_tip(
645+
self: Rc<Self>,
646+
_: chain_notifications::UpdatedBlockTipParams,
647+
_: chain_notifications::UpdatedBlockTipResults,
648+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
649+
std::future::ready(Ok(()))
650+
}
651+
652+
fn chain_state_flushed(
653+
self: Rc<Self>,
654+
_: chain_notifications::ChainStateFlushedParams,
655+
_: chain_notifications::ChainStateFlushedResults,
656+
) -> impl std::future::Future<Output = Result<(), capnp::Error>> + 'static {
657+
std::future::ready(Ok(()))
658+
}
659+
}
660+
661+
/// Verify `Chain.handleNotifications` delivers `transactionAddedToMempool`
662+
/// callbacks when a new transaction enters the node's mempool. This is the
663+
/// path electrs (when configured with the IPC backend) needs to use to
664+
/// retire its periodic `getrawmempool` polling.
665+
#[tokio::test]
666+
#[serial_test::serial]
667+
async fn chain_handle_notifications_delivers_mempool_added() {
668+
let wallet = bitcoin_test_wallet();
669+
ensure_wallet_loaded_and_funded(&wallet);
670+
671+
with_chain_client(|_init, thread, chain| async move {
672+
let recorder = Rc::new(RecordingNotifications::default());
673+
let recorder_for_assert = recorder.clone();
674+
let notifications: chain_notifications::Client =
675+
capnp_rpc::new_client_from_rc(recorder.clone());
676+
677+
// Register the handler. We must keep the returned Handler client
678+
// alive for the duration of the subscription.
679+
let mut req = chain.handle_notifications_request();
680+
req.get().get_context().unwrap().set_thread(thread.clone());
681+
req.get().set_notifications(notifications.clone());
682+
let resp = req.send().promise.await.unwrap();
683+
let _handler = resp.get().unwrap().get_result().unwrap();
684+
685+
// Drain any prior mempool state so the subsequent self-transfer is
686+
// unambiguously the trigger.
687+
recorder_for_assert.added.borrow_mut().clear();
688+
689+
// Inject a fresh transaction into the node's mempool from a
690+
// blocking task (bitcoin-cli is sync) and wait for the notification
691+
// to arrive.
692+
let inject_wallet = wallet.clone();
693+
let inject = tokio::task::spawn_blocking(move || {
694+
create_mempool_self_transfer(&inject_wallet);
695+
});
696+
697+
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(15);
698+
loop {
699+
if !recorder_for_assert.added.borrow().is_empty() {
700+
break;
701+
}
702+
if std::time::Instant::now() >= deadline {
703+
panic!(
704+
"transactionAddedToMempool was not delivered within 15s; recorded={}",
705+
recorder_for_assert.added.borrow().len()
706+
);
707+
}
708+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
709+
}
710+
inject.await.unwrap();
711+
})
712+
.await;
713+
}
714+
715+
/// Verify `Chain.requestMempoolTransactions` replays the current mempool
716+
/// contents through the supplied `ChainNotifications` handler. This is the
717+
/// "give me what's already in the mempool" primer electrs needs at startup
718+
/// before switching to the live notification stream.
719+
#[tokio::test]
720+
#[serial_test::serial]
721+
async fn chain_request_mempool_transactions_replays_current_mempool() {
722+
let wallet = bitcoin_test_wallet();
723+
ensure_wallet_loaded_and_funded(&wallet);
724+
725+
// Seed the mempool with a single transaction synchronously (outside
726+
// the runtime) so it's already there when we ask for the snapshot.
727+
let _seed = create_mempool_self_transfer(&wallet);
728+
assert!(
729+
mempool_tx_count() >= 1,
730+
"expected at least one tx in the node's mempool before request"
731+
);
732+
733+
with_chain_client(|_init, thread, chain| async move {
734+
let recorder = Rc::new(RecordingNotifications::default());
735+
let notifications: chain_notifications::Client =
736+
capnp_rpc::new_client_from_rc(recorder.clone());
737+
738+
let mut req = chain.request_mempool_transactions_request();
739+
req.get().get_context().unwrap().set_thread(thread.clone());
740+
req.get().set_notifications(notifications);
741+
let resp = tokio::time::timeout(
742+
std::time::Duration::from_secs(15),
743+
req.send().promise,
744+
)
745+
.await
746+
.expect("requestMempoolTransactions timed out");
747+
resp.expect("requestMempoolTransactions failed");
748+
749+
// The replay is fire-and-forget on the server side; give the
750+
// delivered callbacks a brief moment to land on our LocalSet
751+
// before asserting.
752+
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
753+
loop {
754+
if !recorder.added.borrow().is_empty() {
755+
break;
756+
}
757+
if std::time::Instant::now() >= deadline {
758+
panic!(
759+
"requestMempoolTransactions delivered no transactionAddedToMempool callbacks"
760+
);
761+
}
762+
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
763+
}
764+
})
765+
.await;
766+
}

0 commit comments

Comments
 (0)