Skip to content

Commit 92fed35

Browse files
committed
Allow simultaneous calls on same Context.thread
If multiple IPC requests happen at the same time specifying same Context.thread to run the requests on, queue the requests to execute in the order they are received instead of raising a "thread busy" exception. This change has no effect on C++ clients using libmultiprocess as a client library, since the libmultiprocess client only makes blocking calls and creates a server thread for every client thread, so it's not possible for there to be multiple calls on the same server thread. But this change may be useful for rust and python clients as discussed bitcoin/bitcoin#33923
1 parent 558ae11 commit 92fed35

4 files changed

Lines changed: 37 additions & 33 deletions

File tree

include/mp/proxy-io.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ struct ProxyServer<Thread> final : public Thread::Server
9393
EventLoopRef m_loop;
9494
ThreadContext& m_thread_context;
9595
std::thread m_thread;
96+
//! Promise signaled when m_thread_context.waiter is idle and there is no
97+
//! post() callback function waiting to execute.
98+
kj::Promise<void> m_thread_ready{kj::READY_NOW};
9699
};
97100

98101
//! Handler for kj::TaskSet failed task events.
@@ -689,9 +692,14 @@ struct ThreadContext
689692
template<typename T, typename Fn>
690693
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
691694
{
692-
{
695+
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is idle again.
696+
auto ret = m_thread_ready.then([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
693697
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
694-
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
698+
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
699+
m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
700+
ready_fulfiller->fulfill();
701+
ready_fulfiller = nullptr;
702+
});
695703
std::optional<T> result_value;
696704
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn()); })};
697705
m_loop->sync([&result_value, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
@@ -706,9 +714,15 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
706714
result_fulfiller = nullptr;
707715
});
708716
});
709-
if (!posted) throw std::runtime_error("thread busy");
717+
// Assert that calling Waiter::post did not fail. It could only return
718+
// false if a new function was posted before the previous one finished
719+
// executing, but new functions are only posted when m_thread_ready is
720+
// is signaled, so this should never happen.
721+
assert(posted);
710722
return kj::mv(result.promise);
711-
}
723+
});
724+
m_thread_ready = kj::mv(ready.promise);
725+
return ret;
712726
}
713727

714728
//! Given stream file descriptor, make a new ProxyClient object to send requests

test/mp/test/foo.capnp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ interface FooInterface $Proxy.wrap("mp::test::FooImplementation") {
3333
passFn @16 (context :Proxy.Context, fn :FooFn) -> (result :Int32);
3434
callFn @17 () -> ();
3535
callFnAsync @18 (context :Proxy.Context) -> ();
36+
callIntFnAsync @21 (context :Proxy.Context, arg :Int32) -> (result :Int32);
3637
}
3738

3839
interface FooCallback $Proxy.wrap("mp::test::FooCallback") {

test/mp/test/foo.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ class FooImplementation
8383
std::shared_ptr<FooCallback> m_callback;
8484
void callFn() { assert(m_fn); m_fn(); }
8585
void callFnAsync() { assert(m_fn); m_fn(); }
86+
int callIntFnAsync(int arg) { assert(m_int_fn); return m_int_fn(arg); }
8687
std::function<void()> m_fn;
88+
std::function<int(int)> m_int_fn;
8789
};
8890

8991
} // namespace test

test/mp/test/test.cpp

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
#include <atomic>
99
#include <capnp/capability.h>
1010
#include <capnp/rpc.h>
11+
#include <cassert>
1112
#include <condition_variable>
13+
#include <cstdint>
1214
#include <cstring>
1315
#include <functional>
1416
#include <future>
1517
#include <kj/async.h>
1618
#include <kj/async-io.h>
1719
#include <kj/common.h>
1820
#include <kj/debug.h>
19-
#include <kj/exception.h>
2021
#include <kj/memory.h>
21-
#include <kj/string.h>
2222
#include <kj/test.h>
2323
#include <memory>
2424
#include <mp/proxy.h>
@@ -31,7 +31,6 @@
3131
#include <stdexcept>
3232
#include <string>
3333
#include <string_view>
34-
#include <system_error>
3534
#include <thread>
3635
#include <type_traits>
3736
#include <utility>
@@ -317,7 +316,7 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
317316
signal.set_value();
318317
}
319318

320-
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
319+
KJ_TEST("Make simultaneous IPC calls on single remote thread")
321320
{
322321
TestSetup setup;
323322
ProxyClient<messages::FooInterface>* foo = setup.client.get();
@@ -336,51 +335,39 @@ KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
336335
request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
337336
});
338337

339-
setup.server->m_impl->m_fn = [&] {
340-
try
341-
{
342-
signal.get_future().get();
343-
}
344-
catch (const std::future_error& e)
345-
{
346-
KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
347-
}
338+
// Call callIntFnAsync 3 times with n=100, 200, 300
339+
std::atomic<int> expected = 100;
340+
341+
setup.server->m_impl->m_int_fn = [&](int n) {
342+
assert(n == expected);
343+
expected += 100;
344+
return n;
348345
};
349346

350347
auto client{foo->m_client};
351-
bool caught_thread_busy = false;
352-
// NOTE: '3' was chosen because it was the lowest number
353-
// of simultaneous calls required to reliably catch a "thread busy" error
354348
std::atomic<size_t> running{3};
355349
foo->m_context.loop->sync([&]
356350
{
357351
for (size_t i = 0; i < running; i++)
358352
{
359-
auto request{client.callFnAsyncRequest()};
353+
auto request{client.callIntFnAsyncRequest()};
360354
auto context{request.initContext()};
361355
context.setCallbackThread(*callback_thread);
362356
context.setThread(*request_thread);
357+
request.setArg(100 * (i+1));
363358
foo->m_context.loop->m_task_set->add(request.send().then(
364-
[&](auto&& results) {
365-
running -= 1;
366-
tc.waiter->m_cv.notify_all();
367-
},
368-
[&](kj::Exception&& e) {
369-
KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
370-
"remote exception: std::exception: thread busy");
371-
caught_thread_busy = true;
359+
[&running, &tc, i](auto&& results) {
360+
assert(results.getResult() == static_cast<int32_t>(100 * (i+1)));
372361
running -= 1;
373-
signal.set_value();
374362
tc.waiter->m_cv.notify_all();
375-
}
376-
));
363+
}));
377364
}
378365
});
379366
{
380367
Lock lock(tc.waiter->m_mutex);
381368
tc.waiter->wait(lock, [&running] { return running == 0; });
382369
}
383-
KJ_EXPECT(caught_thread_busy);
370+
KJ_EXPECT(expected == 400);
384371
}
385372

386373
} // namespace test

0 commit comments

Comments
 (0)