Skip to content

Commit 106057b

Browse files
committed
Use GroupMember namedtuple for join group response members
1 parent b2e27af commit 106057b

4 files changed

Lines changed: 21 additions & 15 deletions

File tree

kafka/coordinator/base.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
from kafka.metrics import AnonMeasurable
1717
from kafka.metrics.stats import Avg, Count, Max, Rate
1818
from kafka.protocol.find_coordinator import FindCoordinatorRequest
19-
from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID
19+
from kafka.protocol.group import (
20+
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest,
21+
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember,
22+
)
2023
from kafka.util import Timer
2124

2225
log = logging.getLogger('kafka.coordinator')
@@ -216,10 +219,10 @@ def _perform_assignment(self, leader_id, protocol, members):
216219
Arguments:
217220
leader_id (str): The id of the leader (which is this member)
218221
protocol (str): the chosen group protocol (assignment strategy)
219-
members (list): [(member_id, group_instance_id, metadata_bytes)] from
220-
JoinGroupResponse. metadata_bytes are associated with the chosen
221-
group protocol, and the Coordinator subclass is responsible for
222-
decoding metadata_bytes based on that protocol.
222+
members (list): [GroupMember] from JoinGroupResponse.
223+
metadata_bytes are associated with the chosen group protocol,
224+
and the Coordinator subclass is responsible for decoding
225+
metadata_bytes based on that protocol.
223226
224227
Returns:
225228
dict: {member_id: assignment}; assignment must either be bytes
@@ -702,9 +705,11 @@ def _on_join_leader(self, response):
702705
Future: resolves to member assignment encoded-bytes
703706
"""
704707
try:
708+
members = [GroupMember(*member) if response.API_VERSION >= 5 else GroupMember(member[0], None, member[1])
709+
for member in response.members]
705710
group_assignment = self._perform_assignment(response.leader_id,
706711
response.group_protocol,
707-
response.members)
712+
members)
708713
for member_id, assignment in six.iteritems(group_assignment):
709714
if not isinstance(assignment, bytes):
710715
group_assignment[member_id] = assignment.encode()

kafka/coordinator/consumer.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -337,16 +337,11 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
337337
member_subscriptions = {}
338338
all_subscribed_topics = set()
339339
for member in members:
340-
if len(member) == 3:
341-
member_id, group_instance_id, metadata_bytes = member
342-
else:
343-
member_id, metadata_bytes = member
344-
group_instance_id = None
345340
subscription = Subscription(
346-
ConsumerProtocol[0].METADATA.decode(metadata_bytes),
347-
group_instance_id
341+
ConsumerProtocol[0].METADATA.decode(member.metadata_bytes),
342+
member.group_instance_id
348343
)
349-
member_subscriptions[member_id] = subscription
344+
member_subscriptions[member.member_id] = subscription
350345
all_subscribed_topics.update(subscription.subscription)
351346

352347
# the leader will begin watching for changes to any of the topics

kafka/protocol/group.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
import collections
4+
35
from kafka.protocol.api import Request, Response
46
from kafka.protocol.struct import Struct
57
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
@@ -8,6 +10,9 @@
810
DEFAULT_GENERATION_ID = -1
911
UNKNOWN_MEMBER_ID = ''
1012

13+
GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata_bytes"])
14+
GroupMember.__new__.__defaults__ = (None,) * len(GroupMember._fields)
15+
1116

1217
class JoinGroupResponse_v0(Response):
1318
API_KEY = 11

test/test_coordinator.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from kafka.protocol.commit import (
2222
OffsetCommitRequest, OffsetCommitResponse,
2323
OffsetFetchRequest, OffsetFetchResponse)
24+
from kafka.protocol.group import GroupMember
2425
from kafka.protocol.metadata import MetadataResponse
2526
from kafka.structs import OffsetAndMetadata, TopicPartition
2627
from kafka.util import WeakMethod
@@ -209,7 +210,7 @@ def test_perform_assignment(mocker, coordinator):
209210

210211
ret = coordinator._perform_assignment(
211212
'member-foo', 'roundrobin',
212-
[(member, subscription.encode())
213+
[GroupMember(member, None, subscription.encode())
213214
for member, subscription in group_subscriptions.items()])
214215

215216
assert RoundRobinPartitionAssignor.assign.call_count == 1

0 commit comments

Comments
 (0)