@@ -751,7 +751,7 @@ def drop(self):
751751
752752RevokeBuffer = Callable [[], None ]
753753OnPartialCopy = Callable [[RevokeBuffer ], None ]
754- OnCopyDone = Callable [[Literal ['completed' ,'cancelled' ]], None ]
754+ OnCopyDone = Callable [[Literal ['completed' ,'cancelled' , 'closed' ]], None ]
755755
756756class ReadableStream :
757757 t : ValType
@@ -793,7 +793,7 @@ def close(self):
793793 if not self .closed_ :
794794 self .closed_ = True
795795 if self .pending_buffer :
796- self .reset_and_notify_pending ('completed ' )
796+ self .reset_and_notify_pending ('closed ' )
797797
798798 def closed (self ):
799799 return self .closed_
@@ -2194,6 +2194,7 @@ async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr
21942194
21952195 def copy_event (why , revoke_buffer ):
21962196 revoke_buffer ()
2197+ assert (e .copying )
21972198 e .copying = False
21982199 return (event_code , i , pack_copy_result (task , e , buffer , why ))
21992200
@@ -2206,13 +2207,13 @@ def on_copy_done(why):
22062207 if e .copy (task .inst , buffer , on_partial_copy , on_copy_done ) == 'done' :
22072208 return [pack_copy_result (task , e , buffer , 'completed' )]
22082209 else :
2210+ e .copying = True
22092211 if opts .sync :
22102212 await task .wait_on (e .wait_for_pending_event (), sync = True )
22112213 code ,index ,payload = e .get_event ()
22122214 assert (code == event_code and index == i )
22132215 return [payload ]
22142216 else :
2215- e .copying = True
22162217 return [BLOCKED ]
22172218
22182219BLOCKED = 0xffff_ffff
0 commit comments