Skip to content

Commit c0df353

Browse files
fix(moonbit): clean up async resource transfer failures
1 parent 2d5bd00 commit c0df353

14 files changed

Lines changed: 550 additions & 104 deletions

File tree

crates/moonbit/src/async/trait.mbt

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,12 @@ pub async fn[X] Sink::write(self : Sink[X], data : ArrayView[X]) -> Int {
6161
/// Returns `true` if all items were written and `false` if the sink stopped
6262
/// accepting data before completion.
6363
pub async fn[X] Sink::write_all(self : Sink[X], data : ArrayView[X]) -> Bool {
64-
if data.length() == 0 {
65-
return true
66-
}
67-
let pending : Array[X] = []
68-
for i = 0; i < data.length(); i = i + 1 {
69-
pending.push(data[i])
70-
}
7164
let mut offset = 0
7265
for ;; {
73-
if offset >= pending.length() {
66+
if offset >= data.length() {
7467
return true
7568
}
76-
let written = self.write(pending[offset:])
69+
let written = self.write(data[offset:])
7770
if written <= 0 {
7871
return false
7972
}

crates/moonbit/src/async_support.rs

Lines changed: 163 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ use std::{
66
use heck::ToUpperCamelCase;
77
use wit_bindgen_core::{
88
Direction, Files, Source,
9-
abi::{self, WasmSignature, deallocate_lists_in_types, lift_from_memory},
9+
abi::{
10+
self, WasmSignature, deallocate_lists_and_own_in_types, deallocate_lists_in_types,
11+
lift_from_memory,
12+
},
1013
dealias, uwriteln,
1114
wit_parser::{
1215
Function, LiftLowerAbi, ManglingAndAbi, Param, Type, TypeDefKind, TypeId, WasmImport,
@@ -501,12 +504,26 @@ fn wasmLower{camel_name}{index}(future : {lifted}) -> Int {{
501504
&inner_ty,
502505
);
503506
uwriteln!(lower, "{}", bindgen.src);
507+
let mut builtins = bindgen.take_local_ffi_imports();
504508
uwriteln!(
505509
lower,
506510
r#"
507-
let _ = {async_qualifier}suspend_for_future_write(writable, wasmLower{camel_name}{index}Write(writable, ret_area)) catch {{ _ => false }}"#
511+
let transferred = {async_qualifier}suspend_for_future_write(writable, wasmLower{camel_name}{index}Write(writable, ret_area)) catch {{ _ => false }}
512+
if !transferred {{"#
508513
);
509-
bindgen.take_local_ffi_imports()
514+
let mut cleanup_bindgen =
515+
FunctionBindgen::new(self, Box::new([]), Direction::Export, true, false);
516+
deallocate_lists_and_own_in_types(
517+
&resolve,
518+
&[inner_ty],
519+
&["ret_area".to_string()],
520+
true,
521+
&mut cleanup_bindgen,
522+
);
523+
uwriteln!(lower, "{}", cleanup_bindgen.src);
524+
builtins.extend(cleanup_bindgen.take_local_ffi_imports());
525+
uwriteln!(lower, " }}");
526+
builtins
510527
} else {
511528
// Unit type - no value to write, just complete the future
512529
uwriteln!(
@@ -744,37 +761,39 @@ fn wasmLower{camel_name}{index}(stream : {lifted}) -> Int {{
744761
}}"#
745762
);
746763

747-
let lower_builtins = if let Some(inner_ty) = inner_type {
748-
let resolve = self.resolve.clone();
749-
let elem_type = self.world_gen.pkg_resolver.type_name(self.name, &inner_ty);
750-
let mut lower_bindgen =
751-
FunctionBindgen::new(self, Box::new([]), Direction::Export, true, false);
752-
lower_bindgen.use_ffi(ffi::MALLOC);
753-
lower_bindgen.use_ffi(ffi::FREE);
764+
let (lower_builtins, bridge_cleanup, bridge_cleanup_uses_memory, bridge_cleanup_lower) =
765+
if let Some(inner_ty) = inner_type {
766+
let resolve = self.resolve.clone();
767+
let elem_type = self.world_gen.pkg_resolver.type_name(self.name, &inner_ty);
768+
let mut lower_bindgen =
769+
FunctionBindgen::new(self, Box::new([]), Direction::Export, true, false);
770+
lower_bindgen.use_ffi(ffi::MALLOC);
771+
lower_bindgen.use_ffi(ffi::FREE);
754772

755-
uwriteln!(
756-
lower,
757-
r#"
773+
uwriteln!(
774+
lower,
775+
r#"
758776
let ptr = mbt_ffi_malloc(data.length() * {elem_size})
759777
defer mbt_ffi_free(ptr)
760778
for i = 0; i < data.length(); i = i + 1 {{
761779
let elem_ptr = ptr + i * {elem_size}
762780
let elem : {elem_type} = data[i]"#
763-
);
781+
);
764782

765-
abi::lower_to_memory(
766-
&resolve,
767-
&mut lower_bindgen,
768-
"elem_ptr".to_string(),
769-
"elem".to_string(),
770-
&inner_ty,
771-
);
772-
uwriteln!(lower, "{}", lower_bindgen.src);
773-
uwriteln!(lower, " }}");
783+
abi::lower_to_memory(
784+
&resolve,
785+
&mut lower_bindgen,
786+
"elem_ptr".to_string(),
787+
"elem".to_string(),
788+
&inner_ty,
789+
);
790+
uwriteln!(lower, "{}", lower_bindgen.src);
791+
uwriteln!(lower, " }}");
792+
let mut builtins = lower_bindgen.take_local_ffi_imports();
774793

775-
uwriteln!(
776-
lower,
777-
r#"
794+
uwriteln!(
795+
lower,
796+
r#"
778797
let (progress, dropped) = {async_qualifier}suspend_for_stream_write(
779798
writable,
780799
wasmLower{camel_name}{index}Write(writable, ptr, data.length()),
@@ -784,13 +803,41 @@ fn wasmLower{camel_name}{index}(stream : {lifted}) -> Int {{
784803
wasmLower{camel_name}{index}DropWritable(writable)
785804
}}
786805
progress"#
787-
);
788-
lower_bindgen.take_local_ffi_imports()
789-
} else {
790-
// Unit type stream
791-
uwriteln!(
792-
lower,
793-
r#"
806+
);
807+
let mut cleanup_lower_bindgen =
808+
FunctionBindgen::new(self, Box::new([]), Direction::Export, true, false);
809+
abi::lower_to_memory(
810+
&resolve,
811+
&mut cleanup_lower_bindgen,
812+
"elem_ptr".to_string(),
813+
"elem".to_string(),
814+
&inner_ty,
815+
);
816+
let cleanup_lower_src = cleanup_lower_bindgen.src.clone();
817+
builtins.extend(cleanup_lower_bindgen.take_local_ffi_imports());
818+
let mut cleanup_bindgen =
819+
FunctionBindgen::new(self, Box::new([]), Direction::Export, true, false);
820+
cleanup_bindgen.use_ffi(ffi::MALLOC);
821+
cleanup_bindgen.use_ffi(ffi::FREE);
822+
deallocate_lists_and_own_in_types(
823+
&resolve,
824+
&[inner_ty],
825+
&["elem_ptr".to_string()],
826+
true,
827+
&mut cleanup_bindgen,
828+
);
829+
builtins.extend(cleanup_bindgen.take_local_ffi_imports());
830+
(
831+
builtins,
832+
Some(cleanup_bindgen.src),
833+
true,
834+
Some(cleanup_lower_src),
835+
)
836+
} else {
837+
// Unit type stream
838+
uwriteln!(
839+
lower,
840+
r#"
794841
let (progress, dropped) = {async_qualifier}suspend_for_stream_write(
795842
writable,
796843
wasmLower{camel_name}{index}Write(writable, 0, data.length()),
@@ -800,9 +847,9 @@ fn wasmLower{camel_name}{index}(stream : {lifted}) -> Int {{
800847
wasmLower{camel_name}{index}DropWritable(writable)
801848
}}
802849
progress"#
803-
);
804-
HashSet::new()
805-
};
850+
);
851+
(HashSet::new(), None, false, None)
852+
};
806853

807854
uwriteln!(
808855
lower,
@@ -823,15 +870,88 @@ fn wasmLower{camel_name}{index}(stream : {lifted}) -> Int {{
823870
None => {{
824871
for ;; {{
825872
match stream.read(64) {{
826-
Some(data) => {{
827-
let pending = []
828-
for i = 0; i < data.length(); i = i + 1 {{
829-
pending.push(data[i])
830-
}}
831-
if !sink.write_all(pending[:]) {{
873+
Some(data) => {{"#
874+
);
875+
if let Some(cleanup) = bridge_cleanup {
876+
uwriteln!(
877+
lower,
878+
r#"
879+
let mut offset = 0
880+
for ;; {{
881+
if offset >= data.length() {{
882+
break
883+
}}
884+
let written = sink.write(data[offset:])
885+
if written <= 0 {{
886+
for i = offset; i < data.length(); i = i + 1 {{
887+
let elem = data[i]"#
888+
);
889+
if bridge_cleanup_uses_memory {
890+
uwriteln!(
891+
lower,
892+
r#"
893+
let elem_ptr = mbt_ffi_malloc({elem_size})
894+
defer mbt_ffi_free(elem_ptr)"#
895+
);
896+
if let Some(cleanup_lower) = &bridge_cleanup_lower {
897+
uwriteln!(lower, "{}", cleanup_lower);
898+
}
899+
}
900+
uwriteln!(lower, "{}", cleanup);
901+
uwriteln!(
902+
lower,
903+
r#"
904+
}}
905+
stream.close()
906+
match stream {{
907+
{async_qualifier}Stream::Local(_, _) => {{
908+
for ;; {{
909+
match stream.read(64) {{
910+
Some(buffered) => {{
911+
for j = 0; j < buffered.length(); j = j + 1 {{
912+
let elem = buffered[j]"#
913+
);
914+
if bridge_cleanup_uses_memory {
915+
uwriteln!(
916+
lower,
917+
r#"
918+
let elem_ptr = mbt_ffi_malloc({elem_size})
919+
defer mbt_ffi_free(elem_ptr)"#
920+
);
921+
if let Some(cleanup_lower) = &bridge_cleanup_lower {
922+
uwriteln!(lower, "{}", cleanup_lower);
923+
}
924+
}
925+
uwriteln!(lower, "{}", cleanup);
926+
uwriteln!(
927+
lower,
928+
r#"
929+
}}
930+
}}
931+
None => break
932+
}}
933+
}}
934+
}}
935+
_ => ()
936+
}}
937+
return
938+
}}
939+
offset += written
940+
}}"#
941+
);
942+
} else {
943+
uwriteln!(
944+
lower,
945+
r#"
946+
if !sink.write_all(data) {{
832947
stream.close()
833948
return
834-
}}
949+
}}"#
950+
);
951+
}
952+
uwriteln!(
953+
lower,
954+
r#"
835955
}}
836956
None => {{
837957
sink.close()

0 commit comments

Comments
 (0)