Skip to content

Commit f17d881

Browse files
committed
feat(wait): add wait() for socket and acceptor readiness without I/O
Exposes an asio-style wait(wait_type) awaitable on tcp_socket, udp_socket, local_stream_socket, local_datagram_socket, tcp_acceptor, and local_stream_acceptor. The operation suspends until the descriptor is ready in the chosen direction (read / write / error) without transferring any bytes; useful for wrapping nonblocking C libraries (libssh, libpq async, etc.) that own their own recv/send and only need readiness notification. POSIX backends share a new reactor_wait_op<Base> template that parks in three new wait_*_op slots on reactor_descriptor_state. Dispatch in invoke_deferred_io completes the matching slot on each event bit without performing any I/O syscall. wait_type::write completes immediately on a connected socket on every backend, matching asio's IOCP contract. Corosio's reactor backends use edge-triggered EPOLLOUT/EVFILT_WRITE; parking would never fire on an already-writable socket, and backpressure is surfaced through write_some()'s return value as usual. On IOCP, stream-socket wait_read uses a zero-byte WSARecv. All other waits (datagram wait_read, acceptor wait_read, every wait_error) route through a new win_wait_reactor: a dedicated WSAPoll thread woken via a loopback Winsock socket pair, bridging readiness into the IOCP queue via PostQueuedCompletionStatus. The reactor is lazily constructed via std::call_once and stopped early in win_scheduler::shutdown() so parked ops do not block work-counter drain. socket.cancel() / close_socket() / shutdown all route through the same cancel_wait_if_constructed path so reactor-parked ops are cleaned up without forcing reactor construction on hot paths that never used it. Doxygen on every new public symbol; new Antora guide page (4r.wait.adoc) covers the readiness pattern, acceptor semantics, cancellation, and the wait_type::write immediate-ready contract. Tests parameterised across backends cover the no-consume promise on TCP, immediate write completion, local_stream wait_read (POSIX), UDP wait_read (exercises the aux reactor on IOCP), cancellation through the standard reactor (wait_read on TCP), cancellation while parked in the aux reactor (UDP wait_read), and acceptor wait_read. wait_type::error is plumbed but unexercised by tests; kernel error semantics are non-portable and the contract is documented as best-effort.
1 parent 6231492 commit f17d881

34 files changed

Lines changed: 2987 additions & 104 deletions

doc/modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
** xref:4.guide/4o.file-io.adoc[File I/O]
3939
** xref:4.guide/4p.unix-sockets.adoc[Unix Domain Sockets]
4040
** xref:4.guide/4q.udp.adoc[UDP Sockets]
41+
** xref:4.guide/4r.wait.adoc[Readiness Wait]
4142
* xref:5.testing/5.intro.adoc[Testing]
4243
** xref:5.testing/5a.mocket.adoc[Mock Sockets]
4344
** xref:5.testing/5b.socket-pair.adoc[Socket Pairs]
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
//
2+
// Copyright (c) 2026 Michael Vandeberg
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/corosio
8+
//
9+
10+
= Readiness Wait
11+
12+
The `wait()` method on every socket and acceptor suspends until the
13+
underlying file descriptor becomes ready in a chosen direction, without
14+
transferring any bytes. Use it to integrate with C libraries that own
15+
the I/O on a nonblocking file descriptor and only need notification
16+
that data is available or that the descriptor is writable.
17+
18+
[NOTE]
19+
====
20+
Code snippets assume:
21+
[source,cpp]
22+
----
23+
#include <boost/corosio/tcp_socket.hpp>
24+
#include <boost/corosio/wait_type.hpp>
25+
26+
namespace corosio = boost::corosio;
27+
----
28+
====
29+
30+
== Overview
31+
32+
Three directions are exposed via the `wait_type` enum:
33+
34+
[source,cpp]
35+
----
36+
enum class wait_type { read, write, error };
37+
----
38+
39+
The awaitable yields an `error_code` with no `bytes_transferred`. On
40+
success the socket is observed to be ready; no data has been consumed
41+
from it.
42+
43+
[source,cpp]
44+
----
45+
auto [ec] = co_await sock.wait(corosio::wait_type::read);
46+
if (!ec) {
47+
// sock is readable: a subsequent read_some will return data
48+
// without blocking.
49+
}
50+
----
51+
52+
== Wrapping a Nonblocking C API
53+
54+
The original motivation is libraries such as libssh and libpq that
55+
manage their own buffers on an `O_NONBLOCK` fd. They need a "tell me
56+
when the fd is ready" primitive that does not steal bytes from the
57+
stream.
58+
59+
The typical pattern:
60+
61+
[source,cpp]
62+
----
63+
// pq is some PG connection holding a nonblocking socket fd.
64+
corosio::tcp_socket sock = adopt_fd(ioc, PQsocket(pq));
65+
66+
while (PQisBusy(pq)) {
67+
auto [ec] = co_await sock.wait(corosio::wait_type::read);
68+
if (ec) co_return ec;
69+
if (PQconsumeInput(pq) == 0)
70+
co_return last_pq_error(pq);
71+
}
72+
----
73+
74+
Because `wait()` does not call `recv()`, the C library's next
75+
`PQconsumeInput` (or equivalent) sees all the data the kernel has
76+
delivered.
77+
78+
== Acceptors
79+
80+
`tcp_acceptor` and `local_stream_acceptor` expose the same `wait()`.
81+
For `wait_type::read`, completion signals that a connection is pending
82+
on the listen socket. A subsequent `accept()` will succeed without
83+
blocking:
84+
85+
[source,cpp]
86+
----
87+
auto [wec] = co_await acceptor.wait(corosio::wait_type::read);
88+
if (wec) co_return;
89+
90+
corosio::tcp_socket peer(ioc);
91+
auto [aec] = co_await acceptor.accept(peer);
92+
----
93+
94+
This is useful when application-level conditions must be checked
95+
before consuming the next connection (rate limiting, backpressure
96+
signaling) without holding an `accept()` call open.
97+
98+
== Cancellation
99+
100+
`wait()` honors the stop token of its `co_await` environment and the
101+
`socket.cancel()` / `acceptor.cancel()` non-virtuals, completing with
102+
`capy::cond::canceled`:
103+
104+
[source,cpp]
105+
----
106+
auto waiter = [&]() -> capy::task<> {
107+
auto [ec] = co_await sock.wait(corosio::wait_type::read);
108+
// ec == capy::cond::canceled if sock.cancel() was invoked
109+
};
110+
----
111+
112+
`cancel_after()` composes with `wait()` the same way it does with the
113+
other socket operations.
114+
115+
== `wait_type::write` Semantics
116+
117+
`wait(wait_type::write)` always completes immediately with success on
118+
a connected socket. This matches asio's behavior on the IOCP backend
119+
and gives a consistent contract across all corosio backends. The
120+
intended use is: "I want to know I can write now," not "I want to
121+
park until the send buffer drains after backpressure."
122+
123+
Backpressure on the send path is already surfaced by `write_some()`
124+
returning fewer bytes than requested (or `EAGAIN`-equivalent
125+
behavior); use that signal rather than `wait(wait_type::write)` to
126+
react to a full send buffer.
127+
128+
== Backend Notes
129+
130+
On Linux (epoll) and BSD/macOS (kqueue) the read and error waits
131+
register interest in the fd's read or error event without performing
132+
any I/O syscall. On the select backend the same registration
133+
semantics apply through the select-loop's fd sets. Write waits
134+
short-circuit and never enter the reactor (see above).
135+
136+
On Windows (IOCP), stream-socket `wait_read` uses a zero-byte
137+
`WSARecv`: the kernel signals completion when data is available
138+
without consuming bytes. All other waits (datagram-read,
139+
acceptor-read, error-wait) route through an auxiliary `WSAPoll`-based
140+
reactor that runs on a dedicated thread and bridges into the IOCP
141+
via `PostQueuedCompletionStatus`. The public API is uniform across
142+
platforms.
143+
144+
== See Also
145+
146+
* xref:4d.sockets.adoc[Sockets]
147+
* xref:4e.tcp-acceptor.adoc[Acceptors]
148+
* xref:4q.udp.adoc[UDP Sockets]

include/boost/corosio/local_datagram_socket.hpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <boost/corosio/local_datagram.hpp>
2323
#include <boost/corosio/message_flags.hpp>
2424
#include <boost/corosio/shutdown_type.hpp>
25+
#include <boost/corosio/wait_type.hpp>
2526
#include <boost/capy/ex/executor_ref.hpp>
2627
#include <boost/capy/ex/execution_context.hpp>
2728
#include <boost/capy/ex/io_env.hpp>
@@ -204,6 +205,27 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object
204205
std::error_code* ec,
205206
std::size_t* bytes_out) = 0;
206207

208+
/** Initiate an asynchronous wait for socket readiness.
209+
210+
Completes when the socket becomes ready for the
211+
specified direction, or an error condition is
212+
reported. No bytes are transferred.
213+
214+
@param h Coroutine handle to resume on completion.
215+
@param ex Executor for dispatching the completion.
216+
@param w The direction to wait on.
217+
@param token Stop token for cancellation.
218+
@param ec Output error code.
219+
220+
@return Coroutine handle to resume immediately.
221+
*/
222+
virtual std::coroutine_handle<> wait(
223+
std::coroutine_handle<> h,
224+
capy::executor_ref ex,
225+
wait_type w,
226+
std::stop_token token,
227+
std::error_code* ec) = 0;
228+
207229
/// Shut down part or all of the socket.
208230
virtual std::error_code shutdown(shutdown_type what) noexcept = 0;
209231

@@ -346,6 +368,23 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object
346368
}
347369
};
348370

371+
/// Represent the awaitable returned by @ref wait.
372+
struct wait_awaitable
373+
: detail::void_op_base<wait_awaitable>
374+
{
375+
local_datagram_socket& s_;
376+
wait_type w_;
377+
378+
wait_awaitable(local_datagram_socket& s, wait_type w) noexcept
379+
: s_(s), w_(w) {}
380+
381+
std::coroutine_handle<> dispatch(
382+
std::coroutine_handle<> h, capy::executor_ref ex) const
383+
{
384+
return s_.get().wait(h, ex, w_, token_, &ec_);
385+
}
386+
};
387+
349388
/** Represent the awaitable returned by @ref send.
350389
351390
Captures the buffer, then dispatches to the backend
@@ -528,6 +567,25 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object
528567
return connect_awaitable(*this, ep);
529568
}
530569

570+
/** Wait for the socket to become ready in a given direction.
571+
572+
Suspends until the socket is ready for the requested
573+
direction, or an error condition is reported. No bytes
574+
are transferred.
575+
576+
@param w The wait direction (read, write, or error).
577+
578+
@return An awaitable that completes with `io_result<>`.
579+
580+
@par Preconditions
581+
The socket must be open. This socket must outlive the
582+
returned awaitable.
583+
*/
584+
[[nodiscard]] auto wait(wait_type w)
585+
{
586+
return wait_awaitable(*this, w);
587+
}
588+
531589
/** Send a datagram to the specified destination.
532590
533591
Completes when the entire datagram has been accepted

include/boost/corosio/local_stream_acceptor.hpp

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
#include <boost/corosio/detail/config.hpp>
1414
#include <boost/corosio/detail/except.hpp>
15+
#include <boost/corosio/detail/op_base.hpp>
16+
#include <boost/corosio/wait_type.hpp>
1517
#include <boost/corosio/io/io_object.hpp>
1618
#include <boost/capy/io_result.hpp>
1719
#include <boost/corosio/local_endpoint.hpp>
@@ -75,6 +77,22 @@ enum class bind_option
7577
*/
7678
class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
7779
{
80+
struct wait_awaitable
81+
: detail::void_op_base<wait_awaitable>
82+
{
83+
local_stream_acceptor& acc_;
84+
wait_type w_;
85+
86+
wait_awaitable(local_stream_acceptor& acc, wait_type w) noexcept
87+
: acc_(acc), w_(w) {}
88+
89+
std::coroutine_handle<> dispatch(
90+
std::coroutine_handle<> h, capy::executor_ref ex) const
91+
{
92+
return acc_.get().wait(h, ex, w_, token_, &ec_);
93+
}
94+
};
95+
7896
struct move_accept_awaitable
7997
{
8098
local_stream_acceptor& acc_;
@@ -301,6 +319,27 @@ class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
301319
return accept_awaitable(*this, peer);
302320
}
303321

322+
/** Wait for an incoming connection or readiness condition.
323+
324+
Suspends until the listen socket is ready in the
325+
requested direction. For `wait_type::read`, completion
326+
signals that a subsequent @ref accept will succeed
327+
without blocking. No connection is consumed.
328+
329+
@param w The wait direction.
330+
331+
@return An awaitable that completes with `io_result<>`.
332+
333+
@par Preconditions
334+
The acceptor must be listening.
335+
*/
336+
[[nodiscard]] auto wait(wait_type w)
337+
{
338+
if (!is_open())
339+
detail::throw_logic_error("wait: acceptor not listening");
340+
return wait_awaitable(*this, w);
341+
}
342+
304343
/** Initiate an asynchronous accept, returning the socket.
305344
306345
Completes when a new connection is available. Only one
@@ -433,6 +472,18 @@ class BOOST_COROSIO_DECL local_stream_acceptor : public io_object
433472
std::error_code*,
434473
io_object::implementation**) = 0;
435474

475+
/** Initiate an asynchronous wait for acceptor readiness.
476+
477+
Completes when the listen socket becomes ready for
478+
the specified direction. No connection is consumed.
479+
*/
480+
virtual std::coroutine_handle<> wait(
481+
std::coroutine_handle<> h,
482+
capy::executor_ref ex,
483+
wait_type w,
484+
std::stop_token token,
485+
std::error_code* ec) = 0;
486+
436487
/// Return the cached local endpoint.
437488
virtual corosio::local_endpoint local_endpoint() const noexcept = 0;
438489

0 commit comments

Comments
 (0)