Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions capnp/proxy.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ interface ThreadMap $count(0) {
# execute on. Clients create and name threads and pass the thread handle as
# a call parameter.
makeThread @0 (name :Text) -> (result :Thread);
# Pre-allocate a pool of server threads for implicit dispatch. When a
# request arrives with no context.thread set, the server round-robins
# across the pool instead of returning an error.
makePool @1 (count :UInt32) -> ();
}

interface Thread {
Expand Down
155 changes: 153 additions & 2 deletions tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use bitcoin_capnp_types::mining_capnp;
use bitcoin_capnp_types::mining_capnp::{self, mining};

#[path = "util/bitcoin_core.rs"]
mod bitcoin_core_util;
#[path = "util/bitcoin_core_wallet.rs"]
mod bitcoin_core_wallet_util;

use bitcoin_core_util::{
destroy_template, make_block_template, mempool_tx_count, with_init_client, with_mining_client,
destroy_template, make_block_template, mempool_tx_count, with_init_client,
with_init_client_and_thread_map, with_mining_client,
};
use bitcoin_core_wallet_util::{
bitcoin_test_wallet, create_mempool_self_transfer, ensure_wallet_loaded_and_funded,
Expand Down Expand Up @@ -40,6 +41,156 @@ async fn integration() {
.await;
}

/// makeEcho without setting context.thread.
///
/// On an upstream Bitcoin Core (libmultiprocess without `ThreadMap.makePool`)
/// the server-side dispatcher rejects requests whose context has no thread
/// handle — `m_threads.getLocalServer()` returns null and the handler throws
/// "invalid thread handle". This test pins that behavior so we notice if the
/// pooled-threads fork ever changes it (e.g., once `makePool` is wired up and
/// the server falls back to a pool instead of erroring).
#[tokio::test]
#[serial_test::parallel]
async fn make_echo_without_thread_errors() {
with_init_client(|client, _thread| async move {
let echo = client.make_echo_request();
// Intentionally do NOT call set_thread on the context — leave the
// thread capability null.
let result = echo.send().promise.await;
match &result {
Ok(_) => {}
Err(e) => eprintln!("makeEcho without thread errored as expected: {e}"),
}
assert!(
result.is_err(),
"makeEcho with no context.thread should be rejected by the server"
);
})
.await;
}

/// Create a server-side thread pool via `ThreadMap.makePool`, then issue an
/// echo round-trip without setting `context.thread`. The server's dispatcher
/// should round-robin the call onto a pool thread and return the echoed text.
///
/// This requires a Bitcoin Core built against the pooled-threads libmultiprocess
/// fork — upstream rejects ordinal @1 on `ThreadMap` as "method not implemented".
#[tokio::test]
#[serial_test::parallel]
async fn make_echo_via_pool() {
with_init_client_and_thread_map(|client, thread_map, _thread| async move {
// Pre-allocate two pool threads on the server.
let mut pool_req = thread_map.make_pool_request();
pool_req.get().set_count(2);
pool_req
.send()
.promise
.await
.expect("makePool should succeed on a pool-capable server");

// Obtain an Echo capability. makeEcho itself dispatches via Context,
// so leave thread unset here too — pool should service it.
let make_echo = client.make_echo_request();
let echo_response = make_echo
.send()
.promise
.await
.expect("makeEcho without thread should be dispatched via the pool");
let echo_client = echo_response.get().unwrap().get_result().unwrap();

// Now call echo() on the Echo capability — again without setting
// context.thread — and assert the round-trip succeeds.
let mut echo_call = echo_client.echo_request();
echo_call.get().set_echo("Hello from pool");
let echo_resp = echo_call
.send()
.promise
.await
.expect("echo without thread should be dispatched via the pool");
let text = echo_resp
.get()
.unwrap()
.get_result()
.unwrap()
.to_string()
.unwrap();
assert_eq!("Hello from pool", text);
})
.await;
}

/// Drive the Mining interface entirely through a server-side thread pool.
///
/// Creates a 4-thread pool, then obtains a Mining capability without setting
/// `context.thread`. Fires two batches of concurrent requests — 4 calls each,
/// none with a thread set — and awaits them with `try_join_all`. Every call
/// should be round-robin dispatched onto pool threads and succeed.
///
/// Requires a Bitcoin Core built against the pooled-threads libmultiprocess
/// fork (ordinal @1 on `ThreadMap`).
#[tokio::test]
#[serial_test::parallel]
async fn mining_via_pool_concurrent_queries() {
with_init_client_and_thread_map(|client, thread_map, _thread| async move {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@plebhash @Shourya742 can you guys have a look at the result here and provide feedback on bitcoin-core/libmultiprocess#283 if any?

// Pre-allocate 4 server threads. With 4 concurrent in-flight requests
// per batch, each pool thread gets exactly one — exercising every
// entry in the round-robin cursor.
let mut pool_req = thread_map.make_pool_request();
pool_req.get().set_count(4);
pool_req
.send()
.promise
.await
.expect("makePool should succeed on a pool-capable server");

// Obtain Mining via the pool (no thread set on makeMining's context).
let make_mining = client.make_mining_request();
let mining_resp = make_mining
.send()
.promise
.await
.expect("makeMining without thread should be dispatched via the pool");
let mining: mining::Client = mining_resp.get().unwrap().get_result().unwrap();

// Batch 1: four concurrent is_test_chain calls.
let test_chain_futs: Vec<_> = (0..4)
.map(|_| mining.is_test_chain_request().send().promise)
.collect();
let test_chain_results = futures::future::try_join_all(test_chain_futs)
.await
.expect("concurrent is_test_chain should all dispatch via pool");
for resp in &test_chain_results {
assert!(
resp.get().unwrap().get_result(),
"regtest is a test chain — all four pool threads should agree"
);
}

// Batch 2: four concurrent get_tip calls. The node isn't mining during
// tests, so each call should report a well-formed tip — but we don't
// assert hash equality across calls in case a parallel test ever does
// advance the tip.
let get_tip_futs: Vec<_> = (0..4)
.map(|_| mining.get_tip_request().send().promise)
.collect();
let get_tip_results = futures::future::try_join_all(get_tip_futs)
.await
.expect("concurrent get_tip should all dispatch via pool");
for resp in &get_tip_results {
let results = resp.get().unwrap();
assert!(results.get_has_result(), "node should have a tip");
let tip = results.get_result().unwrap();
assert_eq!(
tip.get_hash().unwrap().len(),
32,
"block hash must be 32 bytes"
);
assert!(tip.get_height() >= 0, "height must be non-negative");
}
})
.await;
}

/// Calling the deprecated makeMiningOld2 (@2) should return an error from the
/// server. Cap'n Proto requires sequential ordinals so this placeholder cannot
/// be removed, but the server intentionally rejects it.
Expand Down
23 changes: 20 additions & 3 deletions tests/util/bitcoin_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,29 @@ where
let rpc_system = RpcSystem::new(Box::new(rpc_network), None);
LocalSet::new()
.run_until(async move {
let (client, thread) = bootstrap(rpc_system).await;
let (client, _thread_map, thread) = bootstrap(rpc_system).await;
f(client, thread).await;
})
.await;
}

/// Like `with_init_client` but also surfaces the `ThreadMap` capability so the
/// caller can drive thread-pool setup (`make_pool`) on the connection.
pub async fn with_init_client_and_thread_map<F, Fut>(f: F)
where
F: FnOnce(init::Client, thread_map::Client, thread::Client) -> Fut,
Fut: Future<Output = ()>,
{
let rpc_network = connect_unix_stream(unix_socket_path()).await;
let rpc_system = RpcSystem::new(Box::new(rpc_network), None);
LocalSet::new()
.run_until(async move {
let (client, thread_map, thread) = bootstrap(rpc_system).await;
f(client, thread_map, thread).await;
})
.await;
}

pub async fn with_mining_client<F, Fut>(f: F)
where
F: FnOnce(init::Client, thread::Client, mining::Client) -> Fut,
Expand Down Expand Up @@ -117,7 +134,7 @@ pub async fn connect_unix_stream(
/// Bootstrap an Init client, spawn the RPC system, and create a thread handle.
pub async fn bootstrap(
mut rpc_system: RpcSystem<capnp_rpc::rpc_twoparty_capnp::Side>,
) -> (init::Client, thread::Client) {
) -> (init::Client, thread_map::Client, thread::Client) {
ensure_bootstrap_chain_ready();

let client: init::Client = rpc_system.bootstrap(Side::Server);
Expand All @@ -140,7 +157,7 @@ pub async fn bootstrap(
.await
.unwrap();
let thread: thread::Client = thread_reponse.get().unwrap().get_result().unwrap();
(client, thread)
(client, thread_map, thread)
}

/// Obtain a Mining client from an Init client.
Expand Down
Loading