@@ -837,11 +837,13 @@ def copy(self, inst, buffer, on_copy, on_result, src, dst):
837837class StreamEnd (Waitable ):
838838 shared : ReadableStream
839839 copying : bool
840+ done : bool
840841
841842 def __init__ (self , shared ):
842843 Waitable .__init__ (self )
843844 self .shared = shared
844845 self .copying = False
846+ self .done = False
845847
846848 def drop (self ):
847849 trap_if (self .copying )
@@ -858,28 +860,110 @@ def copy(self, inst, src, on_copy, on_result):
858860
859861#### Future State
860862
861- class FutureEnd (StreamEnd ):
862- def close_after_copy (self , copy_op , inst , buffer , on_result ):
863- assert (buffer .remain () == 1 )
864- def on_result_wrapper (result ):
865- if buffer .remain () == 0 :
866- self .shared .close ()
867- on_result (result )
868- ret = copy_op (inst , buffer , on_copy = lambda _ :(), on_result = on_result_wrapper )
869- if ret is not None and buffer .remain () == 0 :
870- self .shared .close ()
871- return StreamResult .CLOSED
872- return ret
863+ class FutureResult (IntEnum ):
864+ RESOLVED = 0
865+ DROPPED = 1 # only returned for future.write
866+ CANCELLED = 2
867+
868+ Lift = Callable [[], any ]
869+ Lower = Callable [[any ], None ]
870+ OnFutureResult = Callable [[FutureResult ], None ]
871+
872+ class ReadableFuture :
873+ t : ValType
874+ read : Callable [[ComponentInstance , Lower , OnFutureResult ], Optional [FutureResult ]]
875+ cancel : Callable [[], None ]
876+ drop : Callable [[]]
877+
878+ class SharedFutureImpl (ReadableFuture ):
879+ dropped : bool
880+ pending_inst : Optional [ComponentInstance ]
881+ pending_lift : Optional [Lift ]
882+ pending_lower : Optional [Lower ]
883+ pending_on_result : Optional [OnFutureResult ]
884+
885+ def __init__ (self , t ):
886+ self .t = t
887+ self .dropped = False
888+ self .reset_pending ()
889+
890+ def reset_pending (self ):
891+ self .set_pending (None , None , None , None )
892+
893+ def set_pending (self , inst , lift , lower , on_result ):
894+ assert (not (lift and lower ))
895+ self .pending_inst = inst
896+ self .pending_lift = lift
897+ self .pending_lower = lower
898+ self .pending_on_result = on_result
899+
900+ def reset_and_notify_pending (self , result ):
901+ pending_on_result = self .pending_on_result
902+ self .reset_pending ()
903+ pending_on_result (result )
904+
905+ def cancel (self ):
906+ self .reset_pending_and_notify_pending (FutureResult .CANCELLED )
907+
908+ def drop (self ):
909+ assert (not self .dropped and not self .pending_lower )
910+ self .dropped = True
911+ if self .pending_lift :
912+ self .reset_and_notify_pending (FutureResult .DROPPED )
913+
914+ def read (self , inst , lower , on_result ):
915+ assert (not self .dropped and not self .pending_lower )
916+ if not self .pending_lift :
917+ self .set_pending (inst , None , lower , on_result )
918+ return None
919+ else :
920+ trap_if (inst is self .pending_inst and self .t is not None ) # temporary
921+ lower (self .pending_lift ())
922+ self .reset_and_notify_pending (FutureResult .RESOLVED )
923+ return FutureResult .RESOLVED
924+
925+ def write (self , inst , lift , on_result ):
926+ assert (not self .pending_lift )
927+ if self .dropped :
928+ return FutureResult .DROPPED
929+ elif not self .pending_lower :
930+ self .set_pending (inst , lift , None , on_result )
931+ return None
932+ else :
933+ trap_if (inst is self .pending_inst and self .t is not None ) # temporary
934+ self .pending_lower (lift ())
935+ self .reset_and_notify_pending (FutureResult .RESOLVED )
936+ return FutureResult .RESOLVED
937+
938+ class FutureEnd (Waitable ):
939+ shared : ReadableFuture
940+ copying : bool
941+ done : bool
942+
943+ def __init__ (self , shared ):
944+ Waitable .__init__ (self )
945+ self .shared = shared
946+ self .copying = False
947+ self .done = False
948+
949+ def drop (self ):
950+ trap_if (self .copying )
951+ Waitable .drop (self )
873952
874953class ReadableFutureEnd (FutureEnd ):
875- def copy (self , inst , dst , on_copy , on_result ):
876- return self .close_after_copy (self .shared .read , inst , dst , on_result )
954+ def copy (self , inst , lower , on_result ):
955+ return self .shared .read (inst , lower , on_result )
956+
957+ def drop (self ):
958+ self .shared .drop ()
959+ FutureEnd .drop (self )
877960
878961class WritableFutureEnd (FutureEnd ):
879- def copy (self , inst , src , on_copy , on_result ):
880- return self .close_after_copy (self .shared .write , inst , src , on_result )
962+ def copy (self , inst , lift , on_result ):
963+ return self .shared .write (inst , lift , on_result )
964+
881965 def drop (self ):
882- trap_if (not self .shared . closed () )
966+ trap_if (not self .done )
883967 FutureEnd .drop (self )
884968
885969### Despecialization
@@ -1211,16 +1295,15 @@ def lift_stream(cx, i, t):
12111295 return lift_async_value (ReadableStreamEnd , cx , i , t )
12121296
12131297def lift_future (cx , i , t ):
1214- v = lift_async_value (ReadableFutureEnd , cx , i , t )
1215- trap_if (v .closed ())
1216- return v
1298+ return lift_async_value (ReadableFutureEnd , cx , i , t )
12171299
12181300def lift_async_value (ReadableEndT , cx , i , t ):
12191301 assert (not contains_borrow (t ))
12201302 e = cx .inst .table .remove (i )
12211303 trap_if (not isinstance (e , ReadableEndT ))
12221304 trap_if (e .shared .t != t )
12231305 trap_if (e .copying )
1306+ trap_if (e .done )
12241307 return e .shared
12251308
12261309### Storing
@@ -1514,14 +1597,14 @@ def lower_borrow(cx, rep, t):
15141597 return cx .inst .table .add (h )
15151598
15161599def lower_stream (cx , v , t ):
1600+ assert (isinstance (v , ReadableStream ))
15171601 return lower_async_value (ReadableStreamEnd , cx , v , t )
15181602
15191603def lower_future (cx , v , t ):
1520- assert (not v . closed ( ))
1604+ assert (isinstance ( v , ReadableFuture ))
15211605 return lower_async_value (ReadableFutureEnd , cx , v , t )
15221606
15231607def lower_async_value (ReadableEndT , cx , v , t ):
1524- assert (isinstance (v , ReadableStream ))
15251608 assert (not contains_borrow (t ))
15261609 return cx .inst .table .add (ReadableEndT (v ))
15271610
@@ -2161,47 +2244,41 @@ async def canon_stream_new(stream_t, task):
21612244
21622245async def canon_future_new (future_t , task ):
21632246 trap_if (not task .inst .may_leave )
2164- shared = SharedStreamImpl (future_t .t )
2247+ shared = SharedFutureImpl (future_t .t )
21652248 ri = task .inst .table .add (ReadableFutureEnd (shared ))
21662249 wi = task .inst .table .add (WritableFutureEnd (shared ))
21672250 return [ ri | (wi << 32 ) ]
21682251
2169- ### 🔀 `canon { stream,future} .{read,write}`
2252+ ### 🔀 `canon stream.{read,write}`
21702253
21712254BLOCKED = 0xffff_ffff
21722255
21732256async def canon_stream_read (stream_t , opts , task , i , ptr , n ):
2174- return await copy (ReadableStreamEnd , WritableBufferGuestImpl , EventCode .STREAM_READ ,
2175- stream_t , opts , task , i , ptr , n )
2257+ return await stream_copy (ReadableStreamEnd , WritableBufferGuestImpl , EventCode .STREAM_READ ,
2258+ stream_t , opts , task , i , ptr , n )
21762259
21772260async def canon_stream_write (stream_t , opts , task , i , ptr , n ):
2178- return await copy (WritableStreamEnd , ReadableBufferGuestImpl , EventCode .STREAM_WRITE ,
2179- stream_t , opts , task , i , ptr , n )
2180-
2181- async def canon_future_read (future_t , opts , task , i , ptr ):
2182- return await copy (ReadableFutureEnd , WritableBufferGuestImpl , EventCode .FUTURE_READ ,
2183- future_t , opts , task , i , ptr , 1 )
2261+ return await stream_copy (WritableStreamEnd , ReadableBufferGuestImpl , EventCode .STREAM_WRITE ,
2262+ stream_t , opts , task , i , ptr , n )
21842263
2185- async def canon_future_write (future_t , opts , task , i , ptr ):
2186- return await copy (WritableFutureEnd , ReadableBufferGuestImpl , EventCode .FUTURE_WRITE ,
2187- future_t , opts , task , i , ptr , 1 )
2188-
2189- async def copy (EndT , BufferT , event_code , stream_or_future_t , opts , task , i , ptr , n ):
2264+ async def stream_copy (EndT , BufferT , event_code , stream_t , opts , task , i , ptr , n ):
21902265 trap_if (not task .inst .may_leave )
21912266 e = task .inst .table .get (i )
21922267 trap_if (not isinstance (e , EndT ))
2193- trap_if (e .shared .t != stream_or_future_t .t )
2268+ trap_if (e .shared .t != stream_t .t )
21942269 trap_if (e .copying )
21952270
2196- assert (not contains_borrow (stream_or_future_t ))
2271+ assert (not contains_borrow (stream_t ))
21972272 cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2198- buffer = BufferT (stream_or_future_t .t , cx , ptr , n )
2273+ buffer = BufferT (stream_t .t , cx , ptr , n )
21992274
22002275 def copy_event (result , revoke_buffer ):
22012276 revoke_buffer ()
22022277 assert (e .copying )
22032278 e .copying = False
2204- return (event_code , i , pack_copy_result (result , buffer ))
2279+ if result != StreamResult .CANCELLED :
2280+ e .done = True
2281+ return (event_code , i , pack_stream_result (result , buffer ))
22052282
22062283 def on_copy (revoke_buffer ):
22072284 e .set_event (partial (copy_event , StreamResult .COMPLETED , revoke_buffer ))
@@ -2211,7 +2288,9 @@ def on_result(result):
22112288
22122289 result = e .copy (task .inst , buffer , on_copy , on_result )
22132290 if result is not None :
2214- return [pack_copy_result (result , buffer )]
2291+ if result != StreamResult .CANCELLED :
2292+ e .done = True
2293+ return [pack_stream_result (result , buffer )]
22152294 else :
22162295 e .copying = True
22172296 if opts .sync :
@@ -2222,13 +2301,68 @@ def on_result(result):
22222301 else :
22232302 return [BLOCKED ]
22242303
2225- def pack_copy_result (result , buffer ):
2304+ def pack_stream_result (result , buffer ):
22262305 assert (0 <= result < 2 ** 4 )
22272306 assert (buffer .progress <= Buffer .MAX_LENGTH < 2 ** 28 )
22282307 packed = result | (buffer .progress << 4 )
22292308 assert (packed != BLOCKED )
22302309 return packed
22312310
2311+ ### 🔀 `canon future.{read,write}`
2312+
2313+ async def canon_future_read (future_t , opts , task , i , flat_args ):
2314+ assert (len (flat_args ) == int (bool (future_t .t )))
2315+ def lower (v ):
2316+ if future_t .t :
2317+ assert (not contains_borrow (future_t ))
2318+ cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2319+ store (cx , v , future_t .t , flat_args [0 ])
2320+
2321+ return await future_copy (ReadableFutureEnd , EventCode .FUTURE_READ , lower ,
2322+ future_t , opts , task , i )
2323+
2324+ async def canon_future_write (future_t , opts , task , i , flat_args ):
2325+ def lift ():
2326+ if future_t .t :
2327+ assert (not contains_borrow (future_t ))
2328+ cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2329+ [v ] = lift_flat_values (cx , MAX_FLAT_ASYNC_PARAMS , CoreValueIter (flat_args ), [future_t .t ])
2330+ return v
2331+
2332+ return await future_copy (WritableFutureEnd , EventCode .FUTURE_WRITE , lift ,
2333+ future_t , opts , task , i )
2334+
2335+ async def future_copy (EndT , event_code , copy , future_t , opts , task , i ):
2336+ trap_if (not task .inst .may_leave )
2337+ e = task .inst .table .get (i )
2338+ trap_if (not isinstance (e , EndT ))
2339+ trap_if (e .shared .t != future_t .t )
2340+ trap_if (e .copying or e .done )
2341+
2342+ def on_result (result ):
2343+ def copy_event ():
2344+ assert (e .copying )
2345+ e .copying = False
2346+ if result != FutureResult .CANCELLED :
2347+ e .done = True
2348+ return (event_code , i , int (result ))
2349+ e .set_event (copy_event )
2350+
2351+ result = e .copy (task .inst , copy , on_result )
2352+ if result is not None :
2353+ if result != FutureResult .CANCELLED :
2354+ e .done = True
2355+ return [int (result )]
2356+ else :
2357+ e .copying = True
2358+ if opts .sync :
2359+ await task .wait_on (e .wait_for_pending_event (), sync = True )
2360+ code ,index ,payload = e .get_event ()
2361+ assert (code == event_code and index == i )
2362+ return [payload ]
2363+ else :
2364+ return [BLOCKED ]
2365+
22322366### 🔀 `canon {stream,future}.cancel-{read,write}`
22332367
22342368async def canon_stream_cancel_read (stream_t , sync , task , i ):
@@ -2260,21 +2394,21 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
22602394 assert (not e .copying and code == event_code and index == i )
22612395 return [payload ]
22622396
2263- ### 🔀 `canon {stream,future}.close -{readable,writable}`
2397+ ### 🔀 `canon {stream,future}.drop -{readable,writable}`
22642398
2265- async def canon_stream_close_readable (stream_t , task , i ):
2266- return await close (ReadableStreamEnd , stream_t , task , i )
2399+ async def canon_stream_drop_readable (stream_t , task , i ):
2400+ return await drop (ReadableStreamEnd , stream_t , task , i )
22672401
2268- async def canon_stream_close_writable (stream_t , task , hi ):
2269- return await close (WritableStreamEnd , stream_t , task , hi )
2402+ async def canon_stream_drop_writable (stream_t , task , hi ):
2403+ return await drop (WritableStreamEnd , stream_t , task , hi )
22702404
2271- async def canon_future_close_readable (future_t , task , i ):
2272- return await close (ReadableFutureEnd , future_t , task , i )
2405+ async def canon_future_drop_readable (future_t , task , i ):
2406+ return await drop (ReadableFutureEnd , future_t , task , i )
22732407
2274- async def canon_future_close_writable (future_t , task , hi ):
2275- return await close (WritableFutureEnd , future_t , task , hi )
2408+ async def canon_future_drop_writable (future_t , task , hi ):
2409+ return await drop (WritableFutureEnd , future_t , task , hi )
22762410
2277- async def close (EndT , stream_or_future_t , task , hi ):
2411+ async def drop (EndT , stream_or_future_t , task , hi ):
22782412 trap_if (not task .inst .may_leave )
22792413 e = task .inst .table .remove (hi )
22802414 trap_if (not isinstance (e , EndT ))
0 commit comments