Skip to content

Commit 695d2fa

Browse files
committed
Add quitter<T> stop-aware task type
quitter<T> is a coroutine task type identical to task<T> except it provides transparent cancellation. When the stop token is triggered, transform_awaiter::await_resume intercepts the stop before the coroutine body sees the result, throwing a sentinel exception that unwinds through RAII destructors to final_suspend. The coroutine author writes zero cancellation-handling code. New files: - quitter.hpp: quitter<T> class template - detail/stop_requested_exception.hpp: sentinel exception type - test/unit/quitter.cpp: 17 tests covering normal completion, exception propagation, stop interception at initial_suspend and during I/O, chain propagation, RAII verification, when_all/when_any integration with quitter children, timer cancellation, echo server shutdown, task/quitter mixing, and move semantics - example/quitter-shutdown: 4 concurrent workers doing simulated I/O, Ctrl+C stops all workers, RAII cleanup runs, shutdown latency printed (~100us) Modified: - run_async.hpp: fix task frame leak in make_trampoline by moving destroy before invoke (prevents leak when invoke throws) - capy.hpp: add quitter.hpp include
1 parent 721b47e commit 695d2fa

8 files changed

Lines changed: 1404 additions & 2 deletions

File tree

example/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_subdirectory(mock-stream-testing)
1717
add_subdirectory(parallel-fetch)
1818
add_subdirectory(parallel-tasks)
1919
add_subdirectory(producer-consumer)
20+
add_subdirectory(quitter-shutdown)
2021
add_subdirectory(strand-serialization)
2122
add_subdirectory(stream-pipeline)
2223
add_subdirectory(timeout-cancellation)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright (c) 2026 Michael Vandeberg
3+
#
4+
# Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
#
7+
# Official repository: https://github.com/cppalliance/capy
8+
#
9+
10+
file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS *.cpp *.hpp
11+
CMakeLists.txt
12+
Jamfile)
13+
14+
source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES})
15+
16+
add_executable(capy_example_quitter_shutdown ${PFILES})
17+
18+
set_property(TARGET capy_example_quitter_shutdown
19+
PROPERTY FOLDER "examples")
20+
21+
target_link_libraries(capy_example_quitter_shutdown
22+
Boost::capy)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
//
2+
// Copyright (c) 2026 Michael Vandeberg
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
/* Quitter Shutdown Example
11+
12+
Demonstrates quitter<T> for responsive application shutdown.
13+
14+
Four workers simulate a batch file-processing pipeline: each
15+
"downloads" data (delay), "transforms" it, and "writes" the
16+
result (delay). Workers are quitter<> coroutines — their
17+
bodies contain zero cancellation-handling code.
18+
19+
Press Ctrl+C to request shutdown. Every in-flight worker
20+
exits at its next co_await, RAII cleanup runs (each worker
21+
holds a resource_guard that logs its cleanup), and the
22+
application prints a summary and exits.
23+
24+
Contrast with task<>:
25+
With task<>, every co_await that touches I/O needs:
26+
auto [ec] = co_await delay(dur);
27+
if(ec) co_return; // <-- cancellation boilerplate
28+
This is repeated at every suspension point.
29+
30+
With quitter<>, the promise intercepts the stop token
31+
automatically. The worker body is pure business logic.
32+
*/
33+
34+
#include <boost/capy.hpp>
35+
36+
#include <atomic>
37+
#include <chrono>
38+
#include <csignal>
39+
#include <iostream>
40+
#include <latch>
41+
#include <sstream>
42+
#include <stop_token>
43+
44+
namespace capy = boost::capy;
45+
using namespace std::chrono_literals;
46+
47+
// Global stop source wired to Ctrl+C.
48+
static std::stop_source g_stop;
49+
static std::atomic<std::chrono::steady_clock::time_point>
50+
g_stop_time{std::chrono::steady_clock::time_point{}};
51+
52+
extern "C" void signal_handler(int)
53+
{
54+
g_stop_time.store(std::chrono::steady_clock::now(),
55+
std::memory_order_relaxed);
56+
g_stop.request_stop();
57+
}
58+
59+
// RAII resource that logs construction and destruction.
60+
// Simulates holding a file handle, socket, or temp buffer
61+
// that must be released on shutdown.
62+
struct resource_guard
63+
{
64+
int id;
65+
std::atomic<int>& cleanup_count;
66+
67+
resource_guard(int id_, std::atomic<int>& count)
68+
: id(id_)
69+
, cleanup_count(count)
70+
{
71+
std::ostringstream oss;
72+
oss << " [worker " << id << "] acquired resources\n";
73+
std::cout << oss.str();
74+
}
75+
76+
~resource_guard()
77+
{
78+
++cleanup_count;
79+
std::ostringstream oss;
80+
oss << " [worker " << id << "] released resources "
81+
<< "(cleanup)\n";
82+
std::cout << oss.str();
83+
}
84+
85+
resource_guard(resource_guard const&) = delete;
86+
resource_guard& operator=(resource_guard const&) = delete;
87+
};
88+
89+
// A single worker: download → transform → write, repeated.
90+
// No cancellation code. quitter handles it.
91+
capy::quitter<> worker(
92+
int id,
93+
std::atomic<int>& items_processed,
94+
std::atomic<int>& cleanup_count)
95+
{
96+
resource_guard guard(id, cleanup_count);
97+
98+
for(int item = 0; ; ++item)
99+
{
100+
// Simulate download (200-400ms depending on worker)
101+
auto download_time = 200ms + 50ms * id;
102+
(void) co_await capy::delay(download_time);
103+
104+
// Simulate transform (CPU work — no co_await needed)
105+
{
106+
std::ostringstream oss;
107+
oss << " [worker " << id << "] processing item "
108+
<< item << "\n";
109+
std::cout << oss.str();
110+
}
111+
112+
// Simulate write (100ms)
113+
(void) co_await capy::delay(100ms);
114+
115+
++items_processed;
116+
}
117+
118+
// Never reached — the loop is infinite.
119+
// quitter exits at the next co_await after stop is requested.
120+
}
121+
122+
int main()
123+
{
124+
std::signal(SIGINT, signal_handler);
125+
#ifdef SIGTERM
126+
std::signal(SIGTERM, signal_handler);
127+
#endif
128+
129+
constexpr int num_workers = 4;
130+
capy::thread_pool pool(num_workers);
131+
std::latch done(num_workers);
132+
133+
std::atomic<int> items_processed{0};
134+
std::atomic<int> cleanup_count{0};
135+
136+
std::cout << "Starting " << num_workers
137+
<< " workers. Press Ctrl+C to quit.\n\n";
138+
139+
for(int i = 0; i < num_workers; ++i)
140+
{
141+
capy::run_async(
142+
pool.get_executor(),
143+
g_stop.get_token(),
144+
[&]() { done.count_down(); },
145+
[&](std::exception_ptr) { done.count_down(); })(
146+
worker(i, items_processed, cleanup_count));
147+
}
148+
149+
done.wait();
150+
151+
auto stop_at = g_stop_time.load(std::memory_order_relaxed);
152+
auto now = std::chrono::steady_clock::now();
153+
154+
std::cout << "\nShutdown complete.\n"
155+
<< " Items processed: " << items_processed << "\n"
156+
<< " Workers cleaned up: " << cleanup_count
157+
<< "/" << num_workers << "\n";
158+
159+
if(stop_at != std::chrono::steady_clock::time_point{})
160+
{
161+
auto us = std::chrono::duration_cast<
162+
std::chrono::microseconds>(now - stop_at).count();
163+
std::cout << " Shutdown latency: " << us << " us\n";
164+
}
165+
}

include/boost/capy.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <boost/capy/error.hpp>
2323
#include <boost/capy/io_result.hpp>
2424
#include <boost/capy/io_task.hpp>
25+
#include <boost/capy/quitter.hpp>
2526
#include <boost/capy/task.hpp>
2627

2728
// Algorithms
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
//
2+
// Copyright (c) 2026 Michael Vandeberg
3+
//
4+
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5+
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6+
//
7+
// Official repository: https://github.com/cppalliance/capy
8+
//
9+
10+
#ifndef BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP
11+
#define BOOST_CAPY_DETAIL_STOP_REQUESTED_EXCEPTION_HPP
12+
13+
namespace boost {
14+
namespace capy {
15+
namespace detail {
16+
17+
/* Lightweight sentinel thrown inside quitter<T> when the stop token
18+
is triggered. Not derived from std::exception. Never escapes the
19+
coroutine — unhandled_exception() catches it and sets the stopped
20+
flag. The cost is one throw+catch per cancellation per coroutine
21+
lifetime. */
22+
struct stop_requested_exception {};
23+
24+
} // namespace detail
25+
} // namespace capy
26+
} // namespace boost
27+
28+
#endif

include/boost/capy/ex/run_async.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,16 @@ make_trampoline(Ex, Handlers, Alloc)
283283
// promise_type ctor steals the parameters
284284
auto& p = co_await get_promise_awaiter<
285285
typename run_async_trampoline<Ex, Handlers, Alloc>::promise_type>{};
286-
286+
287+
// Guard ensures the task frame is destroyed even when invoke_
288+
// throws (e.g. default_handler rethrows an unhandled exception).
289+
struct frame_guard
290+
{
291+
std::coroutine_handle<>& h;
292+
~frame_guard() { h.destroy(); }
293+
} guard{p.task_h_};
294+
287295
p.invoke_(p.task_promise_, p.handlers_);
288-
p.task_h_.destroy();
289296
}
290297

291298
} // namespace detail

0 commit comments

Comments
 (0)