diff --git a/.codespellrc b/.codespellrc index c6cf5dfe0..86730201c 100644 --- a/.codespellrc +++ b/.codespellrc @@ -1,5 +1,5 @@ [codespell] -builtin = clear,rare,en-GB_to_en-US,names,informal,code +builtin = clear,rare,names,informal,code check-filenames = check-hidden = ignore-words-list = deque,warmup,stdio,copyable,combinate diff --git a/AGENTS.md b/AGENTS.md index 4cbaa5deb..d25a557c3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -103,34 +103,6 @@ All tests should pass. If tests fail, check that: - Build completed without errors - Any changes you have made are correct -## Linting & Validation - -The CI runs two linting tools that you should run before committing: - -### codespell (spelling) - -```bash -codespell -``` - -Config: `.codespellrc` (ignores: build/, .git/, etc.) -Should produce no output if passing. - -### clang-format (code formatting) - -```bash -find src include test benchmark/src -name "*.cpp" -o -name "*.hpp" -o -name "*.cxx" | xargs clang-format --dry-run --Werror -``` - -Config: `.clang-format` (110 column limit, specific style) -Should produce no output if passing. - -**To auto-fix formatting**: - -```bash -find src include test benchmark/src -name "*.cpp" -o -name "*.hpp" -o -name "*.cxx" | xargs clang-format -i -``` - ## Project Structure ### Source Layout @@ -150,10 +122,9 @@ libfork/ │ ├── batteries/ # libfork.batteries — stacks, contexts, adaptors │ │ ├── batteries.cxx # aggregator │ │ └── *.cxx # :partitions -│ ├── schedulers/ # libfork.schedulers — concrete schedulers +│ └── schedulers/ # libfork.schedulers — concrete schedulers │ │ ├── schedulers.cxx # aggregator │ │ └── *.cxx # :partitions -│ └── exception.cpp # terminate_with() implementation ├── test/src/**/ # Test suite (Catch2) — uses `import libfork;` │ └── *.cpp ├── benchmark/src/ # Benchmarking suite (google-benchmark) @@ -189,7 +160,6 @@ All workflows follow this pattern: 1. **Modify source files** in `src/`, `include/`, `test/`, or `benchmark/` 2. **Rebuild**: `cmake --build --preset ` 3. **Test**: `ctest --preset ` -4. **Lint**: Run codespell and clang-format checks #### Adding/removing files from `src/` or `include/` @@ -230,11 +200,3 @@ rm -rf build/ **Problem**: "Could not automatically find libc++.modules.json" **Solution**: Ensure LLVM is installed via Homebrew; toolchain auto-detects the path - -### Linting Failures - -**Problem**: clang-format errors -**Solution**: Run fix command above to auto-format code - -**Problem**: codespell errors -**Solution**: Fix typos or add to ignore list in `.codespellrc` if false positive diff --git a/CMakeLists.txt b/CMakeLists.txt index 6edf24670..c26eaa3e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,7 +31,7 @@ set(CMAKE_CXX_MODULE_STD 1) add_library(libfork_libfork) add_library(libfork::libfork ALIAS libfork_libfork) -# target_link_libraries(libfork_libfork PRIVATE Threads::Threads) +target_link_libraries(libfork_libfork PUBLIC Threads::Threads) set_property(TARGET libfork_libfork PROPERTY EXPORT_NAME libfork) @@ -84,6 +84,7 @@ target_sources(libfork_libfork src/core/execute.cxx src/core/receiver.cxx src/core/promise.cxx + src/core/stop.cxx # libfork.batteries src/batteries/batteries.cxx src/batteries/deque.cxx diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 95d495b79..6a0f6bcf6 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 4.2.1 FATAL_ERROR) -project(libfork_benchmark LANGUAGES C CXX) +project(libfork_benchmark LANGUAGES CXX) if(NOT CMAKE_BUILD_TYPE STREQUAL "Release") message(WARNING "It is recommended to build benchmarks in Release mode for accurate results.") @@ -17,7 +17,8 @@ target_link_libraries(libfork_benchmark benchmark::benchmark_main ) -# Common headers +# Common components + target_sources(libfork_benchmark PRIVATE FILE_SET HEADERS FILES @@ -28,18 +29,13 @@ target_sources(libfork_benchmark src ) -# Common sources target_sources(libfork_benchmark PRIVATE src/libfork_benchmark/uts/uts.cpp ) - # C lib for UTS -add_library(uts_c OBJECT - src/libfork_benchmark/uts/external/uts.c - src/libfork_benchmark/uts/external/rng/brg_sha1.c -) +add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/external/uts external/uts) target_link_libraries(libfork_benchmark PRIVATE uts_c) diff --git a/benchmark/external/uts/CMakeLists.txt b/benchmark/external/uts/CMakeLists.txt new file mode 100644 index 000000000..89c8b5e6e --- /dev/null +++ b/benchmark/external/uts/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 4.2.1 FATAL_ERROR) + +project(uts_external LANGUAGES C) + +add_library(uts_c) + +target_sources(uts_c + PRIVATE + src/uts.c + src/rng/brg_sha1.c + PUBLIC + FILE_SET HEADERS + BASE_DIRS ${CMAKE_CURRENT_SOURCE_DIR}/include + FILES + include/uts/uts.h + include/uts/rng/rng.h + include/uts/rng/brg_sha1.h + include/uts/rng/brg_types.h +) diff --git a/benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.h b/benchmark/external/uts/include/uts/rng/brg_sha1.h similarity index 99% rename from benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.h rename to benchmark/external/uts/include/uts/rng/brg_sha1.h index b01a9f5d8..d30f12c0d 100644 --- a/benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.h +++ b/benchmark/external/uts/include/uts/rng/brg_sha1.h @@ -33,7 +33,7 @@ #ifndef _SHA1_H #define _SHA1_H -#include "brg_types.h" +#include "uts/rng/brg_types.h" #define SHA1_BLOCK_SIZE 64 #define SHA1_DIGEST_SIZE 20 diff --git a/benchmark/src/libfork_benchmark/uts/external/rng/brg_types.h b/benchmark/external/uts/include/uts/rng/brg_types.h similarity index 100% rename from benchmark/src/libfork_benchmark/uts/external/rng/brg_types.h rename to benchmark/external/uts/include/uts/rng/brg_types.h diff --git a/benchmark/src/libfork_benchmark/uts/external/rng/rng.h b/benchmark/external/uts/include/uts/rng/rng.h similarity index 62% rename from benchmark/src/libfork_benchmark/uts/external/rng/rng.h rename to benchmark/external/uts/include/uts/rng/rng.h index 30d99d819..105c40466 100644 --- a/benchmark/src/libfork_benchmark/uts/external/rng/rng.h +++ b/benchmark/external/uts/include/uts/rng/rng.h @@ -1,6 +1,6 @@ #ifndef _RNG_H #define _RNG_H -#include "brg_sha1.h" +#include "uts/rng/brg_sha1.h" #endif /* _RNG_H */ \ No newline at end of file diff --git a/benchmark/src/libfork_benchmark/uts/external/uts.h b/benchmark/external/uts/include/uts/uts.h similarity index 99% rename from benchmark/src/libfork_benchmark/uts/external/uts.h rename to benchmark/external/uts/include/uts/uts.h index e7c8adee1..e86e68f3e 100644 --- a/benchmark/src/libfork_benchmark/uts/external/uts.h +++ b/benchmark/external/uts/include/uts/uts.h @@ -22,7 +22,7 @@ extern "C" { #endif - #include "rng/rng.h" + #include "uts/rng/rng.h" #define UTS_VERSION "2.1" diff --git a/benchmark/src/libfork_benchmark/uts/external/rng/brg_endian.h b/benchmark/external/uts/src/rng/brg_endian.h similarity index 100% rename from benchmark/src/libfork_benchmark/uts/external/rng/brg_endian.h rename to benchmark/external/uts/src/rng/brg_endian.h diff --git a/benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.c b/benchmark/external/uts/src/rng/brg_sha1.c similarity index 99% rename from benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.c rename to benchmark/external/uts/src/rng/brg_sha1.c index 8c032f8bc..f6757bafc 100644 --- a/benchmark/src/libfork_benchmark/uts/external/rng/brg_sha1.c +++ b/benchmark/external/uts/src/rng/brg_sha1.c @@ -37,7 +37,7 @@ #include /* for memcpy() etc. */ #include "brg_endian.h" -#include "brg_sha1.h" +#include "uts/rng/brg_sha1.h" #if defined(__cplusplus) extern "C" { diff --git a/benchmark/src/libfork_benchmark/uts/external/uts.c b/benchmark/external/uts/src/uts.c similarity index 99% rename from benchmark/src/libfork_benchmark/uts/external/uts.c rename to benchmark/external/uts/src/uts.c index 7a110fcf8..507915bea 100644 --- a/benchmark/src/libfork_benchmark/uts/external/uts.c +++ b/benchmark/external/uts/src/uts.c @@ -19,7 +19,7 @@ #include #include -#include "uts.h" +#include "uts/uts.h" /*********************************************************** * tree generation and search parameters * diff --git a/benchmark/src/libfork_benchmark/common.hpp b/benchmark/src/libfork_benchmark/common.hpp index 1750cc226..1c6dcef61 100644 --- a/benchmark/src/libfork_benchmark/common.hpp +++ b/benchmark/src/libfork_benchmark/common.hpp @@ -12,7 +12,15 @@ struct incorrect_result : public std::runtime_error { using std::runtime_error::runtime_error; }; -inline constexpr unsigned bench_max_threads = 12; +inline void bench_thread_args(benchmark::Benchmark *bench, auto make_args) { + unsigned hw = std::max(1U, std::thread::hardware_concurrency()); + for (unsigned t : {1U, 2U, 4U, 6U, 8U, 12U, 16U, 24U, 32U, 48U, 64U, 96U}) { + if (t > hw) { + return; + } + make_args(bench, t); + } +} #define CHECK_RESULT(result, expected) \ do { \ diff --git a/benchmark/src/libfork_benchmark/fib/libfork.cpp b/benchmark/src/libfork_benchmark/fib/libfork.cpp index 7c3188589..e2a5d7948 100644 --- a/benchmark/src/libfork_benchmark/fib/libfork.cpp +++ b/benchmark/src/libfork_benchmark/fib/libfork.cpp @@ -22,12 +22,12 @@ struct fib { std::int64_t lhs = 0; std::int64_t rhs = 0; - using scope = lf::scope; + auto sc = co_await lf::scope(); - co_await scope::fork(&rhs, fib{}, n - 2); - co_await scope::call(&lhs, fib{}, n - 1); + co_await sc.fork(&rhs, fib{}, n - 2); + co_await sc.call(&lhs, fib{}, n - 1); - co_await lf::join(); + co_await sc.join(); co_return lhs + rhs; } @@ -41,6 +41,7 @@ void run(benchmark::State &state) { state.counters["n"] = static_cast(n); state.counters["p"] = static_cast(thread_count(state)); + state.SetComplexityN(static_cast(thread_count(state))); Sch scheduler = make_scheduler(state); @@ -89,9 +90,12 @@ BENCH_ALL(inline_scheduler, adapt_deque>>) BENCHMARK_TEMPLATE(run, __VA_ARGS__) \ ->Name(#mode "/libfork/fib/" #__VA_ARGS__) \ ->Apply([](benchmark::Benchmark *b) -> void { \ - for (unsigned t = 1; t <= bench_max_threads; ++t) { \ + bench_thread_args(b, [](benchmark::Benchmark *b, unsigned t) { \ b->Args({fib_##mode, static_cast(t)}); \ - } \ + }); \ + }) \ + ->Complexity([](benchmark::IterationCount n) -> double { \ + return 1.0 / static_cast(n); \ }) \ ->UseRealTime(); diff --git a/benchmark/src/libfork_benchmark/uts/libfork.cpp b/benchmark/src/libfork_benchmark/uts/libfork.cpp index af63a5db4..00a18e893 100644 --- a/benchmark/src/libfork_benchmark/uts/libfork.cpp +++ b/benchmark/src/libfork_benchmark/uts/libfork.cpp @@ -28,6 +28,8 @@ struct uts_fn { if (num_children > 0) { std::vector cs(static_cast(num_children)); + auto sc = co_await lf::scope(); + for (std::size_t i = 0; i < static_cast(num_children); ++i) { cs[i].child.type = child_type; cs[i].child.height = parent->height + 1; @@ -37,16 +39,14 @@ struct uts_fn { rng_spawn(parent->state.state, cs[i].child.state.state, static_cast(i)); } - using scope = lf::scope; - if (i + 1 == static_cast(num_children)) { - co_await scope::call(&cs[i].res, uts_fn{}, depth + 1, &cs[i].child); + co_await sc.call(&cs[i].res, uts_fn{}, depth + 1, &cs[i].child); } else { - co_await scope::fork(&cs[i].res, uts_fn{}, depth + 1, &cs[i].child); + co_await sc.fork(&cs[i].res, uts_fn{}, depth + 1, &cs[i].child); } } - co_await lf::join(); + co_await sc.join(); for (auto &&elem : cs) { r.maxdepth = std::max(r.maxdepth, elem.res.maxdepth); @@ -69,6 +69,7 @@ void run(benchmark::State &state) { auto expected = expected_result(tree); state.counters["p"] = static_cast(thread_count(state)); + state.SetComplexityN(static_cast(thread_count(state))); Sch scheduler = make_scheduler(state); @@ -90,9 +91,12 @@ void run(benchmark::State &state) { BENCHMARK_TEMPLATE(run, __VA_ARGS__) \ ->Name(#mode "/libfork/uts/" tree_name "/" #__VA_ARGS__) \ ->Apply([](benchmark::Benchmark *b) -> void { \ - for (unsigned t = 1; t <= bench_max_threads; ++t) { \ + bench_thread_args(b, [](benchmark::Benchmark *b, unsigned t) { \ b->Args({tree_id, static_cast(t)}); \ - } \ + }); \ + }) \ + ->Complexity([](benchmark::IterationCount n) -> double { \ + return 1.0 / static_cast(n); \ }) \ ->UseRealTime(); diff --git a/benchmark/src/libfork_benchmark/uts/uts.hpp b/benchmark/src/libfork_benchmark/uts/uts.hpp index a54039bb9..1e5e13596 100644 --- a/benchmark/src/libfork_benchmark/uts/uts.hpp +++ b/benchmark/src/libfork_benchmark/uts/uts.hpp @@ -2,7 +2,7 @@ // Include the C UTS library header first (it defines max/min macros that would // clash with std::max/std::min after import std). -#include "libfork_benchmark/uts/external/uts.h" +#include "uts/uts.h" #undef max #undef min diff --git a/src/core/core.cxx b/src/core/core.cxx index b97552d99..120ced6ca 100644 --- a/src/core/core.cxx +++ b/src/core/core.cxx @@ -22,3 +22,4 @@ export import :schedule; export import :root; export import :execute; export import :receiver; +export import :stop; diff --git a/src/core/frame.cxx b/src/core/frame.cxx index f9022dde6..71ec89343 100644 --- a/src/core/frame.cxx +++ b/src/core/frame.cxx @@ -7,20 +7,15 @@ import std; import libfork.utils; -namespace lf { -// =================== Cancellation =================== // - -struct cancellation { - cancellation *parent = nullptr; - std::atomic stop = 0; -}; +import :stop; -// =================== Frame =================== // +namespace lf { // TODO: remove this and other exports export enum class category : std::uint8_t { call = 0, fork, + root, }; export struct frame_base {}; @@ -38,7 +33,7 @@ struct frame_type : frame_base { uninitialized except; frame_type *parent; - cancellation *cancel; + stop_source::stop_token stop_token; [[no_unique_address]] Checkpoint stack_ckpt; @@ -56,16 +51,9 @@ struct frame_type : frame_base { constexpr frame_type(Checkpoint &&ckpt) noexcept : stack_ckpt(std::move(ckpt)) { joins = k_u16_max; } [[nodiscard]] - constexpr auto is_cancelled() const noexcept -> bool { - // TODO: Should exception trigger cancellation? - for (cancellation *ptr = cancel; ptr != nullptr; ptr = ptr->parent) { - // TODO: if users can't use cancellation outside of fork-join - // then this can be relaxed - if (ptr->stop.load(std::memory_order_acquire) == 1) { - return true; - } - } - return false; + constexpr auto stop_requested() const noexcept -> bool { + // TODO: Should exception trigger stop? + return stop_token.stop_requested(); } [[nodiscard]] diff --git a/src/core/ops.cxx b/src/core/ops.cxx index c178b1f2a..3ea05bccd 100644 --- a/src/core/ops.cxx +++ b/src/core/ops.cxx @@ -8,20 +8,20 @@ import libfork.utils; import :concepts_invocable; import :frame; +import :stop; namespace lf { +// Placeholder types for absent optional fields. +struct no_stop_t {}; +struct no_ret_t {}; + // clang-format off -template +template struct [[nodiscard("You should immediately co_await this!")]] pkg { - R *return_address; - [[no_unique_address]] Fn fn; - [[no_unique_address]] tuple args; -}; - -template -struct [[nodiscard("You should immediately co_await this!")]] pkg { + [[no_unique_address]] std::conditional_t stop_token; + [[no_unique_address]] std::conditional_t, no_ret_t, R *> return_addr; [[no_unique_address]] Fn fn; [[no_unique_address]] tuple args; }; @@ -46,56 +46,161 @@ constexpr auto fwd_fn(auto &&fn) noexcept -> Fn { } } -export template -struct scope { - private: - // Use && for fn/args for zero move/copy + noexcept - // TODO: Is it better to stores values for some types i.e. empty +// =============== Join =============== // + +struct join_type {}; +/** + * @brief Base class shared by scope_ops and child_scope_ops. + * + * Provides a member `join()` so that `co_await sc.join()` works on any scope type. + */ +struct scope_base { + [[nodiscard("You should immediately co_await this!")]] + static constexpr auto join() noexcept -> join_type { + return {}; + } +}; + +// =============== Scope ops (no embedded stop source) =============== // + +template +struct scope_ops : scope_base { + private: template - using call_pkg = pkg; + using call_pkg = pkg; template - using fork_pkg = pkg; + using fork_pkg = pkg; public: + // Default constructible + scope_ops() noexcept = default; + + // Immovable + scope_ops(const scope_ops &) = delete; + scope_ops(scope_ops &&) = delete; + auto operator=(const scope_ops &) -> scope_ops & = delete; + auto operator=(scope_ops &&) -> scope_ops & = delete; + + // === Fork === // + + template Fn> + static constexpr auto fork(R *ret, Fn &&fn, Args &&...args) noexcept -> fork_pkg { + return {.return_addr = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } template Fn> - static constexpr auto - fork(std::nullptr_t, Fn &&fn, Args &&...args) noexcept -> fork_pkg { - return {.fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + static constexpr auto fork_drop(Fn &&fn, Args &&...args) noexcept -> fork_pkg { + return {.return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } template Fn> static constexpr auto fork(Fn &&fn, Args &&...args) noexcept -> fork_pkg { - return {.fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + return {.return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } + + // === Call === // + template Fn> - static constexpr auto fork(R *ret, Fn &&fn, Args &&...args) noexcept -> fork_pkg { - return {.return_address = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + static constexpr auto call(R *ret, Fn &&fn, Args &&...args) noexcept -> call_pkg { + return {.return_addr = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } - template Fn> - static constexpr auto - call(std::nullptr_t, Fn &&fn, Args &&...args) noexcept -> call_pkg { - return {.fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + static constexpr auto call_drop(Fn &&fn, Args &&...args) noexcept -> call_pkg { + return {.return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } template Fn> static constexpr auto call(Fn &&fn, Args &&...args) noexcept -> call_pkg { - return {.fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + return {.return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } +}; + +// ==== Scope awaitable ==== // + +template +struct scope_awaitable : std::suspend_never { + static constexpr auto await_resume() -> scope_ops { return {}; } +}; + +struct scope_type {}; + +export [[nodiscard("You should immediately co_await this!")]] +constexpr auto scope() noexcept -> scope_type { + return {}; +} + +// =============== Child scope ops (with embedded stop source) =============== // + +/** + * @brief A scope that is a stop_source. + */ +template +struct child_scope_ops : scope_base, stop_source { + private: + template + using call_pkg = pkg; + + template + using fork_pkg = pkg; + + public: + /** + * @brief Construct the scope, chaining its stop source onto the parent's token. + */ + explicit constexpr child_scope_ops(stop_source::stop_token parent) noexcept : stop_source(parent) {} + + // Immovable (stop_source base is immovable) + child_scope_ops(const child_scope_ops &) = delete; + child_scope_ops(child_scope_ops &&) = delete; + auto operator=(const child_scope_ops &) -> child_scope_ops & = delete; + auto operator=(child_scope_ops &&) -> child_scope_ops & = delete; + + // === Fork (binds this scope's stop source as child stop source) === // + template Fn> - static constexpr auto call(R *ret, Fn &&fn, Args &&...args) noexcept -> call_pkg { - return {.return_address = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + constexpr auto fork(R *ret, Fn &&fn, Args &&...args) noexcept -> fork_pkg { + return {.stop_token = token(), .return_addr = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } + template Fn> + constexpr auto fork_drop(Fn &&fn, Args &&...args) noexcept -> fork_pkg { + return {.stop_token = token(), .return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } + template Fn> + constexpr auto fork(Fn &&fn, Args &&...args) noexcept -> fork_pkg { + return {.stop_token = token(), .return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } + + // === Call (binds this scope's stop source as child stop source) === // + + template Fn> + constexpr auto call(R *ret, Fn &&fn, Args &&...args) noexcept -> call_pkg { + return {.stop_token = token(), .return_addr = ret, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } + template Fn> + constexpr auto call_drop(Fn &&fn, Args &&...args) noexcept -> call_pkg { + return {.stop_token = token(), .return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; + } + template Fn> + constexpr auto call(Fn &&fn, Args &&...args) noexcept -> call_pkg { + return {.stop_token = token(), .return_addr = {}, .fn = LF_FWD(fn), .args = {LF_FWD(args)...}}; } }; -// TODO: do we want join a member of scope? +// =============== child_scope_awaitable =============== // -// =============== Join =============== // +template +struct child_scope_awaitable : std::suspend_never { -struct join_type {}; + stop_source::stop_token parent_stop_token; + + constexpr auto await_resume(this child_scope_awaitable self) -> child_scope_ops { + return child_scope_ops{self.parent_stop_token}; + } +}; + +struct child_scope_type {}; export [[nodiscard("You should immediately co_await this!")]] -constexpr auto join() noexcept -> join_type { +constexpr auto child_scope() noexcept -> child_scope_type { return {}; } diff --git a/src/core/promise.cxx b/src/core/promise.cxx index 0b2deda06..24b1908a4 100644 --- a/src/core/promise.cxx +++ b/src/core/promise.cxx @@ -12,6 +12,7 @@ import libfork.utils; import :concepts_context; import :concepts_invocable; import :frame; +import :stop; import :task; import :thread_locals; import :ops; @@ -28,120 +29,208 @@ template using frame_t = frame_type>; // =============== Final =============== // + +/** + * @brief The full final suspend logic. + * + * The final suspend logic is fully expressed in this function in brief: + * + * - Try to resume parent if a call. + * - Try to resume parent if a fork with no stealing. + * - Try to resume a stolen forked task if last to complete. + * + * This function also handles cancellation (of the parent) by iteratively + * climbing up the parent chain. + * + * This function is split and repeated as two separate functions to allow the + * hot-path code to be inlined more easily into the final suspend. + */ template [[nodiscard]] -LF_FORCE_INLINE constexpr auto final_suspend(frame_t *frame) noexcept -> coro<> { +constexpr auto final_suspend_full(Context &context, frame_t *frame) noexcept -> coro<> { + for (;;) { + // Validate final state + LF_ASSUME(frame); + LF_ASSUME(frame->kind != category::root); + LF_ASSUME(frame->steals == 0); + LF_ASSUME(frame->joins == k_u16_max); + LF_ASSUME(frame->exception_bit == 0); + + // Local copies (before we destroy frame) + category const kind = frame->kind; + + frame_t *parent = not_null(frame->parent); + + // Before resuming the next (or exiting) we should clean-up the current frame. + // Can't use frame from this point onwards + frame->handle().destroy(); + + if (kind == category::call) { + return parent->handle(); + } - // Validate final state - LF_ASSUME(frame->steals == 0); - LF_ASSUME(frame->joins == k_u16_max); - LF_ASSUME(frame->exception_bit == 0); + // Given we are not a call we must be a fork hence, our + // parent can't be a root as they can only call. + LF_ASSUME(kind == category::fork); + LF_ASSUME(parent->kind != category::root); - // Local copies (before we destroy frame) - category const kind = frame->kind; + if (steal_handle last_pushed = context.pop()) { + // No-one stole continuation, we are the exclusive owner of parent -> just keep ripping! + LF_ASSUME(last_pushed == steal_handle{key(), parent}); + // This is not a join point so no state (i.e. counters) is guaranteed. + return parent->handle(); + } - frame_t *parent = not_null(frame->parent); + // An owner is a worker who: + // + // - Created the task. + // - OR had the task submitted to them. + // - OR won the task at a join. + // + // An owner of a task owns the stack the task is on. + // + // As the worker who completed the child task this thread owns the stack the child task was on. + // + // Either: + // + // 1. The parent is on the same stack as the child. + // 2. OR the parent is on a different stack to the child. + // + // Case (1) implies: we owned the parent; forked the child task; then the parent was then stolen. + // Case (2) implies: we stole the parent task; then forked the child; then the parent was stolen. + // + // Case (2) implies that our stack is empty. + + // As soon as we do the `fetch_sub` below the parent task is no longer safe + // to access as it may be resumed and then destroyed by another thread. Hence + // we must make copies on-the-stack of any data we may need if we lose the + // join race. + bool const owner = parent->stack_ckpt == context.stack().checkpoint(); + + // As soon as we do the fetch_sub (if we loose) someone may acquire + // the stack so we must prepare it for release now. + auto release_key = context.stack().prepare_release(); + + // Register with parent we have completed this child task. + if (parent->atomic_joins().fetch_sub(1, std::memory_order_release) == 1) { + // Parent has reached join and we are the last child task to complete. We + // are the exclusive owner of the parent and therefore, we must continue + // parent. As we won the race, acquire all writes before resuming. + std::atomic_thread_fence(std::memory_order_acquire); - // Before resuming the next (or exiting) we should clean-up the current frame. - // Can't use frame from this point onwards - frame->handle().destroy(); + if (!owner) { + // In case of scenario (2) we must acquire the parent's stack. + context.stack().acquire(std::as_const(parent->stack_ckpt)); + } - if (kind == category::call) { - return parent->handle(); - } + // Must reset parent's control block before resuming parent. + parent->reset_counters(); + + if (parent->stop_requested()) [[unlikely]] { + // Don't resume if stopped + if constexpr (LF_COMPILER_EXCEPTIONS) { + if (parent->exception_bit) [[unlikely]] { + std::ignore = extract_exception(parent); + } + } + frame = parent; + continue; + } - LF_ASSUME(kind == category::fork); + if (owner) { + // We were unable to resume the parent and we were its owner, as the + // resuming thread will take ownership of the parent's we must give it up. + context.stack().release(std::move(release_key)); + } - Context &context = get_tls_context(); + return parent->handle(); + } - if (steal_handle last_pushed = context.pop()) { - // No-one stole continuation, we are the exclusive owner of parent -> just keep ripping! - LF_ASSUME(last_pushed == steal_handle{key(), parent}); - // This is not a join point so no state (i.e. counters) is guaranteed. - return parent->handle(); + // We did not win the join-race, we cannot dereference the parent pointer now + // as the frame may now be freed by the winner. Parent has not reached join + // or we are not the last child to complete. We are now out of jobs, we must + // yield to the executor. + + // Else, case (2), our stack has no allocations on it, it may be used later. + return std::noop_coroutine(); } +} - // An owner is a worker who: - // - // - Created the task. - // - OR had the task submitted to them. - // - OR won the task at a join. - // - // An owner of a task owns the stack the task is on. - // - // As the worker who completed the child task this thread owns the stack the child task was on. - // - // Either: - // - // 1. The parent is on the same stack as the child. - // 2. OR the parent is on a different stack to the child. - // - // Case (1) implies: we owned the parent; forked the child task; then the parent was then stolen. - // Case (2) implies: we stole the parent task; then forked the child; then the parent was stolen. - // - // Case (2) implies that our stack is empty. - - // As soon as we do the `fetch_sub` below the parent task is no longer safe - // to access as it may be resumed and then destroyed by another thread. Hence - // we must make copies on-the-stack of any data we may need if we lose the - // join race. - bool const owner = parent->stack_ckpt == context.stack().checkpoint(); +template +[[nodiscard]] +constexpr auto final_suspend_trailing(Context &context, frame_t *parent) noexcept -> coro<> { - // TODO: we could reduce branching if we unconditionally release and also - // drop pre-release function altogether... Need to benchmark with code that - // triggers a lot of stealing. + bool const owner = parent->stack_ckpt == context.stack().checkpoint(); - // As soon as we do the fetch_sub (if we loose) someone may acquire - // the stack so we must prepare it for release now. auto release_key = context.stack().prepare_release(); - // TODO: we could add an `if (owner)` around acquire below, then we could - // define that acquire is always called with null or not-self. - - // Register with parent we have completed this child task. if (parent->atomic_joins().fetch_sub(1, std::memory_order_release) == 1) { - // Parent has reached join and we are the last child task to complete. We - // are the exclusive owner of the parent and therefore, we must continue - // parent. As we won the race, acquire all writes before resuming. + std::atomic_thread_fence(std::memory_order_acquire); if (!owner) { - // In case of scenario (2) we must acquire the parent's stack. context.stack().acquire(std::as_const(parent->stack_ckpt)); } - // Must reset parent's control block before resuming parent. parent->reset_counters(); + if (parent->stop_requested()) [[unlikely]] { + if constexpr (LF_COMPILER_EXCEPTIONS) { + if (parent->exception_bit) [[unlikely]] { + std::ignore = extract_exception(parent); + } + } + return final_suspend_full(context, parent); + } return parent->handle(); } - // We did not win the join-race, we cannot dereference the parent pointer now - // as the frame may now be freed by the winner. Parent has not reached join - // or we are not the last child to complete. We are now out of jobs, we must - // yield to the executor. - if (owner) { - // We were unable to resume the parent and we were its owner, as the - // resuming thread will take ownership of the parent's we must give it up. context.stack().release(std::move(release_key)); } - // Else, case (2), our stack has no allocations on it, it may be used later. return std::noop_coroutine(); } +template +[[nodiscard]] +constexpr auto final_suspend_leading(frame_t *frame) noexcept -> coro<> { + + LF_ASSUME(frame); + LF_ASSUME(frame->steals == 0); + LF_ASSUME(frame->joins == k_u16_max); + LF_ASSUME(frame->exception_bit == 0); + + category const kind = frame->kind; + + frame_t *parent = not_null(frame->parent); + + frame->handle().destroy(); + + if (kind == category::call) { + return parent->handle(); + } + + LF_ASSUME(kind == category::fork); + + Context &context = get_tls_context(); + + if (steal_handle last_pushed = context.pop()) { + LF_ASSUME(last_pushed == steal_handle{key(), parent}); + return parent->handle(); + } + + return final_suspend_trailing(context, parent); +} struct final_awaitable : std::suspend_always { template constexpr static auto await_suspend(coro> handle) noexcept -> coro<> { - return final_suspend(&handle.promise().frame); + return final_suspend_leading(&handle.promise().frame); } }; // =============== Fork/Call =============== // -// TODO: make sure exceptions are cancel-safe (I think now cancellation can leak) - /** * @brief Call inside a catch block, stash current exception in `frame`. */ @@ -177,24 +266,23 @@ struct awaitable : std::suspend_always { } template - LF_FORCE_INLINE constexpr auto + constexpr auto await_suspend(this awaitable self, coro> parent) noexcept -> coro<> { - // TODO: Add tests for exception/cancellation handling in fork/call. + // TODO: test of having a dedicated is_stopped awaitable is quicker if (!self.child) [[unlikely]] { // Noop if an exception was thrown. return parent; } - if (parent.promise().frame.is_cancelled()) [[unlikely]] { - // Noop if canceled, must clean-up the child that will never be resumed. + if (self.child->stop_requested()) [[unlikely]] { + // Noop if stopped, must clean-up the child that will never be resumed. return self.child->handle().destroy(), parent; } // Propagate parent->child relationships self.child->parent = &parent.promise().frame; - self.child->cancel = parent.promise().frame.cancel; if constexpr (Cat == category::call) { // Should be the default @@ -225,6 +313,7 @@ struct awaitable : std::suspend_always { * @brief Pull an exception out of a frame and clean-up the union/allocation. */ template +[[nodiscard]] constexpr auto extract_exception(frame_type *frame) noexcept -> std::exception_ptr { LF_ASSUME(frame->exception_bit); // Should only be called if an exception was thrown. @@ -247,56 +336,28 @@ struct join_awaitable { frame_t *frame; - constexpr auto take_stack_and_reset(this join_awaitable self) noexcept -> void { - stack_t &stack = get_tls_stack(); - LF_ASSUME(self.frame->stack_ckpt != stack.checkpoint()); - stack.acquire(std::as_const(self.frame->stack_ckpt)); - self.frame->reset_counters(); - } - constexpr auto await_ready(this join_awaitable self) noexcept -> bool { - if (not_null(self.frame)->steals == 0) [[likely]] { - // If no steals then we are the only owner of the parent and we are ready - // to join. Therefore, no need to reset the control block. + if (self.frame->stop_requested()) [[unlikely]] { + // Must unconditionally suspended if stopped + return false; + } + // If no steals then we are the only owner of the parent and we are + // ready to join. Therefore, no need to reset the control block. return true; } - - // TODO: benchmark if including the below check (returning false here) in - // multithreaded case helps performance enough to justify the extra - // instructions along the fast path - - // Currently: joins() = k_u16_max - num_joined - // Hence: k_u16_max - joins() = num_joined - - // Could use (relaxed here) + (fence(acquire) in truthy branch) but, it's - // better if we see all the decrements to joins() and avoid suspending the - // coroutine if possible. Cannot fetch_sub() here and write to frame as - // coroutine must be suspended first. - - std::uint32_t steals = self.frame->steals; - std::uint32_t joined = k_u16_max - self.frame->atomic_joins().load(std::memory_order_acquire); - - if (steals == joined) { - // We must reset the control block and take the stack. We should never - // own the stack at this point because we must have stolen the stack. - // For ruther explanation see await_suspend() below. - return self.take_stack_and_reset(), true; - } - return false; } constexpr auto await_suspend(this join_awaitable self, std::coroutine_handle<> task) noexcept -> coro<> { // Currently self.joins = k_u16_max - num_joined + // // We set joins = self->joins - (k_u16_max - num_steals) // = num_steals - num_joined - + // // Hence joined = k_u16_max - num_joined // k_u16_max - joined = num_joined - LF_ASSUME(self.frame); - // Lemma: // // If a thread is at a join and steals have occurred then the @@ -308,21 +369,39 @@ struct join_awaitable { // that had it's continuation stolen, where it would have had to release // the stack, because the parent was at not at the join. + LF_ASSUME(self.frame); + std::uint32_t steals = self.frame->steals; std::uint32_t offset = k_u16_max - steals; std::uint32_t joined = self.frame->atomic_joins().fetch_sub(offset, std::memory_order_release); + // If this was a stop: + // + // steals = 0, joins = k_u16_max then: + // + // steals = 0 + // offset = k_u16_max + // joined = k_u16_max, (self.frame->joins is now 0) + // + // k_u16_max - joined = 0 = steals, hence win the if + if (steals == k_u16_max - joined) { // We set joins after all children had completed therefore we can resume the task. // Need to acquire to ensure we see all writes by other threads to the result. std::atomic_thread_fence(std::memory_order_acquire); + if (self.frame->stop_requested()) [[unlikely]] { + return self.handle_stop(); + } + // We must reset the control block and take the stack. We should never // own the stack at this point because we must have stolen the stack. - return self.take_stack_and_reset(), task; + self.take_stack(); + self.frame->reset_counters(); + return task; } - // Someone else is responsible for running this task. + // We cannot touch *this or dereference self as someone may have resumed already! // We cannot currently own this stack (checking would violate above). @@ -332,12 +411,8 @@ struct join_awaitable { // in a switch awaitable. In this case we can/must do another self-steal. // return try_self_stealing(); - return std::noop_coroutine(); - } - [[noreturn]] - constexpr void rethrow_exception(this join_awaitable self) { - std::rethrow_exception(extract_exception(self.frame)); + return std::noop_coroutine(); } constexpr void await_resume(this join_awaitable self) { @@ -352,6 +427,37 @@ struct join_awaitable { } } } + + constexpr auto take_stack(this join_awaitable self) noexcept -> void { + stack_t &stack = get_tls_stack(); + LF_ASSUME(self.frame->stack_ckpt != stack.checkpoint()); + stack.acquire(std::as_const(self.frame->stack_ckpt)); + } + + [[nodiscard]] + constexpr auto handle_stop(this join_awaitable self) -> coro<> { + // Only need to take the stack if there were steals + if (self.frame->steals > 0) { + self.take_stack(); + } + + // We always need to reset the connters as we modified + self.frame->reset_counters(); + + // Drop any exceptions in the now-stopped task + if constexpr (LF_COMPILER_EXCEPTIONS) { + if (self.frame->exception_bit) [[unlikely]] { + std::ignore = extract_exception(self.frame); + } + } + + return final_suspend_leading(self.frame); + } + + [[noreturn]] + constexpr void rethrow_exception(this join_awaitable self) { + std::rethrow_exception(extract_exception(self.frame)); + } }; // =============== Frame mixin =============== // @@ -385,8 +491,9 @@ struct mixin_frame { // --- Await transformations - template - static constexpr auto await_transform_pkg(pkg &&pkg) noexcept( + template + constexpr auto + await_transform_pkg(this auto const &self, pkg &&pkg) noexcept( async_nothrow_invocable) -> awaitable { // Required for noexcept specifier to be correct @@ -407,21 +514,29 @@ struct mixin_frame { // void can signal drop return. static_assert(std::same_as || std::is_void_v); - // TODO: tests for null path + // TODO: tests for null return path if constexpr (!std::is_void_v) { - child_promise->return_address = pkg.return_address; + child_promise->return_address = not_null(pkg.return_addr); } else if constexpr (!std::is_void_v) { // Set child's return address to null to inhibit the return // TODO: add test for this child_promise->return_address = nullptr; } + if constexpr (StopToken) { + // TODO: need some kind of API to launch an unstoppable task? + LF_ASSUME(pkg.stop_token.stop_possible()); + child_promise->frame.stop_token = pkg.stop_token; + } else { + child_promise->frame.stop_token = self.frame.stop_token; + } + return {.child = &child_promise->frame}; } - template - constexpr auto await_transform(this auto &self, pkg &&pkg) noexcept + template + constexpr auto await_transform(this auto &self, pkg &&pkg) noexcept -> awaitable { LF_TRY { return self.await_transform_pkg(std::move(pkg)); @@ -435,6 +550,13 @@ struct mixin_frame { return {.frame = &self.frame}; } + static constexpr auto await_transform(scope_type) noexcept -> scope_awaitable { return {}; } + + constexpr auto + await_transform(this auto const &self, child_scope_type) noexcept -> child_scope_awaitable { + return {.parent_stop_token = self.frame.stop_token}; + } + constexpr static auto initial_suspend() noexcept -> std::suspend_always { return {}; } constexpr static auto final_suspend() noexcept -> final_awaitable { return {}; } diff --git a/src/core/receiver.cxx b/src/core/receiver.cxx index 2a83961be..73289b7a8 100644 --- a/src/core/receiver.cxx +++ b/src/core/receiver.cxx @@ -8,6 +8,8 @@ import std; import libfork.utils; +import :stop; + namespace lf { export struct broken_receiver_error final : libfork_exception { @@ -17,29 +19,108 @@ export struct broken_receiver_error final : libfork_exception { } }; -template -struct receiver_state { - +/** + * @brief Shared state between a scheduled task and its receiver handle. + * + * @tparam T The return type of the scheduled coroutine. + * @tparam Stoppable If true, the state owns a stop_source that can be used + * to cancel the root task externally. + * + * Constructors forward arguments for in-place construction of the return value. + * Internal access is gated behind a hidden friend: `get(key_t, receiver_state&)`. + */ +export template +class receiver_state { + public: struct empty {}; + /// Default construction — return value is default-initialised (or empty for void). + constexpr receiver_state() = default; + + /// In-place construction of the return value from arbitrary args. + template + requires (!std::is_void_v) && std::constructible_from + constexpr explicit receiver_state(Args &&...args) : m_return_value(std::forward(args)...) {} + + /** + * @brief Request that the associated task stop. + * + * Only available when Stoppable=true. Safe to call before scheduling — + * the root frame checks stop_requested() before executing the task body. + */ + constexpr auto request_stop() noexcept -> void + requires Stoppable + { + m_stop.request_stop(); + } + + private: + template + friend class receiver; + + /** + * @brief Internal accessor returned by `get(key_t, receiver_state&)`. + * + * Not reachable by name from outside this translation unit because view + * is a private nested type. Callers use `auto` with the hidden friend. + */ + struct view { + receiver_state *p; + + constexpr void set_exception(std::exception_ptr e) noexcept { p->m_exception = std::move(e); } + + constexpr void notify_ready() noexcept { + p->m_ready.test_and_set(); + p->m_ready.notify_one(); + } + + [[nodiscard]] + constexpr auto return_value_address() noexcept -> T * + requires (!std::is_void_v) + { + return std::addressof(p->m_return_value); + } + + [[nodiscard]] + constexpr auto get_stop_token() noexcept -> stop_source::stop_token + requires Stoppable + { + return p->m_stop.token(); + } + }; + + /** + * @brief Hidden friend accessor for internal library use. + * + * Only callable via ADL when a `key_t` is available (i.e. by calling `key()`). + * Returns a `view` proxy to manipulate the state's private members. + */ + [[nodiscard]] + friend constexpr auto get(key_t, receiver_state &self) noexcept -> view { + return {&self}; + } + [[no_unique_address]] - std::conditional_t, empty, T> m_return_value; + std::conditional_t, empty, T> m_return_value{}; std::exception_ptr m_exception; std::atomic_flag m_ready; + + [[no_unique_address]] + std::conditional_t m_stop; }; -export template +export template class receiver { - using state_type = receiver_state; + using state_type = receiver_state; public: constexpr receiver(key_t, std::shared_ptr &&state) : m_state(std::move(state)) {} - constexpr receiver(receiver &&) noexcept = default; - constexpr auto operator=(receiver &&) noexcept -> receiver & = default; // Move only + constexpr receiver(receiver &&) noexcept = default; constexpr receiver(const receiver &) = delete; + constexpr auto operator=(receiver &&) noexcept -> receiver & = default; constexpr auto operator=(const receiver &) -> receiver & = delete; [[nodiscard]] @@ -62,6 +143,37 @@ class receiver { m_state->m_ready.wait(false); } + /** + * @brief Returns a stop_token for this task's stop source. + * + * Only available when Stoppable=true. The token can be used to observe + * whether the associated task has been cancelled. + */ + [[nodiscard]] + constexpr auto token() noexcept -> stop_source::stop_token + requires Stoppable + { + if (!valid()) { + LF_THROW(broken_receiver_error{}); + } + return get(key(), *m_state).get_stop_token(); + } + + /** + * @brief Request that the associated task stop. + * + * Only available when Stoppable=true. Thread-safe; may be called + * concurrently with the task executing on worker threads. + */ + constexpr auto request_stop() -> void + requires Stoppable + { + if (!valid()) { + LF_THROW(broken_receiver_error{}); + } + m_state->m_stop.request_stop(); + } + [[nodiscard]] constexpr auto get() && -> T { diff --git a/src/core/root.cxx b/src/core/root.cxx index 0863c41db..00990260d 100644 --- a/src/core/root.cxx +++ b/src/core/root.cxx @@ -17,7 +17,7 @@ import :task; namespace lf { -// TODO: allocator aware! +// TODO: allocator aware! -> IDEA embed in frame/state struct get_frame_t {}; @@ -68,11 +68,12 @@ struct root_task { promise_type *promise; }; -template +template requires async_invocable_to [[nodiscard]] auto // -root_pkg(std::shared_ptr> recv, Fn fn, Args... args) -> root_task> { +root_pkg(std::shared_ptr> recv, Fn fn, Args... args) + -> root_task> { // This should be resumed on a valid context. LF_ASSUME(thread_local_context != nullptr); @@ -89,25 +90,30 @@ root_pkg(std::shared_ptr> recv, Fn fn, Args... args) -> root_t promise_type *child = nullptr; + if (root->stop_requested()) { + // The root task was cancelled before it even started, we can skip + // straight to cleanup. + goto cleanup; + } + LF_TRY { // Potentially throwing child = get(key(), ctx_invoke_t{}(std::move(fn), std::move(args)...)); } LF_CATCH_ALL { - recv->m_exception = std::current_exception(); + get(key(), *recv).set_exception(std::current_exception()); goto cleanup; } LF_ASSUME(child != nullptr); - // TODO: cancellation - + // Propagate parent/stop info to child child->frame.parent = root; - child->frame.cancel = nullptr; + child->frame.stop_token = root->stop_token; LF_ASSUME(child->frame.kind == category::call); if constexpr (!std::is_void_v>) { - child->return_address = std::addressof(recv->m_return_value); + child->return_address = get(key(), *recv).return_value_address(); } // Begin normal execution of the child task, it will clean itself @@ -119,19 +125,25 @@ root_pkg(std::shared_ptr> recv, Fn fn, Args... args) -> root_t // - Normal return // - Exception // - Cancellation + // + // We return any exception stashed unconditionally if constexpr (LF_COMPILER_EXCEPTIONS) { if (root->exception_bit) { // The child threw an exception, propagate it to the receiver. - recv->m_exception = extract_exception(root); + get(key(), *recv).set_exception(extract_exception(root)); } } cleanup: - // Now to that which we would otherwise do at a final suspend. + // Now do that which we would otherwise do at a final suspend. // Notify the receiver that the task is done. - recv->m_ready.test_and_set(); - recv->m_ready.notify_one(); + get(key(), *recv).notify_ready(); + + LF_ASSUME(root->steals == 0); + LF_ASSUME(root->joins == k_u16_max); + LF_ASSUME(root->exception_bit == 0); + co_return; } diff --git a/src/core/schedule.cxx b/src/core/schedule.cxx index 18e4aca6b..5980ea435 100644 --- a/src/core/schedule.cxx +++ b/src/core/schedule.cxx @@ -11,6 +11,7 @@ import libfork.utils; import :concepts_invocable; import :concepts_scheduler; import :frame; +import :stop; import :thread_locals; import :promise; import :root; @@ -42,22 +43,28 @@ concept schedulable = schedulable_decayed, Context, std::decay_ template using invoke_decay_result_t = async_result_t, Context, std::decay_t...>; -template -using schedule_state_t = receiver_state>; +template +using schedule_state_t = receiver_state, Stoppable>; export template requires schedulable using schedule_result_t = receiver>; /** - * @brief Schedule a function to be run on a scheduler. + * @brief Schedule a function with a pre-allocated receiver state. + * + * This is the primary overload: the caller provides a shared_ptr to the + * receiver state, allowing custom allocation. The stop_token bound to the + * root frame depends on whether the state is cancellable. * * This function is strongly exception safe. */ -export template - requires schedulable, Args...> +export template + requires schedulable, Args...> && + std::same_as, Args...>> constexpr auto -schedule(Sch &&sch, Fn &&fn, Args &&...args) -> schedule_result_t, Args...> { +schedule(Sch &&sch, std::shared_ptr> state, Fn &&fn, Args &&...args) + -> receiver { using context_type = context_t; @@ -65,15 +72,22 @@ schedule(Sch &&sch, Fn &&fn, Args &&...args) -> schedule_result_t>(); - - // Package has shared ownership of the state, fine if this throws + // Package takes shared ownership of the state; fine if this throws. root_task task = root_pkg(state, std::forward(fn), std::forward(args)...); LF_ASSUME(task.promise != nullptr); + task.promise->frame.kind = category::root; + task.promise->frame.parent = nullptr; + + if constexpr (Stoppable) { + task.promise->frame.stop_token = get(key(), *state).get_stop_token(); + } else { + task.promise->frame.stop_token = stop_source::stop_token{}; // non-cancellable root + } + LF_TRY { + // TODO: forward sch + modify concept sch.post(sched_handle{key(), &task.promise->frame}); // If ^ didn't throw then the root_task will destroy itself at the final suspend. } LF_CATCH_ALL { @@ -84,4 +98,24 @@ schedule(Sch &&sch, Fn &&fn, Args &&...args) -> schedule_result_t + requires schedulable, Args...> +constexpr auto +schedule(Sch &&sch, Fn &&fn, Args &&...args) -> receiver, Args...>> { + + using context_type = context_t; + using R = invoke_decay_result_t; + + auto state = std::make_shared>(); + + return schedule( + std::forward(sch), std::move(state), std::forward(fn), std::forward(args)...); +} + } // namespace lf diff --git a/src/core/stop.cxx b/src/core/stop.cxx new file mode 100644 index 000000000..4b3b8c52a --- /dev/null +++ b/src/core/stop.cxx @@ -0,0 +1,121 @@ +export module libfork.core:stop; + +import std; + +import libfork.utils; + +namespace lf { + +/** + * @brief Similar to a linked-list of std::stop_sorce but with an embedded stop_state. + */ +export class stop_source { + public: + /** + * @brief Lightweight public handle to a stop_source chain. + * + * A stop_token is a non-owning pointer-sized wrapper around a stop_source. + */ + class stop_token { + public: + /** + * @brief Construct a null (unstoppable) token. + */ + constexpr stop_token() noexcept = default; + + /** + * @brief Returns true if a stop source is associated (stopping is possible). + */ + [[nodiscard]] + constexpr auto stop_possible() const noexcept -> bool { + return m_src != nullptr; + } + + /** + * @brief Returns true if any stop source in the ancestor chain has been stopped. + * + * A null token always returns false. + * + * Complexity: O(chain depth). Every task that creates a child_scope adds one + * node to the chain, so deeply-nested task hierarchies pay proportionally more + * per stop check. + */ + [[nodiscard]] + constexpr auto stop_requested() const noexcept -> bool { + return deep_stop_requested(m_src); + } + + private: + friend class stop_source; + + explicit constexpr stop_token(stop_source const *src) noexcept : m_src(src) {} + + stop_source const *m_src = nullptr; + }; + + /** + * @brief Construct a root stop source with no parent. + */ + constexpr stop_source() noexcept = default; + + /** + * @brief Construct a stop source chained onto the given parent token. + */ + constexpr explicit stop_source(stop_token parent) noexcept : m_parent(parent.m_src) {} + + // Immovable + constexpr stop_source(const stop_source &) noexcept = delete; + constexpr stop_source(stop_source &&) noexcept = delete; + constexpr auto operator=(const stop_source &) noexcept -> stop_source & = delete; + constexpr auto operator=(stop_source &&) noexcept -> stop_source & = delete; + + /** + * @brief Get a handle to this stop source. + */ + constexpr auto token() const noexcept -> stop_token { return stop_token{this}; } + + /** + * @brief Returns true if any stop source in the ancestor chain has been stopped. + + * Complexity: O(chain depth). Every task that creates a child_scope adds one + * node to the chain, so deeply-nested task hierarchies pay proportionally more + * per stop check. + */ + [[nodiscard]] + constexpr auto stop_requested() const noexcept -> bool { + return deep_stop_requested(this); + } + + /** + * @brief Request that this stop source (and all its children) stop. + */ + constexpr auto request_stop() noexcept -> void { m_stop.store(1, std::memory_order_release); } + + /** + * @brief Same as `request_stop`, but returns true if this is the first time stop has been requested. + */ + [[nodiscard("You can use request_stop() if you don't need the return value")]] + constexpr auto race_request_stop() noexcept -> bool { + return m_stop.exchange(1, std::memory_order_release) == 0; + } + + private: + /** + * @brief Test if any stop request has been made in the current chain. + * + * Safe to call with a null pointer, in which case it returns false. + */ + [[nodiscard]] + friend constexpr auto deep_stop_requested(stop_source const *src) noexcept -> bool { + for (stop_source const *ptr = src; ptr != nullptr; ptr = ptr->m_parent) { + if (ptr->m_stop.load(std::memory_order_acquire) == 1) { + return true; + } + } + return false; + } + + stop_source const *m_parent = nullptr; + std::atomic m_stop = 0; +}; +} // namespace lf diff --git a/test/src/cancel.cpp b/test/src/cancel.cpp new file mode 100644 index 000000000..c648d8a72 --- /dev/null +++ b/test/src/cancel.cpp @@ -0,0 +1,502 @@ +#include +#include + +#include "libfork/__impl/exception.hpp" + +import std; + +import libfork; + +// Exhaustive tests for all stop-token paths in promise.cxx. +// +// Stop check-points in promise.cxx: +// +// A. awaitable::await_suspend (StopToken=true): +// child->stop_requested() → child not spawned (fork/call via child_scope_ops) +// +// B. awaitable::await_suspend (StopToken=false): +// parent.promise().frame.stop_requested() → child not spawned (fork/call via scope_ops) +// +// C. final_suspend_full / final_suspend_trailing: +// parent->stop_requested() after winning join race → exception dropped, +// iterative ancestor cleanup (exercises concurrent/stolen path) +// +// D. join_awaitable::await_ready: +// stop_requested() forces suspension even when steals==0 +// +// E. join_awaitable::await_suspend: +// stop_requested() after winning join race → handle_stop() +// +// F. handle_stop (exception_bit set on stopped frame): +// exception dropped, not propagated to caller +// +// G. Nested child_scope chain propagation: +// inner child_scope inherits parent's stop token; stopping the outer +// source propagates through the chain to the inner scope. +// +// H. Stoppable receiver / pre-cancelled root: +// receiver_state::request_stop() before schedule() triggers +// the goto-cleanup fast path in root.cxx — task body never executes. + +namespace { + +// ============================================================ +// Basic helper tasks +// ============================================================ + +struct count_up { + template + static auto operator()(lf::env, std::atomic &count) -> lf::task { + co_return count.fetch_add(1); + } +}; + +struct count_up_void { + template + static auto operator()(lf::env, std::atomic &count) -> lf::task { + count.fetch_add(1); + co_return; + } +}; + +// ============================================================ +// A. Cancel=true: child-specific cancellation via child_scope_ops. +// +// child_scope_ops binds its stop_source as Cancel=true on every fork/call. +// Calling sc.request_stop() before launching exercises +// awaitable::await_suspend's Cancel=true branch. +// ============================================================ + +template +auto test_call_drop_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + auto sc = co_await lf::child_scope(); + sc.request_stop(); + co_await sc.call_drop(count_up_void{}, count); + co_await sc.join(); + co_return count.load() == 0; +} + +template +auto test_call_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + int result = 99; + auto sc = co_await lf::child_scope(); + sc.request_stop(); + co_await sc.call(&result, count_up{}, count); + co_await sc.join(); + co_return result == 99 && count.load() == 0; +} + +template +auto test_fork_drop_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + auto sc = co_await lf::child_scope(); + sc.request_stop(); + co_await sc.fork_drop(count_up_void{}, count); + co_await sc.join(); + co_return count.load() == 0; +} + +template +auto test_fork_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + int result = 99; + auto sc = co_await lf::child_scope(); + sc.request_stop(); + co_await sc.fork(&result, count_up{}, count); + co_await sc.join(); + co_return result == 99 && count.load() == 0; +} + +template +auto test_call_not_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + int result = 0; + auto sc = co_await lf::child_scope(); + co_await sc.call(&result, count_up{}, count); + co_await sc.join(); + co_return result == 0 && count.load() == 1; +} + +template +auto test_fork_not_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + int result = 0; + auto sc = co_await lf::child_scope(); + co_await sc.fork(&result, count_up{}, count); + co_await sc.join(); + co_return result == 0 && count.load() == 1; +} + +template +auto test_multiple_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + auto sc = co_await lf::child_scope(); + sc.request_stop(); + co_await sc.fork_drop(count_up_void{}, count); + co_await sc.fork_drop(count_up_void{}, count); + co_await sc.fork_drop(count_up_void{}, count); + co_await sc.join(); + co_return count.load() == 0; +} + +template +auto test_mixed_cancel(lf::env) -> lf::task { + std::atomic count = 0; + auto sc_run = co_await lf::child_scope(); + auto sc_skip = co_await lf::child_scope(); + sc_skip.request_stop(); + co_await sc_run.fork_drop(count_up_void{}, count); // runs + co_await sc_skip.fork_drop(count_up_void{}, count); // skipped + co_await sc_run.fork_drop(count_up_void{}, count); // runs + co_await sc_run.join(); + co_await sc_skip.join(); + co_return count.load() == 2; +} + +// ============================================================ +// B. StopToken=false: parent frame stop propagation. +// +// An inner task receives a stop_source& that IS its own frame's stop +// source (bound via child_scope_ops::call_drop / StopToken=true). It calls +// request_stop() on it, making its own stop_requested() return true, then +// tries to launch sub-tasks via scope_ops (StopToken=false). Those are +// skipped because parent.frame.stop_requested() is true (path B). +// At join, handle_stop fires (paths D+E). +// ============================================================ + +struct inner_call_after_self_cancel { + template + static auto operator()(lf::env, lf::stop_source &my_cancel, std::atomic &count) + -> lf::task { + my_cancel.request_stop(); // make this frame's stop_requested() == true + auto sc = co_await lf::scope(); + co_await sc.call_drop(count_up_void{}, count); // StopToken=false: stop requested → skip + co_await sc.fork_drop(count_up_void{}, count); // StopToken=false: stop requested → skip + co_await sc.join(); // paths D+E: join fires handle_stop + count.fetch_add(100); // must not be reached + } +}; + +template +auto test_call_parent_stop_source(lf::env) -> lf::task { + std::atomic count = 0; + auto outer_sc = co_await lf::child_scope(); + // Pass the scope's stop_source by reference so the inner task can cancel it. + co_await outer_sc.call_drop(inner_call_after_self_cancel{}, outer_sc, count); + co_await outer_sc.join(); + co_return count.load() == 0; +} + +// ============================================================ +// C/D/E. Concurrent cancellation: final_suspend + join interaction. +// ============================================================ + +// A child task that cancels a stop_source then completes normally. +struct cancel_source { + template + static auto + operator()(lf::env, lf::stop_source &src, std::atomic &count) -> lf::task { + count.fetch_add(1); + src.request_stop(); + co_return; + } +}; + +struct inner_fork_then_cancel_at_join { + template + static auto operator()(lf::env, lf::stop_source &my_cancel, std::atomic &count) + -> lf::task { + auto sc = co_await lf::scope(); + co_await sc.fork_drop(cancel_source{}, my_cancel, count); + co_await sc.join(); // stop_requested() after child requests stop → handle_stop + count.fetch_add(100); // must not be reached + } +}; + +template +auto test_fork_cancel_at_join(lf::env) -> lf::task { + std::atomic count = 0; + auto outer_sc = co_await lf::child_scope(); + co_await outer_sc.call_drop(inner_fork_then_cancel_at_join{}, outer_sc, count); + co_await outer_sc.join(); + co_return count.load() == 1; +} + +// ============================================================ +// F. Exception + cancellation interaction. +// ============================================================ + +#if LF_COMPILER_EXCEPTIONS + +struct just_throw { + template + static auto operator()(lf::env) -> lf::task { + throw std::runtime_error("test exception"); + co_return; + } +}; + +struct inner_forks_throwing { + template + static auto operator()(lf::env) -> lf::task { + auto sc = co_await lf::scope(); + co_await sc.fork_drop(just_throw{}); + co_await sc.join(); // not cancelled → rethrow + co_return; + } +}; + +template +auto test_exception_propagates(lf::env) -> lf::task { + auto outer_sc = co_await lf::child_scope(); + co_await outer_sc.call_drop(inner_forks_throwing{}); + co_await outer_sc.join(); +} + +struct cancel_source_and_throw { + template + static auto + operator()(lf::env, lf::stop_source &src, std::atomic &count) -> lf::task { + count.fetch_add(1); + src.request_stop(); + throw std::runtime_error("should be dropped"); + co_return; + } +}; + +struct inner_cancel_and_throw { + template + static auto operator()(lf::env, lf::stop_source &my_cancel, std::atomic &count) + -> lf::task { + auto sc = co_await lf::scope(); + co_await sc.fork_drop(cancel_source_and_throw{}, my_cancel, count); + co_await sc.join(); // stop requested + exception → handle_stop drops exception + count.fetch_add(100); // must not be reached + } +}; + +template +auto test_exception_dropped_when_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + auto outer_sc = co_await lf::child_scope(); + co_await outer_sc.call_drop(inner_cancel_and_throw{}, outer_sc, count); + co_await outer_sc.join(); + co_return count.load() == 1; +} + +struct just_throw_and_count { + template + static auto operator()(lf::env, std::atomic &count) -> lf::task { + count.fetch_add(1); + throw std::runtime_error("sibling exception"); + co_return; + } +}; + +struct inner_sibling_throws_and_cancel { + template + static auto operator()(lf::env, lf::stop_source &my_cancel, std::atomic &count) + -> lf::task { + auto sc = co_await lf::scope(); + co_await sc.fork_drop(just_throw_and_count{}, count); + co_await sc.fork_drop(cancel_source{}, my_cancel, count); + co_await sc.join(); // stop requested; exceptions dropped + count.fetch_add(100); // must not be reached + } +}; + +template +auto test_sibling_exception_dropped_when_cancelled(lf::env) -> lf::task { + std::atomic count = 0; + auto outer_sc = co_await lf::child_scope(); + co_await outer_sc.call_drop(inner_sibling_throws_and_cancel{}, outer_sc, count); + co_await outer_sc.join(); + auto c = count.load(); + co_return c >= 2 && c < 100; +} + +#endif // LF_COMPILER_EXCEPTIONS + +// ============================================================ +// G. Nested child_scope chain propagation. +// +// A child_scope created inside a task that runs under another child_scope +// has m_parent pointing to the outer scope's stop_source. Stopping the +// outer source propagates through the chain, making the inner scope's +// stop_requested() return true (path A). +// ============================================================ + +struct inner_with_nested_scope { + template + static auto + operator()(lf::env, lf::stop_source &outer, std::atomic &count) -> lf::task { + auto inner_sc = co_await lf::child_scope(); + // Cancel the outer scope; inner_sc.m_parent == &outer, so the chain fires. + outer.request_stop(); + co_await inner_sc.fork_drop(count_up_void{}, count); // skipped: inner_sc is stopped + co_await inner_sc.join(); // handle_stop + count.fetch_add(100); // must not be reached + } +}; + +template +auto test_nested_child_scope_chain(lf::env) -> lf::task { + std::atomic count = 0; + auto outer_sc = co_await lf::child_scope(); + co_await outer_sc.call_drop(inner_with_nested_scope{}, outer_sc, count); + co_await outer_sc.join(); + co_return count.load() == 0; +} + +// ============================================================ +// H. Stoppable receiver / pre-cancelled root. +// +// receiver_state::request_stop() before schedule() makes the root +// frame's stop_token immediately satisfied, triggering the goto-cleanup +// fast path in root.cxx so the task body never runs. +// ============================================================ + +template +auto pre_cancelled_root_fn(lf::env, bool *ran) -> lf::task { + *ran = true; + co_return; +} + +// ============================================================ +// Run all tests against a given scheduler +// ============================================================ + +template +void tests(Sch &scheduler) { + + using Ctx = lf::context_t; + + SECTION("call_drop: pre-cancelled child is not run") { + auto recv = schedule(scheduler, test_call_drop_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("call: pre-cancelled child is not run, return address not written") { + auto recv = schedule(scheduler, test_call_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("fork_drop: pre-cancelled child is not run") { + auto recv = schedule(scheduler, test_fork_drop_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("fork: pre-cancelled child is not run, return address not written") { + auto recv = schedule(scheduler, test_fork_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("call: positive - not cancelled, child runs and writes result") { + auto recv = schedule(scheduler, test_call_not_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("fork: positive - not cancelled, child runs and writes result") { + auto recv = schedule(scheduler, test_fork_not_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("multiple fork_drops: all pre-cancelled, none run") { + auto recv = schedule(scheduler, test_multiple_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("mixed scopes: only non-cancelled children run") { + auto recv = schedule(scheduler, test_mixed_cancel); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("call_drop/fork_drop: skipped when parent frame is cancelled; join fires handle_cancel") { + auto recv = schedule(scheduler, test_call_parent_stop_source); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("fork child cancels parent stop source; join detects cancel via handle_cancel") { + auto recv = schedule(scheduler, test_fork_cancel_at_join); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("nested child_scope: stopping outer scope propagates to inner via chain") { + auto recv = schedule(scheduler, test_nested_child_scope_chain); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("stoppable receiver: pre-cancelled root task body never executes") { + bool ran = false; + auto state = std::make_shared>(); + state->request_stop(); + auto recv = lf::schedule(scheduler, std::move(state), pre_cancelled_root_fn, &ran); + REQUIRE(recv.valid()); + std::move(recv).get(); + REQUIRE(!ran); + } + +#if LF_COMPILER_EXCEPTIONS + + SECTION("exception propagates through join when frame is NOT cancelled") { + auto recv = schedule(scheduler, test_exception_propagates); + REQUIRE(recv.valid()); + REQUIRE_THROWS_AS(std::move(recv).get(), std::runtime_error); + } + + SECTION("exception in cancelled frame is dropped by handle_cancel; recv.get() does not throw") { + auto recv = schedule(scheduler, test_exception_dropped_when_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + + SECTION("sibling exception dropped when another sibling cancels the frame") { + auto recv = schedule(scheduler, test_sibling_exception_dropped_when_cancelled); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get()); + } + +#endif // LF_COMPILER_EXCEPTIONS +} + +using mono_inline_ctx = lf::mono_context, lf::adapt_vector>; +using poly_inline_ctx = lf::derived_poly_context, lf::adapt_vector>; + +} // namespace + +TEMPLATE_TEST_CASE("Inline cancel", "[cancel]", mono_inline_ctx, poly_inline_ctx) { + lf::inline_scheduler sch{}; + tests(sch); +} + +namespace { + +using mono_busy_thread_pool = lf::mono_busy_pool>; +using poly_busy_thread_pool = lf::poly_busy_pool>; + +} // namespace + +TEMPLATE_TEST_CASE("Busy cancel", "[cancel]", mono_busy_thread_pool, poly_busy_thread_pool) { + + STATIC_REQUIRE(lf::scheduler); + + for (std::size_t thr = 1; thr < 4; ++thr) { + DYNAMIC_SECTION("threads=" << thr) { + TestType scheduler{thr}; + tests(scheduler); + } + } +} diff --git a/todo.md b/todo.md deleted file mode 100644 index 1183e6d11..000000000 --- a/todo.md +++ /dev/null @@ -1,19 +0,0 @@ -# TODO - -- [x] Context tag in API -- [x] Context in invocability concepts - -- Integrate geometric allocator (that can throw) - - [x] Initial impl - - [ ] Test correct throwing spec - -- [ ] Optimize release/resume in presence of steals (need benchmark) - -- [ ] `-fassume-nothrow-exception-dtor` - -- [ ] Test nothrow allocator performance (just terminate?) - -- [ ] Cancellation: - - [ ] Maybe in separate `co_await scope` - - [ ] Integrate into join - - [ ] Exception safety