Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 74 additions & 62 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1262,15 +1262,13 @@ returned to the wasm code. Lastly, `COMPLETED` indicates that at least one
value has been copied and neither `DROPPED` nor `CANCELLED` apply.

As with functions and buffers, native host code can be on either side of a
stream. Thus, streams are defined in terms of an abstract interface that can be
stream. Thus, streams are defined in terms of abstract interfaces that can be
implemented and consumed by wasm or host code (with all {wasm,host} pairings
being possible and well-defined). Since a `stream` in a function parameter or
result type always represents the transfer of the *readable* end of a stream,
the abstract stream interface is `ReadableStream` and allows a (wasm or host)
client to asynchronously read multiple values from a (wasm or host) producer.
(The absence of a dual `WritableStream` abstract interface reflects the fact
that there is no Component Model type for passing the writable end of a
stream.)
only the `ReadableStream` interface can be implemented by either wasm or the
host; the `WritableStream` interface is always written to by wasm via a
writable stream end created by `stream.new`.
```python
ReclaimBuffer = Callable[[], None]
OnCopy = Callable[[ReclaimBuffer], None]
Expand All @@ -1281,40 +1279,53 @@ class ReadableStream:
read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]

class WritableStream:
Copy link
Copy Markdown
Collaborator

@vados-cosmonic vados-cosmonic Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How strict do we want to be about this class being abstract? Not sure if we want to pull in the usual Python ABC machinery here or just add a note.

Also fine leaving this as is since this is just python flavored pseudo code :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, yeah, I looked into ABC a bit, and maybe I didn't find the Zen of that feature, but I thought it was less clear -- mostly I just want a thing that lets me write names and types without function bodies. Since this is just Python-flavored pseudo-code, I stopped there, but if you have any good ideas that are clear and succinct, feel free to suggest (in a separate PR).

t: ValType
write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]
```
The key operation is `read` which works as follows:
The key operations in these interfaces are `read` and `write` which work as
follows:
* `read` never blocks and returns its values by either synchronously or
asynchronously writing to the given `WritableBuffer` and then calling the
given `OnCopy*` callbacks to notify the caller of progress.
* `OnCopyDone` is called to indicate that the `read` is finished copying and
that the caller has regained ownership of the buffer.
* `OnCopy` is called to indicate a write has been made into the buffer.
However, there may be further writes made in the future, so the caller has
* Symmetrically, `write` never blocks and takes the value to be written
from the given `ReadableBuffer`, calling the given `OnCopy*` callbacks to
notify the caller of progress.
* `OnCopyDone` is called to indicate that the `read` or `write` is finished
copying and that the caller has regained ownership of the buffer.
* `OnCopy` is called to indicate a copy has been made to or from the buffer.
However, there may be further copies made in the future, so the caller has
*not* regained ownership of the buffer.
* The `ReclaimBuffer` callback passed to `OnCopy` allows the caller of `read` to
immediately regain ownership of the buffer once the first copy has completed.
* The `ReclaimBuffer` callback passed to `OnCopy` allows the caller of `read` or
`write` to immediately regain ownership of the buffer once the first copy has
completed.
* `cancel` is non-blocking, but does **not** guarantee that ownership of
the buffer has been returned; `cancel` only lets the caller *request* that
`read` call one of the `OnCopy*` callbacks ASAP (which may or may not happen
one of the `OnCopy*` callbacks be called ASAP (which may or may not happen
during `cancel`).
* The client may not call `read` or `drop` while there is still an unfinished
`read` of the same `ReadableStream`.
* The client may not call `read`, `write` or `drop` while there is a previous
`read` or `write` in progress.

The `OnCopy*` callbacks are a spec-internal detail used to specify the allowed
concurrent behaviors of `stream.{read,write}` and not exposed directly to core
wasm code. Specifically, the point of the `OnCopy*` callbacks is to specify that
*multiple* writes are allowed into the same `WritableBuffer` up until the point
where either the buffer is full or the calling core wasm code receives the
`STREAM_READ` progress event (in which case `ReclaimBuffer` is called). This
reduces the number of task-switches required by the spec, particularly when
streaming between two components.

The `SharedStreamImpl` class implements `ReadableStream` for streams created by
wasm (via `stream.new`) and tracks the common state shared by both the readable
and writable ends of streams (defined below). Introducing the class in chunks,
starting with the fields and initialization:
```python
class SharedStreamImpl(ReadableStream):
*multiple* reads or writes are allowed into the same `Buffer` up until the point
where either the buffer is full or the calling core wasm code receives a
`STREAM_READ` or `STREAM_WRITE` progress event (in which case `ReclaimBuffer` is
called). This reduces the number of task-switches required by the spec,
particularly when streaming between two components.

The `SharedStreamImpl` class implements both `ReadableStream` and
`WritableStream` for streams created by wasm (via `stream.new`) and tracks the
common state shared by both the readable and writable ends of streams (defined
below).

Introducing `SharedStreamImpl` in chunks, starting with the fields and initialization:
```python
class SharedStreamImpl(ReadableStream, WritableStream):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
Expand Down Expand Up @@ -1355,9 +1366,9 @@ callback:
if self.pending_buffer:
self.reset_and_notify_pending(CopyResult.DROPPED)
```
While the abstract `ReadableStream` interface *allows* `cancel` to return
without having returned ownership of the buffer (which, in general, is
necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
While the abstract `ReadableStream` and `WritableStream` interfaces *allow*
`cancel` to return without having returned ownership of the buffer (which, in
general, is necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
implementing the stream, `cancel` always returns ownership of the buffer
immediately.

Expand All @@ -1367,16 +1378,16 @@ Note that `cancel` and `drop` notify in opposite directions:
* `drop` *must not* be called on a readable or writable end with an operation
pending, and thus `drop` notifies the opposite end.

The `read` method implements the `ReadableStream.read` interface described
above and is called by either `stream.read` or the host, depending on who is
passed the readable end of the stream. If the reader is first to rendezvous,
then all the parameters are stored in the `pending_*` fields, requiring the
reader to wait for the writer to rendezvous. If the writer was first to
rendezvous, then there is already a pending `ReadableBuffer` to read from, and
so the reader copies as much as it can (which may be less than a full buffer's
worth) and eagerly completes the copy without blocking. In the final special
case where both the reader and pending writer have zero-length buffers, the
writer is notified, but the reader remains blocked:
The `read` method implements `ReadableStream.read` and is called by either
`stream.read` or the host, depending on who is passed the readable end of the
stream. If the reader is first to rendezvous, then all the parameters are
stored in the `pending_*` fields, requiring the reader to wait for the writer
to rendezvous. If the writer was first to rendezvous, then there is already a
pending `ReadableBuffer` to read from, and so the reader copies as much as it
can (which may be less than a full buffer's worth) and eagerly completes the
copy without blocking. In the final special case where both the reader and
pending writer have zero-length buffers, the writer is notified, but the reader
remains blocked:
```python
def read(self, inst, dst_buffer, on_copy, on_copy_done):
if self.dropped:
Expand All @@ -1403,13 +1414,13 @@ and lowering can alias the same memory, interleavings can be complex and must
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
can greatly simplify this interleaving and be more practical to implement.

The `write` method is symmetric to `read` (being given a `ReadableBuffer`
instead of a `WritableBuffer`) and is called by the `stream.write` built-in.
(noting that the host cannot be passed the writable end of a stream but may
instead *implement* the `ReadableStream` interface and pass the readable end
into a component). The steps for `write` are the same as `read` except for
when a zero-length `write` rendezvous with a zero-length `read`, in which case
the `write` eagerly completes, leaving the `read` pending:
The `write` method implements `WritableStream.write` and is called by the
`stream.write` built-in (noting that the host cannot be passed the writable end
of a stream but may instead *implement* the `ReadableStream` interface and pass
the readable end into a component). The steps for `write` are the same as
`read` except for when a zero-length `write` rendezvous with a zero-length
`read`, in which case the `write` eagerly completes, leaving the `read`
pending:
```python
def write(self, inst, src_buffer, on_copy, on_copy_done):
if self.dropped:
Expand Down Expand Up @@ -1456,7 +1467,7 @@ entirely symmetric, with the only difference being whether the polymorphic
`copy` method (used below) calls `read` or `write`:
```python
class StreamEnd(Waitable):
shared: ReadableStream
shared: ReadableStream|WritableStream
copying: bool
done: bool

Expand Down Expand Up @@ -1493,36 +1504,37 @@ since the async read or write cannot be cancelled without blocking and `drop`
This means that client code must take care to wait for these operations to
finish before dropping.

The `{Readable,Writable}StreamEnd.copy` method is called polymorphically by the
shared definition of `stream.{read,write}` below. While the static type of
`StreamEnd.shared` is `ReadableStream`, a `WritableStreamEnd` always points to
a `SharedStreamImpl` object which is why `WritableStreamEnd.copy` can
unconditionally call `stream.write`.


#### Future State

Futures are similar to streams, except that instead of passing 0..N values,
exactly one value is passed from the writer end to the reader end unless the
reader end is explicitly dropped first.

Like streams, futures are defined in terms of an abstract `ReadableFuture`
interface that can be implemented by the host or wasm:
Futures are defined in terms of abstract `ReadableFuture` and `WritableFuture`
interfaces:
```python
class ReadableFuture:
t: ValType
read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]

class WritableFuture:
t: ValType
write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]
```
The `ReadableFuture` interface works like `ReadableStream` except that there is
no `OnCopy` callback passed to `read` to report partial progress (since at most
1 value is copied) and the given `WritableBuffer` must have `remain() == 1`.
These interfaces work like `ReadableStream` and `WritableStream` except that
there is no `OnCopy` callback passed to `read` or `write` to report partial
progress (since at most 1 value is copied) and the given `Buffer` must have
`remain() == 1`.

Introducing `SharedFutureImpl` in chunks, the first part is exactly
symmetric to `SharedStreamImpl` in how initialization and cancellation work:
```python
class SharedFutureImpl(ReadableFuture):
class SharedFutureImpl(ReadableFuture, WritableFuture):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
Expand Down Expand Up @@ -1597,7 +1609,7 @@ Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with
value or been notified of the reader dropping their end:
```python
class FutureEnd(Waitable):
shared: ReadableFuture
shared: ReadableFuture|WritableFuture
copying: bool
done: bool

Expand Down
20 changes: 16 additions & 4 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,13 @@ class ReadableStream:
cancel: Callable[[], None]
drop: Callable[[], None]

class SharedStreamImpl(ReadableStream):
class WritableStream:
t: ValType
write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]

class SharedStreamImpl(ReadableStream, WritableStream):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
Expand Down Expand Up @@ -841,7 +847,7 @@ def write(self, inst, src_buffer, on_copy, on_copy_done):
self.set_pending(inst, src_buffer, on_copy, on_copy_done)

class StreamEnd(Waitable):
shared: ReadableStream
shared: ReadableStream|WritableStream
copying: bool
done: bool

Expand Down Expand Up @@ -872,7 +878,13 @@ class ReadableFuture:
cancel: Callable[[], None]
drop: Callable[[], None]

class SharedFutureImpl(ReadableFuture):
class WritableFuture:
t: ValType
write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]
cancel: Callable[[], None]
drop: Callable[[], None]

class SharedFutureImpl(ReadableFuture, WritableFuture):
dropped: bool
pending_inst: Optional[ComponentInstance]
pending_buffer: Optional[Buffer]
Expand Down Expand Up @@ -929,7 +941,7 @@ def write(self, inst, src_buffer, on_copy_done):
on_copy_done(CopyResult.COMPLETED)

class FutureEnd(Waitable):
shared: ReadableFuture
shared: ReadableFuture|WritableFuture
copying: bool
done: bool

Expand Down