3030#include < optional>
3131#include < stdexcept>
3232#include < string>
33- #include < sys/socket.h>
3433#include < thread>
3534#include < tuple>
36- #include < unistd.h>
3735#include < utility>
3836
37+ #ifndef WIN32
38+ #include < sys/socket.h>
39+ #include < unistd.h>
40+ #endif
41+
3942namespace mp {
4043
4144thread_local ThreadContext g_thread_context;
@@ -66,10 +69,9 @@ void EventLoopRef::reset(bool relock) MP_NO_TSA
6669 loop->m_num_clients -= 1 ;
6770 if (loop->done ()) {
6871 loop->m_cv .notify_all ();
69- int post_fd{loop->m_post_fd };
7072 loop_lock->unlock ();
7173 char buffer = 0 ;
72- KJ_SYSCALL ( write (post_fd, &buffer, 1 )); // NOLINT(bugprone-suspicious-semicolon)
74+ loop-> m_post_writer -> write (&buffer, 1 );
7375 // By default, do not try to relock `loop_lock` after writing,
7476 // because the event loop could wake up and destroy itself and the
7577 // mutex might no longer exist.
@@ -96,6 +98,20 @@ Connection::~Connection()
9698 // after the calls finish.
9799 m_rpc_system.reset ();
98100
101+ // shutdownWrite is needed on Windows so pending data in the m_stream socket
102+ // will be sent instead of discarded when m_stream is destroyed. On unix,
103+ // this doesn't seem to be needed because data is sent more reliably.
104+ //
105+ // Sending pending data is important if the connection is a socketpair
106+ // because when one side of the socketpair is closed, the other side doesn't
107+ // seem to receive any onDisconnect event. So it is important for the other
108+ // side to instead receive Cap'n Proto "release" messages (see `struct
109+ // Release` in capnp/rpc.capnp) from local Client objects being being
110+ // destroyed so the remote side can free resources and shut down cleanly.
111+ // Without this call, Server objects corresponding to the Client objects on
112+ // the other side of the connection are not freed by Cap'n Proto.
113+ m_stream->shutdownWrite ();
114+
99115 // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
100116 // handlers are in the async list.
101117 //
@@ -192,17 +208,59 @@ void EventLoop::addAsyncCleanup(std::function<void()> fn)
192208 startAsyncThread ();
193209}
194210
211+ #ifdef WIN32
212+ // ! Synchronous socket output stream. Cap'n Proto library only provides limited
213+ // ! support for synchronous IO. It provides `FdOutputStream` which wraps unix
214+ // ! file descriptors and calls write() internally, and `HandleOutStream` which
215+ // ! wraps windows HANDLE values and calls WriteFile() internally. This class
216+ // ! just provides analagous functionality wrapping SOCKET values and calls
217+ // ! send() internally.
218+ class SocketOutputStream : public kj ::OutputStream {
219+ public:
220+ explicit SocketOutputStream (SOCKET socket) : m_socket(socket) {}
221+
222+ void write (const void * buffer, size_t size) override ;
223+
224+ private:
225+ SOCKET m_socket;
226+ };
227+
228+ static constexpr size_t WRITE_CLAMP_SIZE = 1u << 30 ; // 1GB clamp for Windows, like FdOutputStream
229+
230+ void SocketOutputStream::write (const void * buffer, size_t size) {
231+ const char * pos = reinterpret_cast <const char *>(buffer);
232+
233+ while (size > 0 ) {
234+ int n = send (m_socket, pos, static_cast <int >(kj::min (size, WRITE_CLAMP_SIZE)), 0 );
235+
236+ KJ_WIN32 (n != SOCKET_ERROR, " send() failed" );
237+ KJ_ASSERT (n > 0 , " send() returned zero." );
238+
239+ pos += n;
240+ size -= n;
241+ }
242+ }
243+ #endif
244+
195245EventLoop::EventLoop (const char * exe_name, LogOptions log_opts, void * context)
196246 : m_exe_name(exe_name),
197247 m_io_context (kj::setupAsyncIo()),
198248 m_task_set(new kj::TaskSet(m_error_handler)),
199249 m_log_opts(std::move(log_opts)),
200250 m_context(context)
201251{
202- int fds[2 ];
203- KJ_SYSCALL (socketpair (AF_UNIX, SOCK_STREAM, 0 , fds));
204- m_wait_fd = fds[0 ];
205- m_post_fd = fds[1 ];
252+ auto pipe = m_io_context.provider ->newTwoWayPipe ();
253+ m_wait_stream = kj::mv (pipe.ends [0 ]);
254+ m_post_stream = kj::mv (pipe.ends [1 ]);
255+ KJ_IF_MAYBE (fd, m_post_stream->getFd ()) {
256+ m_post_writer = kj::heap<kj::FdOutputStream>(*fd);
257+ #ifdef WIN32
258+ } else KJ_IF_MAYBE (handle, m_post_stream->getWin32Handle ()) {
259+ m_post_writer = kj::heap<SocketOutputStream>(reinterpret_cast <SOCKET>(*handle));
260+ #endif
261+ } else {
262+ throw std::logic_error (" Could not get file descriptor for new pipe." );
263+ }
206264}
207265
208266EventLoop::~EventLoop ()
@@ -211,8 +269,8 @@ EventLoop::~EventLoop()
211269 const Lock lock (m_mutex);
212270 KJ_ASSERT (m_post_fn == nullptr );
213271 KJ_ASSERT (!m_async_fns);
214- KJ_ASSERT (m_wait_fd == - 1 );
215- KJ_ASSERT (m_post_fd == - 1 );
272+ KJ_ASSERT (!m_wait_stream );
273+ KJ_ASSERT (!m_post_stream );
216274 KJ_ASSERT (m_num_clients == 0 );
217275
218276 // Spin event loop. wait for any promises triggered by RPC shutdown.
@@ -232,9 +290,7 @@ void EventLoop::loop()
232290 m_async_fns.emplace ();
233291 }
234292
235- kj::Own<kj::AsyncIoStream> wait_stream{
236- m_io_context.lowLevelProvider ->wrapSocketFd (m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
237- int post_fd{m_post_fd};
293+ kj::Own<kj::AsyncIoStream>& wait_stream{m_wait_stream};
238294 char buffer = 0 ;
239295 for (;;) {
240296 const size_t read_bytes = wait_stream->read (&buffer, 0 , 1 ).wait (m_io_context.waitScope );
@@ -246,7 +302,7 @@ void EventLoop::loop()
246302 m_cv.notify_all ();
247303 } else if (done ()) {
248304 // Intentionally do not break if m_post_fn was set, even if done()
249- // would return true, to ensure that the EventLoopRef write(post_fd )
305+ // would return true, to ensure that the EventLoopRef write(post_stream )
250306 // call always succeeds and the loop does not exit between the time
251307 // that the done condition is set and the write call is made.
252308 break ;
@@ -256,10 +312,9 @@ void EventLoop::loop()
256312 m_task_set.reset ();
257313 MP_LOG (*this , Log::Info) << " EventLoop::loop bye." ;
258314 wait_stream = nullptr ;
259- KJ_SYSCALL (::close (post_fd));
260315 const Lock lock (m_mutex);
261- m_wait_fd = - 1 ;
262- m_post_fd = - 1 ;
316+ m_wait_stream = nullptr ;
317+ m_post_stream = nullptr ;
263318 m_async_fns.reset ();
264319 m_cv.notify_all ();
265320}
@@ -274,10 +329,9 @@ void EventLoop::post(kj::Function<void()> fn)
274329 EventLoopRef ref (*this , &lock);
275330 m_cv.wait (lock.m_lock , [this ]() MP_REQUIRES (m_mutex) { return m_post_fn == nullptr ; });
276331 m_post_fn = &fn;
277- int post_fd{m_post_fd};
278332 Unlock (lock, [&] {
279333 char buffer = 0 ;
280- KJ_SYSCALL ( write (post_fd, &buffer, 1 ) );
334+ m_post_writer-> write (&buffer, 1 );
281335 });
282336 m_cv.wait (lock.m_lock , [this , &fn]() MP_REQUIRES (m_mutex) { return m_post_fn != &fn; });
283337}
0 commit comments