Skip to content

Commit 94a89e7

Browse files
committed
Rename ConsumerProtocolMemberMetadata field subscription -> topics
1 parent 106057b commit 94a89e7

7 files changed

Lines changed: 14 additions & 11 deletions

File tree

kafka/coordinator/assignors/range.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
3535
def assign(cls, cluster, group_subscriptions):
3636
consumers_per_topic = collections.defaultdict(list)
3737
for member, subscription in six.iteritems(group_subscriptions):
38-
for topic in subscription.subscription:
38+
for topic in subscription.topics:
3939
consumers_per_topic[topic].append(member)
4040

4141
# construct {member_id: {topic: [partition, ...]}}

kafka/coordinator/assignors/roundrobin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
5252
def assign(cls, cluster, group_subscriptions):
5353
all_topics = set()
5454
for subscription in six.itervalues(group_subscriptions):
55-
all_topics.update(subscription.subscription)
55+
all_topics.update(subscription.topics)
5656

5757
all_topic_partitions = []
5858
for topic in all_topics:
@@ -75,7 +75,7 @@ def assign(cls, cluster, group_subscriptions):
7575
# member subscribed topics, we should be safe assuming that
7676
# each topic in all_topic_partitions is in at least one member
7777
# subscription; otherwise this could yield an infinite loop
78-
while partition.topic not in group_subscriptions[member_id].subscription:
78+
while partition.topic not in group_subscriptions[member_id].topics:
7979
member_id = next(member_iter)
8080
assignment[member_id][partition.topic].append(partition.partition)
8181

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ def parse_member_metadata(cls, metadata):
625625
user_data = metadata.user_data
626626
if not user_data:
627627
return StickyAssignorMemberMetadataV1(
628-
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
628+
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
629629
)
630630

631631
try:
@@ -634,15 +634,15 @@ def parse_member_metadata(cls, metadata):
634634
# ignore the consumer's previous assignment if it cannot be parsed
635635
log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args
636636
return StickyAssignorMemberMetadataV1(
637-
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
637+
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
638638
)
639639

640640
member_partitions = []
641641
for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member
642642
member_partitions.extend([TopicPartition(topic, partition) for partition in partitions])
643643
return StickyAssignorMemberMetadataV1(
644644
# pylint: disable=no-member
645-
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription
645+
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics
646646
)
647647

648648
@classmethod

kafka/coordinator/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
342342
member.group_instance_id
343343
)
344344
member_subscriptions[member.member_id] = subscription
345-
all_subscribed_topics.update(subscription.subscription)
345+
all_subscribed_topics.update(subscription.topics)
346346

347347
# the leader will begin watching for changes to any of the topics
348348
# the group is interested in, which ensures that all metadata changes

kafka/coordinator/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
class ConsumerProtocolMemberMetadata_v0(Struct):
99
SCHEMA = Schema(
1010
('version', Int16),
11-
('subscription', Array(String('utf-8'))),
11+
('topics', Array(String('utf-8'))),
1212
('user_data', Bytes))
1313

1414

kafka/coordinator/subscription.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ def user_data(self):
1616
return self._metadata.user_data
1717

1818
@property
19-
def subscription(self):
20-
return self._metadata.subscription
19+
def topics(self):
20+
return self._metadata.topics
21+
22+
# Alias for old interface / name
23+
subscription = topics
2124

2225
@property
2326
def group_instance_id(self):

test/integration/test_admin_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def consumer_thread(i, group_id):
231231
else:
232232
assert(len(consumer_group.members) == 1)
233233
for member in consumer_group.members:
234-
assert(member.member_metadata.subscription[0] == topic)
234+
assert(member.member_metadata.topics[0] == topic)
235235
assert(member.member_assignment.assignment[0][0] == topic)
236236
consumer_groups.add(consumer_group.group)
237237
assert(sorted(list(consumer_groups)) == group_id_list)

0 commit comments

Comments
 (0)