Skip to content

Commit 79b2e13

Browse files
committed
test: add dedicated ListenConnections coverage
Add a separate listen_tests.cpp file with reusable UnixListener, ClientSetup, and ListenSetup helpers for exercising ListenConnections() with real Unix domain sockets. The new test covers the baseline behavior that ListenConnections() accepts an incoming connection and serves requests over it. Keeping this coverage separate from the existing general proxy tests makes the socket listener setup easier to review and provides a clearer place to extend listener-specific behavior in follow-up commits.
1 parent 91c474f commit 79b2e13

4 files changed

Lines changed: 183 additions & 1 deletion

File tree

include/mp/proxy-io.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ class EventLoop
356356

357357
//! Hook called on the worker thread just before returning results.
358358
std::function<void()> testing_hook_async_request_done;
359+
360+
//! Hook called on the server thread when the client has connected.
361+
std::function<void()> testing_hook_connected;
359362
};
360363

361364
//! Single element task queue used to handle recursive capnp calls. (If the
@@ -842,6 +845,7 @@ void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init
842845
});
843846
auto it = loop.m_incoming_connections.begin();
844847
MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
848+
if (loop.testing_hook_connected) loop.testing_hook_connected();
845849
it->onDisconnect([&loop, it] {
846850
MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
847851
loop.m_incoming_connections.erase(it);

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ if(BUILD_TESTING AND TARGET CapnProto::kj-test)
2626
${MP_PROXY_HDRS}
2727
mp/test/foo-types.h
2828
mp/test/foo.h
29+
mp/test/listen_tests.cpp
2930
mp/test/spawn_tests.cpp
3031
mp/test/test.cpp
3132
)

test/mp/test/listen_tests.cpp

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright (c) The Bitcoin Core developers
2+
// Distributed under the MIT software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
5+
#include <mp/test/foo.capnp.h>
6+
#include <mp/test/foo.capnp.proxy.h>
7+
8+
#include <chrono>
9+
#include <condition_variable>
10+
#include <cstdlib>
11+
#include <cstring>
12+
#include <future>
13+
#include <kj/async.h>
14+
#include <kj/common.h>
15+
#include <kj/debug.h>
16+
#include <kj/memory.h>
17+
#include <kj/test.h>
18+
#include <memory>
19+
#include <mp/proxy-io.h>
20+
#include <mutex>
21+
#include <ratio> // IWYU pragma: keep
22+
#include <stdexcept>
23+
#include <string>
24+
#include <sys/socket.h>
25+
#include <sys/un.h>
26+
#include <thread>
27+
#include <unistd.h>
28+
29+
namespace mp {
30+
namespace test {
31+
namespace {
32+
33+
class UnixListener
34+
{
35+
public:
36+
UnixListener()
37+
{
38+
char dir_template[] = "/tmp/mptest-listener-XXXXXX";
39+
char* dir = mkdtemp(dir_template);
40+
KJ_REQUIRE(dir != nullptr);
41+
m_dir = dir;
42+
m_path = m_dir + "/socket";
43+
44+
m_fd = socket(AF_UNIX, SOCK_STREAM, 0);
45+
KJ_REQUIRE(m_fd >= 0);
46+
47+
sockaddr_un addr{};
48+
addr.sun_family = AF_UNIX;
49+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
50+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
51+
KJ_REQUIRE(bind(m_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
52+
KJ_REQUIRE(listen(m_fd, SOMAXCONN) == 0);
53+
}
54+
55+
~UnixListener()
56+
{
57+
if (m_fd >= 0) close(m_fd);
58+
if (!m_path.empty()) unlink(m_path.c_str());
59+
if (!m_dir.empty()) rmdir(m_dir.c_str());
60+
}
61+
62+
int release()
63+
{
64+
int fd = m_fd;
65+
m_fd = -1;
66+
return fd;
67+
}
68+
69+
int Connect() const
70+
{
71+
int fd = socket(AF_UNIX, SOCK_STREAM, 0);
72+
KJ_REQUIRE(fd >= 0);
73+
74+
sockaddr_un addr{};
75+
addr.sun_family = AF_UNIX;
76+
KJ_REQUIRE(m_path.size() < sizeof(addr.sun_path));
77+
std::strncpy(addr.sun_path, m_path.c_str(), sizeof(addr.sun_path) - 1);
78+
KJ_REQUIRE(connect(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0);
79+
return fd;
80+
}
81+
82+
private:
83+
int m_fd{-1};
84+
std::string m_dir;
85+
std::string m_path;
86+
};
87+
88+
class ClientSetup
89+
{
90+
public:
91+
explicit ClientSetup(int fd)
92+
: thread([this, fd] {
93+
EventLoop loop("mptest-client", [](mp::LogMessage log) {
94+
KJ_LOG(INFO, log.level, log.message);
95+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
96+
});
97+
client_promise.set_value(ConnectStream<messages::FooInterface>(loop, fd));
98+
loop.loop();
99+
})
100+
{
101+
client = client_promise.get_future().get();
102+
}
103+
104+
~ClientSetup()
105+
{
106+
client.reset();
107+
thread.join();
108+
}
109+
110+
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
111+
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
112+
113+
//! Thread variable should be after other struct members so the thread does
114+
//! not start until the other members are initialized.
115+
std::thread thread;
116+
};
117+
118+
class ListenSetup
119+
{
120+
public:
121+
ListenSetup()
122+
: thread([this] {
123+
EventLoop loop("mptest-server", [this](mp::LogMessage log) {
124+
KJ_LOG(INFO, log.level, log.message);
125+
if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
126+
});
127+
loop.testing_hook_connected = [&] {
128+
std::lock_guard<std::mutex> lock(counter_mutex);
129+
++connected_count;
130+
counter_cv.notify_all();
131+
};
132+
FooImplementation foo;
133+
ListenConnections<messages::FooInterface>(loop, listener.release(), foo);
134+
ready_promise.set_value();
135+
loop.loop();
136+
})
137+
{
138+
ready_promise.get_future().get();
139+
}
140+
141+
~ListenSetup()
142+
{
143+
thread.join();
144+
}
145+
146+
void WaitForConnectedCount(size_t expected_count)
147+
{
148+
std::unique_lock<std::mutex> lock(counter_mutex);
149+
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
150+
const bool matched = counter_cv.wait_until(lock, deadline, [&] {
151+
return connected_count >= expected_count;
152+
});
153+
KJ_REQUIRE(matched);
154+
}
155+
156+
UnixListener listener;
157+
std::promise<void> ready_promise;
158+
std::mutex counter_mutex;
159+
std::condition_variable counter_cv;
160+
size_t connected_count{0};
161+
//! Thread variable should be after other struct members so the thread does
162+
//! not start until the other members are initialized.
163+
std::thread thread;
164+
165+
};
166+
167+
KJ_TEST("ListenConnections accepts incoming connections")
168+
{
169+
ListenSetup setup;
170+
auto client = std::make_unique<ClientSetup>(setup.listener.Connect());
171+
172+
setup.WaitForConnectedCount(1);
173+
KJ_EXPECT(client->client->add(1, 2) == 3);
174+
}
175+
176+
} // namespace
177+
} // namespace test
178+
} // namespace mp

test/mp/test/test.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include <chrono>
1313
#include <condition_variable>
1414
#include <cstdint>
15-
#include <cstring>
1615
#include <functional>
1716
#include <future>
1817
#include <kj/async.h>

0 commit comments

Comments
 (0)