@@ -307,6 +307,7 @@ class Buffer:
307307 MAX_LENGTH = 2 ** 28 - 1
308308 t : ValType
309309 remain : Callable [[], int ]
310+ is_zero_length : Callable [[], bool ]
310311
311312class ReadableBuffer (Buffer ):
312313 read : Callable [[int ], list [any ]]
@@ -335,6 +336,9 @@ def __init__(self, t, cx, ptr, length):
335336 def remain (self ):
336337 return self .length - self .progress
337338
339+ def is_zero_length (self ):
340+ return self .length == 0
341+
338342class ReadableBufferGuestImpl (BufferGuestImpl ):
339343 def read (self , n ):
340344 assert (n <= self .remain ())
@@ -749,13 +753,18 @@ def drop(self):
749753
750754#### Stream State
751755
756+ class StreamResult (IntEnum ):
757+ COMPLETED = 0
758+ CLOSED = 1
759+ CANCELLED = 2
760+
752761RevokeBuffer = Callable [[], None ]
753- OnPartialCopy = Callable [[RevokeBuffer ], None ]
754- OnCopyDone = Callable [[Literal [ 'completed' , 'cancelled' ] ], None ]
762+ OnCopy = Callable [[RevokeBuffer ], None ]
763+ OnStreamResult = Callable [[StreamResult ], None ]
755764
756765class ReadableStream :
757766 t : ValType
758- read : Callable [[ComponentInstance , WritableBuffer , OnPartialCopy , OnCopyDone ], Literal [ 'done' , 'blocked' ]]
767+ read : Callable [[ComponentInstance , WritableBuffer , OnCopy , OnStreamResult ], Optional [ StreamResult ]]
759768 cancel : Callable [[], None ]
760769 close : Callable [[]]
761770 closed : Callable [[], bool ]
@@ -764,8 +773,8 @@ class SharedStreamImpl(ReadableStream):
764773 closed_ : bool
765774 pending_inst : Optional [ComponentInstance ]
766775 pending_buffer : Optional [Buffer ]
767- pending_on_partial_copy : Optional [OnPartialCopy ]
768- pending_on_copy_done : Optional [OnCopyDone ]
776+ pending_on_copy : Optional [OnCopy ]
777+ pending_on_result : Optional [OnStreamResult ]
769778
770779 def __init__ (self , t ):
771780 self .t = t
@@ -775,59 +784,55 @@ def __init__(self, t):
775784 def reset_pending (self ):
776785 self .set_pending (None , None , None , None )
777786
778- def set_pending (self , inst , buffer , on_partial_copy , on_copy_done ):
787+ def set_pending (self , inst , buffer , on_copy , on_result ):
779788 self .pending_inst = inst
780789 self .pending_buffer = buffer
781- self .pending_on_partial_copy = on_partial_copy
782- self .pending_on_copy_done = on_copy_done
790+ self .pending_on_copy = on_copy
791+ self .pending_on_result = on_result
783792
784- def reset_and_notify_pending (self , why ):
785- pending_on_copy_done = self .pending_on_copy_done
793+ def reset_and_notify_pending (self , result ):
794+ pending_on_result = self .pending_on_result
786795 self .reset_pending ()
787- pending_on_copy_done ( why )
796+ pending_on_result ( result )
788797
789798 def cancel (self ):
790- self .reset_and_notify_pending ('cancelled' )
799+ self .reset_and_notify_pending (StreamResult . CANCELLED )
791800
792801 def close (self ):
793802 if not self .closed_ :
794803 self .closed_ = True
795804 if self .pending_buffer :
796- self .reset_and_notify_pending ('completed' )
805+ self .reset_and_notify_pending (StreamResult . CLOSED )
797806
798807 def closed (self ):
799808 return self .closed_
800809
801- def read (self , inst , dst , on_partial_copy , on_copy_done ):
802- return self .copy (inst , dst , on_partial_copy , on_copy_done , self .pending_buffer , dst )
810+ def read (self , inst , dst , on_copy , on_result ):
811+ return self .copy (inst , dst , on_copy , on_result , self .pending_buffer , dst )
803812
804- def write (self , inst , src , on_partial_copy , on_copy_done ):
805- return self .copy (inst , src , on_partial_copy , on_copy_done , src , self .pending_buffer )
813+ def write (self , inst , src , on_copy , on_result ):
814+ return self .copy (inst , src , on_copy , on_result , src , self .pending_buffer )
806815
807- def copy (self , inst , buffer , on_partial_copy , on_copy_done , src , dst ):
816+ def copy (self , inst , buffer , on_copy , on_result , src , dst ):
808817 if self .closed_ :
809- return 'done'
818+ return StreamResult . CLOSED
810819 elif not self .pending_buffer :
811- self .set_pending (inst , buffer , on_partial_copy , on_copy_done )
812- return 'blocked'
820+ self .set_pending (inst , buffer , on_copy , on_result )
821+ return None
813822 else :
814823 assert (self .t == src .t == dst .t )
815824 trap_if (inst is self .pending_inst and self .t is not None ) # temporary
816825 if self .pending_buffer .remain () > 0 :
817826 if buffer .remain () > 0 :
818827 dst .write (src .read (min (src .remain (), dst .remain ())))
819- if self .pending_buffer .remain () > 0 :
820- self .pending_on_partial_copy (self .reset_pending )
821- else :
822- self .reset_and_notify_pending ('completed' )
823- return 'done'
828+ self .pending_on_copy (self .reset_pending )
829+ return StreamResult .COMPLETED
830+ elif buffer is src and buffer .remain () == 0 and self .pending_buffer .is_zero_length ():
831+ return StreamResult .COMPLETED
824832 else :
825- if buffer .remain () > 0 or buffer is dst :
826- self .reset_and_notify_pending ('completed' )
827- self .set_pending (inst , buffer , on_partial_copy , on_copy_done )
828- return 'blocked'
829- else :
830- return 'done'
833+ self .reset_and_notify_pending (StreamResult .COMPLETED )
834+ self .set_pending (inst , buffer , on_copy , on_result )
835+ return None
831836
832837class StreamEnd (Waitable ):
833838 shared : ReadableStream
@@ -844,34 +849,35 @@ def drop(self):
844849 Waitable .drop (self )
845850
846851class ReadableStreamEnd (StreamEnd ):
847- def copy (self , inst , dst , on_partial_copy , on_copy_done ):
848- return self .shared .read (inst , dst , on_partial_copy , on_copy_done )
852+ def copy (self , inst , dst , on_copy , on_result ):
853+ return self .shared .read (inst , dst , on_copy , on_result )
849854
850855class WritableStreamEnd (StreamEnd ):
851- def copy (self , inst , src , on_partial_copy , on_copy_done ):
852- return self .shared .write (inst , src , on_partial_copy , on_copy_done )
856+ def copy (self , inst , src , on_copy , on_result ):
857+ return self .shared .write (inst , src , on_copy , on_result )
853858
854859#### Future State
855860
856861class FutureEnd (StreamEnd ):
857- def close_after_copy (self , copy_op , inst , buffer , on_copy_done ):
862+ def close_after_copy (self , copy_op , inst , buffer , on_result ):
858863 assert (buffer .remain () == 1 )
859- def on_copy_done_wrapper ( why ):
864+ def on_result_wrapper ( result ):
860865 if buffer .remain () == 0 :
861866 self .shared .close ()
862- on_copy_done ( why )
863- ret = copy_op (inst , buffer , on_partial_copy = None , on_copy_done = on_copy_done_wrapper )
864- if ret == 'done' and buffer .remain () == 0 :
867+ on_result ( result )
868+ ret = copy_op (inst , buffer , on_copy = lambda _ :(), on_result = on_result_wrapper )
869+ if ret is not None and buffer .remain () == 0 :
865870 self .shared .close ()
871+ return StreamResult .CLOSED
866872 return ret
867873
868874class ReadableFutureEnd (FutureEnd ):
869- def copy (self , inst , dst , on_partial_copy , on_copy_done ):
870- return self .close_after_copy (self .shared .read , inst , dst , on_copy_done )
875+ def copy (self , inst , dst , on_copy , on_result ):
876+ return self .close_after_copy (self .shared .read , inst , dst , on_result )
871877
872878class WritableFutureEnd (FutureEnd ):
873- def copy (self , inst , src , on_partial_copy , on_copy_done ):
874- return self .close_after_copy (self .shared .write , inst , src , on_copy_done )
879+ def copy (self , inst , src , on_copy , on_result ):
880+ return self .close_after_copy (self .shared .write , inst , src , on_result )
875881 def drop (self ):
876882 FutureEnd .drop (self )
877883
@@ -1801,43 +1807,36 @@ def lower_flat_flags(v, labels):
18011807def lift_flat_values (cx , max_flat , vi , ts ):
18021808 flat_types = flatten_types (ts )
18031809 if len (flat_types ) > max_flat :
1804- return lift_heap_values (cx , vi , ts )
1810+ ptr = vi .next ('i32' )
1811+ tuple_type = TupleType (ts )
1812+ trap_if (ptr != align_to (ptr , alignment (tuple_type )))
1813+ trap_if (ptr + elem_size (tuple_type ) > len (cx .opts .memory ))
1814+ return list (load (cx , ptr , tuple_type ).values ())
18051815 else :
18061816 return [ lift_flat (cx , vi , t ) for t in ts ]
18071817
1808- def lift_heap_values (cx , vi , ts ):
1809- ptr = vi .next ('i32' )
1810- tuple_type = TupleType (ts )
1811- trap_if (ptr != align_to (ptr , alignment (tuple_type )))
1812- trap_if (ptr + elem_size (tuple_type ) > len (cx .opts .memory ))
1813- return list (load (cx , ptr , tuple_type ).values ())
1814-
18151818def lower_flat_values (cx , max_flat , vs , ts , out_param = None ):
18161819 cx .inst .may_leave = False
18171820 flat_types = flatten_types (ts )
18181821 if len (flat_types ) > max_flat :
1819- flat_vals = lower_heap_values (cx , vs , ts , out_param )
1822+ tuple_type = TupleType (ts )
1823+ tuple_value = {str (i ): v for i ,v in enumerate (vs )}
1824+ if out_param is None :
1825+ ptr = cx .opts .realloc (0 , 0 , alignment (tuple_type ), elem_size (tuple_type ))
1826+ flat_vals = [ptr ]
1827+ else :
1828+ ptr = out_param .next ('i32' )
1829+ flat_vals = []
1830+ trap_if (ptr != align_to (ptr , alignment (tuple_type )))
1831+ trap_if (ptr + elem_size (tuple_type ) > len (cx .opts .memory ))
1832+ store (cx , tuple_value , tuple_type , ptr )
18201833 else :
18211834 flat_vals = []
18221835 for i in range (len (vs )):
18231836 flat_vals += lower_flat (cx , vs [i ], ts [i ])
18241837 cx .inst .may_leave = True
18251838 return flat_vals
18261839
1827- def lower_heap_values (cx , vs , ts , out_param ):
1828- tuple_type = TupleType (ts )
1829- tuple_value = {str (i ): v for i ,v in enumerate (vs )}
1830- if out_param is None :
1831- ptr = cx .opts .realloc (0 , 0 , alignment (tuple_type ), elem_size (tuple_type ))
1832- flat_vals = [ptr ]
1833- else :
1834- ptr = out_param .next ('i32' )
1835- flat_vals = []
1836- trap_if (ptr != align_to (ptr , alignment (tuple_type )))
1837- trap_if (ptr + elem_size (tuple_type ) > len (cx .opts .memory ))
1838- store (cx , tuple_value , tuple_type , ptr )
1839- return flat_vals
1840-
18411840### `canon lift`
18421841
18431842async def canon_lift (opts , inst , ft , callee , caller , on_start , on_resolve , on_block ):
@@ -2168,6 +2167,8 @@ async def canon_future_new(future_t, task):
21682167
21692168### 🔀 `canon {stream,future}.{read,write}`
21702169
2170+ BLOCKED = 0xffff_ffff
2171+
21712172async def canon_stream_read (stream_t , opts , task , i , ptr , n ):
21722173 return await copy (ReadableStreamEnd , WritableBufferGuestImpl , EventCode .STREAM_READ ,
21732174 stream_t , opts , task , i , ptr , n )
@@ -2195,20 +2196,21 @@ async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr
21952196 cx = LiftLowerContext (opts , task .inst , borrow_scope = None )
21962197 buffer = BufferT (stream_or_future_t .t , cx , ptr , n )
21972198
2198- def copy_event (why , revoke_buffer ):
2199+ def copy_event (result , revoke_buffer ):
21992200 revoke_buffer ()
22002201 assert (e .copying )
22012202 e .copying = False
2202- return (event_code , i , pack_copy_result (task , e , buffer , why ))
2203+ return (event_code , i , pack_copy_result (result , buffer ))
22032204
2204- def on_partial_copy (revoke_buffer ):
2205- e .set_event (partial (copy_event , 'completed' , revoke_buffer ))
2205+ def on_copy (revoke_buffer ):
2206+ e .set_event (partial (copy_event , StreamResult . COMPLETED , revoke_buffer ))
22062207
2207- def on_copy_done ( why ):
2208- e .set_event (partial (copy_event , why , revoke_buffer = lambda :()))
2208+ def on_result ( result ):
2209+ e .set_event (partial (copy_event , result , revoke_buffer = lambda :()))
22092210
2210- if e .copy (task .inst , buffer , on_partial_copy , on_copy_done ) == 'done' :
2211- return [pack_copy_result (task , e , buffer , 'completed' )]
2211+ result = e .copy (task .inst , buffer , on_copy , on_result )
2212+ if result is not None :
2213+ return [pack_copy_result (result , buffer )]
22122214 else :
22132215 e .copying = True
22142216 if opts .sync :
@@ -2219,20 +2221,8 @@ def on_copy_done(why):
22192221 else :
22202222 return [BLOCKED ]
22212223
2222- BLOCKED = 0xffff_ffff
2223- COMPLETED = 0x0
2224- CLOSED = 0x1
2225- CANCELLED = 0x2
2226-
2227- def pack_copy_result (task , e , buffer , why ):
2228- if e .shared .closed ():
2229- result = CLOSED
2230- elif why == 'cancelled' :
2231- result = CANCELLED
2232- else :
2233- assert (why == 'completed' )
2234- assert (not isinstance (e , FutureEnd ))
2235- result = COMPLETED
2224+ def pack_copy_result (result , buffer ):
2225+ assert (0 <= result < 2 ** 4 )
22362226 assert (buffer .progress <= Buffer .MAX_LENGTH < 2 ** 28 )
22372227 packed = result | (buffer .progress << 4 )
22382228 assert (packed != BLOCKED )
0 commit comments