Skip to content

Commit 540e041

Browse files
committed
Implement epoll for posix
1 parent b5e5d64 commit 540e041

26 files changed

Lines changed: 1756 additions & 383 deletions

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on:
44
push:
55
branches: [ "**" ]
66
pull_request:
7-
branches: [ master ]
7+
branches: [ master, develop ]
88
workflow_dispatch:
99

1010
jobs:

CMakeLists.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,12 @@ file(GLOB_RECURSE BOOST_COROSIO_HEADERS CONFIGURE_DEPENDS
130130
"${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio/*.hpp"
131131
"${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio.hpp")
132132
file(GLOB_RECURSE BOOST_COROSIO_SOURCES CONFIGURE_DEPENDS
133-
"${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.hpp"
134-
"${CMAKE_CURRENT_SOURCE_DIR}/src/src/*.cpp")
133+
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp"
134+
"${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp")
135135

136136
source_group("" FILES "include/boost/corosio.hpp")
137137
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/include/boost/corosio" PREFIX "include" FILES ${BOOST_COROSIO_HEADERS})
138-
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES})
138+
source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}/src" PREFIX "src" FILES ${BOOST_COROSIO_SOURCES})
139139

140140
function(boost_corosio_setup_properties target)
141141
target_compile_features(${target} PUBLIC cxx_std_20)

include/boost/corosio/acceptor.hpp

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -267,17 +267,6 @@ class acceptor : public io_object
267267
BOOST_COROSIO_DECL
268268
void cancel();
269269

270-
/** Return the execution context.
271-
272-
@return Reference to the execution context that owns this acceptor.
273-
*/
274-
auto
275-
context() const noexcept ->
276-
capy::execution_context&
277-
{
278-
return *ctx_;
279-
}
280-
281270
struct acceptor_impl : io_object_impl
282271
{
283272
virtual void accept(
@@ -294,8 +283,6 @@ class acceptor : public io_object
294283
{
295284
return *static_cast<acceptor_impl*>(impl_);
296285
}
297-
298-
capy::execution_context* ctx_;
299286
};
300287

301288
} // namespace corosio
Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
#include <boost/corosio/acceptor.hpp>
1111

1212
#ifdef _WIN32
13-
#include "src/detail/win_iocp_sockets.hpp"
13+
#include "detail/win_iocp_sockets.hpp"
14+
#else
15+
#include "detail/posix_sockets.hpp"
1416
#endif
1517

1618
#include <boost/corosio/detail/except.hpp>
@@ -25,7 +27,8 @@ namespace {
2527
using acceptor_service = detail::win_iocp_sockets;
2628
using acceptor_impl_type = detail::win_acceptor_impl;
2729
#else
28-
#error "Unsupported platform"
30+
using acceptor_service = detail::posix_sockets;
31+
using acceptor_impl_type = detail::posix_acceptor_impl;
2932
#endif
3033

3134
} // namespace

src/detail/posix_op.hpp

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
//
2+
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/corosio
8+
//
9+
10+
#ifndef BOOST_COROSIO_DETAIL_POSIX_OP_HPP
11+
#define BOOST_COROSIO_DETAIL_POSIX_OP_HPP
12+
13+
#include <boost/corosio/detail/config.hpp>
14+
#include <boost/corosio/io_object.hpp>
15+
#include <boost/capy/ex/any_dispatcher.hpp>
16+
#include <boost/capy/concept/affine_awaitable.hpp>
17+
#include <boost/capy/ex/any_coro.hpp>
18+
#include <boost/capy/error.hpp>
19+
#include <boost/capy/ex/execution_context.hpp>
20+
#include <boost/system/error_code.hpp>
21+
22+
#include <unistd.h>
23+
#include <errno.h>
24+
25+
#include <atomic>
26+
#include <cstddef>
27+
#include <cstdint>
28+
#include <optional>
29+
#include <stop_token>
30+
31+
#include <netinet/in.h>
32+
#include <sys/socket.h>
33+
#include <sys/uio.h>
34+
35+
namespace boost {
36+
namespace corosio {
37+
namespace detail {
38+
39+
/** Base class for POSIX async operations.
40+
41+
This class is analogous to overlapped_op on Windows.
42+
It stores the coroutine handle, dispatcher, and result
43+
pointers needed to complete an async operation.
44+
*/
45+
struct posix_op : capy::execution_context::handler
46+
{
47+
struct canceller
48+
{
49+
posix_op* op;
50+
void operator()() const noexcept { op->request_cancel(); }
51+
};
52+
53+
capy::any_coro h;
54+
capy::any_dispatcher d;
55+
system::error_code* ec_out = nullptr;
56+
std::size_t* bytes_out = nullptr;
57+
58+
int fd = -1; // Socket file descriptor
59+
std::uint32_t events = 0; // Requested epoll events (EPOLLIN/EPOLLOUT)
60+
int error = 0; // errno on completion
61+
std::size_t bytes_transferred = 0;
62+
63+
std::atomic<bool> cancelled{false};
64+
std::optional<std::stop_callback<canceller>> stop_cb;
65+
66+
posix_op()
67+
{
68+
data_ = this;
69+
}
70+
71+
void reset() noexcept
72+
{
73+
fd = -1;
74+
events = 0;
75+
error = 0;
76+
bytes_transferred = 0;
77+
cancelled.store(false, std::memory_order_relaxed);
78+
}
79+
80+
void operator()() override
81+
{
82+
stop_cb.reset();
83+
84+
if (ec_out)
85+
{
86+
if (cancelled.load(std::memory_order_acquire))
87+
*ec_out = make_error_code(system::errc::operation_canceled);
88+
else if (error != 0)
89+
*ec_out = system::error_code(error, system::system_category());
90+
else if (is_read_operation() && bytes_transferred == 0)
91+
{
92+
// EOF: 0 bytes transferred with no error indicates end of stream
93+
*ec_out = make_error_code(capy::error::eof);
94+
}
95+
}
96+
97+
if (bytes_out)
98+
*bytes_out = bytes_transferred;
99+
100+
d(h).resume();
101+
}
102+
103+
// Returns true if this is a read operation (for EOF detection)
104+
virtual bool is_read_operation() const noexcept { return false; }
105+
106+
void destroy() override
107+
{
108+
stop_cb.reset();
109+
}
110+
111+
void request_cancel() noexcept
112+
{
113+
cancelled.store(true, std::memory_order_release);
114+
}
115+
116+
void start(std::stop_token token)
117+
{
118+
cancelled.store(false, std::memory_order_release);
119+
stop_cb.reset();
120+
121+
if (token.stop_possible())
122+
stop_cb.emplace(token, canceller{this});
123+
}
124+
125+
void complete(int err, std::size_t bytes) noexcept
126+
{
127+
error = err;
128+
bytes_transferred = bytes;
129+
}
130+
131+
/** Called when epoll signals the fd is ready.
132+
Derived classes override this to perform the actual I/O.
133+
Sets error and bytes_transferred appropriately.
134+
*/
135+
virtual void perform_io() noexcept {}
136+
};
137+
138+
inline posix_op*
139+
get_posix_op(capy::execution_context::handler* h) noexcept
140+
{
141+
return static_cast<posix_op*>(h->data());
142+
}
143+
144+
//------------------------------------------------------------------------------
145+
146+
/** Connect operation state. */
147+
struct posix_connect_op : posix_op
148+
{
149+
void perform_io() noexcept override
150+
{
151+
// For connect, check SO_ERROR to see if connection succeeded
152+
int err = 0;
153+
socklen_t len = sizeof(err);
154+
if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
155+
err = errno;
156+
complete(err, 0);
157+
}
158+
};
159+
160+
//------------------------------------------------------------------------------
161+
162+
/** Read operation state with buffer descriptors. */
163+
struct posix_read_op : posix_op
164+
{
165+
static constexpr std::size_t max_buffers = 16;
166+
iovec iovecs[max_buffers];
167+
int iovec_count = 0;
168+
169+
bool is_read_operation() const noexcept override { return true; }
170+
171+
void reset() noexcept
172+
{
173+
posix_op::reset();
174+
iovec_count = 0;
175+
}
176+
177+
void perform_io() noexcept override
178+
{
179+
ssize_t n = ::readv(fd, iovecs, iovec_count);
180+
if (n >= 0)
181+
complete(0, static_cast<std::size_t>(n));
182+
else
183+
complete(errno, 0);
184+
}
185+
};
186+
187+
//------------------------------------------------------------------------------
188+
189+
/** Write operation state with buffer descriptors. */
190+
struct posix_write_op : posix_op
191+
{
192+
static constexpr std::size_t max_buffers = 16;
193+
iovec iovecs[max_buffers];
194+
int iovec_count = 0;
195+
196+
void reset() noexcept
197+
{
198+
posix_op::reset();
199+
iovec_count = 0;
200+
}
201+
202+
void perform_io() noexcept override
203+
{
204+
ssize_t n = ::writev(fd, iovecs, iovec_count);
205+
if (n >= 0)
206+
complete(0, static_cast<std::size_t>(n));
207+
else
208+
complete(errno, 0);
209+
}
210+
};
211+
212+
//------------------------------------------------------------------------------
213+
214+
/** Accept operation state. */
215+
struct posix_accept_op : posix_op
216+
{
217+
int accepted_fd = -1;
218+
io_object::io_object_impl* peer_impl = nullptr;
219+
io_object::io_object_impl** impl_out = nullptr;
220+
221+
// Function to create peer impl - set by posix_sockets
222+
using create_peer_fn = io_object::io_object_impl* (*)(void*, int);
223+
create_peer_fn create_peer = nullptr;
224+
void* service_ptr = nullptr;
225+
226+
void reset() noexcept
227+
{
228+
posix_op::reset();
229+
accepted_fd = -1;
230+
peer_impl = nullptr;
231+
impl_out = nullptr;
232+
// Don't reset create_peer and service_ptr - they're set once
233+
}
234+
235+
void perform_io() noexcept override
236+
{
237+
sockaddr_in addr{};
238+
socklen_t addrlen = sizeof(addr);
239+
int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
240+
&addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
241+
242+
if (new_fd >= 0)
243+
{
244+
accepted_fd = new_fd;
245+
if (create_peer && service_ptr)
246+
peer_impl = create_peer(service_ptr, new_fd);
247+
complete(0, 0);
248+
}
249+
else
250+
{
251+
complete(errno, 0);
252+
}
253+
}
254+
255+
void operator()() override
256+
{
257+
stop_cb.reset();
258+
259+
bool success = (error == 0 && !cancelled.load(std::memory_order_acquire));
260+
261+
if (ec_out)
262+
{
263+
if (cancelled.load(std::memory_order_acquire))
264+
*ec_out = make_error_code(system::errc::operation_canceled);
265+
else if (error != 0)
266+
*ec_out = system::error_code(error, system::system_category());
267+
}
268+
269+
if (success && accepted_fd >= 0 && peer_impl)
270+
{
271+
// Pass impl to awaitable for assignment to peer socket
272+
if (impl_out)
273+
*impl_out = peer_impl;
274+
peer_impl = nullptr;
275+
}
276+
else
277+
{
278+
// Cleanup on failure
279+
if (accepted_fd >= 0)
280+
{
281+
::close(accepted_fd);
282+
accepted_fd = -1;
283+
}
284+
285+
if (peer_impl)
286+
{
287+
peer_impl->release();
288+
peer_impl = nullptr;
289+
}
290+
291+
if (impl_out)
292+
*impl_out = nullptr;
293+
}
294+
295+
d(h).resume();
296+
}
297+
};
298+
299+
} // namespace detail
300+
} // namespace corosio
301+
} // namespace boost
302+
303+
#endif

0 commit comments

Comments
 (0)