Skip to content

Commit 699b619

Browse files
committed
CABI: simplify future logic by splitting out from streams
1 parent 4841186 commit 699b619

2 files changed

Lines changed: 197 additions & 91 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 173 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ def drop(self):
751751

752752
RevokeBuffer = Callable[[], None]
753753
OnPartialCopy = Callable[[RevokeBuffer], None]
754-
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
754+
OnCopyDone = Callable[[Literal['completed','cancelled','closed']], None]
755755

756756
class ReadableStream:
757757
t: ValType
@@ -793,7 +793,7 @@ def close(self):
793793
if not self.closed_:
794794
self.closed_ = True
795795
if self.pending_buffer:
796-
self.reset_and_notify_pending('completed')
796+
self.reset_and_notify_pending('closed')
797797

798798
def closed(self):
799799
return self.closed_
@@ -853,26 +853,96 @@ def copy(self, inst, src, on_partial_copy, on_copy_done):
853853

854854
#### Future State
855855

856-
class FutureEnd(StreamEnd):
857-
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
858-
assert(buffer.remain() == 1)
859-
def on_copy_done_wrapper(why):
860-
if buffer.remain() == 0:
861-
self.shared.close()
862-
on_copy_done(why)
863-
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
864-
if ret == 'done' and buffer.remain() == 0:
865-
self.shared.close()
866-
return ret
856+
LiftValue = Callable[[], any]
857+
LowerValue = Callable[[any], None]
858+
859+
class ReadableFuture:
860+
t: ValType
861+
read: Callable[[ComponentInstance, LowerValue, OnCopyDone], Literal['done','blocked']]
862+
cancel: Callable[[], None]
863+
drop: Callable[[]]
864+
865+
class SharedFutureImpl(ReadableFuture):
866+
dropped: bool
867+
pending_inst: Optional[ComponentInstance]
868+
pending_copy_value: Optional[LiftValue|LowerValue]
869+
pending_on_copy_done: Optional[OnCopyDone]
870+
871+
def __init__(self, t):
872+
self.t = t
873+
self.dropped = False
874+
self.reset_pending()
875+
876+
def reset_pending(self):
877+
self.set_pending(None, None, None)
878+
879+
def set_pending(self, inst, copy_value, on_copy_done):
880+
self.pending_inst = inst
881+
self.pending_copy_value = copy_value
882+
self.pending_on_copy_done = on_copy_done
883+
884+
def reset_and_notify_pending(self, why):
885+
pending_on_copy_done = self.pending_on_copy_done
886+
self.reset_pending()
887+
pending_on_copy_done(why)
888+
889+
def cancel(self):
890+
self.reset_and_notify_pending('cancelled')
891+
892+
def drop(self):
893+
assert(not self.dropped)
894+
self.dropped = True
895+
if self.pending_on_copy_done:
896+
self.reset_and_notify_pending('closed')
897+
898+
def read(self, inst, lower_value, on_copy_done):
899+
assert(not self.dropped)
900+
return self.copy(inst, lower_value, on_copy_done, self.pending_copy_value, lower_value)
901+
902+
def write(self, inst, lift_value, on_copy_done):
903+
if self.dropped:
904+
return 'closed'
905+
return self.copy(inst, lift_value, on_copy_done, lift_value, self.pending_copy_value)
906+
907+
def copy(self, inst, copy_value, on_copy_done, lift_value, lower_value):
908+
if not self.pending_copy_value:
909+
self.set_pending(inst, copy_value, on_copy_done)
910+
return 'blocked'
911+
else:
912+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
913+
lower_value(lift_value())
914+
self.reset_and_notify_pending('done')
915+
return 'done'
916+
917+
class FutureEnd(Waitable):
918+
shared: ReadableFuture
919+
copying: bool
920+
done: bool
921+
922+
def __init__(self, shared):
923+
Waitable.__init__(self)
924+
self.shared = shared
925+
self.copying = False
926+
self.done = False
927+
928+
def drop(self):
929+
trap_if(self.copying)
930+
Waitable.drop(self)
867931

868932
class ReadableFutureEnd(FutureEnd):
869-
def copy(self, inst, dst, on_partial_copy, on_copy_done):
870-
return self.close_after_copy(self.shared.read, inst, dst, on_copy_done)
933+
def copy(self, inst, lower_value, on_copy_done):
934+
return self.shared.read(inst, lower_value, on_copy_done)
935+
936+
def drop(self):
937+
self.shared.drop()
938+
FutureEnd.drop(self)
871939

872940
class WritableFutureEnd(FutureEnd):
873-
def copy(self, inst, src, on_partial_copy, on_copy_done):
874-
return self.close_after_copy(self.shared.write, inst, src, on_copy_done)
941+
def copy(self, inst, lift_value, on_copy_done):
942+
return self.shared.write(inst, lift_value, on_copy_done)
943+
875944
def drop(self):
945+
trap_if(not self.done)
876946
FutureEnd.drop(self)
877947

878948
### Despecialization
@@ -1201,19 +1271,20 @@ def lift_borrow(cx, i, t):
12011271
return h.rep
12021272

12031273
def lift_stream(cx, i, t):
1204-
return lift_async_value(ReadableStreamEnd, cx, i, t)
1274+
assert(not contains_borrow(t))
1275+
e = cx.inst.table.remove(i)
1276+
trap_if(not isinstance(e, ReadableStreamEnd))
1277+
trap_if(e.shared.t != t)
1278+
trap_if(e.copying)
1279+
return e.shared
12051280

12061281
def lift_future(cx, i, t):
1207-
v = lift_async_value(ReadableFutureEnd, cx, i, t)
1208-
trap_if(v.closed())
1209-
return v
1210-
1211-
def lift_async_value(ReadableEndT, cx, i, t):
12121282
assert(not contains_borrow(t))
12131283
e = cx.inst.table.remove(i)
1214-
trap_if(not isinstance(e, ReadableEndT))
1284+
trap_if(not isinstance(e, ReadableFutureEnd))
12151285
trap_if(e.shared.t != t)
12161286
trap_if(e.copying)
1287+
trap_if(e.done)
12171288
return e.shared
12181289

12191290
### Storing
@@ -1507,16 +1578,14 @@ def lower_borrow(cx, rep, t):
15071578
return cx.inst.table.add(h)
15081579

15091580
def lower_stream(cx, v, t):
1510-
return lower_async_value(ReadableStreamEnd, cx, v, t)
1581+
assert(isinstance(v, ReadableStream))
1582+
assert(not contains_borrow(t))
1583+
return cx.inst.table.add(ReadableStreamEnd(v))
15111584

15121585
def lower_future(cx, v, t):
1513-
assert(not v.closed())
1514-
return lower_async_value(ReadableFutureEnd, cx, v, t)
1515-
1516-
def lower_async_value(ReadableEndT, cx, v, t):
1517-
assert(isinstance(v, ReadableStream))
1586+
assert(isinstance(v, ReadableFuture))
15181587
assert(not contains_borrow(t))
1519-
return cx.inst.table.add(ReadableEndT(v))
1588+
return cx.inst.table.add(ReadableFutureEnd(v))
15201589

15211590
### Flattening
15221591

@@ -2161,45 +2230,37 @@ async def canon_stream_new(stream_t, task):
21612230

21622231
async def canon_future_new(future_t, task):
21632232
trap_if(not task.inst.may_leave)
2164-
shared = SharedStreamImpl(future_t.t)
2233+
shared = SharedFutureImpl(future_t.t)
21652234
ri = task.inst.table.add(ReadableFutureEnd(shared))
21662235
wi = task.inst.table.add(WritableFutureEnd(shared))
21672236
return [ ri | (wi << 32) ]
21682237

2169-
### 🔀 `canon {stream,future}.{read,write}`
2238+
### 🔀 `canon stream.{read,write}`
21702239

21712240
async def canon_stream_read(stream_t, opts, task, i, ptr, n):
2172-
return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2173-
stream_t, opts, task, i, ptr, n)
2241+
return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2242+
stream_t, opts, task, i, ptr, n)
21742243

21752244
async def canon_stream_write(stream_t, opts, task, i, ptr, n):
2176-
return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2177-
stream_t, opts, task, i, ptr, n)
2178-
2179-
async def canon_future_read(future_t, opts, task, i, ptr):
2180-
return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
2181-
future_t, opts, task, i, ptr, 1)
2182-
2183-
async def canon_future_write(future_t, opts, task, i, ptr):
2184-
return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
2185-
future_t, opts, task, i, ptr, 1)
2245+
return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2246+
stream_t, opts, task, i, ptr, n)
21862247

2187-
async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n):
2248+
async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n):
21882249
trap_if(not task.inst.may_leave)
21892250
e = task.inst.table.get(i)
21902251
trap_if(not isinstance(e, EndT))
2191-
trap_if(e.shared.t != stream_or_future_t.t)
2252+
trap_if(e.shared.t != stream_t.t)
21922253
trap_if(e.copying)
21932254

2194-
assert(not contains_borrow(stream_or_future_t))
2255+
assert(not contains_borrow(stream_t))
21952256
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2196-
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
2257+
buffer = BufferT(stream_t.t, cx, ptr, n)
21972258

21982259
def copy_event(why, revoke_buffer):
21992260
revoke_buffer()
22002261
assert(e.copying)
22012262
e.copying = False
2202-
return (event_code, i, pack_copy_result(task, e, buffer, why))
2263+
return (event_code, i, pack_stream_result(task, e, buffer, why))
22032264

22042265
def on_partial_copy(revoke_buffer):
22052266
e.set_event(partial(copy_event, 'completed', revoke_buffer))
@@ -2208,7 +2269,7 @@ def on_copy_done(why):
22082269
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
22092270

22102271
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) == 'done':
2211-
return [pack_copy_result(task, e, buffer, 'completed')]
2272+
return [pack_stream_result(task, e, buffer, 'completed')]
22122273
else:
22132274
e.copying = True
22142275
if opts.sync:
@@ -2224,7 +2285,7 @@ def on_copy_done(why):
22242285
CLOSED = 0x1
22252286
CANCELLED = 0x2
22262287

2227-
def pack_copy_result(task, e, buffer, why):
2288+
def pack_stream_result(task, e, buffer, why):
22282289
if e.shared.closed():
22292290
result = CLOSED
22302291
elif why == 'cancelled':
@@ -2238,6 +2299,66 @@ def pack_copy_result(task, e, buffer, why):
22382299
assert(packed != BLOCKED)
22392300
return packed
22402301

2302+
### 🔀 `canon future.{read,write}`
2303+
2304+
async def canon_future_read(future_t, opts, task, i, ptr):
2305+
def lower_value(v):
2306+
if future_t.t:
2307+
assert(not contains_borrow(future_t))
2308+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2309+
store(cx, v, future_t.t, ptr)
2310+
2311+
return await future_copy(ReadableFutureEnd, EventCode.FUTURE_READ, lower_value,
2312+
future_t, opts, task, i, ptr)
2313+
2314+
async def canon_future_write(future_t, opts, task, i, ptr):
2315+
def lift_value():
2316+
if future_t.t:
2317+
assert(not contains_borrow(future_t))
2318+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2319+
return load(cx, ptr, future_t.t)
2320+
2321+
return await future_copy(WritableFutureEnd, EventCode.FUTURE_WRITE, lift_value,
2322+
future_t, opts, task, i, ptr)
2323+
2324+
async def future_copy(EndT, event_code, copy_value, future_t, opts, task, i, ptr):
2325+
trap_if(not task.inst.may_leave)
2326+
e = task.inst.table.get(i)
2327+
trap_if(not isinstance(e, EndT))
2328+
trap_if(e.shared.t != future_t.t)
2329+
trap_if(e.copying or e.done)
2330+
2331+
def on_copy_done(why):
2332+
def copy_event():
2333+
assert(e.copying)
2334+
e.copying = False
2335+
if why != 'cancelled':
2336+
e.done = True
2337+
return (event_code, i, pack_future_result(task, e, why))
2338+
assert(not e.has_pending_event())
2339+
e.set_event(copy_event)
2340+
2341+
result = e.copy(task.inst, copy_value, on_copy_done)
2342+
if result != 'blocked':
2343+
e.done = True
2344+
return [pack_future_result(task, e, result)]
2345+
else:
2346+
e.copying = True
2347+
if opts.sync:
2348+
await task.wait_on(e.wait_for_pending_event(), sync = True)
2349+
code,index,payload = e.get_event()
2350+
assert(code == event_code and index == i)
2351+
return [payload]
2352+
else:
2353+
return [BLOCKED]
2354+
2355+
def pack_future_result(task, e, why):
2356+
match why:
2357+
case 'cancelled': return CANCELLED
2358+
case 'closed': return CLOSED
2359+
case 'done': return (CLOSED | (1 << 4))
2360+
assert(False)
2361+
22412362
### 🔀 `canon {stream,future}.cancel-{read,write}`
22422363

22432364
async def canon_stream_cancel_read(stream_t, sync, task, i):

0 commit comments

Comments
 (0)