@@ -837,11 +837,13 @@ def copy(self, inst, buffer, on_copy, on_result, src, dst):
837837class StreamEnd (Waitable ):
838838 shared : ReadableStream
839839 copying : bool
840+ done : bool
840841
841842 def __init__ (self , shared ):
842843 Waitable .__init__ (self )
843844 self .shared = shared
844845 self .copying = False
846+ self .done = False
845847
846848 def drop (self ):
847849 trap_if (self .copying )
@@ -858,27 +860,110 @@ def copy(self, inst, src, on_copy, on_result):
858860
859861#### Future State
860862
861- class FutureEnd (StreamEnd ):
862- def close_after_copy (self , copy_op , inst , buffer , on_result ):
863- assert (buffer .remain () == 1 )
864- def on_result_wrapper (result ):
865- if buffer .remain () == 0 :
866- self .shared .close ()
867- on_result (result )
868- ret = copy_op (inst , buffer , on_copy = lambda _ :(), on_result = on_result_wrapper )
869- if ret is not None and buffer .remain () == 0 :
870- self .shared .close ()
871- return StreamResult .CLOSED
872- return ret
863+ class FutureResult (IntEnum ):
864+ RESOLVED = 0
865+ DROPPED = 1 # only returned for future.write
866+ CANCELLED = 2
867+
868+ Lift = Callable [[], any ]
869+ Lower = Callable [[any ], None ]
870+ OnFutureResult = Callable [[FutureResult ], None ]
871+
872+ class ReadableFuture :
873+ t : ValType
874+ read : Callable [[ComponentInstance , Lower , OnFutureResult ], Optional [FutureResult ]]
875+ cancel : Callable [[], None ]
876+ drop : Callable [[]]
877+
878+ class SharedFutureImpl (ReadableFuture ):
879+ dropped : bool
880+ pending_inst : Optional [ComponentInstance ]
881+ pending_lift : Optional [Lift ]
882+ pending_lower : Optional [Lower ]
883+ pending_on_result : Optional [OnFutureResult ]
884+
885+ def __init__ (self , t ):
886+ self .t = t
887+ self .dropped = False
888+ self .reset_pending ()
889+
890+ def reset_pending (self ):
891+ self .set_pending (None , None , None , None )
892+
893+ def set_pending (self , inst , lift , lower , on_result ):
894+ assert (not (lift and lower ))
895+ self .pending_inst = inst
896+ self .pending_lift = lift
897+ self .pending_lower = lower
898+ self .pending_on_result = on_result
899+
900+ def reset_and_notify_pending (self , result ):
901+ pending_on_result = self .pending_on_result
902+ self .reset_pending ()
903+ pending_on_result (result )
904+
905+ def cancel (self ):
906+ self .reset_pending_and_notify_pending (FutureResult .CANCELLED )
907+
908+ def drop (self ):
909+ assert (not self .dropped and not self .pending_lower )
910+ self .dropped = True
911+ if self .pending_lift :
912+ self .reset_and_notify_pending (FutureResult .DROPPED )
913+
914+ def read (self , inst , lower , on_result ):
915+ assert (not self .dropped and not self .pending_lower )
916+ if not self .pending_lift :
917+ self .set_pending (inst , None , lower , on_result )
918+ return None
919+ else :
920+ trap_if (inst is self .pending_inst and self .t is not None ) # temporary
921+ lower (self .pending_lift ())
922+ self .reset_and_notify_pending (FutureResult .RESOLVED )
923+ return FutureResult .RESOLVED
924+
925+ def write (self , inst , lift , on_result ):
926+ assert (not self .pending_lift )
927+ if self .dropped :
928+ return FutureResult .DROPPED
929+ elif not self .pending_lower :
930+ self .set_pending (inst , lift , None , on_result )
931+ return None
932+ else :
933+ trap_if (inst is self .pending_inst and self .t is not None ) # temporary
934+ self .pending_lower (lift ())
935+ self .reset_and_notify_pending (FutureResult .RESOLVED )
936+ return FutureResult .RESOLVED
937+
938+ class FutureEnd (Waitable ):
939+ shared : ReadableFuture
940+ copying : bool
941+ done : bool
942+
943+ def __init__ (self , shared ):
944+ Waitable .__init__ (self )
945+ self .shared = shared
946+ self .copying = False
947+ self .done = False
948+
949+ def drop (self ):
950+ trap_if (self .copying )
951+ Waitable .drop (self )
873952
874953class ReadableFutureEnd (FutureEnd ):
875- def copy (self , inst , dst , on_copy , on_result ):
876- return self .close_after_copy (self .shared .read , inst , dst , on_result )
954+ def copy (self , inst , lower , on_result ):
955+ return self .shared .read (inst , lower , on_result )
956+
957+ def drop (self ):
958+ self .shared .drop ()
959+ FutureEnd .drop (self )
877960
878961class WritableFutureEnd (FutureEnd ):
879- def copy (self , inst , src , on_copy , on_result ):
880- return self .close_after_copy (self .shared .write , inst , src , on_result )
962+ def copy (self , inst , lift , on_result ):
963+ return self .shared .write (inst , lift , on_result )
964+
881965 def drop (self ):
966+ trap_if (not self .done )
882967 FutureEnd .drop (self )
883968
884969### Despecialization
@@ -1210,16 +1295,15 @@ def lift_stream(cx, i, t):
12101295 return lift_async_value (ReadableStreamEnd , cx , i , t )
12111296
12121297def lift_future (cx , i , t ):
1213- v = lift_async_value (ReadableFutureEnd , cx , i , t )
1214- trap_if (v .closed ())
1215- return v
1298+ return lift_async_value (ReadableFutureEnd , cx , i , t )
12161299
12171300def lift_async_value (ReadableEndT , cx , i , t ):
12181301 assert (not contains_borrow (t ))
12191302 e = cx .inst .table .remove (i )
12201303 trap_if (not isinstance (e , ReadableEndT ))
12211304 trap_if (e .shared .t != t )
12221305 trap_if (e .copying )
1306+ trap_if (e .done )
12231307 return e .shared
12241308
12251309### Storing
@@ -1513,14 +1597,14 @@ def lower_borrow(cx, rep, t):
15131597 return cx .inst .table .add (h )
15141598
15151599def lower_stream (cx , v , t ):
1600+ assert (isinstance (v , ReadableStream ))
15161601 return lower_async_value (ReadableStreamEnd , cx , v , t )
15171602
15181603def lower_future (cx , v , t ):
1519- assert (not v . closed ( ))
1604+ assert (isinstance ( v , ReadableFuture ))
15201605 return lower_async_value (ReadableFutureEnd , cx , v , t )
15211606
15221607def lower_async_value (ReadableEndT , cx , v , t ):
1523- assert (isinstance (v , ReadableStream ))
15241608 assert (not contains_borrow (t ))
15251609 return cx .inst .table .add (ReadableEndT (v ))
15261610
@@ -2160,47 +2244,41 @@ async def canon_stream_new(stream_t, task):
21602244
21612245async def canon_future_new (future_t , task ):
21622246 trap_if (not task .inst .may_leave )
2163- shared = SharedStreamImpl (future_t .t )
2247+ shared = SharedFutureImpl (future_t .t )
21642248 ri = task .inst .table .add (ReadableFutureEnd (shared ))
21652249 wi = task .inst .table .add (WritableFutureEnd (shared ))
21662250 return [ ri | (wi << 32 ) ]
21672251
2168- ### 🔀 `canon { stream,future} .{read,write}`
2252+ ### 🔀 `canon stream.{read,write}`
21692253
21702254BLOCKED = 0xffff_ffff
21712255
21722256async def canon_stream_read (stream_t , opts , task , i , ptr , n ):
2173- return await copy (ReadableStreamEnd , WritableBufferGuestImpl , EventCode .STREAM_READ ,
2174- stream_t , opts , task , i , ptr , n )
2257+ return await stream_copy (ReadableStreamEnd , WritableBufferGuestImpl , EventCode .STREAM_READ ,
2258+ stream_t , opts , task , i , ptr , n )
21752259
21762260async def canon_stream_write (stream_t , opts , task , i , ptr , n ):
2177- return await copy (WritableStreamEnd , ReadableBufferGuestImpl , EventCode .STREAM_WRITE ,
2178- stream_t , opts , task , i , ptr , n )
2179-
2180- async def canon_future_read (future_t , opts , task , i , ptr ):
2181- return await copy (ReadableFutureEnd , WritableBufferGuestImpl , EventCode .FUTURE_READ ,
2182- future_t , opts , task , i , ptr , 1 )
2261+ return await stream_copy (WritableStreamEnd , ReadableBufferGuestImpl , EventCode .STREAM_WRITE ,
2262+ stream_t , opts , task , i , ptr , n )
21832263
2184- async def canon_future_write (future_t , opts , task , i , ptr ):
2185- return await copy (WritableFutureEnd , ReadableBufferGuestImpl , EventCode .FUTURE_WRITE ,
2186- future_t , opts , task , i , ptr , 1 )
2187-
2188- async def copy (EndT , BufferT , event_code , stream_or_future_t , opts , task , i , ptr , n ):
2264+ async def stream_copy (EndT , BufferT , event_code , stream_t , opts , task , i , ptr , n ):
21892265 trap_if (not task .inst .may_leave )
21902266 e = task .inst .table .get (i )
21912267 trap_if (not isinstance (e , EndT ))
2192- trap_if (e .shared .t != stream_or_future_t .t )
2268+ trap_if (e .shared .t != stream_t .t )
21932269 trap_if (e .copying )
21942270
2195- assert (not contains_borrow (stream_or_future_t ))
2271+ assert (not contains_borrow (stream_t ))
21962272 cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2197- buffer = BufferT (stream_or_future_t .t , cx , ptr , n )
2273+ buffer = BufferT (stream_t .t , cx , ptr , n )
21982274
21992275 def copy_event (result , revoke_buffer ):
22002276 revoke_buffer ()
22012277 assert (e .copying )
22022278 e .copying = False
2203- return (event_code , i , pack_copy_result (result , buffer ))
2279+ if result != StreamResult .CANCELLED :
2280+ e .done = True
2281+ return (event_code , i , pack_stream_result (result , buffer ))
22042282
22052283 def on_copy (revoke_buffer ):
22062284 e .set_event (partial (copy_event , StreamResult .COMPLETED , revoke_buffer ))
@@ -2210,7 +2288,9 @@ def on_result(result):
22102288
22112289 result = e .copy (task .inst , buffer , on_copy , on_result )
22122290 if result is not None :
2213- return [pack_copy_result (result , buffer )]
2291+ if result != StreamResult .CANCELLED :
2292+ e .done = True
2293+ return [pack_stream_result (result , buffer )]
22142294 else :
22152295 e .copying = True
22162296 if opts .sync :
@@ -2221,13 +2301,68 @@ def on_result(result):
22212301 else :
22222302 return [BLOCKED ]
22232303
2224- def pack_copy_result (result , buffer ):
2304+ def pack_stream_result (result , buffer ):
22252305 assert (0 <= result < 2 ** 4 )
22262306 assert (buffer .progress <= Buffer .MAX_LENGTH < 2 ** 28 )
22272307 packed = result | (buffer .progress << 4 )
22282308 assert (packed != BLOCKED )
22292309 return packed
22302310
2311+ ### 🔀 `canon future.{read,write}`
2312+
2313+ async def canon_future_read (future_t , opts , task , i , flat_args ):
2314+ assert (len (flat_args ) == int (bool (future_t .t )))
2315+ def lower (v ):
2316+ if future_t .t :
2317+ assert (not contains_borrow (future_t ))
2318+ cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2319+ store (cx , v , future_t .t , flat_args [0 ])
2320+
2321+ return await future_copy (ReadableFutureEnd , EventCode .FUTURE_READ , lower ,
2322+ future_t , opts , task , i )
2323+
2324+ async def canon_future_write (future_t , opts , task , i , flat_args ):
2325+ def lift ():
2326+ if future_t .t :
2327+ assert (not contains_borrow (future_t ))
2328+ cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
2329+ [v ] = lift_flat_values (cx , MAX_FLAT_ASYNC_PARAMS , CoreValueIter (flat_args ), [future_t .t ])
2330+ return v
2331+
2332+ return await future_copy (WritableFutureEnd , EventCode .FUTURE_WRITE , lift ,
2333+ future_t , opts , task , i )
2334+
2335+ async def future_copy (EndT , event_code , copy , future_t , opts , task , i ):
2336+ trap_if (not task .inst .may_leave )
2337+ e = task .inst .table .get (i )
2338+ trap_if (not isinstance (e , EndT ))
2339+ trap_if (e .shared .t != future_t .t )
2340+ trap_if (e .copying or e .done )
2341+
2342+ def on_result (result ):
2343+ def copy_event ():
2344+ assert (e .copying )
2345+ e .copying = False
2346+ if result != FutureResult .CANCELLED :
2347+ e .done = True
2348+ return (event_code , i , int (result ))
2349+ e .set_event (copy_event )
2350+
2351+ result = e .copy (task .inst , copy , on_result )
2352+ if result is not None :
2353+ if result != FutureResult .CANCELLED :
2354+ e .done = True
2355+ return [int (result )]
2356+ else :
2357+ e .copying = True
2358+ if opts .sync :
2359+ await task .wait_on (e .wait_for_pending_event (), sync = True )
2360+ code ,index ,payload = e .get_event ()
2361+ assert (code == event_code and index == i )
2362+ return [payload ]
2363+ else :
2364+ return [BLOCKED ]
2365+
22312366### 🔀 `canon {stream,future}.cancel-{read,write}`
22322367
22332368async def canon_stream_cancel_read (stream_t , sync , task , i ):
@@ -2259,21 +2394,21 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i):
22592394 assert (not e .copying and code == event_code and index == i )
22602395 return [payload ]
22612396
2262- ### 🔀 `canon {stream,future}.close -{readable,writable}`
2397+ ### 🔀 `canon {stream,future}.drop -{readable,writable}`
22632398
2264- async def canon_stream_close_readable (stream_t , task , i ):
2265- return await close (ReadableStreamEnd , stream_t , task , i )
2399+ async def canon_stream_drop_readable (stream_t , task , i ):
2400+ return await drop (ReadableStreamEnd , stream_t , task , i )
22662401
2267- async def canon_stream_close_writable (stream_t , task , hi ):
2268- return await close (WritableStreamEnd , stream_t , task , hi )
2402+ async def canon_stream_drop_writable (stream_t , task , hi ):
2403+ return await drop (WritableStreamEnd , stream_t , task , hi )
22692404
2270- async def canon_future_close_readable (future_t , task , i ):
2271- return await close (ReadableFutureEnd , future_t , task , i )
2405+ async def canon_future_drop_readable (future_t , task , i ):
2406+ return await drop (ReadableFutureEnd , future_t , task , i )
22722407
2273- async def canon_future_close_writable (future_t , task , hi ):
2274- return await close (WritableFutureEnd , future_t , task , hi )
2408+ async def canon_future_drop_writable (future_t , task , hi ):
2409+ return await drop (WritableFutureEnd , future_t , task , hi )
22752410
2276- async def close (EndT , stream_or_future_t , task , hi ):
2411+ async def drop (EndT , stream_or_future_t , task , hi ):
22772412 trap_if (not task .inst .may_leave )
22782413 e = task .inst .table .remove (hi )
22792414 trap_if (not isinstance (e , EndT ))
0 commit comments