Skip to content

Commit 1d30cd0

Browse files
committed
Allow same-instance stream/future writes when element is empty
1 parent 31256b8 commit 1d30cd0

3 files changed

Lines changed: 73 additions & 8 deletions

File tree

design/mvp/CanonicalABI.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,8 @@ but in the opposite direction. Both are implemented by a single underlying
12131213
self.pending_on_copy_done = on_copy_done
12141214
return 'blocked'
12151215
else:
1216-
trap_if(inst is self.pending_inst) # temporary
1216+
assert(self.t == src.t == dst.t)
1217+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
12171218
ncopy = min(src.remain(), dst.remain())
12181219
assert(ncopy > 0)
12191220
dst.write(src.read(ncopy))
@@ -1224,10 +1225,11 @@ but in the opposite direction. Both are implemented by a single underlying
12241225
return 'done'
12251226
```
12261227
Currently, there is a trap when both the `read` and `write` come from the same
1227-
component instance, but this trapping condition will be removed in a subsequent
1228-
release. The reason for this trap is that when lifting and lowering can alias
1229-
the same memory, interleaving must be handled carefully. Future improvements to
1230-
the Canonical ABI ([lazy lowering]) can greatly simplify this interleaving.
1228+
component instance and there is a non-empty element type. This trap will be
1229+
removed in a subsequent release; the reason for the trap is that when lifting
1230+
and lowering can alias the same memory, interleavings can be complex and must
1231+
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
1232+
can greatly simplify this interleaving and be more practical to implement.
12311233

12321234
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
12331235
are actually stored in the `waitables` table. The classes are almost entirely

design/mvp/canonical-abi/definitions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,8 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
716716
self.pending_on_copy_done = on_copy_done
717717
return 'blocked'
718718
else:
719-
trap_if(inst is self.pending_inst) # temporary
719+
assert(self.t == src.t == dst.t)
720+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
720721
ncopy = min(src.remain(), dst.remain())
721722
assert(ncopy > 0)
722723
dst.write(src.read(ncopy))

design/mvp/canonical-abi/run_tests.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,7 @@ def write(self, vs):
10581058

10591059
class HostSink:
10601060
stream: ReadableStream
1061+
t: ValType
10611062
received: list[int]
10621063
chunk: int
10631064
write_remain: int
@@ -1066,6 +1067,7 @@ class HostSink:
10661067

10671068
def __init__(self, stream, chunk, remain = 2**64):
10681069
self.stream = stream
1070+
self.t = stream.t
10691071
self.received = []
10701072
self.chunk = chunk
10711073
self.write_remain = remain
@@ -1756,10 +1758,12 @@ async def core_func(task, args):
17561758

17571759

17581760
class HostFutureSink:
1761+
t: ValType
17591762
v: Optional[any]
17601763
has_v: asyncio.Event
17611764

1762-
def __init__(self):
1765+
def __init__(self, t):
1766+
self.t = t
17631767
self.v = None
17641768
self.has_v = asyncio.Event()
17651769

@@ -1824,7 +1828,7 @@ async def host_func(task, on_start, on_return, on_block):
18241828
[future] = on_start()
18251829
outgoing = HostFutureSource(U8Type())
18261830
on_return([outgoing])
1827-
incoming = HostFutureSink()
1831+
incoming = HostFutureSink(U8Type())
18281832
future.read(None, incoming, lambda:(), lambda why:())
18291833
await on_block(incoming.has_v.wait())
18301834
assert(incoming.v == 42)
@@ -1894,6 +1898,63 @@ async def core_func(task, args):
18941898

18951899
await canon_lift(lift_opts, inst, FuncType([],[]), core_func, None, lambda:[], lambda _:(), host_on_block)
18961900

1901+
async def test_self_empty():
1902+
inst = ComponentInstance()
1903+
mem = bytearray(24)
1904+
sync_opts = mk_opts(memory=mem, sync=True)
1905+
async_opts = mk_opts(memory=mem, sync=False)
1906+
1907+
ft = FuncType([],[])
1908+
async def core_func(task, args):
1909+
[seti] = await canon_waitable_set_new(task)
1910+
1911+
[packed] = await canon_future_new(None, task)
1912+
rfi,wfi = unpack_new_ends(packed)
1913+
1914+
[ret] = await canon_future_write(None, async_opts, task, wfi, 10000)
1915+
assert(ret == definitions.BLOCKED)
1916+
1917+
[ret] = await canon_future_read(None, async_opts, task, rfi, 20000)
1918+
result,n = unpack_result(ret)
1919+
assert(n == 1 and result == definitions.CLOSED)
1920+
[] = await canon_future_close_readable(None, task, rfi)
1921+
1922+
[] = await canon_waitable_join(task, wfi, seti)
1923+
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
1924+
assert(event == EventCode.FUTURE_WRITE)
1925+
assert(mem[0] == wfi)
1926+
result,n = unpack_result(mem[4])
1927+
assert(result == definitions.CLOSED)
1928+
assert(n == 1)
1929+
[] = await canon_future_close_writable(None, task, wfi)
1930+
1931+
[packed] = await canon_stream_new(None, task)
1932+
rsi,wsi = unpack_new_ends(packed)
1933+
[ret] = await canon_stream_write(None, async_opts, task, wsi, 10000, 3)
1934+
assert(ret == definitions.BLOCKED)
1935+
1936+
[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 1)
1937+
result,n = unpack_result(ret)
1938+
assert(n == 1 and result == definitions.COMPLETED)
1939+
[ret] = await canon_stream_read(None, async_opts, task, rsi, 2000, 4)
1940+
result,n = unpack_result(ret)
1941+
assert(n == 2 and result == definitions.COMPLETED)
1942+
[] = await canon_stream_close_readable(None, task, rsi)
1943+
1944+
[] = await canon_waitable_join(task, wsi, seti)
1945+
[event] = await canon_waitable_set_wait(True, mem, task, seti, 0)
1946+
assert(event == EventCode.STREAM_WRITE)
1947+
assert(mem[0] == wsi)
1948+
result,n = unpack_result(mem[4])
1949+
assert(result == definitions.CLOSED)
1950+
assert(n == 3)
1951+
[] = await canon_stream_close_writable(None, task, wsi)
1952+
1953+
[] = await canon_waitable_set_drop(task, seti)
1954+
return []
1955+
1956+
await canon_lift(sync_opts, inst, ft, core_func, None, lambda:[], lambda _:(), host_on_block)
1957+
18971958

18981959
async def run_async_tests():
18991960
await test_roundtrips()
@@ -1912,6 +1973,7 @@ async def run_async_tests():
19121973
await test_wasm_to_wasm_stream_empty()
19131974
await test_cancel_copy()
19141975
await test_futures()
1976+
await test_self_empty()
19151977

19161978
asyncio.run(run_async_tests())
19171979

0 commit comments

Comments
 (0)