Skip to content

Commit d5bc804

Browse files
committed
Add makePool method on ThreadMap
This patch introduces a pool of threads to the `Connection` class, and allows this pool to be populated with the thread map via `makePool`. When a client thread is not set in a request context, it is delegated to the pool. This is unable to handle the guarentees with server-invoked callbacks that the current API offers, but these callbacks are not yet present in the interface. The pool is implemented as round-robin as it is simplest, but perhaps the pool could be a queue of requests with work-stealing for threads that are available. This was raised to me by Rust users, as they did not particularly care where work is executed on the server-side, but they have to set the thread regardless. ref: https://github.com/2140-dev/bitcoin-capnp-types/blob/master/tests/util/bitcoin_core.rs#L149
1 parent 3c69d12 commit d5bc804

6 files changed

Lines changed: 133 additions & 8 deletions

File tree

include/mp/proxy-io.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,11 @@ class Connection
477477
//! ThreadMap.makeThread) used to service requests to clients.
478478
::capnp::CapabilityServerSet<Thread> m_threads;
479479

480+
//! Thread pool populated by ThreadMap.makePool(). When a request arrives
481+
//! with no context.thread set, PassField round-robins across these threads.
482+
std::vector<Thread::Client> m_thread_pool;
483+
size_t m_thread_pool_index{0};
484+
480485
//! Canceler for canceling promises that we want to discard when the
481486
//! connection is destroyed. This is used to interrupt method calls that are
482487
//! still executing at time of disconnection.

include/mp/proxy.capnp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ interface ThreadMap $count(0) {
4545
# execute on. Clients create and name threads and pass the thread handle as
4646
# a call parameter.
4747
makeThread @0 (name :Text) -> (result :Thread);
48+
# Pre-allocate a pool of server threads for implicit dispatch. When a
49+
# request arrives with no context.thread set, the server dispatches it
50+
# through this pool via a shared work queue.
51+
makePool @1 (name :Text, count :UInt32) -> ();
4852
}
4953

5054
interface Thread {

include/mp/type-context.h

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,20 +201,38 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
201201
const auto& params = server_context.call_context.getParams();
202202
Context::Reader context_arg = Accessor::get(params);
203203
auto thread_client = context_arg.getThread();
204-
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
205-
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
206-
// Assuming the thread object is found, pass it a pointer to the
207-
// `invoke` lambda above which will invoke the function on that
208-
// thread.
204+
auto* connection = server.m_context.connection;
205+
auto result = connection->m_threads.getLocalServer(thread_client)
206+
.then([&loop, invoke = kj::mv(invoke), req, connection](const kj::Maybe<Thread::Server&>& perhaps) mutable {
207+
// If the client specified a thread, dispatch to it directly.
209208
KJ_IF_MAYBE (thread_server, perhaps) {
210209
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
211210
MP_LOG(loop, Log::Debug)
212211
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
213212
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
214213
} else {
215-
MP_LOG(loop, Log::Error)
216-
<< "IPC server error request #" << req << ", missing thread to execute request";
217-
throw std::runtime_error("invalid thread handle");
214+
// No thread specified — fall back to the connection's thread
215+
// pool (populated by ThreadMap.makePool). Error if no pool.
216+
auto& pool = connection->m_thread_pool;
217+
if (pool.empty()) {
218+
MP_LOG(loop, Log::Error)
219+
<< "IPC server error request #" << req << ", no thread specified and no pool configured";
220+
throw std::runtime_error("no thread specified and no pool configured");
221+
}
222+
size_t idx = connection->m_thread_pool_index++ % pool.size();
223+
return connection->m_threads.getLocalServer(pool[idx])
224+
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& pool_perhaps) mutable {
225+
KJ_IF_MAYBE (pt, pool_perhaps) {
226+
auto& pool_thread = static_cast<ProxyServer<Thread>&>(*pt);
227+
MP_LOG(loop, Log::Debug)
228+
<< "IPC server post request #" << req << " {" << pool_thread.m_thread_context.thread_name << "}";
229+
return pool_thread.template post<typename ServerContext::CallContext>(std::move(invoke));
230+
} else {
231+
MP_LOG(loop, Log::Error)
232+
<< "IPC server error request #" << req << ", pool thread not found";
233+
throw std::runtime_error("pool thread not found");
234+
}
235+
});
218236
}
219237
});
220238
// Use connection m_canceler object to cancel the result promise if the

include/mp/type-threadmap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ struct ProxyServer<ThreadMap> final : public virtual ThreadMap::Server
1414
public:
1515
ProxyServer(Connection& connection);
1616
kj::Promise<void> makeThread(MakeThreadContext context) override;
17+
kj::Promise<void> makePool(MakePoolContext context) override;
1718
Connection& m_connection;
1819
};
1920

src/mp/proxy.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <kj/function.h>
2727
#include <kj/memory.h>
2828
#include <kj/string.h>
29+
#include <cstdint>
2930
#include <map>
3031
#include <memory>
3132
#include <optional>
@@ -36,6 +37,7 @@
3637
#include <tuple>
3738
#include <unistd.h>
3839
#include <utility>
40+
#include <vector>
3941

4042
namespace mp {
4143

@@ -415,6 +417,28 @@ kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
415417

416418
ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(connection) {}
417419

420+
kj::Promise<void> ProxyServer<ThreadMap>::makePool(MakePoolContext context)
421+
{
422+
EventLoop& loop{*m_connection.m_loop};
423+
const auto& params = context.getParams();
424+
const std::string pool_name = params.getName();
425+
const uint32_t count = params.getCount();
426+
for (uint32_t i = 0; i < count; ++i) {
427+
const std::string thread_name = pool_name + "/pool/" + std::to_string(i);
428+
std::promise<ThreadContext*> thread_context;
429+
std::thread thread([&loop, &thread_context, thread_name]() {
430+
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (" + thread_name + ")";
431+
g_thread_context.waiter = std::make_unique<Waiter>();
432+
Lock lock(g_thread_context.waiter->m_mutex);
433+
thread_context.set_value(&g_thread_context);
434+
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
435+
});
436+
auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
437+
m_connection.m_thread_pool.push_back(m_connection.m_threads.add(kj::mv(thread_server)));
438+
}
439+
return kj::READY_NOW;
440+
}
441+
418442
kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
419443
{
420444
EventLoop& loop{*m_connection.m_loop};

test/mp/test/test.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
#include <kj/async.h>
1919
#include <kj/async-io.h>
2020
#include <kj/common.h>
21+
#include <kj/exception.h>
2122
#include <kj/debug.h>
2223
#include <kj/memory.h>
24+
#include <kj/string.h>
2325
#include <kj/test.h>
2426
#include <memory>
2527
#include <mp/proxy.h>
@@ -481,5 +483,76 @@ KJ_TEST("Make simultaneous IPC calls on single remote thread")
481483
KJ_EXPECT(expected == 400);
482484
}
483485

486+
KJ_TEST("Call async IPC method dispatched to pool thread")
487+
{
488+
TestSetup setup;
489+
ProxyClient<messages::FooInterface>* foo = setup.client.get();
490+
491+
// Set up the thread map exchange so the client has the server's ThreadMap,
492+
// then call makePool to pre-allocate two server threads.
493+
foo->initThreadMap();
494+
setup.server->m_impl->m_int_fn = [](int n) { return n * 2; };
495+
496+
ThreadContext& tc{g_thread_context};
497+
std::atomic<size_t> running{3};
498+
std::promise<void> pool_ready;
499+
foo->m_context.loop->sync([&] {
500+
auto pool_req = foo->m_context.connection->m_thread_map.makePoolRequest();
501+
pool_req.setName("test");
502+
pool_req.setCount(2);
503+
foo->m_context.loop->m_task_set->add(
504+
pool_req.send().then([&](auto&&) { pool_ready.set_value(); }));
505+
});
506+
pool_ready.get_future().get();
507+
508+
// Send three callIntFnAsync requests with no context.thread set.
509+
// The server should dispatch each to a pool thread.
510+
auto client{foo->m_client};
511+
foo->m_context.loop->sync([&] {
512+
for (size_t i = 0; i < running; ++i) {
513+
auto request{client.callIntFnAsyncRequest()};
514+
request.initContext(); // context present but thread unset
515+
request.setArg(static_cast<int32_t>(i + 1));
516+
foo->m_context.loop->m_task_set->add(request.send().then(
517+
[&running, &tc, i](auto&& results) {
518+
assert(results.getResult() == static_cast<int32_t>((i + 1) * 2));
519+
running -= 1;
520+
tc.waiter->m_cv.notify_all();
521+
}));
522+
}
523+
});
524+
{
525+
Lock lock(tc.waiter->m_mutex);
526+
tc.waiter->wait(lock, [&running] { return running == 0; });
527+
}
528+
}
529+
530+
KJ_TEST("Call async IPC method without thread or pool errors correctly")
531+
{
532+
TestSetup setup;
533+
ProxyClient<messages::FooInterface>* foo = setup.client.get();
534+
setup.server->m_impl->m_fn = [] {};
535+
536+
// Send a callFnAsync request with no context.thread and no pool configured.
537+
// The server should throw the "no thread specified and no pool configured" error.
538+
std::promise<void> done;
539+
bool error_thrown{false};
540+
foo->m_context.loop->sync([&] {
541+
auto request{foo->m_client.callFnAsyncRequest()};
542+
request.initContext();
543+
foo->m_context.loop->m_task_set->add(
544+
request.send().then(
545+
[&](auto&&) { done.set_value(); },
546+
[&](kj::Exception&& e) {
547+
error_thrown = true;
548+
KJ_EXPECT(std::string_view{e.getDescription().cStr()}.find(
549+
"no thread specified and no pool configured") != std::string_view::npos);
550+
done.set_value();
551+
}));
552+
});
553+
done.get_future().get();
554+
KJ_EXPECT(error_thrown);
555+
}
556+
484557
} // namespace test
485558
} // namespace mp

0 commit comments

Comments
 (0)