Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/boost/corosio/detail/except.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,33 @@

namespace boost::corosio::detail {

/// Throw `std::logic_error` with a default message.
[[noreturn]] BOOST_COROSIO_DECL void throw_logic_error();

/** Throw `std::logic_error` with the given message.

@param what Null-terminated message string.

@throws std::logic_error Always.
*/
[[noreturn]] BOOST_COROSIO_DECL void throw_logic_error(char const* what);

/** Throw `std::system_error` from @p ec.

@param ec Error code used to construct the exception.

@throws std::system_error Always.
*/
[[noreturn]] BOOST_COROSIO_DECL void
throw_system_error(std::error_code const& ec);

/** Throw `std::system_error` from @p ec with the given context.

@param ec Error code used to construct the exception.
@param what Null-terminated context string.

@throws std::system_error Always.
*/
[[noreturn]] BOOST_COROSIO_DECL void
throw_system_error(std::error_code const& ec, char const* what);

Expand Down
59 changes: 48 additions & 11 deletions include/boost/corosio/detail/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,61 @@ namespace boost::corosio::detail {

class scheduler_op;

/** Define the abstract interface for the event loop scheduler.

Concrete backends (epoll, IOCP, kqueue, select) derive from
this to implement the reactor/proactor event loop. The
@ref io_context delegates all scheduling operations here.

@see io_context, native_scheduler
*/
struct BOOST_COROSIO_DECL scheduler
{
virtual ~scheduler() = default;
virtual ~scheduler() = default;

/// Post a coroutine handle for deferred execution.
virtual void post(std::coroutine_handle<>) const = 0;
virtual void post(scheduler_op*) const = 0;

virtual void work_started() noexcept = 0;
/// Post a scheduler operation for deferred execution.
virtual void post(scheduler_op*) const = 0;

/// Increment the outstanding work count.
virtual void work_started() noexcept = 0;

/// Decrement the outstanding work count.
virtual void work_finished() noexcept = 0;

/// Check if the calling thread is running the event loop.
virtual bool running_in_this_thread() const noexcept = 0;
virtual void stop() = 0;
virtual bool stopped() const noexcept = 0;
virtual void restart() = 0;
virtual std::size_t run() = 0;
virtual std::size_t run_one() = 0;
virtual std::size_t wait_one(long usec) = 0;
virtual std::size_t poll() = 0;
virtual std::size_t poll_one() = 0;

/// Signal the event loop to stop.
virtual void stop() = 0;

/// Check if the event loop has been stopped.
virtual bool stopped() const noexcept = 0;

/// Reset the stopped state so `run()` can be called again.
virtual void restart() = 0;

/// Run the event loop, blocking until all work completes.
virtual std::size_t run() = 0;

/// Run one handler, blocking until one completes.
virtual std::size_t run_one() = 0;

/** Run one handler, blocking up to @p usec microseconds.

@param usec Maximum wait time in microseconds.

@return The number of handlers executed (0 or 1).
*/
virtual std::size_t wait_one(long usec) = 0;

/// Run all ready handlers without blocking.
virtual std::size_t poll() = 0;

/// Run at most one ready handler without blocking.
virtual std::size_t poll_one() = 0;
};

} // namespace boost::corosio::detail
Expand Down
36 changes: 36 additions & 0 deletions include/boost/corosio/detail/timer_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,26 @@ class BOOST_COROSIO_DECL timer_service final
using clock_type = std::chrono::steady_clock;
using time_point = clock_type::time_point;

/// Type-erased callback for earliest-expiry-changed notifications.
class callback
{
void* ctx_ = nullptr;
void (*fn_)(void*) = nullptr;

public:
/// Construct an empty callback.
callback() = default;

/// Construct a callback with the given context and function.
callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}

/// Return true if the callback is non-empty.
explicit operator bool() const noexcept
{
return fn_ != nullptr;
}

/// Invoke the callback.
void operator()() const
{
if (fn_)
Expand Down Expand Up @@ -126,49 +133,78 @@ class BOOST_COROSIO_DECL timer_service final
(std::numeric_limits<std::int64_t>::max)()};

public:
/// Construct the timer service bound to a scheduler.
inline timer_service(capy::execution_context&, scheduler& sched)
: sched_(&sched)
{
}

/// Return the associated scheduler.
inline scheduler& get_scheduler() noexcept
{
return *sched_;
}

/// Destroy the timer service.
~timer_service() override = default;

timer_service(timer_service const&) = delete;
timer_service& operator=(timer_service const&) = delete;

/// Register a callback invoked when the earliest expiry changes.
inline void set_on_earliest_changed(callback cb)
{
on_earliest_changed_ = cb;
}

/// Return true if no timers are in the heap.
inline bool empty() const noexcept
{
return cached_nearest_ns_.load(std::memory_order_acquire) ==
(std::numeric_limits<std::int64_t>::max)();
}

/// Return the nearest timer expiry without acquiring the mutex.
inline time_point nearest_expiry() const noexcept
{
auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
return time_point(time_point::duration(ns));
}

/// Cancel all pending timers and free cached resources.
inline void shutdown() override;

/// Construct a new timer implementation.
inline io_object::implementation* construct() override;

/// Destroy a timer implementation, cancelling pending waiters.
inline void destroy(io_object::implementation* p) override;

/// Cancel and recycle a timer implementation.
inline void destroy_impl(implementation& impl);

/// Create or recycle a waiter node.
inline waiter_node* create_waiter();

/// Return a waiter node to the cache or free list.
inline void destroy_waiter(waiter_node* w);

/// Update the timer expiry, cancelling existing waiters.
inline std::size_t update_timer(implementation& impl, time_point new_time);

/// Insert a waiter into the timer's waiter list and the heap.
inline void insert_waiter(implementation& impl, waiter_node* w);

/// Cancel all waiters on a timer.
inline std::size_t cancel_timer(implementation& impl);

/// Cancel a single waiter ( stop_token callback path ).
inline void cancel_waiter(waiter_node* w);

/// Cancel one waiter on a timer.
inline std::size_t cancel_one_waiter(implementation& impl);

/// Complete all waiters whose timers have expired.
inline std::size_t process_expired();

private:
Expand Down
1 change: 1 addition & 0 deletions include/boost/corosio/io/io_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class BOOST_COROSIO_DECL io_object
io_object(io_object const&) = delete;
io_object& operator=(io_object const&) = delete;

/// The platform I/O handle owned by this object.
handle h_;
};

Expand Down
30 changes: 25 additions & 5 deletions include/boost/corosio/io/io_signal_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,35 @@ class BOOST_COROSIO_DECL io_signal_set : public io_object
};

public:
/** Define backend hooks for signal set wait and cancel.

Platform backends derive from this to implement
signal delivery notification.
*/
struct implementation : io_object::implementation
{
/** Initiate an asynchronous wait for a signal.

@param h Coroutine handle to resume on completion.
@param ex Executor for dispatching the completion.
@param token Stop token for cancellation.
@param ec Output error code.
@param signo Output signal number.

@return Coroutine handle to resume immediately.
*/
virtual std::coroutine_handle<> wait(
std::coroutine_handle<>,
capy::executor_ref,
std::stop_token,
std::error_code*,
int*) = 0;
std::coroutine_handle<> h,
capy::executor_ref ex,
std::stop_token token,
std::error_code* ec,
int* signo) = 0;

/** Cancel all pending wait operations.

Cancelled waiters complete with an error that
compares equal to `capy::cond::canceled`.
*/
virtual void cancel() = 0;
};

Expand Down
15 changes: 14 additions & 1 deletion include/boost/corosio/io/io_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,28 @@ class BOOST_COROSIO_DECL io_timer : public io_object
};

public:
/** Backend interface for timer wait operations.

Holds per-timer state (expiry, heap position) and provides
the virtual `wait` entry point that concrete timer services
override.
*/
struct implementation : io_object::implementation
{
/// Sentinel value indicating the timer is not in the heap.
static constexpr std::size_t npos =
(std::numeric_limits<std::size_t>::max)();

/// The absolute expiry time point.
std::chrono::steady_clock::time_point expiry_{};
std::size_t heap_index_ = npos;

/// Index in the timer service's min-heap, or `npos`.
std::size_t heap_index_ = npos;

/// True if `wait()` has been called since last cancel.
bool might_have_pending_waits_ = false;

/// Initiate an asynchronous wait for the timer to expire.
virtual std::coroutine_handle<> wait(
std::coroutine_handle<>,
capy::executor_ref,
Expand Down
7 changes: 7 additions & 0 deletions include/boost/corosio/native/native.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
// Official repository: https://github.com/cppalliance/corosio
//

/** @file native.hpp

Include all native (devirtualized) public headers:
I/O context, sockets, acceptor, resolver, signal set,
timer, and cancellation helpers.
*/

#ifndef BOOST_COROSIO_NATIVE_NATIVE_HPP
#define BOOST_COROSIO_NATIVE_NATIVE_HPP

Expand Down
11 changes: 9 additions & 2 deletions include/boost/corosio/native/native_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ namespace boost::corosio::detail {

class timer_service;

// Intermediary between public scheduler and concrete backends,
// holds cached service pointers behind the compilation firewall
/** Cache service pointers for native backend schedulers.

Sits between @ref scheduler and the concrete backend schedulers,
storing service pointers that would otherwise require a virtual
call or service lookup on every timer operation.

@see scheduler
*/
struct native_scheduler : scheduler
{
/// Store the timer service pointer, set during construction.
timer_service* timer_svc_ = nullptr;
};

Expand Down
Loading
Loading