Skip to content

Commit 9a16638

Browse files
refactor(moonbit): simplify async bindings
1 parent ce6fd89 commit 9a16638

35 files changed

Lines changed: 304 additions & 365 deletions

File tree

crates/moonbit/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Stream + Sink (batched reads/writes):
3030
let (s, sink) = @async.Stream::new[Byte]()
3131
@async.spawn(async fn() {
3232
let chunk : Array[Byte] = [1, 2, 3, 4]
33-
let _ = sink.write(chunk[:])
33+
assert_true(sink.write_all(chunk[:]))
3434
sink.close()
3535
})
3636
let chunk = s.read(4096)
@@ -43,8 +43,9 @@ match chunk {
4343
```
4444

4545
`Stream::read(count)` returns up to `count` elements; `Sink::write` accepts
46-
`ArrayView[T]` so byte streams can batch data efficiently. `Stream::new`
47-
accepts an optional `capacity` (<= 0 means unbounded).
46+
`ArrayView[T]` so byte streams can batch data efficiently, and
47+
`Sink::write_all` loops until the whole slice is accepted or the sink closes.
48+
`Stream::new` accepts an optional `capacity` (<= 0 means unbounded).
4849

4950
## Testing
5051

crates/moonbit/src/async/coroutine.mbt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ fn Coroutine::wake(self : Coroutine) -> Unit {
5050
///|
5151
pub fn is_being_cancelled() -> Bool {
5252
let coro = current_coroutine()
53-
coro.cancelled && not(coro.shielded)
53+
coro.cancelled && !coro.shielded
5454
}
5555

5656
///|
@@ -59,15 +59,15 @@ pub(all) suberror Cancelled derive(Show)
5959
///|
6060
fn Coroutine::cancel(self : Coroutine) -> Unit {
6161
self.cancelled = true
62-
if not(self.shielded || self.ready) {
62+
if !(self.shielded || self.ready) {
6363
self.wake()
6464
}
6565
}
6666

6767
///|
6868
pub async fn pause() -> Unit raise Cancelled {
6969
guard scheduler.curr_coro is Some(coro)
70-
if coro.cancelled && not(coro.shielded) {
70+
if coro.cancelled && !coro.shielded {
7171
raise Cancelled::Cancelled
7272
}
7373
async_suspend(fn(ok_cont, err_cont) {
@@ -81,7 +81,7 @@ pub async fn pause() -> Unit raise Cancelled {
8181
///|
8282
pub async fn suspend() -> Unit raise Cancelled {
8383
guard scheduler.curr_coro is Some(coro)
84-
if coro.cancelled && not(coro.shielded) {
84+
if coro.cancelled && !coro.shielded {
8585
raise Cancelled::Cancelled
8686
}
8787
scheduler.blocking += 1
@@ -147,7 +147,7 @@ fn Coroutine::unwrap(self : Coroutine) -> Unit raise {
147147
///|
148148
async fn Coroutine::wait(target : Coroutine) -> Unit {
149149
guard scheduler.curr_coro is Some(coro)
150-
guard not(physical_equal(coro, target))
150+
guard !physical_equal(coro, target)
151151
match target.state {
152152
Done => return
153153
Fail(err) => raise err

crates/moonbit/src/async/ev.mbt

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
priv struct EventLoop {
1717
subscribes : Map[Int, Subscriber]
1818
tasks : Map[WaitableSet, Coroutine]
19+
owned_coroutines : Map[WaitableSet, @set.Set[Coroutine]]
1920
finished : Map[WaitableSet, Bool]
2021
}
2122

@@ -39,6 +40,7 @@ fn should_complete(waitable_set : WaitableSet) -> Bool {
3940
fn next_callback(waitable_set : WaitableSet) -> Int {
4041
if should_complete(waitable_set) {
4142
ev.tasks.remove(waitable_set)
43+
ev.owned_coroutines.remove(waitable_set)
4244
ev.finished.remove(waitable_set)
4345
waitable_set.drop()
4446
return CallbackCode::Completed.encode()
@@ -52,12 +54,17 @@ fn next_callback(waitable_set : WaitableSet) -> Int {
5254
///|
5355
pub fn with_waitableset(f : async () -> Unit) -> Int {
5456
let waitable_set = WaitableSet::new()
57+
let owned = @set.Set::new()
5558
tls_set(waitable_set.0)
5659
let coro = spawn(async fn() -> Unit {
60+
let coro = current_coroutine()
61+
defer owned.remove(coro)
5762
defer ev.finished.set(waitable_set, true)
5863
f()
5964
})
6065
ev.tasks.set(waitable_set, coro)
66+
owned.add(coro)
67+
ev.owned_coroutines.set(waitable_set, owned)
6168
ev.finished.set(waitable_set, false)
6269
reschedule()
6370
next_callback(waitable_set)
@@ -75,13 +82,17 @@ pub fn cb(event : Int, waitable_id : Int, code : Int) -> Int {
7582
TaskCancelled => {
7683
guard ev.tasks.get(waitable_set) is Some(coro)
7784
coro.cancel()
78-
for {
85+
if ev.owned_coroutines.get(waitable_set) is Some(owned) {
86+
owned.each(Coroutine::cancel)
87+
}
88+
for ;; {
7989
reschedule()
8090
if !has_immediately_ready_task() {
8191
break
8292
}
8393
}
8494
ev.tasks.remove(waitable_set)
95+
ev.owned_coroutines.remove(waitable_set)
8596
ev.finished.remove(waitable_set)
8697
if ev.subscribes.is_empty() {
8798
waitable_set.drop()
@@ -104,7 +115,32 @@ pub fn cb(event : Int, waitable_id : Int, code : Int) -> Int {
104115
}
105116

106117
///|
107-
let ev : EventLoop = { subscribes: {}, tasks: {}, finished: {} }
118+
let ev : EventLoop = {
119+
subscribes: {},
120+
tasks: {},
121+
owned_coroutines: {},
122+
finished: {},
123+
}
124+
125+
///|
126+
/// Spawn a coroutine owned by the current component-model async task.
127+
///
128+
/// This is intended for runtime bridge work such as lowering local futures and
129+
/// streams to component-model handles. The coroutine does not participate in
130+
/// `TaskGroup` structured concurrency, but it is cancelled when the current
131+
/// component-model task is cancelled and keeps the waitable-set alive until it
132+
/// terminates.
133+
pub fn spawn_component_task_current(f : async () -> Unit) -> Unit {
134+
let waitable_set = current_waitableset()
135+
guard ev.tasks.get(waitable_set) is Some(_)
136+
guard ev.owned_coroutines.get(waitable_set) is Some(owned)
137+
let coro = spawn(async fn() -> Unit {
138+
let coro = current_coroutine()
139+
defer owned.remove(coro)
140+
f()
141+
})
142+
owned.add(coro)
143+
}
108144

109145
///|
110146
pub fn detach_waitable(waitable_id : Int) -> Unit {
@@ -125,7 +161,7 @@ pub async fn suspend_for_subtask(
125161

126162
// Helper: ensure cleanup is called once we've moved past Starting state
127163
fn ensure_cleanup(state : SubTaskState) -> Unit {
128-
if not(cleaned) && !(state is Starting) {
164+
if !cleaned && !(state is Starting) {
129165
cleanup_after_started()
130166
cleaned = true
131167
}
@@ -159,7 +195,7 @@ pub async fn suspend_for_subtask(
159195
let set = @set.Set::new()
160196
let subscriber = { event: None, coro: set }
161197
ev.subscribes.set(task.handle, subscriber)
162-
for {
198+
for ;; {
163199
set.add(current_coroutine())
164200
waitable_join(task.handle, current_waitableset().0)
165201
defer subscriber.coro.remove(current_coroutine())

crates/moonbit/src/async/moon.pkg.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
{ "path": "moonbitlang/core/ref", "alias": "ref" },
66
{ "path": "moonbitlang/core/set", "alias": "set" }
77
],
8-
"supported-targets": ["wasm"]
8+
"supported-targets": "+wasm"
99
}

crates/moonbit/src/async/scheduler.mbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub fn reschedule() -> Unit {
5454
coro.state = Running
5555
let last_coro = scheduler.curr_coro
5656
scheduler.curr_coro = Some(coro)
57-
if coro.cancelled && not(coro.shielded) {
57+
if coro.cancelled && !coro.shielded {
5858
err_cont(Cancelled::Cancelled)
5959
} else {
6060
ok_cont(())

crates/moonbit/src/async/task_group.mbt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ fn[X] TaskGroup::spawn_coroutine(
6464
guard self.state is Running else {
6565
abort("trying to spawn from a terminated task group")
6666
}
67-
if not(no_wait) {
67+
if !no_wait {
6868
self.waiting += 1
6969
}
7070
async fn worker() {
7171
let coro = current_coroutine()
7272
defer {
7373
self.children.remove(coro)
74-
if not(no_wait) {
74+
if !no_wait {
7575
self.waiting -= 1
7676
if self.waiting == 0 && self.state is Running {
7777
for child in self.children {
@@ -93,7 +93,7 @@ fn[X] TaskGroup::spawn_coroutine(
9393
child.cancel()
9494
}
9595
self.state = Fail(err)
96-
} else if not(err is Cancelled::Cancelled) {
96+
} else if !(err is Cancelled::Cancelled) {
9797
self.state = Fail(err)
9898
}
9999
raise err
@@ -212,7 +212,7 @@ pub async fn[X] with_task_group(f : async (TaskGroup[X]) -> X) -> X {
212212
tg.result = Some(value)
213213
}
214214
})
215-
if not(tg.children.is_empty()) {
215+
if !tg.children.is_empty() {
216216
suspend() catch {
217217
err =>
218218
if tg.state is Running {
@@ -223,7 +223,7 @@ pub async fn[X] with_task_group(f : async (TaskGroup[X]) -> X) -> X {
223223
}
224224
}
225225
}
226-
if not(tg.children.is_empty()) {
226+
if !tg.children.is_empty() {
227227
protect_from_cancel(() => suspend()) catch {
228228
_ => ()
229229
}

0 commit comments

Comments
 (0)