Skip to content

Commit 0174450

Browse files
committed
Prevent crash on unclean disconnect if abandoned IPC call returns interface pointer
This is a fix for bitcoin/bitcoin#34250 which reports that bitcoin node crashes if a rust stratrumv2 mining client calls BlockTemplate.waitNext() and disconnects without waiting for a response from the call, and if mempool fees increased so the call returns a non-null interface BlockTemplate pointer. The node would crash in this case while trying to call MakeProxyServer on the returned BlockTemplate pointer, which would fail because MakeProxyServer would try to use a reference to the Connection object that had been deleted as a result of the disconnect. The fix works by: - Adding a Connection::m_canceler member variable and using it to cancel any IPC response promises that are pending when the connection is destroyed. - Updating ProxyServer<Thread>::post to use promise.attach() as described https://capnproto.org/cxxrpc.html#cancellation to detect cancellation and set a ServerContext::request_canceled variable. - Updating ServerCall to check the ServerContext::request_canceled status after any C++ server method returns, and throw an exception if it is set. - Updating type-context.h PassField() function to deal with the exception by catching and logging it.
1 parent ddb5f74 commit 0174450

7 files changed

Lines changed: 251 additions & 16 deletions

File tree

include/mp/proxy-io.h

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ struct ServerInvokeContext : InvokeContext
4848
ProxyServer& proxy_server;
4949
CallContext& call_context;
5050
int req;
51+
//! For IPC methods that execute asynchronously, not on the event-loop
52+
//! thread: lock preventing the event-loop thread from freeing the params or
53+
//! results structs if the request is canceled while the worker thread is
54+
//! reading params (`call_context.getParams()`) or writing results
55+
//! (`call_context.getResults()`).
56+
Lock* cancel_lock{nullptr};
57+
//! For IPC methods that execute asynchronously, not on the event-loop
58+
//! thread, this is set to true if the IPC call was canceled by the client
59+
//! or canceled by a disconnection. If the call runs on the event-loop
60+
//! thread, it can't be canceled. This should be accessed with cancel_lock
61+
//! held if it is not null, since in the asynchronous case it is accessed
62+
//! from multiple threads.
63+
bool request_canceled{false};
5164

5265
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
5366
: InvokeContext{*proxy_server.m_context.connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
@@ -421,18 +434,21 @@ class Connection
421434
template <typename F>
422435
void onDisconnect(F&& f)
423436
{
424-
// Add disconnect handler to local TaskSet to ensure it is cancelled and
437+
// Add disconnect handler to local TaskSet to ensure it is canceled and
425438
// will never run after connection object is destroyed. But when disconnect
426439
// handler fires, do not call the function f right away, instead add it
427440
// to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
428-
// error in cases where f deletes this Connection object.
441+
// error in the typical case where f deletes this Connection object.
429442
m_on_disconnect.add(m_network.onDisconnect().then(
430443
[f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
431444
}
432445

433446
EventLoopRef m_loop;
434447
kj::Own<kj::AsyncIoStream> m_stream;
435448
LoggingErrorHandler m_error_handler{*m_loop};
449+
//! TaskSet used to cancel the m_network.onDisconnect() handler for remote
450+
//! disconnections, if the connection is closed locally first by deleting
451+
//! this Connection object.
436452
kj::TaskSet m_on_disconnect{m_error_handler};
437453
::capnp::TwoPartyVatNetwork m_network;
438454
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
@@ -445,6 +461,11 @@ class Connection
445461
//! ThreadMap.makeThread) used to service requests to clients.
446462
::capnp::CapabilityServerSet<Thread> m_threads;
447463

464+
//! Canceler for canceling promises that we want to discard when the
465+
//! connection is destroyed. This is used to interrupt method calls that are
466+
//! still executing at time of disconnection.
467+
kj::Canceler m_canceler;
468+
448469
//! Cleanup functions to run if connection is broken unexpectedly. List
449470
//! will be empty if all ProxyClient are destroyed cleanly before the
450471
//! connection is destroyed.
@@ -696,9 +717,17 @@ template<typename T, typename Fn>
696717
kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
697718
{
698719
auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
699-
auto ret = m_thread_ready.then([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller)]() mutable {
720+
auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
721+
CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
722+
// Keep a reference to the ProxyServer<Thread> instance by assigning it to
723+
// the self variable. ProxyServer instances are reference-counted and if the
724+
// client drops its reference, this variable keeps the instance alive until
725+
// the thread finishes executing. The self variable needs to be destroyed on
726+
// the event loop thread so it is freed in a sync() call below.
727+
auto self = thisCap();
728+
auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
700729
auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
701-
bool posted = m_thread_context.waiter->post([this, fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller)]() mutable {
730+
bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
702731
// Fulfill ready.promise now, as soon as the Waiter starts executing
703732
// this lambda, so the next ProxyServer<Thread>::post() call can
704733
// immediately call waiter->post(). It is important to do this
@@ -712,8 +741,15 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
712741
ready_fulfiller = nullptr;
713742
});
714743
std::optional<T> result_value;
715-
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn()); })};
716-
m_loop->sync([&result_value, &exception, result_fulfiller = kj::mv(result_fulfiller)]() mutable {
744+
kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
745+
m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
746+
// Destroy CancelMonitor here before fulfilling or rejecting the
747+
// promise so it doesn't get triggered when the promise is
748+
// destroyed.
749+
cancel_monitor_ptr = nullptr;
750+
// Send results to the fulfiller. Technically it would be ok to
751+
// skip this if promise was canceled, but it's simpler to just
752+
// do it unconditionally.
717753
KJ_IF_MAYBE(e, exception) {
718754
assert(!result_value);
719755
result_fulfiller->reject(kj::mv(*e));
@@ -723,6 +759,11 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
723759
result_value.reset();
724760
}
725761
result_fulfiller = nullptr;
762+
// Use evalLater to destroy the ProxyServer<Thread> self
763+
// reference, if it is the last reference, because the
764+
// ProxyServer<Thread> destructor needs to join the thread,
765+
// which can't happen until this sync() block has exited.
766+
m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
726767
});
727768
});
728769
// Assert that calling Waiter::post did not fail. It could only return
@@ -731,7 +772,7 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
731772
// signaled, so this should never happen.
732773
assert(posted);
733774
return kj::mv(result.promise);
734-
});
775+
}).attach(kj::heap<CancelProbe>(cancel_monitor));
735776
m_thread_ready = kj::mv(ready.promise);
736777
return ret;
737778
}

include/mp/proxy-types.h

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,9 +445,36 @@ struct ServerCall
445445
template <typename ServerContext, typename... Args>
446446
decltype(auto) invoke(ServerContext& server_context, TypeList<>, Args&&... args) const
447447
{
448-
return ProxyServerMethodTraits<typename decltype(server_context.call_context.getParams())::Reads>::invoke(
449-
server_context,
450-
std::forward<Args>(args)...);
448+
// If cancel_lock is set, release it while executing the method, and
449+
// reacquire it afterwards. The lock is needed to prevent params and
450+
// response structs from being deleted by the event loop thread if the
451+
// request is canceled, so it is only needed before and after method
452+
// execution. It is important to release the lock during execution
453+
// because the method can take arbitrarily long to return and the event
454+
// loop will need the lock itself in on_cancel if the call is canceled.
455+
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.unlock();
456+
return TryFinally(
457+
[&]() -> decltype(auto) {
458+
return ProxyServerMethodTraits<
459+
typename decltype(server_context.call_context.getParams())::Reads
460+
>::invoke(server_context, std::forward<Args>(args)...);
461+
},
462+
[&] {
463+
if (server_context.cancel_lock) server_context.cancel_lock->m_lock.lock();
464+
// If the IPC request was canceled, throw InterruptException
465+
// because there is no point continuing and trying to fill the
466+
// call_context.getResults() struct. It's also important to stop
467+
// executing because the connection may have been destroyed as
468+
// described in https://github.com/bitcoin/bitcoin/issues/34250
469+
// and there could be invalid references to the destroyed
470+
// Connection object if this continued.
471+
// If the IPC method itself threw an exception, the
472+
// InterruptException thrown below will take precedence over it.
473+
// Since the call has been canceled that exception can't be
474+
// returned to the caller, so it needs to be discarded like
475+
// other result values.
476+
if (server_context.request_canceled) throw InterruptException{"canceled"};
477+
});
451478
}
452479
};
453480

include/mp/proxy.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ struct ProxyServerBase : public virtual Interface_::Server
153153
ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection);
154154
virtual ~ProxyServerBase();
155155
void invokeDestroy();
156+
using Interface_::Server::thisCap;
156157

157158
/**
158159
* Implementation pointer that may or may not be owned and deleted when this

include/mp/type-context.h

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <mp/proxy-io.h>
99
#include <mp/util.h>
1010

11+
#include <kj/string.h>
12+
1113
namespace mp {
1214
template <typename Output>
1315
void CustomBuildField(TypeList<>,
@@ -63,7 +65,14 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
6365
Context::Reader context_arg = Accessor::get(params);
6466
auto& server = server_context.proxy_server;
6567
int req = server_context.req;
66-
auto invoke = [call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
68+
// Keep a reference to the ProxyServer instance by assigning it to the self
69+
// variable. ProxyServer instances are reference-counted and if the client
70+
// drops its reference and the IPC call is canceled, this variable keeps the
71+
// instance alive until the method finishes executing. The self variable
72+
// needs to be destroyed on the event loop thread so it is freed in a sync()
73+
// call below.
74+
auto self = server.thisCap();
75+
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
6776
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
6877
const auto& params = call_context.getParams();
6978
Context::Reader context_arg = Accessor::get(params);
@@ -89,8 +98,35 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
8998
auto& thread_context = g_thread_context;
9099
auto& request_threads = thread_context.request_threads;
91100
ConnThread request_thread;
92-
bool inserted;
101+
bool inserted{false};
102+
Mutex cancel_mutex;
103+
Lock cancel_lock{cancel_mutex};
104+
server_context.cancel_lock = &cancel_lock;
93105
server.m_context.loop->sync([&] {
106+
// Detect request being canceled before it executes.
107+
if (cancel_monitor.m_canceled) {
108+
server_context.request_canceled = true;
109+
return;
110+
}
111+
// Detect request being canceled while it executes.
112+
assert(!cancel_monitor.m_on_cancel);
113+
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
114+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
115+
// Lock cancel_mutex here to block the event loop
116+
// thread and prevent it from deleting the request's
117+
// params and response structs while the execution
118+
// thread is accessing them. Because this lock is
119+
// released before the event loop thread does delete
120+
// the structs, the mutex does not provide any
121+
// protection from the event loop deleting the
122+
// structs _before_ the execution thread acquires
123+
// it. So in addition to locking the mutex, the
124+
// execution thread always checks request_canceled
125+
// as well before accessing the structs.
126+
Lock{cancel_mutex};
127+
server_context.request_canceled = true;
128+
};
129+
// Update requests_threads map if not canceled.
94130
std::tie(request_thread, inserted) = SetThread(
95131
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
96132
[&] { return context_arg.getCallbackThread(); });
@@ -102,13 +138,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
102138
// recursive call (IPC call calling back to the caller which
103139
// makes another IPC call), so avoid modifying the map.
104140
const bool erase_thread{inserted};
105-
KJ_DEFER(if (erase_thread) {
141+
KJ_DEFER(
142+
// Release the cancel lock before calling loop->sync and
143+
// waiting for the event loop thread, because if a
144+
// cancellation happened, it needs to run the on_cancel
145+
// callback above. It's safe to release cancel_lock at
146+
// this point because the fn.invoke() call below will be
147+
// finished and no longer accessing the params or
148+
// results structs.
149+
cancel_lock.m_lock.unlock();
106150
// Erase the request_threads entry on the event loop
107151
// thread with loop->sync(), so if the connection is
108152
// broken there is not a race between this thread and
109153
// the disconnect handler trying to destroy the thread
110154
// client object.
111155
server.m_context.loop->sync([&] {
156+
auto self_dispose{kj::mv(self)};
157+
if (erase_thread) {
112158
// Look up the thread again without using existing
113159
// iterator since entry may no longer be there after
114160
// a disconnect. Destroy node after releasing
@@ -120,9 +166,22 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
120166
Lock lock(thread_context.waiter->m_mutex);
121167
removed = request_threads.extract(server.m_context.connection);
122168
}
169+
}
123170
});
124-
});
125-
fn.invoke(server_context, args...);
171+
);
172+
if (server_context.request_canceled) {
173+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
174+
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
175+
try {
176+
fn.invoke(server_context, args...);
177+
} catch (const InterruptException& e) {
178+
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
179+
}
180+
})) {
181+
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
182+
throw kj::mv(*exception);
183+
}
184+
// End of scope: if KJ_DEFER was reached, it runs here
126185
}
127186
return call_context;
128187
};
@@ -131,7 +190,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
131190
// be a local Thread::Server object, but it needs to be looked up
132191
// asynchronously with getLocalServer().
133192
auto thread_client = context_arg.getThread();
134-
return server.m_context.connection->m_threads.getLocalServer(thread_client)
193+
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
135194
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
136195
// Assuming the thread object is found, pass it a pointer to the
137196
// `invoke` lambda above which will invoke the function on that
@@ -147,6 +206,12 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
147206
throw std::runtime_error("invalid thread handle");
148207
}
149208
});
209+
// Use connection m_canceler object to cancel the result promise if the
210+
// connection is destroyed. (By default Cap'n Proto does not cancel requests
211+
// on disconnect, since it's possible clients might want to make requests
212+
// and immediately disconnect without waiting for results, but not want the
213+
// the requests to be canceled.)
214+
return server.m_context.connection->m_canceler.wrap(kj::mv(result));
150215
}
151216
} // namespace mp
152217

0 commit comments

Comments
 (0)