Skip to content

Commit 688992d

Browse files
committed
CABI: simplify future logic by splitting out from streams
1 parent fa1b34a commit 688992d

2 files changed

Lines changed: 202 additions & 89 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 178 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -853,26 +853,103 @@ def copy(self, inst, src, on_partial_copy, on_copy_done):
853853

854854
#### Future State
855855

856-
class FutureEnd(StreamEnd):
857-
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
858-
assert(buffer.remain() == 1)
859-
def on_copy_done_wrapper(why):
860-
if buffer.remain() == 0:
861-
self.shared.close()
862-
on_copy_done(why)
863-
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
864-
if ret == 'done' and buffer.remain() == 0:
865-
self.shared.close()
866-
return ret
856+
LiftValue = Callable[[], any]
857+
LowerValue = Callable[[any], None]
858+
859+
class ReadableFuture:
860+
t: ValType
861+
read: Callable[[ComponentInstance, LowerValue, OnCopyDone], Literal['done','blocked']]
862+
cancel: Callable[[], None]
863+
drop: Callable[[]]
864+
865+
class SharedFutureImpl(ReadableFuture):
866+
reader_dropped: bool
867+
pending_inst: Optional[ComponentInstance]
868+
pending_lift_value: Optional[LiftValue]
869+
pending_lower_value: Optional[LowerValue]
870+
pending_on_copy_done: Optional[OnCopyDone]
871+
872+
def __init__(self, t):
873+
self.t = t
874+
self.reader_dropped = False
875+
self.reset_pending()
876+
877+
def reset_pending(self):
878+
self.set_pending(None, None, None, None)
879+
880+
def set_pending(self, inst, lift_value, lower_value, on_copy_done):
881+
self.pending_inst = inst
882+
self.pending_lift_value = lift_value
883+
self.pending_lower_value = lower_value
884+
self.pending_on_copy_done = on_copy_done
885+
886+
def reset_and_notify_pending(self, why):
887+
pending_on_copy_done = self.pending_on_copy_done
888+
self.reset_pending()
889+
pending_on_copy_done(why)
890+
891+
def cancel(self):
892+
self.reset_and_notify_pending('cancelled')
893+
894+
def drop(self):
895+
assert(not self.reader_dropped and not self.pending_lower_value)
896+
self.reader_dropped = True
897+
if self.pending_lift_value:
898+
self.reset_and_notify_pending('closed')
899+
900+
def read(self, inst, lower_value, on_copy_done):
901+
assert(not self.reader_dropped and not self.pending_lower_value)
902+
if not self.pending_lift_value:
903+
self.set_pending(inst, None, lower_value, on_copy_done)
904+
return 'blocked'
905+
else:
906+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
907+
lower_value(self.pending_lift_value())
908+
self.reset_and_notify_pending('done')
909+
return 'done'
910+
911+
def write(self, inst, lift_value, on_copy_done):
912+
assert(not self.pending_lift_value)
913+
if self.reader_dropped:
914+
return 'closed'
915+
elif not self.pending_lower_value:
916+
self.set_pending(inst, lift_value, None, on_copy_done)
917+
return 'blocked'
918+
else:
919+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
920+
self.pending_lower_value(lift_value())
921+
self.reset_and_notify_pending('done')
922+
return 'done'
923+
924+
class FutureEnd(Waitable):
925+
shared: ReadableFuture
926+
copying: bool
927+
done: bool
928+
929+
def __init__(self, shared):
930+
Waitable.__init__(self)
931+
self.shared = shared
932+
self.copying = False
933+
self.done = False
934+
935+
def drop(self):
936+
trap_if(self.copying)
937+
Waitable.drop(self)
867938

868939
class ReadableFutureEnd(FutureEnd):
869-
def copy(self, inst, dst, on_partial_copy, on_copy_done):
870-
return self.close_after_copy(self.shared.read, inst, dst, on_copy_done)
940+
def copy(self, inst, lower_value, on_copy_done):
941+
return self.shared.read(inst, lower_value, on_copy_done)
942+
943+
def drop(self):
944+
self.shared.drop()
945+
FutureEnd.drop(self)
871946

872947
class WritableFutureEnd(FutureEnd):
873-
def copy(self, inst, src, on_partial_copy, on_copy_done):
874-
return self.close_after_copy(self.shared.write, inst, src, on_copy_done)
948+
def copy(self, inst, lift_value, on_copy_done):
949+
return self.shared.write(inst, lift_value, on_copy_done)
950+
875951
def drop(self):
952+
trap_if(not self.done)
876953
FutureEnd.drop(self)
877954

878955
### Despecialization
@@ -1201,19 +1278,20 @@ def lift_borrow(cx, i, t):
12011278
return h.rep
12021279

12031280
def lift_stream(cx, i, t):
1204-
return lift_async_value(ReadableStreamEnd, cx, i, t)
1281+
assert(not contains_borrow(t))
1282+
e = cx.inst.table.remove(i)
1283+
trap_if(not isinstance(e, ReadableStreamEnd))
1284+
trap_if(e.shared.t != t)
1285+
trap_if(e.copying)
1286+
return e.shared
12051287

12061288
def lift_future(cx, i, t):
1207-
v = lift_async_value(ReadableFutureEnd, cx, i, t)
1208-
trap_if(v.closed())
1209-
return v
1210-
1211-
def lift_async_value(ReadableEndT, cx, i, t):
12121289
assert(not contains_borrow(t))
12131290
e = cx.inst.table.remove(i)
1214-
trap_if(not isinstance(e, ReadableEndT))
1291+
trap_if(not isinstance(e, ReadableFutureEnd))
12151292
trap_if(e.shared.t != t)
12161293
trap_if(e.copying)
1294+
trap_if(e.done)
12171295
return e.shared
12181296

12191297
### Storing
@@ -1507,16 +1585,14 @@ def lower_borrow(cx, rep, t):
15071585
return cx.inst.table.add(h)
15081586

15091587
def lower_stream(cx, v, t):
1510-
return lower_async_value(ReadableStreamEnd, cx, v, t)
1588+
assert(isinstance(v, ReadableStream))
1589+
assert(not contains_borrow(t))
1590+
return cx.inst.table.add(ReadableStreamEnd(v))
15111591

15121592
def lower_future(cx, v, t):
1513-
assert(not v.closed())
1514-
return lower_async_value(ReadableFutureEnd, cx, v, t)
1515-
1516-
def lower_async_value(ReadableEndT, cx, v, t):
1517-
assert(isinstance(v, ReadableStream))
1593+
assert(isinstance(v, ReadableFuture))
15181594
assert(not contains_borrow(t))
1519-
return cx.inst.table.add(ReadableEndT(v))
1595+
return cx.inst.table.add(ReadableFutureEnd(v))
15201596

15211597
### Flattening
15221598

@@ -2158,45 +2234,37 @@ async def canon_stream_new(stream_t, task):
21582234

21592235
async def canon_future_new(future_t, task):
21602236
trap_if(not task.inst.may_leave)
2161-
shared = SharedStreamImpl(future_t.t)
2237+
shared = SharedFutureImpl(future_t.t)
21622238
ri = task.inst.table.add(ReadableFutureEnd(shared))
21632239
wi = task.inst.table.add(WritableFutureEnd(shared))
21642240
return [ ri | (wi << 32) ]
21652241

2166-
### 🔀 `canon {stream,future}.{read,write}`
2242+
### 🔀 `canon stream.{read,write}`
21672243

21682244
async def canon_stream_read(stream_t, opts, task, i, ptr, n):
2169-
return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2170-
stream_t, opts, task, i, ptr, n)
2245+
return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2246+
stream_t, opts, task, i, ptr, n)
21712247

21722248
async def canon_stream_write(stream_t, opts, task, i, ptr, n):
2173-
return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2174-
stream_t, opts, task, i, ptr, n)
2175-
2176-
async def canon_future_read(future_t, opts, task, i, ptr):
2177-
return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
2178-
future_t, opts, task, i, ptr, 1)
2249+
return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2250+
stream_t, opts, task, i, ptr, n)
21792251

2180-
async def canon_future_write(future_t, opts, task, i, ptr):
2181-
return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
2182-
future_t, opts, task, i, ptr, 1)
2183-
2184-
async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n):
2252+
async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n):
21852253
trap_if(not task.inst.may_leave)
21862254
e = task.inst.table.get(i)
21872255
trap_if(not isinstance(e, EndT))
2188-
trap_if(e.shared.t != stream_or_future_t.t)
2256+
trap_if(e.shared.t != stream_t.t)
21892257
trap_if(e.copying)
21902258

2191-
assert(not contains_borrow(stream_or_future_t))
2259+
assert(not contains_borrow(stream_t))
21922260
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2193-
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
2261+
buffer = BufferT(stream_t.t, cx, ptr, n)
21942262

21952263
def copy_event(why, revoke_buffer):
21962264
revoke_buffer()
21972265
assert(e.copying)
21982266
e.copying = False
2199-
return (event_code, i, pack_copy_result(task, e, buffer, why))
2267+
return (event_code, i, pack_stream_result(task, e, buffer, why))
22002268

22012269
def on_partial_copy(revoke_buffer):
22022270
e.set_event(partial(copy_event, 'completed', revoke_buffer))
@@ -2205,7 +2273,7 @@ def on_copy_done(why):
22052273
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
22062274

22072275
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) == 'done':
2208-
return [pack_copy_result(task, e, buffer, 'completed')]
2276+
return [pack_stream_result(task, e, buffer, 'completed')]
22092277
else:
22102278
e.copying = True
22112279
if opts.sync:
@@ -2221,7 +2289,7 @@ def on_copy_done(why):
22212289
CLOSED = 0x1
22222290
CANCELLED = 0x2
22232291

2224-
def pack_copy_result(task, e, buffer, why):
2292+
def pack_stream_result(task, e, buffer, why):
22252293
if e.shared.closed():
22262294
result = CLOSED
22272295
elif why == 'cancelled':
@@ -2235,6 +2303,66 @@ def pack_copy_result(task, e, buffer, why):
22352303
assert(packed != BLOCKED)
22362304
return packed
22372305

2306+
### 🔀 `canon future.{read,write}`
2307+
2308+
async def canon_future_read(future_t, opts, task, i, ptr):
2309+
return await future_copy(ReadableFutureEnd, EventCode.FUTURE_READ,
2310+
future_t, opts, task, i, ptr)
2311+
2312+
async def canon_future_write(future_t, opts, task, i, ptr):
2313+
return await future_copy(WritableFutureEnd, EventCode.FUTURE_WRITE,
2314+
future_t, opts, task, i, ptr)
2315+
2316+
async def future_copy(EndT, event_code, future_t, opts, task, i, ptr):
2317+
trap_if(not task.inst.may_leave)
2318+
e = task.inst.table.get(i)
2319+
trap_if(not isinstance(e, EndT))
2320+
trap_if(e.shared.t != future_t.t)
2321+
trap_if(e.copying or e.done)
2322+
2323+
assert(not contains_borrow(future_t))
2324+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2325+
if event_code == EventCode.FUTURE_READ:
2326+
def lift_or_lower_value(v):
2327+
if future_t.t:
2328+
store(cx, v, future_t.t, ptr)
2329+
else:
2330+
def lift_or_lower_value():
2331+
if future_t.t:
2332+
return load(cx, ptr, future_t.t)
2333+
2334+
def on_copy_done(why):
2335+
def copy_event():
2336+
assert(e.copying)
2337+
e.copying = False
2338+
if why != 'cancelled':
2339+
e.done = True
2340+
return (event_code, i, pack_future_result(task, e, why))
2341+
assert(not e.has_pending_event())
2342+
e.set_event(copy_event)
2343+
2344+
result = e.copy(task.inst, lift_or_lower_value, on_copy_done)
2345+
if result != 'blocked':
2346+
e.done = True
2347+
return [pack_future_result(task, e, result)]
2348+
else:
2349+
e.copying = True
2350+
if opts.sync:
2351+
await task.wait_on(e.wait_for_pending_event(), sync = True)
2352+
code,index,payload = e.get_event()
2353+
assert(code == event_code and index == i)
2354+
return [payload]
2355+
else:
2356+
return [BLOCKED]
2357+
2358+
def pack_future_result(task, e, why):
2359+
match why:
2360+
case 'cancelled': return CANCELLED
2361+
case 'closed': return CLOSED
2362+
case 'done': return (CLOSED | (1 << 4))
2363+
print(f" GOT {why}")
2364+
assert(False)
2365+
22382366
### 🔀 `canon {stream,future}.cancel-{read,write}`
22392367

22402368
async def canon_stream_cancel_read(stream_t, sync, task, i):

0 commit comments

Comments
 (0)