Skip to content

Commit aff37a9

Browse files
committed
proxy, refactor: Change ConnectStream and ServeStream to accept stream objects
Instead of accepting raw file descriptor integers and wrapping them internally, ConnectStream and ServeStream now accept kj::Own<kj::AsyncIoStream> directly. This removes the assumption that the transport is always a local unix fd, making the API easier to adapt to other I/O types (e.g. Windows handles). The Stream type alias (kj::Own<kj::AsyncIoStream>) is added as a convenience, along with StreamSocketId() to extract the underlying fd from a Stream when needed. Callers are updated to wrap their fd with wrapSocketFd() before calling.
1 parent 34b0b24 commit aff37a9

5 files changed

Lines changed: 27 additions & 14 deletions

File tree

example/calculator.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <functional>
1212
#include <iostream>
1313
#include <kj/async.h>
14+
#include <kj/async-io.h>
1415
#include <kj/common.h>
1516
#include <kj/memory.h>
1617
#include <memory>
@@ -53,7 +54,8 @@ int main(int argc, char** argv)
5354
mp::SocketId socket{mp::StartSpawned(argv[1])};
5455
mp::EventLoop loop("mpcalculator", LogPrint);
5556
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
56-
mp::ServeStream<InitInterface>(loop, socket, *init);
57+
mp::Stream stream{loop.m_io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
58+
mp::ServeStream<InitInterface>(loop, kj::mv(stream), *init);
5759
loop.loop();
5860
return 0;
5961
}

example/example.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
#include <future>
1212
#include <iostream>
1313
#include <kj/async.h>
14+
#include <kj/async-io.h>
1415
#include <kj/common.h>
16+
#include <kj/memory.h>
1517
#include <memory>
1618
#include <mp/proxy-io.h>
1719
#include <mp/util.h>
@@ -32,7 +34,7 @@ static auto Spawn(mp::EventLoop& loop, const std::string& process_argv0, const s
3234
path.append(new_exe_name);
3335
return {path.string(), std::move(info)};
3436
});
35-
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, socket), pid);
37+
return std::make_tuple(mp::ConnectStream<InitInterface>(loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(socket)), pid);
3638
}
3739

3840
static void LogPrint(mp::LogMessage log_data)

example/printer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <fstream>
1212
#include <iostream>
1313
#include <kj/async.h>
14+
#include <kj/async-io.h>
1415
#include <kj/common.h>
1516
#include <kj/memory.h>
1617
#include <memory>
@@ -47,7 +48,8 @@ int main(int argc, char** argv)
4748
mp::SocketId socket{mp::StartSpawned(argv[1])};
4849
mp::EventLoop loop("mpprinter", LogPrint);
4950
std::unique_ptr<Init> init = std::make_unique<InitImpl>();
50-
mp::ServeStream<InitInterface>(loop, socket, *init);
51+
mp::Stream stream{loop.m_io_context.lowLevelProvider->wrapSocketFd(socket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
52+
mp::ServeStream<InitInterface>(loop, std::move(stream), *init);
5153
loop.loop();
5254
return 0;
5355
}

include/mp/proxy-io.h

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@ class Logger
211211

212212
std::string LongThreadName(const char* exe_name);
213213

214+
inline SocketId StreamSocketId(const Stream& stream)
215+
{
216+
if (stream) KJ_IF_MAYBE(fd, stream->getFd()) return *fd;
217+
throw std::logic_error("Stream socket unset");
218+
}
219+
214220
//! Event loop implementation.
215221
//!
216222
//! Cap'n Proto threading model is very simple: all I/O operations are
@@ -795,17 +801,15 @@ kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
795801
return ret;
796802
}
797803

798-
//! Given stream file descriptor, make a new ProxyClient object to send requests
799-
//! over the stream. Also create a new Connection object embedded in the
800-
//! client that is freed when the client is closed.
804+
//! Given a stream, make a new ProxyClient object to send requests over it.
805+
//! Also create a new Connection object embedded in the client that is freed
806+
//! when the client is closed.
801807
template <typename InitInterface>
802-
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
808+
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream)
803809
{
804810
typename InitInterface::Client init_client(nullptr);
805811
std::unique_ptr<Connection> connection;
806812
loop.sync([&] {
807-
auto stream =
808-
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
809813
connection = std::make_unique<Connection>(loop, kj::mv(stream));
810814
init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
811815
Connection* connection_ptr = connection.get();
@@ -853,13 +857,12 @@ void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitIm
853857
}));
854858
}
855859

856-
//! Given stream file descriptor and an init object, handle requests on the
857-
//! stream by calling methods on the Init object.
860+
//! Given a stream and an init object, handle requests on the stream by calling
861+
//! methods on the Init object.
858862
template <typename InitInterface, typename InitImpl>
859-
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
863+
void ServeStream(EventLoop& loop, kj::Own<kj::AsyncIoStream> stream, InitImpl& init)
860864
{
861-
_Serve<InitInterface>(
862-
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
865+
_Serve<InitInterface>(loop, kj::mv(stream), init);
863866
}
864867

865868
//! Given listening socket file descriptor and an init object, handle incoming

include/mp/util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include <cstring>
1313
#include <exception>
1414
#include <functional>
15+
#include <kj/async-io.h>
16+
#include <kj/memory.h>
1517
#include <kj/string-tree.h>
1618
#include <mutex>
1719
#include <string>
@@ -250,6 +252,8 @@ std::string ThreadName(const char* exe_name);
250252
//! errors in python unit tests.
251253
std::string LogEscape(const kj::StringTree& string, size_t max_size);
252254

255+
using Stream = kj::Own<kj::AsyncIoStream>;
256+
253257
using ProcessId = int;
254258
using SocketId = int;
255259
constexpr SocketId SocketError{-1};

0 commit comments

Comments
 (0)