Skip to content

Commit 2975fac

Browse files
committed
Add windows support
Add support for running on windows. These changes make the libmultiprocess API more generic, using stream types instead of file descriptors. All features are supported, including spawning processes with socket connections to the parent process. These changes were originally made in bitcoin/bitcoin#32387
1 parent 75c2a27 commit 2975fac

8 files changed

Lines changed: 300 additions & 101 deletions

File tree

example/calculator.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@
66
#include <init.capnp.h>
77
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep
88

9-
#include <charconv>
10-
#include <cstring>
9+
#include <cstring> // IWYU pragma: keep
1110
#include <fstream>
1211
#include <functional>
1312
#include <iostream>
1413
#include <kj/async.h>
14+
#include <kj/async-io.h>
1515
#include <kj/common.h>
1616
#include <kj/memory.h>
1717
#include <memory>
1818
#include <mp/proxy-io.h>
19+
#include <mp/util.h>
1920
#include <stdexcept>
2021
#include <string>
21-
#include <system_error>
2222
#include <utility>
2323

2424
class CalculatorImpl : public Calculator
@@ -51,14 +51,11 @@ int main(int argc, char** argv)
5151
std::cout << "Usage: mpcalculator <fd>\n";
5252
return 1;
5353
}
54-
int fd;
55-
if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) {
56-
std::cerr << argv[1] << " is not a number or is larger than an int\n";
57-
return 1;
58-
}
54+
mp::SocketId socket{mp::StartSpawned(argv[1])};
5955
mp::EventLoop loop("mpcalculator", LogPrint);
6056
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
61-
mp::ServeStream<InitInterface>(loop, fd, *init);
57+
mp::Stream stream{loop.m_io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
58+
mp::ServeStream<InitInterface>(loop, kj::mv(stream), *init);
6259
loop.loop();
6360
return 0;
6461
}

example/example.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,38 @@
55
#include <init.capnp.h>
66
#include <init.capnp.proxy.h>
77

8+
#include <array>
89
#include <cstring> // IWYU pragma: keep
910
#include <filesystem>
1011
#include <fstream>
1112
#include <future>
1213
#include <iostream>
1314
#include <kj/async.h>
15+
#include <kj/async-io.h>
1416
#include <kj/common.h>
17+
#include <kj/memory.h>
1518
#include <memory>
1619
#include <mp/proxy-io.h>
1720
#include <mp/util.h>
1821
#include <stdexcept>
1922
#include <string>
2023
#include <thread>
2124
#include <tuple>
25+
#include <utility>
2226
#include <vector>
2327

2428
namespace fs = std::filesystem;
2529

2630
static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const std::string& new_exe_name)
2731
{
28-
int pid;
29-
const int fd = mp::SpawnProcess(pid, [&](int fd) -> std::vector<std::string> {
32+
auto pair{mp::SocketPair()};
33+
mp::ProcessId pid{mp::SpawnProcess(pair[0], [&](mp::ConnectInfo info) -> std::vector<std::string> {
3034
fs::path path = process_argv0;
3135
path.remove_filename();
3236
path.append(new_exe_name);
33-
return {path.string(), std::to_string(fd)};
34-
});
35-
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, fd), pid);
37+
return {path.string(), std::move(info)};
38+
})};
39+
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(pair[1])), pid);
3640
}
3741

3842
static void LogPrint(mp::LogMessage log_data)

example/printer.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,19 @@
77
#include <init.capnp.h>
88
#include <init.capnp.proxy.h> // NOLINT(misc-include-cleaner) // IWYU pragma: keep
99

10-
#include <charconv>
11-
#include <cstring>
10+
#include <cstring> // IWYU pragma: keep
1211
#include <fstream>
1312
#include <iostream>
1413
#include <kj/async.h>
14+
#include <kj/async-io.h>
1515
#include <kj/common.h>
1616
#include <kj/memory.h>
1717
#include <memory>
1818
#include <mp/proxy-io.h>
19+
#include <mp/util.h>
1920
#include <stdexcept>
2021
#include <string>
21-
#include <system_error>
22+
#include <utility>
2223

2324
class PrinterImpl : public Printer
2425
{
@@ -44,14 +45,11 @@ int main(int argc, char** argv)
4445
std::cout << "Usage: mpprinter <fd>\n";
4546
return 1;
4647
}
47-
int fd;
48-
if (std::from_chars(argv[1], argv[1] + strlen(argv[1]), fd).ec != std::errc{}) {
49-
std::cerr << argv[1] << " is not a number or is larger than an int\n";
50-
return 1;
51-
}
48+
mp::SocketId socket{mp::StartSpawned(argv[1])};
5249
mp::EventLoop loop("mpprinter", LogPrint);
5350
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
54-
mp::ServeStream<InitInterface>(loop, fd, *init);
51+
mp::Stream stream{loop.m_io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
52+
mp::ServeStream<InitInterface>(loop, std::move(stream), *init);
5553
loop.loop();
5654
return 0;
5755
}

include/mp/proxy-io.h

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,17 @@ class Logger
210210

211211
std::string LongThreadName(const char* exe_name);
212212

213+
using Stream = kj::Own<kj::AsyncIoStream>;
214+
215+
inline SocketId StreamSocketId(const Stream& stream)
216+
{
217+
if (stream) KJ_IF_MAYBE(fd, stream->getFd()) return *fd;
218+
#ifdef WIN32
219+
if (stream) KJ_IF_MAYBE(handle, stream->getWin32Handle()) return reinterpret_cast<SocketId>(*handle);
220+
#endif
221+
throw std::logic_error("Stream socket unset");
222+
}
223+
213224
//! Event loop implementation.
214225
//!
215226
//! Cap'n Proto threading model is very simple: all I/O operations are
@@ -308,11 +319,12 @@ class EventLoop
308319
//! Callback functions to run on async thread.
309320
std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310321

311-
//! Pipe read handle used to wake up the event loop thread.
312-
int m_wait_fd = -1;
322+
//! Socket pair used to post and wait for wakeups to the event loop thread.
323+
kj::Own<kj::AsyncIoStream> m_wait_stream;
324+
kj::Own<kj::AsyncIoStream> m_post_stream;
313325

314-
//! Pipe write handle used to wake up the event loop thread.
315-
int m_post_fd = -1;
326+
//! Synchronous writer used to write to m_post_stream.
327+
kj::Own<kj::OutputStream> m_post_writer;
316328

317329
//! Number of clients holding references to ProxyServerBase objects that
318330
//! reference this event loop.
@@ -797,13 +809,11 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
797809
//! over the stream. Also create a new Connection object embedded in the
798810
//! client that is freed when the client is closed.
799811
template <typename InitInterface>
800-
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
812+
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream)
801813
{
802814
typename InitInterface::Client init_client(nullptr);
803815
std::unique_ptr<Connection> connection;
804816
loop.sync([&] {
805-
auto stream =
806-
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807817
connection = std::make_unique<Connection>(loop, kj::mv(stream));
808818
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
809819
Connection* connection_ptr = connection.get();
@@ -854,10 +864,9 @@ void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitIm
854864
//! Given stream file descriptor and an init object, handle requests on the
855865
//! stream by calling methods on the Init object.
856866
template <typename InitInterface, typename InitImpl>
857-
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
867+
void ServeStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream, InitImpl& init)
858868
{
859-
_Serve<InitInterface>(
860-
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
869+
_Serve<InitInterface>(loop, kj::mv(stream), init);
861870
}
862871

863872
//! Given listening socket file descriptor and an init object, handle incoming

include/mp/util.h

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#ifndef MP_UTIL_H
66
#define MP_UTIL_H
77

8+
#include <array>
89
#include <capnp/schema.h>
910
#include <cassert>
1011
#include <cstddef>
@@ -20,6 +21,10 @@
2021
#include <variant>
2122
#include <vector>
2223

24+
#ifdef WIN32
25+
#include <winsock2.h>
26+
#endif
27+
2328
namespace mp {
2429

2530
//! Generic utility functions used by capnp code.
@@ -249,25 +254,46 @@ std::string ThreadName(const char* exe_name);
249254
//! errors in python unit tests.
250255
std::string LogEscape(const kj::StringTree& string, size_t max_size);
251256

257+
#ifdef WIN32
258+
using ProcessId = uintptr_t;
259+
using SocketId = uintptr_t;
260+
constexpr SocketId SocketError{INVALID_SOCKET};
261+
#else
262+
using ProcessId = int;
263+
using SocketId = int;
264+
constexpr SocketId SocketError{-1};
265+
#endif
266+
267+
//! Information about parent process passed to child process. On unix this is
268+
//! just the inherited int file descriptor formatted as a string. On windows,
269+
//! this is a path to a named pipe the parent process will write
270+
//! WSADuplicateSocket info to.
271+
using ConnectInfo = std::string;
272+
252273
//! Callback type used by SpawnProcess below.
253-
using FdToArgsFn = std::function<std::vector<std::string>(int fd)>;
274+
using ConnectInfoToArgsFn = std::function<std::vector<std::string>(const ConnectInfo&)>;
275+
276+
//! Create a socket pair that can be used to communicate within a process or
277+
//! between parent and child processes.
278+
std::array<SocketId, 2> SocketPair();
279+
280+
//! Spawn a new process that communicates with the current process over provided
281+
//! socket argument. Calls connect_info_to_args callback with a connection
282+
//! string that needs to be passed to the child process, and executes the
283+
//! argv command line it returns. Returns child process id.
284+
ProcessId SpawnProcess(SocketId socket, ConnectInfoToArgsFn&& connect_info_to_args);
254285

255-
//! Spawn a new process that communicates with the current process over a socket
256-
//! pair. Returns pid through an output argument, and file descriptor for the
257-
//! local side of the socket.
258-
//! The fd_to_args callback is invoked in the parent process before fork().
259-
//! It must not rely on child pid/state, and must return the command line
260-
//! arguments that should be used to execute the process. Embed the remote file
261-
//! descriptor number in whatever format the child process expects.
262-
int SpawnProcess(int& pid, FdToArgsFn&& fd_to_args);
286+
//! Initialize spawned child process using the ConnectInfo string passed to it,
287+
//! returning a socket id for communicating with the parent process.
288+
SocketId StartSpawned(const ConnectInfo& connect_info);
263289

264290
//! Call execvp with vector args.
265291
//! Not safe to call in a post-fork child of a multi-threaded process.
266292
//! Currently only used by mpgen at build time.
267293
void ExecProcess(const std::vector<std::string>& args);
268294

269295
//! Wait for a process to exit and return its exit code.
270-
int WaitProcess(int pid);
296+
int WaitProcess(ProcessId pid);
271297

272298
inline char* CharCast(char* c) { return c; }
273299
inline char* CharCast(unsigned char* c) { return (char*)c; }

0 commit comments

Comments
 (0)