Skip to content

Commit c9c1efc

Browse files
committed
CABI: refactor stream code to use explicit enums
1 parent 989cdd6 commit c9c1efc

3 files changed

Lines changed: 268 additions & 250 deletions

File tree

design/mvp/CanonicalABI.md

Lines changed: 115 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -364,17 +364,21 @@ built-ins such as `stream.read`. However, in the
364364
explicit component-level buffer types and canonical built-ins may be added to
365365
allow explicitly creating buffers and passing them between components.)
366366

367-
All buffers have an associated component-level value type `t` and a `remain`
368-
method that returns how many `t` values may still be read or written. Thus
369-
buffers hide their original/complete size. A "readable buffer" allows reading
370-
`t` values *from* the buffer's memory. A "writable buffer" allows writing `t`
371-
values *into* the buffer's memory. Buffers are represented by the following 3
367+
A "readable buffer" allows reading `t` values *from* the buffer's memory. A
368+
"writable buffer" allows writing `t` values *into* the buffer's memory. All
369+
buffers have an associated component-level value type `t` and a `remain` method
370+
that returns how many `t` values may still be read or written. Buffers mostly
371+
hide their original/complete size. However, zero-length buffers need to be
372+
treated specially (particularly when a zero-length read rendezvous with a
373+
zero-length write), so there is a special query for detecting whether a buffer
374+
is zero-length. Based on this, buffers are represented by the following 3
372375
abstract Python classes:
373376
```python
374377
class Buffer:
375378
MAX_LENGTH = 2**28 - 1
376379
t: ValType
377380
remain: Callable[[], int]
381+
is_zero_length: Callable[[], bool]
378382

379383
class ReadableBuffer(Buffer):
380384
read: Callable[[int], list[any]]
@@ -421,6 +425,9 @@ class BufferGuestImpl(Buffer):
421425
def remain(self):
422426
return self.length - self.progress
423427

428+
def is_zero_length(self):
429+
return self.length == 0
430+
424431
class ReadableBufferGuestImpl(BufferGuestImpl):
425432
def read(self, n):
426433
assert(n <= self.remain())
@@ -1238,6 +1245,22 @@ design avoids the need for an intermediate buffer and copy (unlike, e.g., a
12381245
Unix pipe; a Unix pipe would instead be implemented as a resource type owning
12391246
the buffer memory and *two* streams; on going in and one coming out).
12401247

1248+
The result of a `{stream,future}.{read,write}` is communicated to the wasm
1249+
guest via a `CopyResult` code:
1250+
```python
1251+
class CopyResult(IntEnum):
1252+
COMPLETED = 0
1253+
CLOSED = 1
1254+
CANCELLED = 2
1255+
```
1256+
The `CLOSED` code indicates that the *other* end has since closed their end and
1257+
thus no more reads/writes are possible. The `CANCELLED` code is only possible
1258+
after *this* end has performed a `{stream,future}.{read,write}` followed by a
1259+
`{stream,future}.cancel-{read,write}`; `CANCELLED` notifies the wasm code
1260+
that the cancellation finished and so ownership of the memory buffer has been
1261+
returned to the wasm code. Lastly, `COMPLETED` indicates that at least one
1262+
value has been copied and neither `CLOSED` nor `CANCELLED` apply.
1263+
12411264
As with functions and buffers, native host code can be on either side of a
12421265
stream. Thus, streams are defined in terms of an abstract interface that can be
12431266
implemented and consumed by wasm or host code (with all {wasm,host} pairings
@@ -1250,28 +1273,27 @@ that there is no Component Model type for passing the writable end of a
12501273
stream.)
12511274
```python
12521275
RevokeBuffer = Callable[[], None]
1253-
OnPartialCopy = Callable[[RevokeBuffer], None]
1254-
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
1276+
OnCopy = Callable[[RevokeBuffer], None]
1277+
OnCopyDone = Callable[[CopyResult], None]
12551278

12561279
class ReadableStream:
12571280
t: ValType
1258-
read: Callable[[ComponentInstance, WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']]
1281+
read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], Optional[CopyResult]]
12591282
cancel: Callable[[], None]
12601283
close: Callable[[]]
12611284
closed: Callable[[], bool]
12621285
```
12631286
The key operation is `read` which works as follows:
1264-
* `read` is non-blocking, returning `'blocked'` if it would have blocked.
1265-
* The `On*` callbacks are only called *after* `read` returns `'blocked'`.
1266-
* `OnCopyDone` is called to indicate that the caller has regained ownership of
1267-
the buffer and whether this was due to the read/write completing or
1268-
being cancelled.
1269-
* `OnPartialCopy` is called to indicate a partial write has been made to the
1270-
buffer, but there may be further writes made in the future, so the caller
1271-
has *not* regained ownership of the buffer.
1272-
* The `RevokeBuffer` callback passed to `OnPartialCopy` allows the caller
1273-
of `read` to *synchronously* regain ownership of the buffer.
1274-
* `cancel` is also non-blocking, but does **not** guarantee that ownership of
1287+
* `read` is non-blocking, returning `None` if it would have blocked.
1288+
* The `On*` callbacks are only called if `read` returns `None`.
1289+
* `OnCopyDone` is called to indicate that the `read` is finished copying and
1290+
that the caller has regained ownership of the buffer.
1291+
* `OnCopy` is called to indicate a potentially-partial write has been made into
1292+
the buffer. However, there may be further writes made in the future, so the
1293+
caller has *not* regained ownership of the buffer.
1294+
* The `RevokeBuffer` callback passed to `OnCopy` allows the caller of `read` to
1295+
immediately regain ownership of the buffer once the first copy has completed.
1296+
* `cancel` is non-blocking, but does **not** guarantee that ownership of
12751297
the buffer has been returned; `cancel` only lets the caller *request* that
12761298
`read` call one of the `On*` callbacks ASAP (which may or may not happen
12771299
during `cancel`).
@@ -1282,10 +1304,10 @@ The `On*` callbacks are a spec-internal detail used to specify the allowed
12821304
concurrent behaviors of `stream.{read,write}` and not exposed directly to core
12831305
wasm code. Specifically, the point of the `On*` callbacks is to specify that
12841306
*multiple* writes are allowed into the same `WritableBuffer` up until the point
1285-
where either the buffer is full (at which point `OnCopyDone` is called) or the
1286-
calling core wasm code receives the `STREAM_READ` progress event (in which case
1287-
`RevokeBuffer` is called). This reduces the number of task-switches required
1288-
by the spec, particularly when streaming between two components.
1307+
where either the buffer is full (at which point `OnCopyDone` is called) or
1308+
the calling core wasm code receives the `STREAM_READ` progress event (in which
1309+
case `RevokeBuffer` is called). This reduces the number of task-switches
1310+
required by the spec, particularly when streaming between two components.
12891311

12901312
The `SharedStreamImpl` class implements `ReadableStream` for streams created by
12911313
wasm (via `stream.new`) and tracks the common state shared by both the readable
@@ -1296,7 +1318,7 @@ class SharedStreamImpl(ReadableStream):
12961318
closed_: bool
12971319
pending_inst: Optional[ComponentInstance]
12981320
pending_buffer: Optional[Buffer]
1299-
pending_on_partial_copy: Optional[OnPartialCopy]
1321+
pending_on_copy: Optional[OnCopy]
13001322
pending_on_copy_done: Optional[OnCopyDone]
13011323

13021324
def __init__(self, t):
@@ -1307,32 +1329,31 @@ class SharedStreamImpl(ReadableStream):
13071329
def reset_pending(self):
13081330
self.set_pending(None, None, None, None)
13091331

1310-
def set_pending(self, inst, buffer, on_partial_copy, on_copy_done):
1332+
def set_pending(self, inst, buffer, on_copy, on_copy_done):
13111333
self.pending_inst = inst
13121334
self.pending_buffer = buffer
1313-
self.pending_on_partial_copy = on_partial_copy
1335+
self.pending_on_copy = on_copy
13141336
self.pending_on_copy_done = on_copy_done
13151337
```
13161338
If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a
13171339
`read` or `write` that is waiting to rendezvous with a complementary `write` or
13181340
`read`. Closing the readable or writable end of a stream or cancelling a `read`
13191341
or `write` notifies any pending `read` or `write` via its `OnCopyDone`
1320-
callback, which lets the other side know that ownership of the `Buffer` has
1321-
been returned and why:
1342+
callback:
13221343
```python
1323-
def reset_and_notify_pending(self, why):
1344+
def reset_and_notify_pending(self, result):
13241345
pending_on_copy_done = self.pending_on_copy_done
13251346
self.reset_pending()
1326-
pending_on_copy_done(why)
1347+
pending_on_copy_done(result)
13271348

13281349
def cancel(self):
1329-
self.reset_and_notify_pending('cancelled')
1350+
self.reset_and_notify_pending(CopyResult.CANCELLED)
13301351

13311352
def close(self):
13321353
if not self.closed_:
13331354
self.closed_ = True
13341355
if self.pending_buffer:
1335-
self.reset_and_notify_pending('completed')
1356+
self.reset_and_notify_pending(CopyResult.CLOSED)
13361357

13371358
def closed(self):
13381359
return self.closed_
@@ -1355,36 +1376,32 @@ is also a symmetric `write` method that follows the same rules as `read`,
13551376
but in the opposite direction. Both are implemented by a single underlying
13561377
`copy` method parameterized by the direction of the copy:
13571378
```python
1358-
def read(self, inst, dst, on_partial_copy, on_copy_done):
1359-
return self.copy(inst, dst, on_partial_copy, on_copy_done, self.pending_buffer, dst)
1379+
def read(self, inst, dst, on_copy, on_copy_done):
1380+
return self.copy(inst, dst, on_copy, on_copy_done, self.pending_buffer, dst)
13601381

1361-
def write(self, inst, src, on_partial_copy, on_copy_done):
1362-
return self.copy(inst, src, on_partial_copy, on_copy_done, src, self.pending_buffer)
1382+
def write(self, inst, src, on_copy, on_copy_done):
1383+
return self.copy(inst, src, on_copy, on_copy_done, src, self.pending_buffer)
13631384

1364-
def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
1385+
def copy(self, inst, buffer, on_copy, on_copy_done, src, dst):
13651386
if self.closed_:
1366-
return 'done'
1387+
return CopyResult.CLOSED
13671388
elif not self.pending_buffer:
1368-
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
1369-
return 'blocked'
1389+
self.set_pending(inst, buffer, on_copy, on_copy_done)
1390+
return None
13701391
else:
13711392
assert(self.t == src.t == dst.t)
13721393
trap_if(inst is self.pending_inst and self.t is not None) # temporary
13731394
if self.pending_buffer.remain() > 0:
13741395
if buffer.remain() > 0:
13751396
dst.write(src.read(min(src.remain(), dst.remain())))
1376-
if self.pending_buffer.remain() > 0:
1377-
self.pending_on_partial_copy(self.reset_pending)
1378-
else:
1379-
self.reset_and_notify_pending('completed')
1380-
return 'done'
1397+
self.pending_on_copy(self.reset_pending)
1398+
return CopyResult.COMPLETED
1399+
elif buffer is src and buffer.remain() == 0 and self.pending_buffer.is_zero_length():
1400+
return CopyResult.COMPLETED
13811401
else:
1382-
if buffer.remain() > 0 or buffer is dst:
1383-
self.reset_and_notify_pending('completed')
1384-
self.set_pending(inst, buffer, on_partial_copy, on_copy_done)
1385-
return 'blocked'
1386-
else:
1387-
return 'done'
1402+
self.reset_and_notify_pending(CopyResult.COMPLETED)
1403+
self.set_pending(inst, buffer, on_copy, on_copy_done)
1404+
return None
13881405
```
13891406
Currently, there is a trap when both the `read` and `write` come from the same
13901407
component instance and there is a non-empty element type. This trap will be
@@ -1438,12 +1455,12 @@ class StreamEnd(Waitable):
14381455
Waitable.drop(self)
14391456

14401457
class ReadableStreamEnd(StreamEnd):
1441-
def copy(self, inst, dst, on_partial_copy, on_copy_done):
1442-
return self.shared.read(inst, dst, on_partial_copy, on_copy_done)
1458+
def copy(self, inst, dst, on_copy, on_copy_done):
1459+
return self.shared.read(inst, dst, on_copy, on_copy_done)
14431460

14441461
class WritableStreamEnd(StreamEnd):
1445-
def copy(self, inst, src, on_partial_copy, on_copy_done):
1446-
return self.shared.write(inst, src, on_partial_copy, on_copy_done)
1462+
def copy(self, inst, src, on_copy, on_copy_done):
1463+
return self.shared.write(inst, src, on_copy, on_copy_done)
14471464
```
14481465
Dropping a stream end while an asynchronous read or write is in progress traps
14491466
since the async read or write cannot be cancelled without blocking and `drop`
@@ -1462,35 +1479,41 @@ unconditionally call `stream.write`.
14621479

14631480
Given the above definitions for `stream`, `future` can be simply defined as a
14641481
`stream` that transmits only 1 value before automatically closing itself. This
1465-
can be achieved by simply wrapping the `on_copy_done` callback (defined above)
1466-
and closing once a value has been read-from or written-to the given buffer:
1482+
can be achieved by wrapping the `On*` callbacks and closing once a value has
1483+
been read-from or written-to the given buffer:
14671484
```python
14681485
class FutureEnd(StreamEnd):
1469-
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
1486+
def close_after_copy(self, copy_op, inst, buffer, on_copy, on_copy_done):
14701487
assert(buffer.remain() == 1)
1471-
def on_copy_done_wrapper(why):
1488+
1489+
def on_copy_wrapper(revoke_buffer):
1490+
assert(buffer.remain() == 0)
1491+
self.shared.close()
1492+
1493+
def on_copy_done_wrapper(result):
14721494
if buffer.remain() == 0:
14731495
self.shared.close()
1474-
on_copy_done(why)
1475-
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
1476-
if ret == 'done' and buffer.remain() == 0:
1496+
on_copy_done(result)
1497+
1498+
ret = copy_op(inst, buffer, on_copy_wrapper, on_copy_done_wrapper)
1499+
if ret is not None and buffer.remain() == 0:
14771500
self.shared.close()
1501+
return CopyResult.CLOSED
14781502
return ret
14791503

14801504
class ReadableFutureEnd(FutureEnd):
1481-
def copy(self, inst, dst, on_partial_copy, on_copy_done):
1482-
return self.close_after_copy(self.shared.read, inst, dst, on_copy_done)
1505+
def copy(self, inst, dst, on_copy, on_copy_done):
1506+
return self.close_after_copy(self.shared.read, inst, dst, on_copy, on_copy_done)
14831507

14841508
class WritableFutureEnd(FutureEnd):
1485-
def copy(self, inst, src, on_partial_copy, on_copy_done):
1486-
return self.close_after_copy(self.shared.write, inst, src, on_copy_done)
1509+
def copy(self, inst, src, on_copy, on_copy_done):
1510+
return self.close_after_copy(self.shared.write, inst, src, on_copy, on_copy_done)
14871511
def drop(self):
14881512
trap_if(not self.shared.closed())
14891513
FutureEnd.drop(self)
14901514
```
14911515
The `future.{read,write}` built-ins fix the buffer length to `1`, ensuring the
1492-
`assert(buffer.remain() == 1)` holds. Because of this, there are no partial
1493-
copies and `on_partial_copy` is never called.
1516+
`assert(buffer.remain() == 1)` holds.
14941517

14951518
The additional `trap_if` in `WritableFutureEnd.drop` ensures that a future
14961519
must have written a value before closing.
@@ -3776,33 +3799,32 @@ their originating call.)
37763799
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
37773800
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
37783801
```
3779-
Next, the `copy` method of `{Readable,Writable}{Stream,Future}End` is called
3780-
to attempt to perform the actual `read` or `write`. The `on_partial_copy`
3781-
callback passed to `copy` is called zero or more times each time values are
3782-
copied to/from `buffer` without filling it up. Aftewards, the `on_copy_done`
3783-
callback passed to `copy` is called at most once when: the `buffer` if full,
3784-
the other end closed, or this end cancelled the copy via
3785-
`{stream,future}.cancel-{read,write}`.
3802+
Next, the `copy` method of `{Readable,Writable}{Stream,Future}End` is called to
3803+
attempt to perform the actual `read` or `write`. The `on_copy` callback passed
3804+
to `copy` is called zero or more times each time values are copied to/from
3805+
`buffer`. The `on_copy_done` callback passed to `copy` is called at most once
3806+
when the read/write is definitely finished.
37863807
```python
3787-
def copy_event(why, revoke_buffer):
3808+
def copy_event(result, revoke_buffer):
37883809
revoke_buffer()
37893810
assert(e.copying)
37903811
e.copying = False
3791-
return (event_code, i, pack_copy_result(task, e, buffer, why))
3812+
return (event_code, i, pack_copy_result(result, buffer))
37923813

3793-
def on_partial_copy(revoke_buffer):
3794-
e.set_event(partial(copy_event, 'completed', revoke_buffer))
3814+
def on_copy(revoke_buffer):
3815+
e.set_event(partial(copy_event, CopyResult.COMPLETED, revoke_buffer))
37953816

3796-
def on_copy_done(why):
3797-
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
3817+
def on_copy_done(result):
3818+
e.set_event(partial(copy_event, result, revoke_buffer = lambda:()))
37983819

3799-
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) == 'done':
3800-
return [pack_copy_result(task, e, buffer, 'completed')]
3820+
result = e.copy(task.inst, buffer, on_copy, on_copy_done)
3821+
if result is not None:
3822+
return [pack_copy_result(result, buffer)]
38013823
```
38023824
If the stream/future is already closed or at least 1 element could be
3803-
immediately copied, `copy` returns `'done'` and `{stream,future}.{read,write}`
3804-
synchronously returns how much was copied and how the operation ended to the
3805-
caller. Otherwise, the built-in blocks:
3825+
immediately copied, `copy` returns a `CopyResult` in which case the whole
3826+
`{stream,future}.{read,write}` built-in call returns the `CopyResult` packed
3827+
with the number of elements copied. Otherwise, the built-in blocks:
38063828
```python
38073829
else:
38083830
e.copying = True
@@ -3817,9 +3839,9 @@ caller. Otherwise, the built-in blocks:
38173839
In the synchronous case, the caller synchronously waits for progress
38183840
(blocking all execution in the calling component instance, but allowing other
38193841
tasks in other component instances to make progress). Note that `get_event()`
3820-
necessarily calls a `copy_event` closure created by either `on_partial_copy`
3821-
or `on_copy_done`. In the asynchronous case, the built-in immeditely returns
3822-
the `BLOCKED` code and the caller must asynchronously wait for progress using
3842+
necessarily calls a `copy_event` closure created by either `on_copy` or
3843+
`on_copy_done`. In the asynchronous case, the built-in immediately returns the
3844+
`BLOCKED` code and the caller must asynchronously wait for progress using
38233845
`waitable-set.{wait,poll}` or, if using a `callback`, by returning to the event
38243846
loop. Setting `copying` prevents any more reads/writes from starting and also
38253847
prevents the stream/future from being closed.
@@ -3828,30 +3850,15 @@ Regardless of whether the `{stream,future}.{read,write}` completes
38283850
synchronously or asynchronously, the results passed to core wasm are
38293851
bit-packed into a single `i32` according to the following scheme:
38303852
```python
3831-
BLOCKED = 0xffff_ffff
3832-
COMPLETED = 0x0
3833-
CLOSED = 0x1
3834-
CANCELLED = 0x2
3835-
3836-
def pack_copy_result(task, e, buffer, why):
3837-
if e.shared.closed():
3838-
result = CLOSED
3839-
elif why == 'cancelled':
3840-
result = CANCELLED
3841-
else:
3842-
assert(why == 'completed')
3843-
assert(not isinstance(e, FutureEnd))
3844-
result = COMPLETED
3853+
def pack_copy_result(result, buffer):
3854+
assert(0 <= result < 2**4)
38453855
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
38463856
packed = result | (buffer.progress << 4)
38473857
assert(packed != BLOCKED)
38483858
return packed
38493859
```
3850-
The `result` indicates whether the stream was closed by the other end, the
3851-
copy was cancelled by this end (via `{stream,future}.cancel-{read,write}`) or,
3852-
otherwise, completed successfully. In all cases, any number of elements (from
3853-
`0` to `n`) may have *first* been copied into or out of the buffer passed to
3854-
the `read` or `write` and so this number is packed into the `i32` result.
3860+
Note that, regardless of the final `CopyResult`, any number of elements may
3861+
have been copied and so the high 28 bits always contains this number.
38553862

38563863

38573864
### 🔀 `canon {stream,future}.cancel-{read,write}`

0 commit comments

Comments
 (0)