Skip to content

Commit 401686d

Browse files
committed
CABI: simplify future logic by splitting out from streams
1 parent 67d9add commit 401686d

2 files changed

Lines changed: 139 additions & 24 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 137 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -853,26 +853,106 @@ def copy(self, inst, src, on_partial_copy, on_copy_done):
853853

854854
#### Future State
855855

856-
class FutureEnd(StreamEnd):
856+
class ReadableFuture:
857+
t: ValType
858+
read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], Literal['done','blocked']]
859+
cancel: Callable[[], None]
860+
close: Callable[[]]
861+
closed: Callable[[], bool]
862+
863+
class SharedFutureImpl(ReadableFuture):
864+
closed_: bool
865+
pending_inst: Optional[ComponentInstance]
866+
pending_buffer: Optional[Buffer]
867+
pending_on_copy_done: Optional[OnCopyDone]
868+
869+
def __init__(self, t):
870+
self.t = t
871+
self.closed_ = False
872+
self.reset_pending()
873+
874+
def reset_pending(self):
875+
self.set_pending(None, None, None)
876+
877+
def set_pending(self, inst, buffer, on_copy_done):
878+
self.pending_inst = inst
879+
self.pending_buffer = buffer
880+
self.pending_on_copy_done = on_copy_done
881+
882+
def reset_and_notify_pending(self, why):
883+
pending_on_copy_done = self.pending_on_copy_done
884+
self.reset_pending()
885+
pending_on_copy_done(why)
886+
887+
def cancel(self):
888+
self.reset_and_notify_pending('cancelled')
889+
890+
def close(self):
891+
if not self.closed_:
892+
self.closed_ = True
893+
if self.pending_buffer:
894+
self.reset_and_notify_pending('completed')
895+
896+
def closed(self):
897+
return self.closed_
898+
899+
def read(self, inst, dst, on_copy_done):
900+
return self.copy(inst, dst, on_copy_done, self.pending_buffer, dst)
901+
902+
def write(self, inst, src, on_copy_done):
903+
return self.copy(inst, src, on_copy_done, src, self.pending_buffer)
904+
905+
def copy(self, inst, buffer, on_copy_done, src, dst):
906+
if self.closed_:
907+
return 'done'
908+
elif not self.pending_buffer:
909+
self.set_pending(inst, buffer, on_copy_done)
910+
return 'blocked'
911+
else:
912+
assert(self.t == src.t == dst.t)
913+
assert(1 == src.remain() == dst.remain())
914+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
915+
dst.write(src.read(1))
916+
self.reset_and_notify_pending('completed')
917+
return 'done'
918+
919+
class FutureEnd(Waitable):
920+
shared: ReadableFuture
921+
copying: bool
922+
done: bool
923+
924+
def __init__(self, shared):
925+
Waitable.__init__(self)
926+
self.shared = shared
927+
self.copying = False
928+
self.done = False
929+
857930
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
858931
assert(buffer.remain() == 1)
859932
def on_copy_done_wrapper(why):
860933
if buffer.remain() == 0:
861934
self.shared.close()
862935
on_copy_done(why)
863-
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
936+
ret = copy_op(inst, buffer, on_copy_done = on_copy_done_wrapper)
864937
if ret == 'done' and buffer.remain() == 0:
865938
self.shared.close()
866939
return ret
867940

941+
def drop(self):
942+
trap_if(self.copying)
943+
self.shared.close()
944+
Waitable.drop(self)
945+
868946
class ReadableFutureEnd(FutureEnd):
869-
def copy(self, inst, dst, on_partial_copy, on_copy_done):
947+
def copy(self, inst, dst, on_copy_done):
870948
return self.close_after_copy(self.shared.read, inst, dst, on_copy_done)
871949

872950
class WritableFutureEnd(FutureEnd):
873-
def copy(self, inst, src, on_partial_copy, on_copy_done):
951+
def copy(self, inst, src, on_copy_done):
874952
return self.close_after_copy(self.shared.write, inst, src, on_copy_done)
953+
875954
def drop(self):
955+
trap_if(not self.done)
876956
FutureEnd.drop(self)
877957

878958
### Despecialization
@@ -2158,39 +2238,31 @@ async def canon_stream_new(stream_t, task):
21582238

21592239
async def canon_future_new(future_t, task):
21602240
trap_if(not task.inst.may_leave)
2161-
shared = SharedStreamImpl(future_t.t)
2241+
shared = SharedFutureImpl(future_t.t)
21622242
ri = task.inst.table.add(ReadableFutureEnd(shared))
21632243
wi = task.inst.table.add(WritableFutureEnd(shared))
21642244
return [ ri | (wi << 32) ]
21652245

2166-
### 🔀 `canon {stream,future}.{read,write}`
2246+
### 🔀 `canon stream.{read,write}`
21672247

21682248
async def canon_stream_read(stream_t, opts, task, i, ptr, n):
2169-
return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2170-
stream_t, opts, task, i, ptr, n)
2249+
return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2250+
stream_t, opts, task, i, ptr, n)
21712251

21722252
async def canon_stream_write(stream_t, opts, task, i, ptr, n):
2173-
return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2174-
stream_t, opts, task, i, ptr, n)
2175-
2176-
async def canon_future_read(future_t, opts, task, i, ptr):
2177-
return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
2178-
future_t, opts, task, i, ptr, 1)
2179-
2180-
async def canon_future_write(future_t, opts, task, i, ptr):
2181-
return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
2182-
future_t, opts, task, i, ptr, 1)
2253+
return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2254+
stream_t, opts, task, i, ptr, n)
21832255

2184-
async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n):
2256+
async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n):
21852257
trap_if(not task.inst.may_leave)
21862258
e = task.inst.table.get(i)
21872259
trap_if(not isinstance(e, EndT))
2188-
trap_if(e.shared.t != stream_or_future_t.t)
2260+
trap_if(e.shared.t != stream_t.t)
21892261
trap_if(e.copying)
21902262

2191-
assert(not contains_borrow(stream_or_future_t))
2263+
assert(not contains_borrow(stream_t))
21922264
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2193-
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
2265+
buffer = BufferT(stream_t.t, cx, ptr, n)
21942266

21952267
def copy_event(why, revoke_buffer):
21962268
revoke_buffer()
@@ -2234,6 +2306,49 @@ def pack_copy_result(task, e, buffer, why):
22342306
assert(packed != BLOCKED)
22352307
return packed
22362308

2309+
### 🔀 `canon future.{read,write}`
2310+
2311+
async def canon_future_read(future_t, opts, task, i, ptr):
2312+
return await future_copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
2313+
future_t, opts, task, i, ptr)
2314+
2315+
async def canon_future_write(future_t, opts, task, i, ptr):
2316+
return await future_copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
2317+
future_t, opts, task, i, ptr)
2318+
2319+
async def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr):
2320+
trap_if(not task.inst.may_leave)
2321+
e = task.inst.table.get(i)
2322+
trap_if(not isinstance(e, EndT))
2323+
trap_if(e.shared.t != future_t.t)
2324+
trap_if(e.copying or e.done)
2325+
assert(not contains_borrow(future_t))
2326+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2327+
buffer = BufferT(future_t.t, cx, ptr, 1)
2328+
if opts.sync:
2329+
def on_copy_done(why):
2330+
assert(why == 'completed')
2331+
e.done = True
2332+
if not async_copy.done():
2333+
async_copy.set_result(None)
2334+
if e.copy(task.inst, buffer, on_copy_done) != 'done':
2335+
async_copy = asyncio.Future()
2336+
await task.wait_on(async_copy, sync = True)
2337+
else:
2338+
def on_copy_done(why):
2339+
def copy_event():
2340+
if why == 'completed':
2341+
e.done = True
2342+
assert(e.copying)
2343+
e.copying = False
2344+
return (event_code, i, pack_copy_result(task, e, buffer, why))
2345+
e.set_event(copy_event)
2346+
if e.copy(task.inst, buffer, on_copy_done) != 'done':
2347+
e.copying = True
2348+
return [BLOCKED]
2349+
e.done = True
2350+
return [pack_copy_result(task, e, buffer, 'completed')]
2351+
22372352
### 🔀 `canon {stream,future}.cancel-{read,write}`
22382353

22392354
async def canon_stream_cancel_read(stream_t, sync, task, i):

design/mvp/canonical-abi/run_tests.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,7 +1815,7 @@ def __init__(self, t):
18151815
self.pending_on_copy_done = None
18161816
def closed(self):
18171817
return self.is_closed
1818-
def read(self, inst, dst, on_partial_copy, on_copy_done):
1818+
def read(self, inst, dst, on_copy_done):
18191819
if self.is_closed:
18201820
return 'done'
18211821
elif self.v:
@@ -1855,7 +1855,7 @@ async def host_func(task, on_start, on_resolve, on_block):
18551855
outgoing = HostFutureSource(U8Type())
18561856
on_resolve([outgoing])
18571857
incoming = HostFutureSink(U8Type())
1858-
future.read(None, incoming, lambda:(), lambda why:())
1858+
future.read(None, incoming, lambda why:())
18591859
wait = asyncio.create_task(incoming.has_v.wait())
18601860
await on_block(wait)
18611861
assert(incoming.v == 42)

0 commit comments

Comments
 (0)