Skip to content

Commit 444751b

Browse files
gophergogocaleb2h
authored andcommitted
Drive ConnectionManager event tests through the dispatcher thread (#212)
McpConnectionManager::onConnectionEvent now asserts isThreadSafe(), so calling it from the test thread aborts. Spin up a background dispatcher thread in the fixture and route the two existing OnConnectionEventPreventsDuplicate* cases through a runOnDispatcher helper. Adds OnCloseRunsCleanlyOnDispatcherThread, which walks Connected -> RemoteClose -> LocalClose on the dispatcher, and RepeatedEventSequencesOnDispatcherThreadDoNotCrash, which replays the sequence five times to catch any lifetime bug that only surfaces under repeated close events.
1 parent 9f6002d commit 444751b

1 file changed

Lines changed: 133 additions & 10 deletions

File tree

tests/connection/test_connection_manager.cc

Lines changed: 133 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
* - Connection close order
1111
*/
1212

13+
#include <atomic>
14+
#include <chrono>
15+
#include <thread>
16+
1317
#include <gmock/gmock.h>
1418
#include <gtest/gtest.h>
1519

@@ -49,12 +53,61 @@ class ConnectionManagerSection2Test : public ::testing::Test {
4953
}
5054

5155
void TearDown() override {
56+
stopDispatcherThread();
5257
callbacks_.reset();
5358
dispatcher_.reset();
5459
}
5560

61+
// Starts the dispatcher on a background thread. Needed for tests that
62+
// invoke McpConnectionManager methods that assert isThreadSafe() — those
63+
// methods model callbacks coming from the connection's own callback loop,
64+
// which only exists once the dispatcher is running.
65+
void startDispatcherThread() {
66+
if (loop_thread_.joinable()) {
67+
return;
68+
}
69+
loop_thread_ = std::thread(
70+
[this]() { dispatcher_->run(event::RunType::Block); });
71+
// Wait until libevent has actually entered the loop — without this we
72+
// race with thread_id_ being set and isThreadSafe() still returns false.
73+
for (int i = 0; i < 200 && !dispatcher_->isThreadSafe(); ++i) {
74+
std::this_thread::sleep_for(std::chrono::milliseconds(2));
75+
// isThreadSafe() checked from test thread is always false by design;
76+
// poll via a posted sentinel instead.
77+
std::atomic<bool> sentinel{false};
78+
dispatcher_->post([&sentinel]() { sentinel = true; });
79+
for (int j = 0; j < 50 && !sentinel.load(); ++j) {
80+
std::this_thread::sleep_for(std::chrono::milliseconds(2));
81+
}
82+
if (sentinel.load()) break;
83+
}
84+
}
85+
86+
// Posts fn onto the dispatcher and blocks until it has run. Use for any
87+
// call that must execute on the dispatcher thread (e.g., onConnectionEvent,
88+
// deferredDelete, connection mutators guarded by dispatcher-thread asserts).
89+
void runOnDispatcher(std::function<void()> fn) {
90+
std::atomic<bool> done{false};
91+
dispatcher_->post([&done, fn = std::move(fn)]() {
92+
fn();
93+
done = true;
94+
});
95+
for (int i = 0; i < 400 && !done.load(); ++i) {
96+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
97+
}
98+
ASSERT_TRUE(done.load()) << "dispatcher never ran the posted callback";
99+
}
100+
101+
void stopDispatcherThread() {
102+
if (loop_thread_.joinable()) {
103+
dispatcher_->exit();
104+
loop_thread_.join();
105+
}
106+
}
107+
56108
std::unique_ptr<event::Dispatcher> dispatcher_;
57109
std::unique_ptr<MockProtocolCallbacks> callbacks_;
110+
std::thread loop_thread_;
58111
};
59112

60113
// =============================================================================
@@ -268,10 +321,16 @@ TEST_F(ConnectionManagerSection2Test,
268321
onConnectionEvent(network::ConnectionEvent::Connected))
269322
.Times(1);
270323

271-
// Simulate receiving Connected event twice
272-
manager->onConnectionEvent(network::ConnectionEvent::Connected);
273-
manager->onConnectionEvent(
274-
network::ConnectionEvent::Connected); // Should be ignored
324+
// onConnectionEvent asserts it runs on the dispatcher thread because in
325+
// production it's always invoked from the connection's own callback loop.
326+
// Drive the synthetic invocations through the dispatcher so they satisfy
327+
// that contract.
328+
startDispatcherThread();
329+
runOnDispatcher([&manager]() {
330+
manager->onConnectionEvent(network::ConnectionEvent::Connected);
331+
manager->onConnectionEvent(
332+
network::ConnectionEvent::Connected); // Should be ignored
333+
});
275334
}
276335

277336
/**
@@ -286,12 +345,76 @@ TEST_F(ConnectionManagerSection2Test, OnConnectionEventPreventsDuplicateClose) {
286345

287346
manager->setProtocolCallbacks(*callbacks_);
288347

289-
// First close should be processed, second should be ignored
290-
// We can't easily verify the exact count due to internal state,
291-
// but we can verify it doesn't crash
292-
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
293-
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
294-
manager->onConnectionEvent(network::ConnectionEvent::LocalClose);
348+
// As above — driven through the dispatcher to satisfy the
349+
// onConnectionEvent thread-safety assert.
350+
startDispatcherThread();
351+
runOnDispatcher([&manager]() {
352+
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
353+
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
354+
manager->onConnectionEvent(network::ConnectionEvent::LocalClose);
355+
});
356+
}
357+
358+
// =============================================================================
359+
// Deferred-close / thread-safety regression tests (PR #212)
360+
// =============================================================================
361+
362+
// Regression test for the crash fixed by 4f46b3db:
363+
// onConnectionEvent(RemoteClose|LocalClose) used to destroy active_connection_
364+
// synchronously from inside the connection's own callback loop, causing UAF.
365+
// It now hands the connection to dispatcher.deferredDelete. This test drives
366+
// the close path on the dispatcher thread and verifies it completes without
367+
// tripping the dispatcher-thread assertion and without crashing.
368+
TEST_F(ConnectionManagerSection2Test, OnCloseRunsCleanlyOnDispatcherThread) {
369+
McpConnectionConfig config;
370+
config.transport_type = TransportType::HttpSse;
371+
372+
auto manager = std::make_unique<McpConnectionManager>(
373+
*dispatcher_, network::socketInterface(), config);
374+
manager->setProtocolCallbacks(*callbacks_);
375+
376+
startDispatcherThread();
377+
378+
// Full legal lifecycle: connect -> close. Each step must run on the
379+
// dispatcher thread because onConnectionEvent now asserts it.
380+
runOnDispatcher([&manager]() {
381+
manager->onConnectionEvent(network::ConnectionEvent::Connected);
382+
});
383+
runOnDispatcher([&manager]() {
384+
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
385+
});
386+
387+
// Sanity: a stray close event after the first must also be safe to process
388+
// on the dispatcher thread. This is the path that used to UAF when the
389+
// connection was destroyed synchronously on the first close.
390+
runOnDispatcher([&manager]() {
391+
manager->onConnectionEvent(network::ConnectionEvent::LocalClose);
392+
});
393+
}
394+
395+
// The dispatcher-thread assertion added alongside the deferred-close fix is
396+
// the tripwire that stops any future caller from synthesising connection
397+
// events on the wrong thread. We can't exercise the abort path in a passing
398+
// test, but we can at least prove that the same calls *do* succeed when they
399+
// come in on the dispatcher thread — and, paired with the assertion, that
400+
// catches off-thread regressions in debug builds.
401+
TEST_F(ConnectionManagerSection2Test,
402+
RepeatedEventSequencesOnDispatcherThreadDoNotCrash) {
403+
McpConnectionConfig config;
404+
config.transport_type = TransportType::HttpSse;
405+
406+
auto manager = std::make_unique<McpConnectionManager>(
407+
*dispatcher_, network::socketInterface(), config);
408+
manager->setProtocolCallbacks(*callbacks_);
409+
410+
startDispatcherThread();
411+
412+
for (int i = 0; i < 5; ++i) {
413+
runOnDispatcher([&manager]() {
414+
manager->onConnectionEvent(network::ConnectionEvent::Connected);
415+
manager->onConnectionEvent(network::ConnectionEvent::RemoteClose);
416+
});
417+
}
295418
}
296419

297420
// =============================================================================

0 commit comments

Comments
 (0)