diff --git a/README.md b/README.md index dc5f66c..a496499 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,8 @@ This repository contains a C++ ABI implementation of the WebAssembly Component M - [x] Wamr - [ ] WasmEdge +When expanding the canonical ABI surface, cross-check the Python reference tests in `ref/component-model/design/mvp/canonical-abi/run_tests.py`; new host features should mirror the behaviors exercised there. + ## Build Instructions ### Prerequisites @@ -171,6 +173,46 @@ This library is a header only library. To use it in your project, you can: - [x] Copy the contents of the `include` directory to your project. - [ ] Use `vcpkg` to install the library and its dependencies. +### Async runtime helpers + +The canonical Component Model runtime is cooperative: hosts must drive pending work by scheduling tasks explicitly. `cmcpp` now provides a minimal async harness in `cmcpp/runtime.hpp`: + +- `Store` owns the pending task queue and exposes `invoke` plus `tick()`. +- `FuncInst` is the callable signature hosts use to wrap guest functions. +- `Thread::create` builds a pending task with user-supplied readiness/resume callbacks. +- `Call::from_thread` returns a cancellation-capable handle to the caller. + +Typical usage: + +```cpp +cmcpp::Store store; +cmcpp::FuncInst func = [](cmcpp::Store &store, + cmcpp::SupertaskPtr, + cmcpp::OnStart on_start, + cmcpp::OnResolve on_resolve) { + auto args = std::make_shared>(on_start()); + auto gate = std::make_shared>(false); + + auto thread = cmcpp::Thread::create( + store, + [gate]() { return gate->load(); }, + [args, on_resolve](bool cancelled) { + on_resolve(cancelled ? std::nullopt : std::optional{*args}); + return false; // finished + }, + true, + [gate]() { gate->store(true); }); + + return cmcpp::Call::from_thread(thread); +}; + +auto call = store.invoke(func, nullptr, [] { return std::vector{}; }, [](auto) {}); +// Drive progress +store.tick(); +``` + +Call `tick()` in your host loop until all pending work completes. Cancellation is cooperative: calling `Call::request_cancellation()` marks the associated thread as cancelled before the next `tick()`. + ## Related projects diff --git a/include/cmcpp.hpp b/include/cmcpp.hpp index faa7af3..dfd5c21 100644 --- a/include/cmcpp.hpp +++ b/include/cmcpp.hpp @@ -11,5 +11,6 @@ #include #include #include +#include #endif // CMCPP_HPP \ No newline at end of file diff --git a/include/cmcpp/runtime.hpp b/include/cmcpp/runtime.hpp new file mode 100644 index 0000000..7ad5545 --- /dev/null +++ b/include/cmcpp/runtime.hpp @@ -0,0 +1,261 @@ +#ifndef CMCPP_RUNTIME_HPP +#define CMCPP_RUNTIME_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cmcpp +{ + class Store; + + struct Supertask; + using SupertaskPtr = std::shared_ptr; + + using OnStart = std::function()>; + using OnResolve = std::function>)>; + + class Thread : public std::enable_shared_from_this + { + public: + using ReadyFn = std::function; + using ResumeFn = std::function; + using CancelFn = std::function; + + static std::shared_ptr create(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable = false, CancelFn on_cancel = {}); + + Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel); + bool ready() const; + void resume(); + void request_cancellation(); + bool cancellable() const; + bool cancelled() const; + bool completed() const; + + private: + enum class State + { + Pending, + Running, + Completed + }; + + void set_pending(bool pending_again, const std::shared_ptr &self); + + Store *store_; + ReadyFn ready_; + ResumeFn resume_; + CancelFn on_cancel_; + bool cancellable_; + bool cancelled_; + mutable std::mutex mutex_; + State state_; + }; + + class Call + { + public: + using CancelRequest = std::function; + + Call() = default; + explicit Call(CancelRequest cancel) : request_cancellation_(std::move(cancel)) {} + + void request_cancellation() const + { + if (request_cancellation_) + { + request_cancellation_(); + } + } + + static Call from_thread(const std::shared_ptr &thread) + { + if (!thread) + { + return Call(); + } + std::weak_ptr weak = thread; + return Call([weak]() + { + if (auto locked = weak.lock()) + { + locked->request_cancellation(); + } }); + } + + private: + CancelRequest request_cancellation_; + }; + + struct Supertask + { + SupertaskPtr parent; + }; + + using FuncInst = std::function; + + class Store + { + public: + Call invoke(const FuncInst &func, SupertaskPtr caller, OnStart on_start, OnResolve on_resolve); + void tick(); + void schedule(const std::shared_ptr &thread); + std::size_t pending_size() const; + + private: + friend class Thread; + mutable std::mutex mutex_; + std::vector> pending_; + }; + + inline std::shared_ptr Thread::create(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel) + { + auto thread = std::shared_ptr(new Thread(store, std::move(ready), std::move(resume), cancellable, std::move(on_cancel))); + store.schedule(thread); + return thread; + } + + inline Thread::Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel) + : store_(&store), ready_(std::move(ready)), resume_(std::move(resume)), on_cancel_(std::move(on_cancel)), cancellable_(cancellable), cancelled_(false), state_(State::Pending) + { + } + + inline bool Thread::ready() const + { + std::lock_guard lock(mutex_); + if (state_ != State::Pending) + { + return false; + } + if (!ready_) + { + return true; + } + return ready_(); + } + + inline void Thread::resume() + { + auto self = shared_from_this(); + ResumeFn resume; + bool was_cancelled = false; + { + std::lock_guard lock(mutex_); + if (state_ != State::Pending) + { + return; + } + state_ = State::Running; + resume = resume_; + was_cancelled = cancelled_; + } + + bool keep_pending = false; + if (resume) + { + keep_pending = resume(was_cancelled); + } + + set_pending(keep_pending, self); + } + + inline void Thread::request_cancellation() + { + CancelFn cancel; + { + std::lock_guard lock(mutex_); + if (cancelled_ || !cancellable_) + { + return; + } + cancelled_ = true; + cancel = on_cancel_; + } + if (cancel) + { + cancel(); + } + } + + inline bool Thread::cancellable() const + { + std::lock_guard lock(mutex_); + return cancellable_; + } + + inline bool Thread::cancelled() const + { + std::lock_guard lock(mutex_); + return cancelled_; + } + + inline bool Thread::completed() const + { + std::lock_guard lock(mutex_); + return state_ == State::Completed; + } + + inline void Thread::set_pending(bool pending_again, const std::shared_ptr &self) + { + { + std::lock_guard lock(mutex_); + state_ = pending_again ? State::Pending : State::Completed; + } + if (pending_again) + { + store_->schedule(self); + } + } + + inline Call Store::invoke(const FuncInst &func, SupertaskPtr caller, OnStart on_start, OnResolve on_resolve) + { + if (!func) + { + return Call(); + } + return func(*this, std::move(caller), std::move(on_start), std::move(on_resolve)); + } + + inline void Store::tick() + { + std::shared_ptr selected; + { + std::lock_guard lock(mutex_); + auto it = std::find_if(pending_.begin(), pending_.end(), [](const std::shared_ptr &thread) + { return thread && thread->ready(); }); + if (it == pending_.end()) + { + return; + } + selected = *it; + pending_.erase(it); + } + if (selected) + { + selected->resume(); + } + } + + inline void Store::schedule(const std::shared_ptr &thread) + { + if (!thread) + { + return; + } + std::lock_guard lock(mutex_); + pending_.push_back(thread); + } + + inline std::size_t Store::pending_size() const + { + std::lock_guard lock(mutex_); + return pending_.size(); + } +} + +#endif diff --git a/test/main.cpp b/test/main.cpp index ae6e75d..24fc42a 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -9,6 +9,8 @@ using namespace cmcpp; #include #include #include +#include +#include #include #include #include @@ -17,6 +19,191 @@ using namespace cmcpp; #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN #include +TEST_CASE("Async runtime schedules threads") +{ + Store store; + + std::optional> resolved_payload; + bool resolved = false; + bool cancelled = false; + std::shared_ptr> gate; + + FuncInst async_func = [&](Store &store, SupertaskPtr, OnStart on_start, OnResolve on_resolve) + { + auto args = std::make_shared>(on_start()); + gate = std::make_shared>(false); + + auto thread = Thread::create( + store, + [gate]() + { + return gate->load(); + }, + [args, on_resolve](bool was_cancelled) + { + if (was_cancelled) + { + on_resolve(std::nullopt); + } + else + { + on_resolve(*args); + } + return false; + }, + true, + [gate]() + { + gate->store(true); + }); + + return Call::from_thread(thread); + }; + + auto call = store.invoke( + async_func, + nullptr, + []() + { + return std::vector{int32_t(1), std::string("world")}; + }, + [&](std::optional> values) + { + resolved_payload = std::move(values); + resolved = true; + cancelled = !resolved_payload.has_value(); + }); + + CHECK(gate); + CHECK(store.pending_size() == 1); + CHECK_FALSE(resolved); + + store.tick(); + CHECK_FALSE(resolved); + + gate->store(true); + store.tick(); + + CHECK(resolved); + REQUIRE(resolved_payload.has_value()); + REQUIRE(resolved_payload->size() == 2); + CHECK(std::any_cast((*resolved_payload)[0]) == 1); + CHECK(std::any_cast((*resolved_payload)[1]) == "world"); + CHECK_FALSE(cancelled); + + // Cancellation path + resolved = false; + resolved_payload.reset(); + cancelled = false; + + auto call2 = store.invoke( + async_func, + nullptr, + []() + { + return std::vector{int32_t(2)}; + }, + [&](std::optional> values) + { + resolved_payload = std::move(values); + resolved = true; + cancelled = !resolved_payload.has_value(); + }); + + CHECK(store.pending_size() == 1); + store.tick(); + CHECK(store.pending_size() == 1); + + call2.request_cancellation(); + store.tick(); + + CHECK(resolved); + CHECK(cancelled); + CHECK_FALSE(resolved_payload.has_value()); +} + +TEST_CASE("Async runtime requeues until completion") +{ + Store store; + + auto counter = std::make_shared(0); + + auto thread = Thread::create( + store, + []() + { + return true; + }, + [counter](bool cancelled) + { + REQUIRE_FALSE(cancelled); + ++(*counter); + return *counter < 3; + }); + + CHECK(store.pending_size() == 1); + + store.tick(); + CHECK(*counter == 1); + CHECK(store.pending_size() == 1); + + store.tick(); + CHECK(*counter == 2); + CHECK(store.pending_size() == 1); + + store.tick(); + CHECK(*counter == 3); + CHECK(store.pending_size() == 0); + CHECK(thread->completed()); +} + +TEST_CASE("Async runtime propagates cancellation") +{ + Store store; + + auto ready_gate = std::make_shared>(false); + bool on_cancel_called = false; + bool resume_called = false; + bool was_cancelled = false; + + auto thread = Thread::create( + store, + [ready_gate]() + { + return ready_gate->load(); + }, + [&on_cancel_called, &resume_called, &was_cancelled](bool cancelled) + { + resume_called = true; + was_cancelled = cancelled; + CHECK(on_cancel_called); + return false; + }, + true, + [ready_gate, &on_cancel_called]() + { + on_cancel_called = true; + ready_gate->store(true); + }); + + auto call = Call::from_thread(thread); + + CHECK(store.pending_size() == 1); + CHECK_FALSE(on_cancel_called); + CHECK_FALSE(resume_called); + + call.request_cancellation(); + CHECK(on_cancel_called); + CHECK(store.pending_size() == 1); + + store.tick(); + + CHECK(resume_called); + CHECK(was_cancelled); + CHECK(store.pending_size() == 0); + CHECK(thread->completed()); +} + TEST_CASE("Boolean") { Heap heap(1024 * 1024);