@@ -171,12 +171,10 @@ async def unary_request(
171171 raise ReadTimeoutError ("Request timed out" )
172172 finally :
173173 if stream_id is not None :
174- nbytes = _take_all_unacked_flow_bytes (state )
175- if nbytes > 0 :
176- try :
177- await conn .ack_data (stream_id , nbytes )
178- except Exception :
179- pass
174+ try :
175+ await conn .ack_all_data (stream_id , state )
176+ except Exception :
177+ pass
180178 if not state .ended .is_set ():
181179 await conn .reset_stream (stream_id )
182180 conn .release_stream (stream_id , state )
@@ -263,12 +261,10 @@ async def _ack_stream_data(nbytes: int) -> None:
263261 pass
264262 # Ack remaining flow bytes to keep connection window healthy
265263 if stream_id is not None :
266- nbytes = _take_all_unacked_flow_bytes (state )
267- if nbytes > 0 :
268- try :
269- await conn .ack_data (stream_id , nbytes )
270- except Exception :
271- pass
264+ try :
265+ await conn .ack_all_data (stream_id , state )
266+ except Exception :
267+ pass
272268 if not state .ended .is_set ():
273269 await conn .reset_stream (stream_id )
274270 conn .release_stream (stream_id , state )
@@ -768,6 +764,16 @@ async def ack_data(self, stream_id: int, nbytes: int) -> None:
768764 self ._h2 .acknowledge_received_data (nbytes , stream_id )
769765 await self ._flush_h2_data ()
770766
767+ async def ack_all_data (self , stream_id : int , state : _StreamState ) -> None :
768+ """Acknowledge all received data for stream cleanup."""
769+ assert self ._h2 is not None
770+ async with self ._write_lock :
771+ nbytes = state .unacked_flow_bytes
772+ state .unacked_flow_bytes = 0
773+ if nbytes > 0 :
774+ self ._h2 .acknowledge_received_data (nbytes , stream_id )
775+ await self ._flush_h2_data ()
776+
771777 async def reset_stream (self , stream_id : int ) -> None :
772778 """Send RST_STREAM to tell the peer to stop sending."""
773779 assert self ._h2 is not None
@@ -989,12 +995,6 @@ def _queue_item_parts(item: tuple[bytes, int] | bytes) -> tuple[bytes, int]:
989995 return item , len (item )
990996
991997
992- def _take_all_unacked_flow_bytes (state : _StreamState ) -> int :
993- nbytes = state .unacked_flow_bytes
994- state .unacked_flow_bytes = 0
995- return nbytes
996-
997-
998998def _parse_retry_after_ms (raw : str | None ) -> float | None :
999999 if raw is None :
10001000 return None
0 commit comments