Skip to content

Commit 18a4bb4

Browse files
committed
feat: add async runtime
helpers fixes #4 Signed-off-by: Gordon Smith <GordonJSmith@gmail.com>
1 parent d1df1af commit 18a4bb4

4 files changed

Lines changed: 408 additions & 0 deletions

File tree

README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,46 @@ This library is a header only library. To use it in your project, you can:
171171
- [x] Copy the contents of the `include` directory to your project.
172172
- [ ] Use `vcpkg` to install the library and its dependencies.
173173

174+
### Async runtime helpers
175+
176+
The canonical Component Model runtime is cooperative: hosts must drive pending work by scheduling tasks explicitly. `cmcpp` now provides a minimal async harness in `cmcpp/runtime.hpp`:
177+
178+
- `Store` owns the pending task queue and exposes `invoke` plus `tick()`.
179+
- `FuncInst` is the callable signature hosts use to wrap guest functions.
180+
- `Thread::create` builds a pending task with user-supplied readiness/resume callbacks.
181+
- `Call::from_thread` returns a cancellation-capable handle to the caller.
182+
183+
Typical usage:
184+
185+
```cpp
186+
cmcpp::Store store;
187+
cmcpp::FuncInst func = [](cmcpp::Store &store,
188+
cmcpp::SupertaskPtr,
189+
cmcpp::OnStart on_start,
190+
cmcpp::OnResolve on_resolve) {
191+
auto args = std::make_shared<std::vector<std::any>>(on_start());
192+
auto gate = std::make_shared<std::atomic<bool>>(false);
193+
194+
auto thread = cmcpp::Thread::create(
195+
store,
196+
[gate]() { return gate->load(); },
197+
[args, on_resolve](bool cancelled) {
198+
on_resolve(cancelled ? std::nullopt : std::optional{*args});
199+
return false; // finished
200+
},
201+
true,
202+
[gate]() { gate->store(true); });
203+
204+
return cmcpp::Call::from_thread(thread);
205+
};
206+
207+
auto call = store.invoke(func, nullptr, [] { return std::vector<std::any>{}; }, [](auto) {});
208+
// Drive progress
209+
store.tick();
210+
```
211+
212+
Call `tick()` in your host loop until all pending work completes. Cancellation is cooperative: calling `Call::request_cancellation()` marks the associated thread as cancelled before the next `tick()`.
213+
174214
175215
## Related projects
176216

include/cmcpp.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@
1111
#include <cmcpp/func.hpp>
1212
#include <cmcpp/lower.hpp>
1313
#include <cmcpp/lift.hpp>
14+
#include <cmcpp/runtime.hpp>
1415

1516
#endif // CMCPP_HPP

include/cmcpp/runtime.hpp

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
#ifndef CMCPP_RUNTIME_HPP
2+
#define CMCPP_RUNTIME_HPP
3+
4+
#include <algorithm>
5+
#include <any>
6+
#include <functional>
7+
#include <memory>
8+
#include <mutex>
9+
#include <optional>
10+
#include <utility>
11+
#include <vector>
12+
13+
namespace cmcpp
14+
{
15+
class Store;
16+
17+
struct Supertask;
18+
using SupertaskPtr = std::shared_ptr<Supertask>;
19+
20+
using OnStart = std::function<std::vector<std::any>()>;
21+
using OnResolve = std::function<void(std::optional<std::vector<std::any>>)>;
22+
23+
class Thread : public std::enable_shared_from_this<Thread>
24+
{
25+
public:
26+
using ReadyFn = std::function<bool()>;
27+
using ResumeFn = std::function<bool(bool)>;
28+
using CancelFn = std::function<void()>;
29+
30+
static std::shared_ptr<Thread> create(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable = false, CancelFn on_cancel = {});
31+
32+
bool ready() const;
33+
void resume();
34+
void request_cancellation();
35+
bool cancellable() const;
36+
bool cancelled() const;
37+
bool completed() const;
38+
39+
private:
40+
enum class State
41+
{
42+
Pending,
43+
Running,
44+
Completed
45+
};
46+
47+
Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel);
48+
49+
void set_pending(bool pending_again, const std::shared_ptr<Thread> &self);
50+
51+
Store *store_;
52+
ReadyFn ready_;
53+
ResumeFn resume_;
54+
CancelFn on_cancel_;
55+
bool cancellable_;
56+
bool cancelled_;
57+
mutable std::mutex mutex_;
58+
State state_;
59+
};
60+
61+
class Call
62+
{
63+
public:
64+
using CancelRequest = std::function<void()>;
65+
66+
Call() = default;
67+
explicit Call(CancelRequest cancel) : request_cancellation_(std::move(cancel)) {}
68+
69+
void request_cancellation() const
70+
{
71+
if (request_cancellation_)
72+
{
73+
request_cancellation_();
74+
}
75+
}
76+
77+
static Call from_thread(const std::shared_ptr<Thread> &thread)
78+
{
79+
if (!thread)
80+
{
81+
return Call();
82+
}
83+
std::weak_ptr<Thread> weak = thread;
84+
return Call([weak]()
85+
{
86+
if (auto locked = weak.lock())
87+
{
88+
locked->request_cancellation();
89+
} });
90+
}
91+
92+
private:
93+
CancelRequest request_cancellation_;
94+
};
95+
96+
struct Supertask
97+
{
98+
SupertaskPtr parent;
99+
};
100+
101+
using FuncInst = std::function<Call(Store &, SupertaskPtr, OnStart, OnResolve)>;
102+
103+
class Store
104+
{
105+
public:
106+
Call invoke(const FuncInst &func, SupertaskPtr caller, OnStart on_start, OnResolve on_resolve);
107+
void tick();
108+
void schedule(const std::shared_ptr<Thread> &thread);
109+
std::size_t pending_size() const;
110+
111+
private:
112+
friend class Thread;
113+
mutable std::mutex mutex_;
114+
std::vector<std::shared_ptr<Thread>> pending_;
115+
};
116+
117+
inline std::shared_ptr<Thread> Thread::create(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel)
118+
{
119+
auto thread = std::shared_ptr<Thread>(new Thread(store, std::move(ready), std::move(resume), cancellable, std::move(on_cancel)));
120+
store.schedule(thread);
121+
return thread;
122+
}
123+
124+
inline Thread::Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel)
125+
: store_(&store), ready_(std::move(ready)), resume_(std::move(resume)), on_cancel_(std::move(on_cancel)), cancellable_(cancellable), cancelled_(false), state_(State::Pending)
126+
{
127+
}
128+
129+
inline bool Thread::ready() const
130+
{
131+
std::lock_guard lock(mutex_);
132+
if (state_ != State::Pending)
133+
{
134+
return false;
135+
}
136+
if (!ready_)
137+
{
138+
return true;
139+
}
140+
return ready_();
141+
}
142+
143+
inline void Thread::resume()
144+
{
145+
auto self = shared_from_this();
146+
ResumeFn resume;
147+
bool was_cancelled = false;
148+
{
149+
std::lock_guard lock(mutex_);
150+
if (state_ != State::Pending)
151+
{
152+
return;
153+
}
154+
state_ = State::Running;
155+
resume = resume_;
156+
was_cancelled = cancelled_;
157+
}
158+
159+
bool keep_pending = false;
160+
if (resume)
161+
{
162+
keep_pending = resume(was_cancelled);
163+
}
164+
165+
set_pending(keep_pending, self);
166+
}
167+
168+
inline void Thread::request_cancellation()
169+
{
170+
CancelFn cancel;
171+
{
172+
std::lock_guard lock(mutex_);
173+
if (cancelled_ || !cancellable_)
174+
{
175+
return;
176+
}
177+
cancelled_ = true;
178+
cancel = on_cancel_;
179+
}
180+
if (cancel)
181+
{
182+
cancel();
183+
}
184+
}
185+
186+
inline bool Thread::cancellable() const
187+
{
188+
std::lock_guard lock(mutex_);
189+
return cancellable_;
190+
}
191+
192+
inline bool Thread::cancelled() const
193+
{
194+
std::lock_guard lock(mutex_);
195+
return cancelled_;
196+
}
197+
198+
inline bool Thread::completed() const
199+
{
200+
std::lock_guard lock(mutex_);
201+
return state_ == State::Completed;
202+
}
203+
204+
inline void Thread::set_pending(bool pending_again, const std::shared_ptr<Thread> &self)
205+
{
206+
{
207+
std::lock_guard lock(mutex_);
208+
state_ = pending_again ? State::Pending : State::Completed;
209+
}
210+
if (pending_again)
211+
{
212+
store_->schedule(self);
213+
}
214+
}
215+
216+
inline Call Store::invoke(const FuncInst &func, SupertaskPtr caller, OnStart on_start, OnResolve on_resolve)
217+
{
218+
if (!func)
219+
{
220+
return Call();
221+
}
222+
return func(*this, std::move(caller), std::move(on_start), std::move(on_resolve));
223+
}
224+
225+
inline void Store::tick()
226+
{
227+
std::shared_ptr<Thread> selected;
228+
{
229+
std::lock_guard lock(mutex_);
230+
auto it = std::find_if(pending_.begin(), pending_.end(), [](const std::shared_ptr<Thread> &thread)
231+
{ return thread && thread->ready(); });
232+
if (it == pending_.end())
233+
{
234+
return;
235+
}
236+
selected = *it;
237+
pending_.erase(it);
238+
}
239+
if (selected)
240+
{
241+
selected->resume();
242+
}
243+
}
244+
245+
inline void Store::schedule(const std::shared_ptr<Thread> &thread)
246+
{
247+
if (!thread)
248+
{
249+
return;
250+
}
251+
std::lock_guard lock(mutex_);
252+
pending_.push_back(thread);
253+
}
254+
255+
inline std::size_t Store::pending_size() const
256+
{
257+
std::lock_guard lock(mutex_);
258+
return pending_.size();
259+
}
260+
}
261+
262+
#endif

0 commit comments

Comments
 (0)