Skip to content

Commit 53dc5b9

Browse files
committed
util: Add Windows support
Add Windows-specific code to support building and running on Windows: - util.h: Guard ProcessId/SocketId/SocketError type aliases with WIN32 ifdefs so they use SOCKET/uintptr_t on Windows and int on Unix. Add winsock2.h include on Windows. - util.cpp: Guard Unix-specific system headers with WIN32 ifdefs. Add Windows-specific includes (windows.h, winsock2.h). Guard MaxFd() with #ifndef WIN32. Add GetCurrentThreadId() branch in ThreadName(). Add win32Socketpair() forward-declare. Add Windows branch in SocketPair() using win32Socketpair(). Add CommandLineFromArgv() helper needed to construct CreateProcess command lines. Add Windows branch in SpawnProcess() using named pipes and WSADuplicateSocket to pass socket to child. Add Windows branch in StartSpawned() reading socket from named pipe. Add Windows branch in WaitProcess() using WaitForSingleObject/GetExitCodeProcess. - proxy-io.h: Add Windows branch in StreamSocketId() using getWin32Handle(). - proxy.cpp: Add SocketOutputStream class on Windows (analogous to FdOutputStream but using SOCKET/send()). Add Windows branch in EventLoop constructor to create m_post_writer using SocketOutputStream.
1 parent 43dfe34 commit 53dc5b9

5 files changed

Lines changed: 191 additions & 7 deletions

File tree

include/mp/proxy-io.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ std::string LongThreadName(const char* exe_name);
214214
inline SocketId StreamSocketId(const Stream& stream)
215215
{
216216
if (stream) KJ_IF_MAYBE(fd, stream->getFd()) return *fd;
217+
#ifdef WIN32
218+
if (stream) KJ_IF_MAYBE(handle, stream->getWin32Handle()) return reinterpret_cast<SocketId>(*handle);
219+
#endif
217220
throw std::logic_error("Stream socket unset");
218221
}
219222

include/mp/util.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
#include <variant>
2424
#include <vector>
2525

26+
#ifdef WIN32
27+
#include <winsock2.h>
28+
#endif
29+
2630
namespace mp {
2731

2832
//! Generic utility functions used by capnp code.
@@ -254,12 +258,20 @@ std::string LogEscape(const kj::StringTree& string, size_t max_size);
254258

255259
using Stream = kj::Own<kj::AsyncIoStream>;
256260

261+
#ifdef WIN32
262+
using ProcessId = uintptr_t;
263+
using SocketId = uintptr_t;
264+
constexpr SocketId SocketError{INVALID_SOCKET};
265+
#else
257266
using ProcessId = int;
258267
using SocketId = int;
259268
constexpr SocketId SocketError{-1};
269+
#endif
260270

261271
//! Information about parent process passed to child process as a command-line
262272
//! argument. On unix this is the child socket fd number formatted as a string.
273+
//! On windows, this is a path to a named pipe the parent process will write
274+
//! WSADuplicateSocket info to.
263275
using ConnectInfo = std::string;
264276

265277
//! Callback type used by SpawnProcess below.

src/mp/proxy.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,40 @@ void Connection::removeSyncCleanup(CleanupIt it)
194194
m_sync_cleanup_fns.erase(it);
195195
}
196196

197+
#ifdef WIN32
198+
//! Synchronous socket output stream. Cap'n Proto library only provides limited
199+
//! support for synchronous IO. It provides `FdOutputStream` which wraps unix
200+
//! file descriptors and calls write() internally, and `HandleOutStream` which
201+
//! wraps windows HANDLE values and calls WriteFile() internally. This class
202+
//! just provides analogous functionality wrapping SOCKET values and calls
203+
//! send() internally.
204+
class SocketOutputStream : public kj::OutputStream {
205+
public:
206+
explicit SocketOutputStream(SOCKET socket) : m_socket(socket) {}
207+
208+
void write(const void* buffer, size_t size) override;
209+
210+
private:
211+
SOCKET m_socket;
212+
};
213+
214+
static constexpr size_t WRITE_CLAMP_SIZE = 1u << 30; // 1GB clamp for Windows, like FdOutputStream
215+
216+
void SocketOutputStream::write(const void* buffer, size_t size) {
217+
const char* pos = reinterpret_cast<const char*>(buffer);
218+
219+
while (size > 0) {
220+
int n = send(m_socket, pos, static_cast<int>(kj::min(size, WRITE_CLAMP_SIZE)), 0);
221+
222+
KJ_WIN32(n != SOCKET_ERROR, "send() failed");
223+
KJ_ASSERT(n > 0, "send() returned zero.");
224+
225+
pos += n;
226+
size -= n;
227+
}
228+
}
229+
#endif
230+
197231
void EventLoop::addAsyncCleanup(std::function<void()> fn)
198232
{
199233
const Lock lock(m_mutex);
@@ -229,6 +263,10 @@ EventLoop::EventLoop(const char* exe_name, LogOptions log_opts, void* context)
229263
m_post_stream = kj::mv(pipe.ends[1]);
230264
KJ_IF_MAYBE(fd, m_post_stream->getFd()) {
231265
m_post_writer = kj::heap<kj::FdOutputStream>(*fd);
266+
#ifdef WIN32
267+
} else KJ_IF_MAYBE(handle, m_post_stream->getWin32Handle()) {
268+
m_post_writer = kj::heap<SocketOutputStream>(reinterpret_cast<SOCKET>(*handle));
269+
#endif
232270
} else {
233271
throw std::logic_error("Could not get file descriptor for new pipe.");
234272
}

src/mp/util.cpp

Lines changed: 125 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,24 @@
1515
#include <pthread.h>
1616
#include <sstream>
1717
#include <string>
18-
#include <fcntl.h>
19-
#include <sys/resource.h>
20-
#include <sys/socket.h>
21-
#include <sys/wait.h>
2218
#include <system_error>
2319
#include <thread> // NOLINT(misc-include-cleaner) // IWYU pragma: keep
2420
#include <unistd.h>
2521
#include <utility>
2622
#include <vector>
2723

24+
#ifdef WIN32
25+
#include <atomic>
26+
#include <windows.h>
27+
#include <winsock2.h>
28+
#else
29+
#include <fcntl.h>
30+
#include <sys/resource.h>
31+
#include <sys/socket.h>
32+
#include <sys/types.h>
33+
#include <sys/wait.h>
34+
#endif
35+
2836
#ifdef __linux__
2937
#include <sys/syscall.h>
3038
#endif
@@ -33,6 +41,11 @@
3341
#include <pthread_np.h>
3442
#endif // HAVE_PTHREAD_GETTHREADID_NP
3543

44+
#ifdef WIN32
45+
// Forward-declare internal capnp function.
46+
namespace kj { namespace _ { int win32Socketpair(SOCKET socks[2]); } }
47+
#endif
48+
3649
namespace fs = std::filesystem;
3750

3851
namespace mp {
@@ -49,6 +62,7 @@ std::vector<char*> MakeArgv(const std::vector<std::string>& args)
4962
return argv;
5063
}
5164

65+
#ifndef WIN32
5266
//! Return highest possible file descriptor.
5367
size_t MaxFd()
5468
{
@@ -59,6 +73,7 @@ size_t MaxFd()
5973
return 1023;
6074
}
6175
}
76+
#endif
6277

6378
} // namespace
6479

@@ -80,6 +95,8 @@ std::string ThreadName(const char* exe_name)
8095
// the former are shorter and are the same as what gdb prints "LWP ...".
8196
#ifdef __linux__
8297
buffer << syscall(SYS_gettid);
98+
#elif defined(WIN32)
99+
buffer << GetCurrentThreadId();
83100
#elif defined(HAVE_PTHREAD_THREADID_NP)
84101
uint64_t tid = 0;
85102
pthread_threadid_np(nullptr, &tid);
@@ -117,10 +134,56 @@ std::string LogEscape(const kj::StringTree& string, size_t max_size)
117134
return result;
118135
}
119136

137+
//! Generate command line that the executable being invoked will split up using
138+
//! the CommandLineToArgvW function, which expects arguments with spaces to be
139+
//! quoted, quote characters to be backslash-escaped, and backslashes to also be
140+
//! backslash-escaped, but only if they precede a quote character.
141+
std::string CommandLineFromArgv(const std::vector<std::string>& argv)
142+
{
143+
std::string out;
144+
for (const auto& arg : argv) {
145+
if (!out.empty()) out += " ";
146+
if (!arg.empty() && arg.find_first_of(" \t\"") == std::string::npos) {
147+
// Argument has no quotes or spaces so escaping not necessary.
148+
out += arg;
149+
} else {
150+
out += '"'; // Start with a quote
151+
for (size_t i = 0; i < arg.size(); ++i) {
152+
if (arg[i] == '\\') {
153+
// Count consecutive backslashes
154+
size_t backslash_count = 0;
155+
while (i < arg.size() && arg[i] == '\\') {
156+
++backslash_count;
157+
++i;
158+
}
159+
if (i < arg.size() && arg[i] == '"') {
160+
// Backslashes before a quote need to be doubled
161+
out.append(backslash_count * 2 + 1, '\\');
162+
out.push_back('"');
163+
} else {
164+
// Otherwise, backslashes remain as-is
165+
out.append(backslash_count, '\\');
166+
--i; // Compensate for the outer loop's increment
167+
}
168+
} else if (arg[i] == '"') {
169+
// Escape double quotes with a backslash
170+
out.push_back('\\');
171+
out.push_back('"');
172+
} else {
173+
out.push_back(arg[i]);
174+
}
175+
}
176+
out += '"'; // End with a quote
177+
}
178+
}
179+
return out;
180+
}
181+
120182
std::tuple<ProcessId, SocketId> SpawnProcess(ConnectInfoToArgsFn&& connect_info_to_args)
121183
{
122184
auto fds{SocketPair()};
123185

186+
#ifndef WIN32
124187
// Evaluate the callback and build the argv array before forking.
125188
//
126189
// The parent process may be multi-threaded and holding internal library
@@ -177,17 +240,66 @@ std::tuple<ProcessId, SocketId> SpawnProcess(ConnectInfoToArgsFn&& connect_info_
177240
_exit(127);
178241
}
179242
return {pid, fds[1]};
243+
#else
244+
// Create windows pipe to send socket over to child process.
245+
static std::atomic<int> counter{1};
246+
ConnectInfo pipe_path{"\\\\.\\pipe\\mp-" + std::to_string(GetCurrentProcessId()) + "-" + std::to_string(counter.fetch_add(1))};
247+
HANDLE pipe{CreateNamedPipeA(pipe_path.c_str(), PIPE_ACCESS_OUTBOUND, PIPE_TYPE_MESSAGE | PIPE_WAIT, 1, 0, 0, 0, nullptr)};
248+
KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateNamedPipe failed");
249+
250+
// Start child process
251+
std::string cmd{CommandLineFromArgv(connect_info_to_args(pipe_path))};
252+
STARTUPINFOA si{};
253+
si.cb = sizeof(si);
254+
PROCESS_INFORMATION pi{};
255+
KJ_WIN32(CreateProcessA(nullptr, const_cast<char*>(cmd.c_str()), nullptr, nullptr, TRUE, 0, nullptr, nullptr, &si, &pi), "CreateProcess failed");
256+
CloseHandle(pi.hThread); // not needed
257+
258+
// Duplicate socket for the child (now that we know its PID)
259+
WSAPROTOCOL_INFO info{};
260+
KJ_WINSOCK(WSADuplicateSocket(fds[0], pi.dwProcessId, &info), "WSADuplicateSocket failed");
261+
262+
// Send socket to the child via the pipe
263+
KJ_WIN32(ConnectNamedPipe(pipe, nullptr) || GetLastError() == ERROR_PIPE_CONNECTED, "ConnectNamedPipe failed");
264+
DWORD wr;
265+
KJ_WIN32(WriteFile(pipe, &info, sizeof(info), &wr, nullptr) && wr == sizeof(info), "WriteFile(pipe) failed");
266+
CloseHandle(pipe);
267+
268+
return {reinterpret_cast<ProcessId>(pi.hProcess), fds[1]};
269+
#endif
180270
}
181271

182272
SocketId StartSpawned(const ConnectInfo& connect_info)
183273
{
274+
#ifndef WIN32
184275
return std::stoi(connect_info);
276+
#else
277+
HANDLE pipe = CreateFileA(connect_info.c_str(), GENERIC_READ, 0, nullptr, OPEN_EXISTING, 0, nullptr);
278+
KJ_WIN32(pipe != INVALID_HANDLE_VALUE, "CreateFile(pipe) failed");
279+
280+
WSAPROTOCOL_INFO info{};
281+
DWORD rd;
282+
KJ_WIN32(ReadFile(pipe, &info, sizeof(info), &rd, nullptr) && rd == sizeof(info), "ReadFile(pipe) failed");
283+
CloseHandle(pipe);
284+
285+
WSADATA dontcare;
286+
KJ_WIN32(WSAStartup(MAKEWORD(2, 2), &dontcare) != 0, "WSAStartup() failed");
287+
288+
SOCKET socket{WSASocketA(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &info, 0, WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT)};
289+
KJ_WINSOCK(socket, "WSASocket(FROM_PROTOCOL_INFO) failed");
290+
return socket;
291+
#endif
185292
}
186293

187294
std::array<SocketId, 2> SocketPair()
188295
{
296+
#ifdef WIN32
297+
SOCKET pair[2];
298+
KJ_WINSOCK(kj::_::win32Socketpair(pair));
299+
#else
189300
int pair[2];
190301
KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, pair));
302+
#endif
191303
return {pair[0], pair[1]};
192304
}
193305

@@ -205,11 +317,20 @@ void ExecProcess(const std::vector<std::string>& args)
205317

206318
int WaitProcess(ProcessId pid)
207319
{
320+
#ifndef WIN32
208321
int status;
209322
if (::waitpid(pid, &status, /*options=*/0) != pid) {
210323
throw std::system_error(errno, std::system_category(), "waitpid");
211324
}
212325
return status;
326+
#else
327+
HANDLE handle{reinterpret_cast<HANDLE>(pid)};
328+
DWORD result{WaitForSingleObject(handle, INFINITE)};
329+
KJ_WIN32(result != WAIT_OBJECT_0, "WaitForSingleObject(child) failed");
330+
KJ_WIN32(GetExitCodeProcess(handle, &result), "GetExitCodeProcess failed");
331+
CloseHandle(handle);
332+
return result;
333+
#endif
213334
}
214335

215336
} // namespace mp

test/mp/test/spawn_tests.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,25 @@
99
#include <chrono>
1010
#include <compare>
1111
#include <condition_variable>
12-
#include <csignal>
1312
#include <cstdlib>
1413
#include <mutex>
1514
#include <string>
16-
#include <sys/wait.h>
1715
#include <thread>
1816
#include <tuple>
19-
#include <unistd.h>
2017
#include <utility>
2118
#include <vector>
2219

20+
#ifndef WIN32
21+
#include <csignal>
22+
#include <sys/wait.h>
23+
#include <unistd.h>
24+
#endif
25+
2326
namespace mp {
2427
namespace test {
2528
namespace {
2629

30+
#ifndef WIN32
2731
constexpr auto FAILURE_TIMEOUT = std::chrono::seconds{30};
2832

2933
// Poll for child process exit using waitpid(..., WNOHANG) until the child exits
@@ -44,14 +48,19 @@ static bool WaitPidWithTimeout(ProcessId pid, std::chrono::milliseconds timeout,
4448
}
4549
return false;
4650
}
51+
#endif // !WIN32
4752

4853
} // namespace
4954

55+
#ifndef WIN32
5056
KJ_TEST("SpawnProcess does not run callback in child")
5157
{
5258
// This test is designed to fail deterministically if fd_to_args is invoked
5359
// in the post-fork child: a mutex held by another parent thread at fork
5460
// time appears locked forever in the child.
61+
//
62+
// This test is Unix-only: Windows uses CreateProcess (not fork), so the
63+
// inherited-locked-mutex hazard does not apply there.
5564
std::mutex target_mutex;
5665
std::mutex control_mutex;
5766
std::condition_variable control_cv;
@@ -113,5 +122,6 @@ KJ_TEST("SpawnProcess does not run callback in child")
113122
KJ_EXPECT(exited, "Timeout waiting for child process to exit");
114123
KJ_EXPECT(WIFEXITED(status) && WEXITSTATUS(status) == 0);
115124
}
125+
#endif // !WIN32
116126
} // namespace test
117127
} // namespace mp

0 commit comments

Comments
 (0)