@@ -558,7 +558,7 @@ async def join_group_async(self, timeout_ms=None):
558558 return True
559559 return True
560560
561- def _failed_request (self , node_id , request , future , error ):
561+ def _failed_request (self , node_id , request , error ):
562562 # Marking coordinator dead
563563 # unless the error is caused by internal client pipelining or throttling
564564 if not isinstance (error , (Errors .NodeNotReadyError ,
@@ -570,8 +570,6 @@ def _failed_request(self, node_id, request, future, error):
570570 else :
571571 log .debug ('Error sending %s to node %s [%s]' ,
572572 request .__class__ .__name__ , node_id , error )
573- if future is not None :
574- future .failure (error )
575573
576574 def _process_join_group_response (self , response , send_time ):
577575 """Classify a JoinGroupResponse: mutate state on success, raise on error.
@@ -841,7 +839,7 @@ async def _send_group_coordinator_request(self):
841839 try :
842840 response = await self ._manager .send (request , node_id = node_id )
843841 except Exception as exc :
844- self ._failed_request (node_id , request , None , exc )
842+ self ._failed_request (node_id , request , exc )
845843 raise
846844 return self ._handle_find_coordinator_response (response )
847845
@@ -1128,7 +1126,7 @@ async def _send_heartbeat_request(self):
11281126 response = await self ._manager .send (request , node_id = self .coordinator_id )
11291127 return self ._handle_heartbeat_response (response , send_time )
11301128 except Errors .KafkaError as exc :
1131- self ._failed_request (self .coordinator_id , request , None , exc )
1129+ self ._failed_request (self .coordinator_id , request , exc )
11321130 raise
11331131
11341132 def _handle_heartbeat_response (self , response , send_time ):
0 commit comments