Skip to content

Commit 8fe9da2

Browse files
committed
Allow simultaneous calls on same Context.thread
If multiple IPC requests happen at the same time specifying same Context.thread to run the requests on, queue the requests to execute in the order they are received instead of raising a "thread busy" exception. This change has no effect on C++ clients using libmultiprocess as a client library, since the libmultiprocess client only makes blocking calls and creates a server thread for every client thread, so it's not possible for there to be multiple calls on the same server thread. But this change may be useful for rust and python clients as discussed bitcoin/bitcoin#33923
1 parent d0fc108 commit 8fe9da2

6 files changed

Lines changed: 74 additions & 57 deletions

File tree

include/mp/proxy-io.h

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,20 @@ template <>
8282
struct ProxyServer<Thread> final : public Thread::Server
8383
{
8484
public:
85-
ProxyServer(ThreadContext& thread_context, std::thread&& thread);
85+
ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
8686
~ProxyServer();
8787
kj::Promise<void> getName(GetNameContext context) override;
88+
89+
//! Run a callback function returning T on this thread.
90+
template<typename T, typename Fn>
91+
kj::Promise<T> post(Fn&& fn);
92+
93+
EventLoopRef m_loop;
8894
ThreadContext& m_thread_context;
8995
std::thread m_thread;
96+
//! Promise signaled when m_thread_context.waiter is idle and there is no
97+
//! post() callback function waiting to execute.
98+
kj::Promise<void> m_thread_ready{kj::READY_NOW};
9099
};
91100

92101
//! Handler for kj::TaskSet failed task events.
@@ -322,7 +331,12 @@ class EventLoop
322331
//! thread is blocked waiting for server response, this is what allows the
323332
//! client to run the request in the same thread, the same way code would run in a
324333
//! single process, with the callback sharing the same thread stack as the original
325-
//! call.)
334+
//! call.) To support this, the clientInvoke function calls Waiter::wait() to
335+
//! block the client IPC thread while initial request is in progress. Then if
336+
//! there is a callback, it is executed with Waiter::post().
337+
//!
338+
//! The Waiter class is also used server-side by `ProxyServer<Thread>::post()`
339+
//! to execute IPC calls on worker threads.
326340
struct Waiter
327341
{
328342
Waiter() = default;
@@ -675,6 +689,37 @@ struct ThreadContext
675689
bool loop_thread = false;
676690
};
677691

692+
template<typename T, typename Fn>
693+
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
694+
{
695+
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
696+
auto ret = m_thread_ready.then([this, fn = std::move(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
697+
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
698+
bool posted = m_thread_context.waiter->post([this, fn = std::move(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
699+
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable { ready_fulfiller->fulfill(); });
700+
std::optional<T> result;
701+
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result.emplace(fn()); })};
702+
m_loop->sync([&result, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
703+
KJ_IF_MAYBE(e, exception) {
704+
assert(!result);
705+
result_fulfiller->reject(kj::mv(*e));
706+
} else {
707+
assert(result);
708+
result_fulfiller->fulfill(kj::mv(*result));
709+
}
710+
});
711+
});
712+
// Assert that calling Waiter::post did not fail. It could only return
713+
// false if a new function was posted before the previous one finished
714+
// executing, but new functions are only posted when m_thread_ready is
715+
// is signaled, so this should never happen.
716+
assert(posted);
717+
return kj::mv(result.promise);
718+
});
719+
m_thread_ready = kj::mv(ready.promise);
720+
return ret;
721+
}
722+
678723
//! Given stream file descriptor, make a new ProxyClient object to send requests
679724
//! over the stream. Also create a new Connection object embedded in the
680725
//! client that is freed when the client is closed.

include/mp/type-context.h

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ void CustomBuildField(TypeList<>,
2626
// future calls over this connection can reuse it.
2727
auto [callback_thread, _]{SetThread(
2828
GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
29-
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
29+
[&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
3030

3131
// Call remote ThreadMap.makeThread function so server will create a
3232
// dedicated worker thread to run function calls from this thread. Store the
@@ -61,11 +61,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6161
{
6262
const auto& params = server_context.call_context.getParams();
6363
Context::Reader context_arg = Accessor::get(params);
64-
auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
6564
auto& server = server_context.proxy_server;
6665
int req = server_context.req;
67-
auto invoke = [fulfiller = kj::mv(future.fulfiller),
68-
call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
66+
auto invoke = [call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
67+
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
6968
const auto& params = call_context.getParams();
7069
Context::Reader context_arg = Accessor::get(params);
7170
ServerContext server_context{server, call_context, req};
@@ -125,18 +124,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
125124
});
126125
fn.invoke(server_context, args...);
127126
}
128-
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
129-
server.m_context.loop->sync([&] {
130-
auto fulfiller_dispose = kj::mv(fulfiller);
131-
fulfiller_dispose->fulfill(kj::mv(call_context));
132-
});
133-
}))
134-
{
135-
server.m_context.loop->sync([&]() {
136-
auto fulfiller_dispose = kj::mv(fulfiller);
137-
fulfiller_dispose->reject(kj::mv(*exception));
138-
});
139-
}
127+
return call_context;
140128
};
141129

142130
// Lookup Thread object specified by the client. The specified thread should
@@ -149,23 +137,16 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
149137
// `invoke` lambda above which will invoke the function on that
150138
// thread.
151139
KJ_IF_MAYBE (thread_server, perhaps) {
152-
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
140+
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
153141
MP_LOG(*server.m_context.loop, Log::Debug)
154142
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
155-
if (!thread.m_thread_context.waiter->post(std::move(invoke))) {
156-
MP_LOG(*server.m_context.loop, Log::Error)
157-
<< "IPC server error request #" << req
158-
<< " {" << thread.m_thread_context.thread_name << "}" << ", thread busy";
159-
throw std::runtime_error("thread busy");
160-
}
143+
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
161144
} else {
162145
MP_LOG(*server.m_context.loop, Log::Error)
163146
<< "IPC server error request #" << req << ", missing thread to execute request";
164147
throw std::runtime_error("invalid thread handle");
165148
}
166-
})
167-
// Wait for the invocation to finish before returning to the caller.
168-
.then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
149+
});
169150
}
170151
} // namespace mp
171152

src/mp/proxy.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,8 @@ ProxyClient<Thread>::~ProxyClient()
362362
}
363363
}
364364

365-
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
366-
: m_thread_context(thread_context), m_thread(std::move(thread))
365+
ProxyServer<Thread>::ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread)
366+
: m_loop{*connection.m_loop}, m_thread_context(thread_context), m_thread(std::move(thread))
367367
{
368368
assert(m_thread_context.waiter.get() != nullptr);
369369
}
@@ -418,7 +418,7 @@ kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
418418
// is just waiter getting set to null.)
419419
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
420420
});
421-
auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
421+
auto thread_server = kj::heap<ProxyServer<Thread>>(m_connection, *thread_context.get_future().get(), std::move(thread));
422422
auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
423423
context.getResults().setResult(kj::mv(thread_client));
424424
return kj::READY_NOW;

test/mp/test/foo.capnp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
3333
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
3434
callFn @17 () -> ();
3535
callFnAsync @18 (context :Proxy.Context) -> ();
36+
callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32);
3637
}
3738

3839
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {

test/mp/test/foo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ class FooImplementation
8383
std::shared_ptr<FooCallback> m_callback;
8484
void callFn() { assert(m_fn); m_fn(); }
8585
void callFnAsync() { assert(m_fn); m_fn(); }
86+
int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); }
8687
std::function<void()> m_fn;
88+
std::function<int(int)> m_int_fn;
8789
};
8890

8991
} // namespace test

test/mp/test/test.cpp

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
317317
signal.set_value();
318318
}
319319

320-
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
320+
KJ_TEST("Make simultaneous IPC calls on single remote thread")
321321
{
322322
TestSetup setup;
323323
ProxyClient<messages::FooInterface>* foo = setup.client.get();
@@ -336,51 +336,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
336336
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
337337
});
338338

339-
setup.server->m_impl->m_fn = [&] {
340-
try
341-
{
342-
signal.get_future().get();
343-
}
344-
catch (const std::future_error& e)
345-
{
346-
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
347-
}
339+
// Call callIntFnAsync 3 times with n=100, 200, 300
340+
std::atomic<int> expected = 100;
341+
342+
setup.server->m_impl->m_int_fn = [&](int n) {
343+
assert(n == expected);
344+
expected += 100;
345+
return n;
348346
};
349347

350348
auto client{foo->m_client};
351-
bool caught_thread_busy = false;
352-
// NOTE: '3' was chosen because it was the lowest number
353-
// of simultaneous calls required to reliably catch a "thread busy" error
354349
std::atomic<size_t> running{3};
355350
foo->m_context.loop->sync([&]
356351
{
357352
for (size_t i = 0; i < running; i++)
358353
{
359-
auto request{client.callFnAsyncRequest()};
354+
auto request{client.callIntFnAsyncRequest()};
360355
auto context{request.initContext()};
361356
context.setCallbackThread(*callback_thread);
362357
context.setThread(*request_thread);
358+
request.setArg(100 * (i+1));
363359
foo->m_context.loop->m_task_set->add(request.send().then(
364-
[&](auto&& results) {
365-
running -= 1;
366-
tc.waiter->m_cv.notify_all();
367-
},
368-
[&](kj::Exception&& e) {
369-
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
370-
"remote exception: std::exception: thread busy");
371-
caught_thread_busy = true;
360+
[&running, &tc, i](auto&& results) {
361+
assert(results.getResult() == 100 * (i+1));
372362
running -= 1;
373-
signal.set_value();
374363
tc.waiter->m_cv.notify_all();
375-
}
376-
));
364+
}));
377365
}
378366
});
379367
{
380368
Lock lock(tc.waiter->m_mutex);
381369
tc.waiter->wait(lock, [&running] { return running == 0; });
382370
}
383-
KJ_EXPECT(caught_thread_busy);
371+
KJ_EXPECT(expected == 400);
384372
}
385373

386374
} // namespace test

0 commit comments

Comments
 (0)