Skip to content

Commit b2e27af

Browse files
committed
Use Subscription wrapper to hold group_instance_id + member metadata
1 parent 5e5ea3f commit b2e27af

7 files changed

Lines changed: 73 additions & 32 deletions

File tree

kafka/coordinator/assignors/abstract.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ def assign(self, cluster, members):
2323
2424
Arguments:
2525
cluster (ClusterMetadata): metadata for use in assignment
26-
members (dict of {member_id: MemberMetadata}): decoded metadata for
27-
each member in the group.
26+
members (dict of {member_id: Subscription}): decoded metadata
27+
for each member in the group, including group_instance_id
28+
when available.
2829
2930
Returns:
3031
dict: {member_id: MemberAssignment}

kafka/coordinator/assignors/range.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
3232
version = 0
3333

3434
@classmethod
35-
def assign(cls, cluster, member_metadata):
35+
def assign(cls, cluster, group_subscriptions):
3636
consumers_per_topic = collections.defaultdict(list)
37-
for member, metadata in six.iteritems(member_metadata):
38-
for topic in metadata.subscription:
37+
for member, subscription in six.iteritems(group_subscriptions):
38+
for topic in subscription.subscription:
3939
consumers_per_topic[topic].append(member)
4040

4141
# construct {member_id: {topic: [partition, ...]}}
@@ -61,7 +61,7 @@ def assign(cls, cluster, member_metadata):
6161
assignment[member][topic] = partitions[start:start+length]
6262

6363
protocol_assignment = {}
64-
for member_id in member_metadata:
64+
for member_id in group_subscriptions:
6565
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
6666
cls.version,
6767
sorted(assignment[member_id].items()),

kafka/coordinator/assignors/roundrobin.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
4949
version = 0
5050

5151
@classmethod
52-
def assign(cls, cluster, member_metadata):
52+
def assign(cls, cluster, group_subscriptions):
5353
all_topics = set()
54-
for metadata in six.itervalues(member_metadata):
55-
all_topics.update(metadata.subscription)
54+
for subscription in six.itervalues(group_subscriptions):
55+
all_topics.update(subscription.subscription)
5656

5757
all_topic_partitions = []
5858
for topic in all_topics:
@@ -67,20 +67,20 @@ def assign(cls, cluster, member_metadata):
6767
# construct {member_id: {topic: [partition, ...]}}
6868
assignment = collections.defaultdict(lambda: collections.defaultdict(list))
6969

70-
member_iter = itertools.cycle(sorted(member_metadata.keys()))
70+
member_iter = itertools.cycle(sorted(group_subscriptions.keys()))
7171
for partition in all_topic_partitions:
7272
member_id = next(member_iter)
7373

7474
# Because we constructed all_topic_partitions from the set of
7575
# member subscribed topics, we should be safe assuming that
7676
# each topic in all_topic_partitions is in at least one member
7777
# subscription; otherwise this could yield an infinite loop
78-
while partition.topic not in member_metadata[member_id].subscription:
78+
while partition.topic not in group_subscriptions[member_id].subscription:
7979
member_id = next(member_iter)
8080
assignment[member_id][partition.topic].append(partition.partition)
8181

8282
protocol_assignment = {}
83-
for member_id in member_metadata:
83+
for member_id in group_subscriptions:
8484
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
8585
cls.version,
8686
sorted(assignment[member_id].items()),

kafka/coordinator/consumer.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1414
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
1515
from kafka.coordinator.protocol import ConsumerProtocol
16+
from kafka.coordinator.subscription import Subscription
1617
import kafka.errors as Errors
1718
from kafka.future import Future
1819
from kafka.metrics import AnonMeasurable
@@ -333,17 +334,20 @@ def time_to_next_poll(self):
333334
def _perform_assignment(self, leader_id, assignment_strategy, members):
334335
assignor = self._lookup_assignor(assignment_strategy)
335336
assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
336-
member_metadata = {}
337+
member_subscriptions = {}
337338
all_subscribed_topics = set()
338339
for member in members:
339340
if len(member) == 3:
340341
member_id, group_instance_id, metadata_bytes = member
341342
else:
342343
member_id, metadata_bytes = member
343344
group_instance_id = None
344-
metadata = ConsumerProtocol[0].METADATA.decode(metadata_bytes),
345-
member_metadata[member_id] = metadata
346-
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
345+
subscription = Subscription(
346+
ConsumerProtocol[0].METADATA.decode(metadata_bytes),
347+
group_instance_id
348+
)
349+
member_subscriptions[member_id] = subscription
350+
all_subscribed_topics.update(subscription.subscription)
347351

348352
# the leader will begin watching for changes to any of the topics
349353
# the group is interested in, which ensures that all metadata changes
@@ -361,9 +365,9 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
361365

362366
log.debug("Performing assignment for group %s using strategy %s"
363367
" with subscriptions %s", self.group_id, assignor.name,
364-
member_metadata)
368+
member_subscriptions)
365369

366-
assignments = assignor.assign(self._cluster, member_metadata)
370+
assignments = assignor.assign(self._cluster, member_subscriptions)
367371

368372
log.debug("Finished assignment for group %s: %s", self.group_id, assignments)
369373

kafka/coordinator/subscription.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import absolute_import
2+
3+
4+
class Subscription(object):
5+
__slots__ = ('_metadata', '_group_instance_id')
6+
def __init__(self, metadata, group_instance_id):
7+
self._metadata = metadata
8+
self._group_instance_id = group_instance_id
9+
10+
@property
11+
def version(self):
12+
return self._metadata.version
13+
14+
@property
15+
def user_data(self):
16+
return self._metadata.user_data
17+
18+
@property
19+
def subscription(self):
20+
return self._metadata.subscription
21+
22+
@property
23+
def group_instance_id(self):
24+
return self._group_instance_id
25+
26+
def encode(self):
27+
return self._metadata.encode()
28+
29+
def __eq__(self, other):
30+
return (
31+
isinstance(other, Subscription) and
32+
self._metadata == other._metadata and
33+
self._group_instance_id == other._group_instance_id
34+
)

test/test_assignors.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1212
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
1313
from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment_v0
14+
from kafka.coordinator.subscription import Subscription
1415
from kafka.vendor import six
1516

1617

@@ -34,13 +35,13 @@ def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lamb
3435
def test_assignor_roundrobin(mocker):
3536
assignor = RoundRobinPartitionAssignor
3637

37-
member_metadata = {
38-
'C0': assignor.metadata({'t0', 't1'}),
39-
'C1': assignor.metadata({'t0', 't1'}),
38+
group_subscriptions = {
39+
'C0': Subscription(assignor.metadata({'t0', 't1'}), None),
40+
'C1': Subscription(assignor.metadata({'t0', 't1'}), None),
4041
}
4142

4243
cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
43-
ret = assignor.assign(cluster, member_metadata)
44+
ret = assignor.assign(cluster, group_subscriptions)
4445
expected = {
4546
'C0': ConsumerProtocolMemberAssignment_v0(
4647
assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
@@ -56,13 +57,13 @@ def test_assignor_roundrobin(mocker):
5657
def test_assignor_range(mocker):
5758
assignor = RangePartitionAssignor
5859

59-
member_metadata = {
60-
'C0': assignor.metadata({'t0', 't1'}),
61-
'C1': assignor.metadata({'t0', 't1'}),
60+
group_subscriptions = {
61+
'C0': Subscription(assignor.metadata({'t0', 't1'}), None),
62+
'C1': Subscription(assignor.metadata({'t0', 't1'}), None),
6263
}
6364

6465
cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
65-
ret = assignor.assign(cluster, member_metadata)
66+
ret = assignor.assign(cluster, group_subscriptions)
6667
expected = {
6768
'C0': ConsumerProtocolMemberAssignment_v0(
6869
assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),

test/test_coordinator.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from kafka.coordinator.consumer import ConsumerCoordinator
1515
from kafka.coordinator.protocol import (
1616
ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0)
17+
from kafka.coordinator.subscription import Subscription
1718
import kafka.errors as Errors
1819
from kafka.future import Future
1920
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
@@ -192,9 +193,9 @@ def test_subscription_listener_failure(mocker, coordinator):
192193

193194
def test_perform_assignment(mocker, coordinator):
194195
coordinator._subscription.subscribe(topics=['foo1'])
195-
member_metadata = {
196-
'member-foo': ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''),
197-
'member-bar': ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b'')
196+
group_subscriptions = {
197+
'member-foo': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None),
198+
'member-bar': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None),
198199
}
199200
assignments = {
200201
'member-foo': ConsumerProtocolMemberAssignment_v0(
@@ -208,12 +209,12 @@ def test_perform_assignment(mocker, coordinator):
208209

209210
ret = coordinator._perform_assignment(
210211
'member-foo', 'roundrobin',
211-
[(member, metadata.encode())
212-
for member, metadata in member_metadata.items()])
212+
[(member, subscription.encode())
213+
for member, subscription in group_subscriptions.items()])
213214

214215
assert RoundRobinPartitionAssignor.assign.call_count == 1
215216
RoundRobinPartitionAssignor.assign.assert_called_with(
216-
coordinator._client.cluster, member_metadata)
217+
coordinator._client.cluster, group_subscriptions)
217218
assert ret == assignments
218219

219220

0 commit comments

Comments
 (0)