Skip to content

Commit 6a39d2d

Browse files
committed
Fix epoll async cancellation, TLS test corruption, and SIGPIPE handling
This commit addresses several correctness and portability issues in the epoll backend and TLS implementations. epoll completion/cancellation race: - Add atomic `registered` flag to epoll_op as a claim token - Scheduler and cancel() now race to atomically exchange the flag - Whoever wins owns the completion, preventing double-completion bugs - cancel() properly unregisters fd from epoll and posts to completion queue - close_socket() calls cancel() first to ensure pending ops complete SIGPIPE handling: - Replace writev() with sendmsg() using MSG_NOSIGNAL flag - Matches Boost.Asio's per-syscall approach instead of global signal handler - Avoids process-wide side effects from signal(SIGPIPE, SIG_IGN) Empty buffer handling: - Extend checks to handle single zero-length buffers, not just empty iovec WolfSSL deferred context initialization: - Cache separate client_ctx_ and server_ctx_ in wolfssl_native_context - Defer SSL object creation until handshake when role is known - Works with standard WolfSSL builds without --enable-opensslextra - Single tls::context can now be shared across client and server streams TLS test lambda capture fix: - Store coroutine lambdas in named variables before invocation - Fixes dangling reference when temporary lambda is destroyed before coroutine completes, which caused client/server to share captures Add high-level overview comments to epoll implementation files explaining architecture, completion/cancellation race handling, and design rationale.
1 parent e9d9abd commit 6a39d2d

14 files changed

Lines changed: 1250 additions & 1064 deletions

File tree

include/boost/corosio/tls/openssl_stream.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ class BOOST_COROSIO_DECL openssl_stream : public tls_stream
5757
@param ctx The TLS context containing configuration.
5858
*/
5959
openssl_stream( io_stream& stream, tls::context ctx );
60+
61+
/** Destructor.
62+
63+
Releases the underlying OpenSSL resources.
64+
*/
65+
~openssl_stream();
6066
};
6167

6268
} // namespace corosio

include/boost/corosio/tls/wolfssl_stream.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ class BOOST_COROSIO_DECL
5959
@param ctx The TLS context containing configuration.
6060
*/
6161
wolfssl_stream(io_stream& stream, tls::context ctx);
62+
63+
/** Destructor.
64+
65+
Releases the underlying WolfSSL resources.
66+
*/
67+
~wolfssl_stream();
6268
};
6369

6470
} // namespace corosio

src/corosio/src/detail/epoll/op.hpp

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,23 +31,57 @@
3131
#include <atomic>
3232
#include <cstddef>
3333
#include <cstdint>
34+
#include <memory>
3435
#include <optional>
3536
#include <stop_token>
3637

3738
#include <netinet/in.h>
3839
#include <sys/socket.h>
3940
#include <sys/uio.h>
4041

42+
/*
43+
epoll Operation State
44+
=====================
45+
46+
Each async I/O operation has a corresponding epoll_op-derived struct that
47+
holds the operation's state while it's in flight. The socket impl owns
48+
fixed slots for each operation type (conn_, rd_, wr_), so only one
49+
operation of each type can be pending per socket at a time.
50+
51+
Completion vs Cancellation Race
52+
-------------------------------
53+
The `registered` atomic handles the race between epoll signaling ready
54+
and cancel() being called. Whoever atomically exchanges it from true to
55+
false "claims" the operation and is responsible for completing it. The
56+
loser sees false and does nothing. This avoids double-completion bugs
57+
without requiring a mutex in the hot path.
58+
59+
Impl Lifetime Management
60+
------------------------
61+
When cancel() posts an op to the scheduler's ready queue, the socket impl
62+
might be destroyed before the scheduler processes the op. The `impl_ptr`
63+
member holds a shared_ptr to the impl, keeping it alive until the op
64+
completes. This is set by cancel() in sockets.hpp and cleared in operator()
65+
after the coroutine is resumed. Without this, closing a socket with pending
66+
operations causes use-after-free.
67+
68+
EOF Detection
69+
-------------
70+
For reads, 0 bytes with no error means EOF. But an empty user buffer also
71+
returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases
72+
so we don't spuriously report EOF when the user just passed an empty buffer.
73+
74+
SIGPIPE Prevention
75+
------------------
76+
Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
77+
SIGPIPE when the peer has closed. This is the same approach Boost.Asio
78+
uses on Linux.
79+
*/
80+
4181
namespace boost {
4282
namespace corosio {
4383
namespace detail {
4484

45-
/** Base class for epoll async operations.
46-
47-
This class is analogous to overlapped_op on Windows.
48-
It stores the coroutine handle, executor, and result
49-
pointers needed to complete an async operation.
50-
*/
5185
struct epoll_op : scheduler_op
5286
{
5387
struct canceller
@@ -67,8 +101,13 @@ struct epoll_op : scheduler_op
67101
std::size_t bytes_transferred = 0;
68102

69103
std::atomic<bool> cancelled{false};
104+
std::atomic<bool> registered{false};
70105
std::optional<std::stop_callback<canceller>> stop_cb;
71106

107+
// Prevents use-after-free when socket is closed with pending ops.
108+
// See "Impl Lifetime Management" in file header.
109+
std::shared_ptr<void> impl_ptr;
110+
72111
epoll_op()
73112
{
74113
data_ = this;
@@ -81,6 +120,8 @@ struct epoll_op : scheduler_op
81120
errn = 0;
82121
bytes_transferred = 0;
83122
cancelled.store(false, std::memory_order_relaxed);
123+
registered.store(false, std::memory_order_relaxed);
124+
impl_ptr.reset();
84125
}
85126

86127
void operator()() override
@@ -94,23 +135,24 @@ struct epoll_op : scheduler_op
94135
else if (errn != 0)
95136
*ec_out = make_err(errn);
96137
else if (is_read_operation() && bytes_transferred == 0)
97-
{
98-
// EOF: 0 bytes transferred with no error indicates end of stream
99138
*ec_out = capy::error::eof;
100-
}
101139
}
102140

103141
if (bytes_out)
104142
*bytes_out = bytes_transferred;
105143

106-
d.dispatch(h).resume();
144+
auto saved_d = d;
145+
auto saved_h = std::move(h);
146+
impl_ptr.reset();
147+
saved_d.dispatch(saved_h).resume();
107148
}
108149

109150
virtual bool is_read_operation() const noexcept { return false; }
110151

111152
void destroy() override
112153
{
113154
stop_cb.reset();
155+
impl_ptr.reset();
114156
}
115157

116158
void request_cancel() noexcept
@@ -133,10 +175,6 @@ struct epoll_op : scheduler_op
133175
bytes_transferred = bytes;
134176
}
135177

136-
/** Called when epoll signals the fd is ready.
137-
Derived classes override this to perform the actual I/O.
138-
Sets error and bytes_transferred appropriately.
139-
*/
140178
virtual void perform_io() noexcept {}
141179
};
142180

@@ -148,12 +186,11 @@ get_epoll_op(scheduler_op* h) noexcept
148186

149187
//------------------------------------------------------------------------------
150188

151-
/** Connect operation state. */
152189
struct epoll_connect_op : epoll_op
153190
{
154191
void perform_io() noexcept override
155192
{
156-
// For connect, check SO_ERROR to see if connection succeeded
193+
// connect() completion status is retrieved via SO_ERROR, not return value
157194
int err = 0;
158195
socklen_t len = sizeof(err);
159196
if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
@@ -164,17 +201,13 @@ struct epoll_connect_op : epoll_op
164201

165202
//------------------------------------------------------------------------------
166203

167-
/** Read operation state with buffer descriptors. */
168204
struct epoll_read_op : epoll_op
169205
{
170206
static constexpr std::size_t max_buffers = 16;
171207
iovec iovecs[max_buffers];
172208
int iovec_count = 0;
173-
174-
// True when 0 bytes is due to empty buffer, not EOF
175209
bool empty_buffer_read = false;
176210

177-
// EOF only applies when we actually tried to read something
178211
bool is_read_operation() const noexcept override
179212
{
180213
return !empty_buffer_read;
@@ -199,7 +232,6 @@ struct epoll_read_op : epoll_op
199232

200233
//------------------------------------------------------------------------------
201234

202-
/** Write operation state with buffer descriptors. */
203235
struct epoll_write_op : epoll_op
204236
{
205237
static constexpr std::size_t max_buffers = 16;
@@ -214,7 +246,11 @@ struct epoll_write_op : epoll_op
214246

215247
void perform_io() noexcept override
216248
{
217-
ssize_t n = ::writev(fd, iovecs, iovec_count);
249+
msghdr msg{};
250+
msg.msg_iov = iovecs;
251+
msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
252+
253+
ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
218254
if (n >= 0)
219255
complete(0, static_cast<std::size_t>(n));
220256
else
@@ -224,7 +260,6 @@ struct epoll_write_op : epoll_op
224260

225261
//------------------------------------------------------------------------------
226262

227-
/** Accept operation state. */
228263
struct epoll_accept_op : epoll_op
229264
{
230265
int accepted_fd = -1;
@@ -241,7 +276,6 @@ struct epoll_accept_op : epoll_op
241276
accepted_fd = -1;
242277
peer_impl = nullptr;
243278
impl_out = nullptr;
244-
// Don't reset create_peer and service_ptr - they're set once
245279
}
246280

247281
void perform_io() noexcept override
@@ -302,7 +336,10 @@ struct epoll_accept_op : epoll_op
302336
*impl_out = nullptr;
303337
}
304338

305-
d.dispatch(h).resume();
339+
auto saved_d = d;
340+
auto saved_h = std::move(h);
341+
impl_ptr.reset();
342+
saved_d.dispatch(saved_h).resume();
306343
}
307344
};
308345

src/corosio/src/detail/epoll/scheduler.cpp

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,47 @@
2929
#include <sys/socket.h>
3030
#include <unistd.h>
3131

32+
/*
33+
epoll Scheduler
34+
===============
35+
36+
The scheduler is the heart of the I/O event loop. It multiplexes I/O
37+
readiness notifications from epoll with a completion queue for operations
38+
that finished synchronously or were cancelled.
39+
40+
Event Loop Structure (do_one)
41+
-----------------------------
42+
1. Check completion queue first (mutex-protected)
43+
2. If empty, call epoll_wait with calculated timeout
44+
3. Process timer expirations
45+
4. For each ready fd, claim the operation and perform I/O
46+
5. Push completed operations to completion queue
47+
6. Pop one and invoke its handler
48+
49+
The completion queue exists because handlers must run outside the epoll
50+
processing loop. This allows handlers to safely start new operations
51+
on the same fd without corrupting iteration state.
52+
53+
Wakeup Mechanism
54+
----------------
55+
An eventfd allows other threads (or cancel/post calls) to wake the
56+
event loop from epoll_wait. We distinguish wakeup events from I/O by
57+
storing nullptr in epoll_event.data.ptr for the eventfd.
58+
59+
Work Counting
60+
-------------
61+
outstanding_work_ tracks pending operations. When it hits zero, run()
62+
returns. This is how io_context knows there's nothing left to do.
63+
Each operation increments on start, decrements on completion.
64+
65+
Timer Integration
66+
-----------------
67+
Timers are handled by timer_service. The scheduler adjusts epoll_wait
68+
timeout to wake in time for the nearest timer expiry. When a new timer
69+
is scheduled earlier than current, timer_service calls wakeup() to
70+
re-evaluate the timeout.
71+
*/
72+
3273
namespace boost {
3374
namespace corosio {
3475
namespace detail {
@@ -84,7 +125,6 @@ epoll_scheduler(
84125
detail::throw_system_error(make_err(errn), "eventfd");
85126
}
86127

87-
// data.ptr = nullptr distinguishes wakeup events from I/O completions
88128
epoll_event ev{};
89129
ev.events = EPOLLIN;
90130
ev.data.ptr = nullptr;
@@ -119,16 +159,13 @@ shutdown()
119159
std::unique_lock lock(mutex_);
120160
shutdown_ = true;
121161

122-
// Drain all completed operations without invoking handlers
123162
while (auto* h = completed_ops_.pop())
124163
{
125164
lock.unlock();
126165
h->destroy();
127166
lock.lock();
128167
}
129168

130-
// Reset outstanding work count - any pending I/O operations
131-
// will be cleaned up when their owning objects are destroyed
132169
outstanding_work_.store(0, std::memory_order_release);
133170
}
134171

@@ -357,7 +394,6 @@ void
357394
epoll_scheduler::
358395
unregister_fd(int fd) const
359396
{
360-
// EPOLL_CTL_DEL ignores the event parameter (can be NULL on Linux 2.6.9+)
361397
::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
362398
}
363399

@@ -379,12 +415,10 @@ void
379415
epoll_scheduler::
380416
wakeup() const
381417
{
382-
// Write cannot fail: eventfd counter won't overflow with single increments
383418
std::uint64_t val = 1;
384419
[[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
385420
}
386421

387-
// RAII guard - work_finished called even if handler throws
388422
struct work_guard
389423
{
390424
epoll_scheduler const* self;
@@ -459,30 +493,30 @@ do_one(long timeout_us)
459493
{
460494
if (errno == EINTR)
461495
{
462-
// EINTR: retry for infinite waits, return for timed waits
463496
if (timeout_us < 0)
464497
continue;
465498
return 0;
466499
}
467500
detail::throw_system_error(make_err(errno), "epoll_wait");
468501
}
469502

470-
// May dispatch timer handlers inline
471503
timer_svc_->process_expired();
472504

473505
for (int i = 0; i < nfds; ++i)
474506
{
475507
if (events[i].data.ptr == nullptr)
476508
{
477-
// Drain eventfd; read cannot fail since epoll signaled readiness
478509
std::uint64_t val;
479510
[[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
480511
continue;
481512
}
482513

483514
auto* op = static_cast<epoll_op*>(events[i].data.ptr);
484515

485-
// One-shot: unregister before I/O
516+
bool was_registered = op->registered.exchange(false, std::memory_order_acq_rel);
517+
if (!was_registered)
518+
continue;
519+
486520
unregister_fd(op->fd);
487521

488522
if (events[i].events & (EPOLLERR | EPOLLHUP))
@@ -521,7 +555,6 @@ do_one(long timeout_us)
521555
return 1;
522556
}
523557

524-
// Finite timeout: return on timeout; infinite: keep looping
525558
if (timeout_us >= 0)
526559
return 0;
527560
}

0 commit comments

Comments
 (0)