@@ -608,6 +608,11 @@ def _send_offset_commit_request(self, offsets):
608608 if node_id is None :
609609 return Future ().failure (Errors .CoordinatorNotAvailableError )
610610
611+ # Verify node is ready
612+ if not self ._client .ready (node_id , metadata_priority = False ):
613+ log .debug ("Node %s not ready -- failing offset commit request" ,
614+ node_id )
615+ return Future ().failure (Errors .NodeNotReadyError )
611616
612617 # create the offset commit request
613618 offset_data = collections .defaultdict (dict )
@@ -616,7 +621,7 @@ def _send_offset_commit_request(self, offsets):
616621
617622 version = self ._client .api_version (OffsetCommitRequest , max_version = 6 )
618623 if version > 1 and self ._subscription .partitions_auto_assigned ():
619- generation = self .generation ()
624+ generation = self .generation_if_stable ()
620625 else :
621626 generation = Generation .NO_GENERATION
622627
@@ -625,7 +630,18 @@ def _send_offset_commit_request(self, offsets):
625630 # and let the user rejoin the group in poll()
626631 if generation is None :
627632 log .info ("Failing OffsetCommit request since the consumer is not part of an active group" )
628- return Future ().failure (Errors .CommitFailedError ('Group rebalance in progress' ))
633+ if self .rebalance_in_progress ():
634+ # if the client knows it is already rebalancing, we can use RebalanceInProgressError instead of
635+ # CommitFailedError to indicate this is not a fatal error
636+ return Future ().failure (Errors .RebalanceInProgressError (
637+ "Offset commit cannot be completed since the"
638+ " consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance"
639+ " by calling poll() and then retry the operation." ))
640+ else :
641+ return Future ().failure (Errors .CommitFailedError (
642+ "Offset commit cannot be completed since the"
643+ " consumer is not part of an active group for auto partition assignment; it is likely that the consumer"
644+ " was kicked out of the group." ))
629645
630646 if version == 0 :
631647 request = OffsetCommitRequest [version ](
@@ -756,7 +772,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
756772 # However, we do not need to reset generations and just request re-join, such that
757773 # if the caller decides to proceed and poll, it would still try to proceed and re-join normally.
758774 self .request_rejoin ()
759- future .failure (Errors .CommitFailedError ('Group rebalance in progress' ))
775+ future .failure (Errors .CommitFailedError (error_type () ))
760776 return
761777 elif error_type in (Errors .UnknownMemberIdError ,
762778 Errors .IllegalGenerationError ):
@@ -765,7 +781,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
765781 log .warning ("OffsetCommit for group %s failed: %s" ,
766782 self .group_id , error )
767783 self .reset_generation ()
768- future .failure (Errors .CommitFailedError ())
784+ future .failure (Errors .CommitFailedError (error_type () ))
769785 return
770786 else :
771787 log .error ("Group %s failed to commit partition %s at offset"
@@ -804,7 +820,7 @@ def _send_offset_fetch_request(self, partitions):
804820 return Future ().failure (Errors .CoordinatorNotAvailableError )
805821
806822 # Verify node is ready
807- if not self ._client .ready (node_id ):
823+ if not self ._client .ready (node_id , metadata_priority = False ):
808824 log .debug ("Node %s not ready -- failing offset fetch request" ,
809825 node_id )
810826 return Future ().failure (Errors .NodeNotReadyError )
0 commit comments