diff --git a/capnp/proxy.capnp b/capnp/proxy.capnp index 386f8f7..08cf99b 100644 --- a/capnp/proxy.capnp +++ b/capnp/proxy.capnp @@ -45,6 +45,10 @@ interface ThreadMap $count(0) { # execute on. Clients create and name threads and pass the thread handle as # a call parameter. makeThread @0 (name :Text) -> (result :Thread); + # Pre-allocate a pool of server threads for implicit dispatch. When a + # request arrives with no context.thread set, the server round-robins + # across the pool instead of returning an error. + makePool @1 (count :UInt32) -> (); } interface Thread { diff --git a/tests/test.rs b/tests/test.rs index bf7e0d2..8bc502b 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,4 +1,4 @@ -use bitcoin_capnp_types::mining_capnp; +use bitcoin_capnp_types::mining_capnp::{self, mining}; #[path = "util/bitcoin_core.rs"] mod bitcoin_core_util; @@ -6,7 +6,8 @@ mod bitcoin_core_util; mod bitcoin_core_wallet_util; use bitcoin_core_util::{ - destroy_template, make_block_template, mempool_tx_count, with_init_client, with_mining_client, + destroy_template, make_block_template, mempool_tx_count, with_init_client, + with_init_client_and_thread_map, with_mining_client, }; use bitcoin_core_wallet_util::{ bitcoin_test_wallet, create_mempool_self_transfer, ensure_wallet_loaded_and_funded, @@ -40,6 +41,156 @@ async fn integration() { .await; } +/// makeEcho without setting context.thread. +/// +/// On an upstream Bitcoin Core (libmultiprocess without `ThreadMap.makePool`) +/// the server-side dispatcher rejects requests whose context has no thread +/// handle — `m_threads.getLocalServer()` returns null and the handler throws +/// "invalid thread handle". This test pins that behavior so we notice if the +/// pooled-threads fork ever changes it (e.g., once `makePool` is wired up and +/// the server falls back to a pool instead of erroring). +#[tokio::test] +#[serial_test::parallel] +async fn make_echo_without_thread_errors() { + with_init_client(|client, _thread| async move { + let echo = client.make_echo_request(); + // Intentionally do NOT call set_thread on the context — leave the + // thread capability null. + let result = echo.send().promise.await; + match &result { + Ok(_) => {} + Err(e) => eprintln!("makeEcho without thread errored as expected: {e}"), + } + assert!( + result.is_err(), + "makeEcho with no context.thread should be rejected by the server" + ); + }) + .await; +} + +/// Create a server-side thread pool via `ThreadMap.makePool`, then issue an +/// echo round-trip without setting `context.thread`. The server's dispatcher +/// should round-robin the call onto a pool thread and return the echoed text. +/// +/// This requires a Bitcoin Core built against the pooled-threads libmultiprocess +/// fork — upstream rejects ordinal @1 on `ThreadMap` as "method not implemented". +#[tokio::test] +#[serial_test::parallel] +async fn make_echo_via_pool() { + with_init_client_and_thread_map(|client, thread_map, _thread| async move { + // Pre-allocate two pool threads on the server. + let mut pool_req = thread_map.make_pool_request(); + pool_req.get().set_count(2); + pool_req + .send() + .promise + .await + .expect("makePool should succeed on a pool-capable server"); + + // Obtain an Echo capability. makeEcho itself dispatches via Context, + // so leave thread unset here too — pool should service it. + let make_echo = client.make_echo_request(); + let echo_response = make_echo + .send() + .promise + .await + .expect("makeEcho without thread should be dispatched via the pool"); + let echo_client = echo_response.get().unwrap().get_result().unwrap(); + + // Now call echo() on the Echo capability — again without setting + // context.thread — and assert the round-trip succeeds. + let mut echo_call = echo_client.echo_request(); + echo_call.get().set_echo("Hello from pool"); + let echo_resp = echo_call + .send() + .promise + .await + .expect("echo without thread should be dispatched via the pool"); + let text = echo_resp + .get() + .unwrap() + .get_result() + .unwrap() + .to_string() + .unwrap(); + assert_eq!("Hello from pool", text); + }) + .await; +} + +/// Drive the Mining interface entirely through a server-side thread pool. +/// +/// Creates a 4-thread pool, then obtains a Mining capability without setting +/// `context.thread`. Fires two batches of concurrent requests — 4 calls each, +/// none with a thread set — and awaits them with `try_join_all`. Every call +/// should be round-robin dispatched onto pool threads and succeed. +/// +/// Requires a Bitcoin Core built against the pooled-threads libmultiprocess +/// fork (ordinal @1 on `ThreadMap`). +#[tokio::test] +#[serial_test::parallel] +async fn mining_via_pool_concurrent_queries() { + with_init_client_and_thread_map(|client, thread_map, _thread| async move { + // Pre-allocate 4 server threads. With 4 concurrent in-flight requests + // per batch, each pool thread gets exactly one — exercising every + // entry in the round-robin cursor. + let mut pool_req = thread_map.make_pool_request(); + pool_req.get().set_count(4); + pool_req + .send() + .promise + .await + .expect("makePool should succeed on a pool-capable server"); + + // Obtain Mining via the pool (no thread set on makeMining's context). + let make_mining = client.make_mining_request(); + let mining_resp = make_mining + .send() + .promise + .await + .expect("makeMining without thread should be dispatched via the pool"); + let mining: mining::Client = mining_resp.get().unwrap().get_result().unwrap(); + + // Batch 1: four concurrent is_test_chain calls. + let test_chain_futs: Vec<_> = (0..4) + .map(|_| mining.is_test_chain_request().send().promise) + .collect(); + let test_chain_results = futures::future::try_join_all(test_chain_futs) + .await + .expect("concurrent is_test_chain should all dispatch via pool"); + for resp in &test_chain_results { + assert!( + resp.get().unwrap().get_result(), + "regtest is a test chain — all four pool threads should agree" + ); + } + + // Batch 2: four concurrent get_tip calls. The node isn't mining during + // tests, so each call should report a well-formed tip — but we don't + // assert hash equality across calls in case a parallel test ever does + // advance the tip. + let get_tip_futs: Vec<_> = (0..4) + .map(|_| mining.get_tip_request().send().promise) + .collect(); + let get_tip_results = futures::future::try_join_all(get_tip_futs) + .await + .expect("concurrent get_tip should all dispatch via pool"); + for resp in &get_tip_results { + let results = resp.get().unwrap(); + assert!(results.get_has_result(), "node should have a tip"); + let tip = results.get_result().unwrap(); + assert_eq!( + tip.get_hash().unwrap().len(), + 32, + "block hash must be 32 bytes" + ); + assert!(tip.get_height() >= 0, "height must be non-negative"); + } + }) + .await; +} + /// Calling the deprecated makeMiningOld2 (@2) should return an error from the /// server. Cap'n Proto requires sequential ordinals so this placeholder cannot /// be removed, but the server intentionally rejects it. diff --git a/tests/util/bitcoin_core.rs b/tests/util/bitcoin_core.rs index 2171bf5..74e5b6c 100644 --- a/tests/util/bitcoin_core.rs +++ b/tests/util/bitcoin_core.rs @@ -70,12 +70,29 @@ where let rpc_system = RpcSystem::new(Box::new(rpc_network), None); LocalSet::new() .run_until(async move { - let (client, thread) = bootstrap(rpc_system).await; + let (client, _thread_map, thread) = bootstrap(rpc_system).await; f(client, thread).await; }) .await; } +/// Like `with_init_client` but also surfaces the `ThreadMap` capability so the +/// caller can drive thread-pool setup (`make_pool`) on the connection. +pub async fn with_init_client_and_thread_map(f: F) +where + F: FnOnce(init::Client, thread_map::Client, thread::Client) -> Fut, + Fut: Future, +{ + let rpc_network = connect_unix_stream(unix_socket_path()).await; + let rpc_system = RpcSystem::new(Box::new(rpc_network), None); + LocalSet::new() + .run_until(async move { + let (client, thread_map, thread) = bootstrap(rpc_system).await; + f(client, thread_map, thread).await; + }) + .await; +} + pub async fn with_mining_client(f: F) where F: FnOnce(init::Client, thread::Client, mining::Client) -> Fut, @@ -117,7 +134,7 @@ pub async fn connect_unix_stream( /// Bootstrap an Init client, spawn the RPC system, and create a thread handle. pub async fn bootstrap( mut rpc_system: RpcSystem, -) -> (init::Client, thread::Client) { +) -> (init::Client, thread_map::Client, thread::Client) { ensure_bootstrap_chain_ready(); let client: init::Client = rpc_system.bootstrap(Side::Server); @@ -140,7 +157,7 @@ pub async fn bootstrap( .await .unwrap(); let thread: thread::Client = thread_reponse.get().unwrap().get_result().unwrap(); - (client, thread) + (client, thread_map, thread) } /// Obtain a Mining client from an Init client.