@@ -751,7 +751,7 @@ def drop(self):
751751
752752RevokeBuffer = Callable [[], None ]
753753OnPartialCopy = Callable [[RevokeBuffer ], None ]
754- OnCopyDone = Callable [[Literal ['completed' ,'cancelled' ]], None ]
754+ OnCopyDone = Callable [[Literal ['completed' ,'cancelled' , 'closed' ]], None ]
755755
756756class ReadableStream :
757757 t : ValType
@@ -793,7 +793,7 @@ def close(self):
793793 if not self .closed_ :
794794 self .closed_ = True
795795 if self .pending_buffer :
796- self .reset_and_notify_pending ('completed ' )
796+ self .reset_and_notify_pending ('closed ' )
797797
798798 def closed (self ):
799799 return self .closed_
@@ -2197,6 +2197,7 @@ async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr
21972197
21982198 def copy_event (why , revoke_buffer ):
21992199 revoke_buffer ()
2200+ assert (e .copying )
22002201 e .copying = False
22012202 return (event_code , i , pack_copy_result (task , e , buffer , why ))
22022203
@@ -2209,13 +2210,13 @@ def on_copy_done(why):
22092210 if e .copy (task .inst , buffer , on_partial_copy , on_copy_done ) == 'done' :
22102211 return [pack_copy_result (task , e , buffer , 'completed' )]
22112212 else :
2213+ e .copying = True
22122214 if opts .sync :
22132215 await task .wait_on (e .wait_for_pending_event (), sync = True )
22142216 code ,index ,payload = e .get_event ()
22152217 assert (code == event_code and index == i )
22162218 return [payload ]
22172219 else :
2218- e .copying = True
22192220 return [BLOCKED ]
22202221
22212222BLOCKED = 0xffff_ffff
0 commit comments