Skip to content

Commit 6e723bc

Browse files
committed
Clarify how 'write' operation works in the Python code (no change)
1 parent e362068 commit 6e723bc

2 files changed

Lines changed: 89 additions & 66 deletions

File tree

design/mvp/CanonicalABI.md

Lines changed: 73 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,15 +1262,13 @@ returned to the wasm code. Lastly, `COMPLETED` indicates that at least one
12621262
value has been copied and neither `DROPPED` nor `CANCELLED` apply.
12631263

12641264
As with functions and buffers, native host code can be on either side of a
1265-
stream. Thus, streams are defined in terms of an abstract interface that can be
1265+
stream. Thus, streams are defined in terms of abstract interfaces that can be
12661266
implemented and consumed by wasm or host code (with all {wasm,host} pairings
12671267
being possible and well-defined). Since a `stream` in a function parameter or
12681268
result type always represents the transfer of the *readable* end of a stream,
1269-
the abstract stream interface is `ReadableStream` and allows a (wasm or host)
1270-
client to asynchronously read multiple values from a (wasm or host) producer.
1271-
(The absence of a dual `WritableStream` abstract interface reflects the fact
1272-
that there is no Component Model type for passing the writable end of a
1273-
stream.)
1269+
only the `ReadableStream` interface can be implemented by either wasm or the
1270+
host; the `WritableStream` interface is always written to by wasm via a
1271+
writable stream end created by `stream.new`.
12741272
```python
12751273
RevokeBuffer = Callable[[], None]
12761274
OnCopy = Callable[[RevokeBuffer], None]
@@ -1281,40 +1279,52 @@ class ReadableStream:
12811279
read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None]
12821280
cancel: Callable[[], None]
12831281
drop: Callable[[], None]
1282+
1283+
class WritableStream:
1284+
t: ValType
1285+
write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]
1286+
cancel: Callable[[], None]
1287+
drop: Callable[[], None]
12841288
```
1285-
The key operation is `read` which works as follows:
1289+
The key operations in these interfaces are `read` and `write` which work as
1290+
follows:
12861291
* `read` never blocks and returns its values by either synchronously or
12871292
asynchronously writing to the given `WritableBuffer` and then calling the
12881293
given `OnCopy*` callbacks to notify the caller of progress.
1289-
* `OnCopyDone` is called to indicate that the `read` is finished copying and
1290-
that the caller has regained ownership of the buffer.
1291-
* `OnCopy` is called to indicate a write has been made into the buffer.
1292-
However, there may be further writes made in the future, so the caller has
1294+
* Symmetrically, `write` never blocks and takes the value to be written
1295+
from the given `ReadableBuffer`, calling the given `OnCopy*` callbacks to
1296+
notify the caller of progress.
1297+
* `OnCopyDone` is called to indicate that the `read` or `write` is finished
1298+
copying and that the caller has regained ownership of the buffer.
1299+
* `OnCopy` is called to indicate a copy has been made to or from the buffer.
1300+
However, there may be further copies made in the future, so the caller has
12931301
*not* regained ownership of the buffer.
1294-
* The `RevokeBuffer` callback passed to `OnCopy` allows the caller of `read` to
1295-
immediately regain ownership of the buffer once the first copy has completed.
1302+
* The `RevokeBuffer` callback passed to `OnCopy` allows the caller of `read` or
1303+
`write` to immediately regain ownership of the buffer once the first copy has
1304+
completed.
12961305
* `cancel` is non-blocking, but does **not** guarantee that ownership of
12971306
the buffer has been returned; `cancel` only lets the caller *request* that
1298-
`read` call one of the `OnCopy*` callbacks ASAP (which may or may not happen
1307+
one of the `OnCopy*` callbacks be called ASAP (which may or may not happen
12991308
during `cancel`).
1300-
* The client may not call `read` or `drop` while there is still an unfinished
1301-
`read` of the same `ReadableStream`.
1309+
* The client may not call `read`, `write` or `drop` while there is a previous
1310+
`read` or `write` in progress.
13021311

13031312
The `OnCopy*` callbacks are a spec-internal detail used to specify the allowed
13041313
concurrent behaviors of `stream.{read,write}` and not exposed directly to core
13051314
wasm code. Specifically, the point of the `OnCopy*` callbacks is to specify that
1306-
*multiple* writes are allowed into the same `WritableBuffer` up until the point
1307-
where either the buffer is full or the calling core wasm code receives the
1308-
`STREAM_READ` progress event (in which case `RevokeBuffer` is called). This
1309-
reduces the number of task-switches required by the spec, particularly when
1310-
streaming between two components.
1311-
1312-
The `SharedStreamImpl` class implements `ReadableStream` for streams created by
1313-
wasm (via `stream.new`) and tracks the common state shared by both the readable
1314-
and writable ends of streams (defined below). Introducing the class in chunks,
1315-
starting with the fields and initialization:
1316-
```python
1317-
class SharedStreamImpl(ReadableStream):
1315+
*multiple* reads or writes are allowed into the same `Buffer` up until the point
1316+
where either the buffer is full or the calling core wasm code receives a
1317+
`STREAM_READ` or `STREAM_WRITE` progress event (in which case `RevokeBuffer` is
1318+
called). This reduces the number of task-switches required by the spec,
1319+
particularly when streaming between two components.
1320+
1321+
The `SharedStreamImpl` class implements both `ReadableStream` and
1322+
`WritableStream` for streams created by wasm (via `stream.new`) and tracks the
1323+
common state shared by both the readable and writable ends of streams (defined
1324+
below). Introducing the class in chunks, starting with the fields and
1325+
initialization:
1326+
```python
1327+
class SharedStreamImpl(ReadableStream, WritableStream):
13181328
dropped: bool
13191329
pending_inst: Optional[ComponentInstance]
13201330
pending_buffer: Optional[Buffer]
@@ -1355,9 +1365,9 @@ callback:
13551365
if self.pending_buffer:
13561366
self.reset_and_notify_pending(CopyResult.DROPPED)
13571367
```
1358-
While the abstract `ReadableStream` interface *allows* `cancel` to return
1359-
without having returned ownership of the buffer (which, in general, is
1360-
necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
1368+
While the abstract `ReadableStream` and `WritableStream` interfaces *allow*
1369+
`cancel` to return without having returned ownership of the buffer (which, in
1370+
general, is necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
13611371
implementing the stream, `cancel` always returns ownership of the buffer
13621372
immediately.
13631373

@@ -1367,16 +1377,16 @@ Note that `cancel` and `drop` notify in opposite directions:
13671377
* `drop` *must not* be called on a readable or writable end with an operation
13681378
pending, and thus `drop` notifies the opposite end.
13691379

1370-
The `read` method implements the `ReadableStream.read` interface described
1371-
above and is called by either `stream.read` or the host, depending on who is
1372-
passed the readable end of the stream. If the reader is first to rendezvous,
1373-
then all the parameters are stored in the `pending_*` fields, requiring the
1374-
reader to wait for the writer to rendezvous. If the writer was first to
1375-
rendezvous, then there is already a pending `ReadableBuffer` to read from, and
1376-
so the reader copies as much as it can (which may be less than a full buffer's
1377-
worth) and eagerly completes the copy without blocking. In the final special
1378-
case where both the reader and pending writer have zero-length buffers, the
1379-
writer is notified, but the reader remains blocked:
1380+
The `read` method implements `ReadableStream.read` and is called by either
1381+
`stream.read` or the host, depending on who is passed the readable end of the
1382+
stream. If the reader is first to rendezvous, then all the parameters are
1383+
stored in the `pending_*` fields, requiring the reader to wait for the writer
1384+
to rendezvous. If the writer was first to rendezvous, then there is already a
1385+
pending `ReadableBuffer` to read from, and so the reader copies as much as it
1386+
can (which may be less than a full buffer's worth) and eagerly completes the
1387+
copy without blocking. In the final special case where both the reader and
1388+
pending writer have zero-length buffers, the writer is notified, but the reader
1389+
remains blocked:
13801390
```python
13811391
def read(self, inst, dst_buffer, on_copy, on_copy_done):
13821392
if self.dropped:
@@ -1403,13 +1413,13 @@ and lowering can alias the same memory, interleavings can be complex and must
14031413
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
14041414
can greatly simplify this interleaving and be more practical to implement.
14051415

1406-
The `write` method is symmetric to `read` (being given a `ReadableBuffer`
1407-
instead of a `WritableBuffer`) and is called by the `stream.write` built-in.
1408-
(noting that the host cannot be passed the writable end of a stream but may
1409-
instead *implement* the `ReadableStream` interface and pass the readable end
1410-
into a component). The steps for `write` are the same as `read` except for
1411-
when a zero-length `write` rendezvous with a zero-length `read`, in which case
1412-
the `write` eagerly completes, leaving the `read` pending:
1416+
The `write` method implements `WritableStream.write` and is called by the
1417+
`stream.write` built-in (noting that the host cannot be passed the writable end
1418+
of a stream but may instead *implement* the `ReadableStream` interface and pass
1419+
the readable end into a component). The steps for `write` are the same as
1420+
`read` except for when a zero-length `write` rendezvous with a zero-length
1421+
`read`, in which case the `write` eagerly completes, leaving the `read`
1422+
pending:
14131423
```python
14141424
def write(self, inst, src_buffer, on_copy, on_copy_done):
14151425
if self.dropped:
@@ -1456,7 +1466,7 @@ entirely symmetric, with the only difference being whether the polymorphic
14561466
`copy` method (used below) calls `read` or `write`:
14571467
```python
14581468
class StreamEnd(Waitable):
1459-
shared: ReadableStream
1469+
shared: ReadableStream|WritableStream
14601470
copying: bool
14611471
done: bool
14621472

@@ -1493,36 +1503,37 @@ since the async read or write cannot be cancelled without blocking and `drop`
14931503
This means that client code must take care to wait for these operations to
14941504
finish before dropping.
14951505

1496-
The `{Readable,Writable}StreamEnd.copy` method is called polymorphically by the
1497-
shared definition of `stream.{read,write}` below. While the static type of
1498-
`StreamEnd.shared` is `ReadableStream`, a `WritableStreamEnd` always points to
1499-
a `SharedStreamImpl` object which is why `WritableStreamEnd.copy` can
1500-
unconditionally call `stream.write`.
1501-
15021506

15031507
#### Future State
15041508

15051509
Futures are similar to streams, except that instead of passing 0..N values,
15061510
exactly one value is passed from the writer end to the reader end unless the
15071511
reader end is explicitly dropped first.
15081512

1509-
Like streams, futures are defined in terms of an abstract `ReadableFuture`
1510-
interface that can be implemented by the host or wasm:
1513+
Futures are defined in terms of abstract `ReadableFuture` and `WritableFuture`
1514+
interfaces:
15111515
```python
15121516
class ReadableFuture:
15131517
t: ValType
15141518
read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None]
15151519
cancel: Callable[[], None]
15161520
drop: Callable[[], None]
1521+
1522+
class WritableFuture:
1523+
t: ValType
1524+
write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]
1525+
cancel: Callable[[], None]
1526+
drop: Callable[[], None]
15171527
```
1518-
The `ReadableFuture` interface works like `ReadableStream` except that there is
1519-
no `OnCopy` callback passed to `read` to report partial progress (since at most
1520-
1 value is copied) and the given `WritableBuffer` must have `remain() == 1`.
1528+
These interfaces work like `ReadableStream` and `WritableStream` except that
1529+
there is no `OnCopy` callback passed to `read` or `write` to report partial
1530+
progress (since at most 1 value is copied) and the given `Buffer` must have
1531+
`remain() == 1`.
15211532

15221533
Introducing `SharedFutureImpl` in chunks, the first part is exactly
15231534
symmetric to `SharedStreamImpl` in how initialization and cancellation work:
15241535
```python
1525-
class SharedFutureImpl(ReadableFuture):
1536+
class SharedFutureImpl(ReadableFuture, WritableFuture):
15261537
dropped: bool
15271538
pending_inst: Optional[ComponentInstance]
15281539
pending_buffer: Optional[Buffer]
@@ -1597,7 +1608,7 @@ Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with
15971608
value or been notified of the reader dropping their end:
15981609
```python
15991610
class FutureEnd(Waitable):
1600-
shared: ReadableFuture
1611+
shared: ReadableFuture|WritableFuture
16011612
copying: bool
16021613
done: bool
16031614

design/mvp/canonical-abi/definitions.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -767,7 +767,13 @@ class ReadableStream:
767767
cancel: Callable[[], None]
768768
drop: Callable[[], None]
769769

770-
class SharedStreamImpl(ReadableStream):
770+
class WritableStream:
771+
t: ValType
772+
write: Callable[[ComponentInstance, ReadableBuffer, OnCopy, OnCopyDone], None]
773+
cancel: Callable[[], None]
774+
drop: Callable[[], None]
775+
776+
class SharedStreamImpl(ReadableStream, WritableStream):
771777
dropped: bool
772778
pending_inst: Optional[ComponentInstance]
773779
pending_buffer: Optional[Buffer]
@@ -841,7 +847,7 @@ def write(self, inst, src_buffer, on_copy, on_copy_done):
841847
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
842848

843849
class StreamEnd(Waitable):
844-
shared: ReadableStream
850+
shared: ReadableStream|WritableStream
845851
copying: bool
846852
done: bool
847853

@@ -872,7 +878,13 @@ class ReadableFuture:
872878
cancel: Callable[[], None]
873879
drop: Callable[[], None]
874880

875-
class SharedFutureImpl(ReadableFuture):
881+
class WritableFuture:
882+
t: ValType
883+
write: Callable[[ComponentInstance, ReadableBuffer, OnCopyDone], None]
884+
cancel: Callable[[], None]
885+
drop: Callable[[], None]
886+
887+
class SharedFutureImpl(ReadableFuture, WritableFuture):
876888
dropped: bool
877889
pending_inst: Optional[ComponentInstance]
878890
pending_buffer: Optional[Buffer]
@@ -929,7 +941,7 @@ def write(self, inst, src_buffer, on_copy_done):
929941
on_copy_done(CopyResult.COMPLETED)
930942

931943
class FutureEnd(Waitable):
932-
shared: ReadableFuture
944+
shared: ReadableFuture|WritableFuture
933945
copying: bool
934946
done: bool
935947

0 commit comments

Comments
 (0)