@@ -82,6 +82,14 @@ def next_batch(self):
8282 # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
8383 self ._task = asyncio .create_task (self ._handle_next_batch ())
8484
85+ def cancel (self ):
86+ """Sets the cancelled flag, cancels the running task, and removes
87+ the runner registration.
88+ """
89+ self ._cancelled = True
90+ self ._runners .pop (self ._registration_id , None )
91+ self ._listener .on_cancel ()
92+
8593 async def _handle_next_batch (self ):
8694 """Reads the next batch from the ringbuffer and processes the items."""
8795 if self ._cancelled :
@@ -102,7 +110,6 @@ async def _handle_next_batch(self):
102110 try :
103111 message = result [i ]
104112 self ._listener .store_sequence (result .get_sequence (i ))
105-
106113 member = None
107114 if message .publisher_address :
108115 member = MemberInfo (
@@ -138,16 +145,6 @@ async def _handle_next_batch(self):
138145 if not await self ._handle_internal_error (e ):
139146 self .cancel ()
140147
141- def cancel (self ):
142- """Sets the cancelled flag, cancels the running task, and removes
143- the runner registration.
144- """
145- self ._cancelled = True
146- self ._runners .pop (self ._registration_id , None )
147- # if self._task is not None and not self._task.done():
148- # self._task.cancel()
149- self ._listener .on_cancel ()
150-
151148 def _is_loss_tolerable (self , loss_count : int ) -> bool :
152149 """Called when message loss is detected.
153150
0 commit comments