Skip to content

Commit cff17ba

Browse files
feat: async support in MoonBit (#1393)
* feat: moonbit support async component model & use include_str for embedded moonbit code * test: add moonbit tests for async component model * style: cargo fmt * chore: add features section in Cargo.toml * refactor: rename WaitableTrait to Waitable and update related implementations * style: clippy fix * test: fix stub.mbt function name * refactor: use FutureReader instead of Future * refactor: remove defer from Waitable trait & fix callback Event handling * feat: update future/stream import function signatures to include kind * fix: enhance task cancel logic & refactor Waitable cancel method * refactor: async-wasm module for improved task management and error handling feat: support async component model `Stream` methods * refactor: remove unused WASIP1 * test: add moonbit async tests * refactor: use `_async_debug` instead of `print` function * refactor: remove unused `SubtaskStatus::encode` function and clean up test case * feat: implement indirect parameter handling and clean up unused type methods * feat: enhance waitable task management with resource tracking and improved drop semantics * test: refactor tests using updated syntax * fix: improve resource management * refactor: improve async handling * chore: fix type declarations * chore: remove extra debug function * Revert "refactor: improve async handling" This reverts commit 0f2a664. * chore: add back waitable_task change * fix: make future/stream type vtable generate correctly * chore: clean up unused function generation --------- Co-authored-by: BigOrangeQWQ <2284086963@qq.com>
1 parent 37f1ece commit cff17ba

15 files changed

Lines changed: 2537 additions & 289 deletions

File tree

crates/moonbit/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ anyhow = { workspace = true }
1717
wit-bindgen-core = { workspace = true }
1818
heck = { workspace = true }
1919
clap = { workspace = true, optional = true }
20+
21+
[features]
22+
clap = ['dep:clap', 'wit-bindgen-core/clap']
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
///|
2+
async fn[T, E : Error] async_suspend(
3+
cb : ((T) -> Unit, (E) -> Unit) -> Unit,
4+
) -> T raise E = "%async.suspend"
5+
6+
///|
7+
fn run_async(f : async () -> Unit noraise) = "%async.run"
8+
9+
10+
///|
11+
priv enum State {
12+
Done
13+
Fail(Error)
14+
Running
15+
Suspend(ok_cont~ : (Unit) -> Unit, err_cont~ : (Error) -> Unit)
16+
}
17+
18+
///|
19+
struct Coroutine {
20+
coro_id : Int
21+
mut state : State
22+
mut shielded : Bool
23+
mut cancelled : Bool
24+
mut ready : Bool
25+
downstream : Map[Int, Coroutine]
26+
}
27+
28+
///|
29+
pub impl Eq for Coroutine with equal(c1, c2) {
30+
c1.coro_id == c2.coro_id
31+
}
32+
33+
///|
34+
pub impl Hash for Coroutine with hash_combine(self, hasher) {
35+
self.coro_id.hash_combine(hasher)
36+
}
37+
38+
///|
39+
pub fn Coroutine::wake(self : Coroutine) -> Unit {
40+
self.ready = true
41+
scheduler.run_later.push_back(self)
42+
}
43+
44+
///|
45+
pub fn Coroutine::run(self : Coroutine) -> Unit {
46+
self.ready = true
47+
scheduler.run_later.push_front(self)
48+
}
49+
50+
///|
51+
pub fn Coroutine::is_done(self : Coroutine) -> Bool {
52+
match self.state {
53+
Done => true
54+
Fail(_) => true
55+
Running | Suspend(_) => false
56+
}
57+
}
58+
59+
///|
60+
pub fn is_being_cancelled() -> Bool {
61+
current_coroutine().cancelled
62+
}
63+
64+
///|
65+
pub fn current_coroutine_done() -> Bool {
66+
guard scheduler.curr_coro is Some(coro) else { return true }
67+
coro.is_done()
68+
}
69+
70+
///|
71+
pub(all) suberror Cancelled derive(Show)
72+
73+
///|
74+
pub fn Coroutine::cancel(self : Coroutine) -> Unit {
75+
self.cancelled = true
76+
if not(self.shielded || self.ready) {
77+
self.wake()
78+
}
79+
}
80+
81+
///|
82+
pub async fn pause() -> Unit {
83+
guard scheduler.curr_coro is Some(coro)
84+
if coro.cancelled && not(coro.shielded) {
85+
raise Cancelled::Cancelled
86+
}
87+
async_suspend(fn(ok_cont, err_cont) {
88+
guard coro.state is Running
89+
coro.state = Suspend(ok_cont~, err_cont~)
90+
coro.ready = true
91+
scheduler.run_later.push_back(coro)
92+
})
93+
}
94+
95+
///|
96+
pub async fn suspend() -> Unit raise {
97+
guard scheduler.curr_coro is Some(coro)
98+
if coro.cancelled && not(coro.shielded) {
99+
raise Cancelled::Cancelled
100+
}
101+
scheduler.blocking += 1
102+
defer {
103+
scheduler.blocking -= 1
104+
}
105+
async_suspend(fn(ok_cont, err_cont) {
106+
guard coro.state is Running
107+
coro.state = Suspend(ok_cont~, err_cont~)
108+
})
109+
}
110+
111+
///|
112+
pub fn spawn(f : async () -> Unit raise) -> Coroutine {
113+
scheduler.coro_id += 1
114+
let coro = {
115+
state: Running,
116+
ready: true,
117+
shielded: false,
118+
downstream: {},
119+
coro_id: scheduler.coro_id,
120+
cancelled: false,
121+
}
122+
fn run(_) {
123+
run_async(fn() {
124+
coro.shielded = false
125+
try f() catch {
126+
err => coro.state = Fail(err)
127+
} noraise {
128+
_ => coro.state = Done
129+
}
130+
for _, coro in coro.downstream {
131+
coro.wake()
132+
}
133+
coro.downstream.clear()
134+
})
135+
}
136+
137+
coro.state = Suspend(ok_cont=run, err_cont=_ => ())
138+
scheduler.run_later.push_back(coro)
139+
coro
140+
}
141+
142+
///|
143+
pub fn Coroutine::unwrap(self : Coroutine) -> Unit raise {
144+
match self.state {
145+
Done => ()
146+
Fail(err) => raise err
147+
Running | Suspend(_) => panic()
148+
}
149+
}
150+
151+
///|
152+
pub async fn Coroutine::wait(target : Coroutine) -> Unit raise {
153+
guard scheduler.curr_coro is Some(coro)
154+
guard not(physical_equal(coro, target))
155+
match target.state {
156+
Done => return
157+
Fail(err) => raise err
158+
Running | Suspend(_) => ()
159+
}
160+
target.downstream[coro.coro_id] = coro
161+
try suspend() catch {
162+
err => {
163+
target.downstream.remove(coro.coro_id)
164+
raise err
165+
}
166+
} noraise {
167+
_ => target.unwrap()
168+
}
169+
}
170+
171+
///|
172+
pub async fn protect_from_cancel(f : async () -> Unit raise) -> Unit raise {
173+
guard scheduler.curr_coro is Some(coro)
174+
if coro.shielded {
175+
// already in a shield, do nothing
176+
f()
177+
} else {
178+
coro.shielded = true
179+
defer {
180+
coro.shielded = false
181+
}
182+
f()
183+
if coro.cancelled {
184+
raise Cancelled::Cancelled
185+
}
186+
}
187+
}
188+
189+
190+
///|
191+
priv struct Scheduler {
192+
mut coro_id : Int
193+
mut curr_coro : Coroutine?
194+
mut blocking : Int
195+
run_later : @deque.Deque[Coroutine]
196+
}
197+
198+
///|
199+
let scheduler : Scheduler = {
200+
coro_id: 0,
201+
curr_coro: None,
202+
blocking: 0,
203+
run_later: @deque.new(),
204+
}
205+
206+
///|
207+
pub fn current_coroutine() -> Coroutine {
208+
scheduler.curr_coro.unwrap()
209+
}
210+
211+
///|
212+
pub fn no_more_work() -> Bool {
213+
scheduler.blocking == 0 && scheduler.run_later.is_empty()
214+
}
215+
216+
///|
217+
pub fn rschedule() -> Unit {
218+
while scheduler.run_later.pop_front() is Some(coro) {
219+
coro.ready = false
220+
guard coro.state is Suspend(ok_cont~, err_cont~) else { }
221+
coro.state = Running
222+
let last_coro = scheduler.curr_coro
223+
scheduler.curr_coro = Some(coro)
224+
if coro.cancelled && !coro.shielded {
225+
err_cont(Cancelled::Cancelled)
226+
} else {
227+
ok_cont(())
228+
}
229+
scheduler.curr_coro = last_coro
230+
}
231+
}

0 commit comments

Comments
 (0)