|
12 | 12 | from kafka.metrics import AnonMeasurable |
13 | 13 | from kafka.metrics.stats import Avg, Count, Max, Rate |
14 | 14 | from kafka.protocol.new.metadata import FindCoordinatorRequest |
15 | | -from kafka.protocol.group import ( |
| 15 | +from kafka.protocol.new.consumer import ( |
16 | 16 | HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, |
17 | | - DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember, |
| 17 | + DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, |
18 | 18 | ) |
19 | 19 | from kafka.util import Timer |
20 | 20 |
|
@@ -215,7 +215,7 @@ def _perform_assignment(self, leader_id, protocol, members): |
215 | 215 | Arguments: |
216 | 216 | leader_id (str): The id of the leader (which is this member) |
217 | 217 | protocol (str): the chosen group protocol (assignment strategy) |
218 | | - members (list): [GroupMember] from JoinGroupResponse. |
| 218 | + members (list): [JoinGroupResponseMember] from JoinGroupResponse. |
219 | 219 | metadata_bytes are associated with the chosen group protocol, |
220 | 220 | and the Coordinator subclass is responsible for decoding |
221 | 221 | metadata_bytes based on that protocol. |
@@ -697,14 +697,9 @@ def _on_join_leader(self, response): |
697 | 697 | Future: resolves to member assignment encoded-bytes |
698 | 698 | """ |
699 | 699 | try: |
700 | | - members = [GroupMember( |
701 | | - member_id=member[0], |
702 | | - group_instance_id=member[1] if response.API_VERSION >= 5 else None, |
703 | | - metadata=member[2] if response.API_VERSION >= 5 else member[1]) |
704 | | - for member in response.members] |
705 | 700 | group_assignment = self._perform_assignment(response.leader, |
706 | 701 | response.protocol_name, |
707 | | - members) |
| 702 | + response.members) |
708 | 703 | except Exception as e: |
709 | 704 | return Future().failure(e) |
710 | 705 |
|
|
0 commit comments