Skip to content

Commit 27efedd

Browse files
committed
CABI: make futures make sense independently of streams
1 parent ce47535 commit 27efedd

2 files changed

Lines changed: 251 additions & 144 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 187 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -837,11 +837,13 @@ def copy(self, inst, buffer, on_copy, on_result, src, dst):
837837
class StreamEnd(Waitable):
838838
shared: ReadableStream
839839
copying: bool
840+
done: bool
840841

841842
def __init__(self, shared):
842843
Waitable.__init__(self)
843844
self.shared = shared
844845
self.copying = False
846+
self.done = False
845847

846848
def drop(self):
847849
trap_if(self.copying)
@@ -858,27 +860,110 @@ def copy(self, inst, src, on_copy, on_result):
858860

859861
#### Future State
860862

861-
class FutureEnd(StreamEnd):
862-
def close_after_copy(self, copy_op, inst, buffer, on_result):
863-
assert(buffer.remain() == 1)
864-
def on_result_wrapper(result):
865-
if buffer.remain() == 0:
866-
self.shared.close()
867-
on_result(result)
868-
ret = copy_op(inst, buffer, on_copy = lambda _:(), on_result = on_result_wrapper)
869-
if ret is not None and buffer.remain() == 0:
870-
self.shared.close()
871-
return StreamResult.CLOSED
872-
return ret
863+
class FutureResult(IntEnum):
864+
RESOLVED = 0
865+
DROPPED = 1 # only returned for future.write
866+
CANCELLED = 2
867+
868+
Lift = Callable[[], any]
869+
Lower = Callable[[any], None]
870+
OnFutureResult = Callable[[FutureResult], None]
871+
872+
class ReadableFuture:
873+
t: ValType
874+
read: Callable[[ComponentInstance, Lower, OnFutureResult], Optional[FutureResult]]
875+
cancel: Callable[[], None]
876+
drop: Callable[[]]
877+
878+
class SharedFutureImpl(ReadableFuture):
879+
dropped: bool
880+
pending_inst: Optional[ComponentInstance]
881+
pending_lift: Optional[Lift]
882+
pending_lower: Optional[Lower]
883+
pending_on_result: Optional[OnFutureResult]
884+
885+
def __init__(self, t):
886+
self.t = t
887+
self.dropped = False
888+
self.reset_pending()
889+
890+
def reset_pending(self):
891+
self.set_pending(None, None, None, None)
892+
893+
def set_pending(self, inst, lift, lower, on_result):
894+
assert(not (lift and lower))
895+
self.pending_inst = inst
896+
self.pending_lift = lift
897+
self.pending_lower = lower
898+
self.pending_on_result = on_result
899+
900+
def reset_and_notify_pending(self, result):
901+
pending_on_result = self.pending_on_result
902+
self.reset_pending()
903+
pending_on_result(result)
904+
905+
def cancel(self):
906+
self.reset_pending_and_notify_pending(FutureResult.CANCELLED)
907+
908+
def drop(self):
909+
assert(not self.dropped and not self.pending_lower)
910+
self.dropped = True
911+
if self.pending_lift:
912+
self.reset_and_notify_pending(FutureResult.DROPPED)
913+
914+
def read(self, inst, lower, on_result):
915+
assert(not self.dropped and not self.pending_lower)
916+
if not self.pending_lift:
917+
self.set_pending(inst, None, lower, on_result)
918+
return None
919+
else:
920+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
921+
lower(self.pending_lift())
922+
self.reset_and_notify_pending(FutureResult.RESOLVED)
923+
return FutureResult.RESOLVED
924+
925+
def write(self, inst, lift, on_result):
926+
assert(not self.pending_lift)
927+
if self.dropped:
928+
return FutureResult.DROPPED
929+
elif not self.pending_lower:
930+
self.set_pending(inst, lift, None, on_result)
931+
return None
932+
else:
933+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
934+
self.pending_lower(lift())
935+
self.reset_and_notify_pending(FutureResult.RESOLVED)
936+
return FutureResult.RESOLVED
937+
938+
class FutureEnd(Waitable):
939+
shared: ReadableFuture
940+
copying: bool
941+
done: bool
942+
943+
def __init__(self, shared):
944+
Waitable.__init__(self)
945+
self.shared = shared
946+
self.copying = False
947+
self.done = False
948+
949+
def drop(self):
950+
trap_if(self.copying)
951+
Waitable.drop(self)
873952

874953
class ReadableFutureEnd(FutureEnd):
875-
def copy(self, inst, dst, on_copy, on_result):
876-
return self.close_after_copy(self.shared.read, inst, dst, on_result)
954+
def copy(self, inst, lower, on_result):
955+
return self.shared.read(inst, lower, on_result)
956+
957+
def drop(self):
958+
self.shared.drop()
959+
FutureEnd.drop(self)
877960

878961
class WritableFutureEnd(FutureEnd):
879-
def copy(self, inst, src, on_copy, on_result):
880-
return self.close_after_copy(self.shared.write, inst, src, on_result)
962+
def copy(self, inst, lift, on_result):
963+
return self.shared.write(inst, lift, on_result)
964+
881965
def drop(self):
966+
trap_if(not self.done)
882967
FutureEnd.drop(self)
883968

884969
### Despecialization
@@ -1210,16 +1295,15 @@ def lift_stream(cx, i, t):
12101295
return lift_async_value(ReadableStreamEnd, cx, i, t)
12111296

12121297
def lift_future(cx, i, t):
1213-
v = lift_async_value(ReadableFutureEnd, cx, i, t)
1214-
trap_if(v.closed())
1215-
return v
1298+
return lift_async_value(ReadableFutureEnd, cx, i, t)
12161299

12171300
def lift_async_value(ReadableEndT, cx, i, t):
12181301
assert(not contains_borrow(t))
12191302
e = cx.inst.table.remove(i)
12201303
trap_if(not isinstance(e, ReadableEndT))
12211304
trap_if(e.shared.t != t)
12221305
trap_if(e.copying)
1306+
trap_if(e.done)
12231307
return e.shared
12241308

12251309
### Storing
@@ -1513,14 +1597,14 @@ def lower_borrow(cx, rep, t):
15131597
return cx.inst.table.add(h)
15141598

15151599
def lower_stream(cx, v, t):
1600+
assert(isinstance(v, ReadableStream))
15161601
return lower_async_value(ReadableStreamEnd, cx, v, t)
15171602

15181603
def lower_future(cx, v, t):
1519-
assert(not v.closed())
1604+
assert(isinstance(v, ReadableFuture))
15201605
return lower_async_value(ReadableFutureEnd, cx, v, t)
15211606

15221607
def lower_async_value(ReadableEndT, cx, v, t):
1523-
assert(isinstance(v, ReadableStream))
15241608
assert(not contains_borrow(t))
15251609
return cx.inst.table.add(ReadableEndT(v))
15261610

@@ -2160,47 +2244,41 @@ async def canon_stream_new(stream_t, task):
21602244

21612245
async def canon_future_new(future_t, task):
21622246
trap_if(not task.inst.may_leave)
2163-
shared = SharedStreamImpl(future_t.t)
2247+
shared = SharedFutureImpl(future_t.t)
21642248
ri = task.inst.table.add(ReadableFutureEnd(shared))
21652249
wi = task.inst.table.add(WritableFutureEnd(shared))
21662250
return [ ri | (wi << 32) ]
21672251

2168-
### 🔀 `canon {stream,future}.{read,write}`
2252+
### 🔀 `canon stream.{read,write}`
21692253

21702254
BLOCKED = 0xffff_ffff
21712255

21722256
async def canon_stream_read(stream_t, opts, task, i, ptr, n):
2173-
return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2174-
stream_t, opts, task, i, ptr, n)
2257+
return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ,
2258+
stream_t, opts, task, i, ptr, n)
21752259

21762260
async def canon_stream_write(stream_t, opts, task, i, ptr, n):
2177-
return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2178-
stream_t, opts, task, i, ptr, n)
2179-
2180-
async def canon_future_read(future_t, opts, task, i, ptr):
2181-
return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ,
2182-
future_t, opts, task, i, ptr, 1)
2261+
return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE,
2262+
stream_t, opts, task, i, ptr, n)
21832263

2184-
async def canon_future_write(future_t, opts, task, i, ptr):
2185-
return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE,
2186-
future_t, opts, task, i, ptr, 1)
2187-
2188-
async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n):
2264+
async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n):
21892265
trap_if(not task.inst.may_leave)
21902266
e = task.inst.table.get(i)
21912267
trap_if(not isinstance(e, EndT))
2192-
trap_if(e.shared.t != stream_or_future_t.t)
2268+
trap_if(e.shared.t != stream_t.t)
21932269
trap_if(e.copying)
21942270

2195-
assert(not contains_borrow(stream_or_future_t))
2271+
assert(not contains_borrow(stream_t))
21962272
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2197-
buffer = BufferT(stream_or_future_t.t, cx, ptr, n)
2273+
buffer = BufferT(stream_t.t, cx, ptr, n)
21982274

21992275
def copy_event(result, revoke_buffer):
22002276
revoke_buffer()
22012277
assert(e.copying)
22022278
e.copying = False
2203-
return (event_code, i, pack_copy_result(result, buffer))
2279+
if result != StreamResult.CANCELLED:
2280+
e.done = True
2281+
return (event_code, i, pack_stream_result(result, buffer))
22042282

22052283
def on_copy(revoke_buffer):
22062284
e.set_event(partial(copy_event, StreamResult.COMPLETED, revoke_buffer))
@@ -2210,7 +2288,9 @@ def on_result(result):
22102288

22112289
result = e.copy(task.inst, buffer, on_copy, on_result)
22122290
if result is not None:
2213-
return [pack_copy_result(result, buffer)]
2291+
if result != StreamResult.CANCELLED:
2292+
e.done = True
2293+
return [pack_stream_result(result, buffer)]
22142294
else:
22152295
e.copying = True
22162296
if opts.sync:
@@ -2221,13 +2301,68 @@ def on_result(result):
22212301
else:
22222302
return [BLOCKED]
22232303

2224-
def pack_copy_result(result, buffer):
2304+
def pack_stream_result(result, buffer):
22252305
assert(0 <= result < 2**4)
22262306
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
22272307
packed = result | (buffer.progress << 4)
22282308
assert(packed != BLOCKED)
22292309
return packed
22302310

2311+
### 🔀 `canon future.{read,write}`
2312+
2313+
async def canon_future_read(future_t, opts, task, i, flat_args):
2314+
assert(len(flat_args) == int(bool(future_t.t)))
2315+
def lower(v):
2316+
if future_t.t:
2317+
assert(not contains_borrow(future_t))
2318+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2319+
store(cx, v, future_t.t, flat_args[0])
2320+
2321+
return await future_copy(ReadableFutureEnd, EventCode.FUTURE_READ, lower,
2322+
future_t, opts, task, i)
2323+
2324+
async def canon_future_write(future_t, opts, task, i, flat_args):
2325+
def lift():
2326+
if future_t.t:
2327+
assert(not contains_borrow(future_t))
2328+
cx = LiftLowerContext(opts, task.inst, borrow_scope = None)
2329+
[v] = lift_flat_values(cx, MAX_FLAT_ASYNC_PARAMS, CoreValueIter(flat_args), [future_t.t])
2330+
return v
2331+
2332+
return await future_copy(WritableFutureEnd, EventCode.FUTURE_WRITE, lift,
2333+
future_t, opts, task, i)
2334+
2335+
async def future_copy(EndT, event_code, copy, future_t, opts, task, i):
2336+
trap_if(not task.inst.may_leave)
2337+
e = task.inst.table.get(i)
2338+
trap_if(not isinstance(e, EndT))
2339+
trap_if(e.shared.t != future_t.t)
2340+
trap_if(e.copying or e.done)
2341+
2342+
def on_result(result):
2343+
def copy_event():
2344+
assert(e.copying)
2345+
e.copying = False
2346+
if result != FutureResult.CANCELLED:
2347+
e.done = True
2348+
return (event_code, i, int(result))
2349+
e.set_event(copy_event)
2350+
2351+
result = e.copy(task.inst, copy, on_result)
2352+
if result is not None:
2353+
if result != FutureResult.CANCELLED:
2354+
e.done = True
2355+
return [int(result)]
2356+
else:
2357+
e.copying = True
2358+
if opts.sync:
2359+
await task.wait_on(e.wait_for_pending_event(), sync = True)
2360+
code,index,payload = e.get_event()
2361+
assert(code == event_code and index == i)
2362+
return [payload]
2363+
else:
2364+
return [BLOCKED]
2365+
22312366
### 🔀 `canon {stream,future}.cancel-{read,write}`
22322367

22332368
async def canon_stream_cancel_read(stream_t, sync, task, i):
@@ -2259,21 +2394,21 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
22592394
assert(not e.copying and code == event_code and index == i)
22602395
return [payload]
22612396

2262-
### 🔀 `canon {stream,future}.close-{readable,writable}`
2397+
### 🔀 `canon {stream,future}.drop-{readable,writable}`
22632398

2264-
async def canon_stream_close_readable(stream_t, task, i):
2265-
return await close(ReadableStreamEnd, stream_t, task, i)
2399+
async def canon_stream_drop_readable(stream_t, task, i):
2400+
return await drop(ReadableStreamEnd, stream_t, task, i)
22662401

2267-
async def canon_stream_close_writable(stream_t, task, hi):
2268-
return await close(WritableStreamEnd, stream_t, task, hi)
2402+
async def canon_stream_drop_writable(stream_t, task, hi):
2403+
return await drop(WritableStreamEnd, stream_t, task, hi)
22692404

2270-
async def canon_future_close_readable(future_t, task, i):
2271-
return await close(ReadableFutureEnd, future_t, task, i)
2405+
async def canon_future_drop_readable(future_t, task, i):
2406+
return await drop(ReadableFutureEnd, future_t, task, i)
22722407

2273-
async def canon_future_close_writable(future_t, task, hi):
2274-
return await close(WritableFutureEnd, future_t, task, hi)
2408+
async def canon_future_drop_writable(future_t, task, hi):
2409+
return await drop(WritableFutureEnd, future_t, task, hi)
22752410

2276-
async def close(EndT, stream_or_future_t, task, hi):
2411+
async def drop(EndT, stream_or_future_t, task, hi):
22772412
trap_if(not task.inst.may_leave)
22782413
e = task.inst.table.remove(hi)
22792414
trap_if(not isinstance(e, EndT))

0 commit comments

Comments
 (0)