Skip to content

Commit 15d4c03

Browse files
authored
Use new protocol attrs in Admin client (#2761)
1 parent 4fb0a10 commit 15d4c03

5 files changed

Lines changed: 79 additions & 83 deletions

File tree

kafka/admin/client.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
504504
"""
505505
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
506506
timeout_ms = self._validate_timeout(timeout_ms)
507-
request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms)
507+
request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms)
508508
def get_response_errors(r):
509509
for response in r.responses:
510510
yield Errors.for_code(response[1])
@@ -554,7 +554,7 @@ def list_topics(self):
554554
A list of topic name strings.
555555
"""
556556
metadata = self._get_cluster_metadata(topics=None)
557-
return [t['topic'] for t in metadata['topics']]
557+
return [t['name'] for t in metadata['topics']]
558558

559559
def describe_topics(self, topics=None):
560560
"""Fetch metadata for the specified topics or all topics if None.
@@ -651,12 +651,11 @@ def describe_acls(self, acl_filter):
651651
request = DescribeAclsRequest[version](
652652
resource_type_filter=acl_filter.resource_pattern.resource_type,
653653
resource_name_filter=acl_filter.resource_pattern.resource_name,
654-
resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
654+
pattern_type_filter=acl_filter.resource_pattern.pattern_type,
655655
principal_filter=acl_filter.principal,
656-
host=acl_filter.host,
656+
host_filter=acl_filter.host,
657657
operation=acl_filter.operation,
658658
permission_type=acl_filter.permission_type
659-
660659
)
661660
response = self.send_request(request) # pylint: disable=E0606
662661
error_type = Errors.for_code(response.error_code)
@@ -1089,9 +1088,9 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
10891088
valid_partitions = set()
10901089
for topic in metadata.get("topics", ()):
10911090
for partition in topic.get("partitions", ()):
1092-
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
1091+
t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"])
10931092
if t2p in partitions:
1094-
leader2partitions[partition["leader"]].append(t2p)
1093+
leader2partitions[partition["leader_id"]].append(t2p)
10951094
valid_partitions.add(t2p)
10961095

10971096
if len(partitions) != len(valid_partitions):

kafka/protocol/metadata.py

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ class MetadataResponse_v0(Response):
1212
('port', Int32))),
1313
('topics', Array(
1414
('error_code', Int16),
15-
('topic', String('utf-8')),
15+
('name', String('utf-8')),
1616
('partitions', Array(
1717
('error_code', Int16),
18-
('partition', Int32),
19-
('leader', Int32),
20-
('replicas', Array(Int32)),
21-
('isr', Array(Int32))))))
18+
('partition_index', Int32),
19+
('leader_id', Int32),
20+
('replica_nodes', Array(Int32)),
21+
('isr_nodes', Array(Int32))))))
2222
)
2323

2424

@@ -34,14 +34,14 @@ class MetadataResponse_v1(Response):
3434
('controller_id', Int32),
3535
('topics', Array(
3636
('error_code', Int16),
37-
('topic', String('utf-8')),
37+
('name', String('utf-8')),
3838
('is_internal', Boolean),
3939
('partitions', Array(
4040
('error_code', Int16),
41-
('partition', Int32),
42-
('leader', Int32),
43-
('replicas', Array(Int32)),
44-
('isr', Array(Int32))))))
41+
('partition_index', Int32),
42+
('leader_id', Int32),
43+
('replica_nodes', Array(Int32)),
44+
('isr_nodes', Array(Int32))))))
4545
)
4646

4747

@@ -58,14 +58,14 @@ class MetadataResponse_v2(Response):
5858
('controller_id', Int32),
5959
('topics', Array(
6060
('error_code', Int16),
61-
('topic', String('utf-8')),
61+
('name', String('utf-8')),
6262
('is_internal', Boolean),
6363
('partitions', Array(
6464
('error_code', Int16),
65-
('partition', Int32),
66-
('leader', Int32),
67-
('replicas', Array(Int32)),
68-
('isr', Array(Int32))))))
65+
('partition_index', Int32),
66+
('leader_id', Int32),
67+
('replica_nodes', Array(Int32)),
68+
('isr_nodes', Array(Int32))))))
6969
)
7070

7171

@@ -83,14 +83,14 @@ class MetadataResponse_v3(Response):
8383
('controller_id', Int32),
8484
('topics', Array(
8585
('error_code', Int16),
86-
('topic', String('utf-8')),
86+
('name', String('utf-8')),
8787
('is_internal', Boolean),
8888
('partitions', Array(
8989
('error_code', Int16),
90-
('partition', Int32),
91-
('leader', Int32),
92-
('replicas', Array(Int32)),
93-
('isr', Array(Int32))))))
90+
('partition_index', Int32),
91+
('leader_id', Int32),
92+
('replica_nodes', Array(Int32)),
93+
('isr_nodes', Array(Int32))))))
9494
)
9595

9696

@@ -114,14 +114,14 @@ class MetadataResponse_v5(Response):
114114
('controller_id', Int32),
115115
('topics', Array(
116116
('error_code', Int16),
117-
('topic', String('utf-8')),
117+
('name', String('utf-8')),
118118
('is_internal', Boolean),
119119
('partitions', Array(
120120
('error_code', Int16),
121-
('partition', Int32),
122-
('leader', Int32),
123-
('replicas', Array(Int32)),
124-
('isr', Array(Int32)),
121+
('partition_index', Int32),
122+
('leader_id', Int32),
123+
('replica_nodes', Array(Int32)),
124+
('isr_nodes', Array(Int32)),
125125
('offline_replicas', Array(Int32))))))
126126
)
127127

@@ -149,15 +149,15 @@ class MetadataResponse_v7(Response):
149149
('controller_id', Int32),
150150
('topics', Array(
151151
('error_code', Int16),
152-
('topic', String('utf-8')),
152+
('name', String('utf-8')),
153153
('is_internal', Boolean),
154154
('partitions', Array(
155155
('error_code', Int16),
156-
('partition', Int32),
157-
('leader', Int32),
156+
('partition_index', Int32),
157+
('leader_id', Int32),
158158
('leader_epoch', Int32),
159-
('replicas', Array(Int32)),
160-
('isr', Array(Int32)),
159+
('replica_nodes', Array(Int32)),
160+
('isr_nodes', Array(Int32)),
161161
('offline_replicas', Array(Int32))))))
162162
)
163163

@@ -177,15 +177,15 @@ class MetadataResponse_v8(Response):
177177
('controller_id', Int32),
178178
('topics', Array(
179179
('error_code', Int16),
180-
('topic', String('utf-8')),
180+
('name', String('utf-8')),
181181
('is_internal', Boolean),
182182
('partitions', Array(
183183
('error_code', Int16),
184-
('partition', Int32),
185-
('leader', Int32),
184+
('partition_index', Int32),
185+
('leader_id', Int32),
186186
('leader_epoch', Int32),
187-
('replicas', Array(Int32)),
188-
('isr', Array(Int32)),
187+
('replica_nodes', Array(Int32)),
188+
('isr_nodes', Array(Int32)),
189189
('offline_replicas', Array(Int32)))),
190190
('authorized_operations', BitField))),
191191
('authorized_operations', BitField)

test/integration/test_admin_integration.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def consumer_thread(i, group_id):
238238
assert(len(consumer_group.members) == 1)
239239
for member in consumer_group.members:
240240
assert(member.member_metadata.topics[0] == topic)
241-
assert(member.member_assignment.assignment[0][0] == topic)
241+
assert(member.member_assignment.assigned_partitions[0][0] == topic)
242242
consumer_groups.add(consumer_group.group)
243243
assert(sorted(list(consumer_groups)) == group_id_list)
244244
finally:
@@ -406,8 +406,8 @@ def test_create_delete_topics(kafka_admin_client):
406406
@pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")
407407
def test_perform_leader_election(kafka_admin_client, topic):
408408
topic_metadata = kafka_admin_client.describe_topics([topic])[0]
409-
assert topic_metadata['topic'] == topic
410-
partitions = list(map(lambda p: p['partition'], topic_metadata['partitions']))
409+
assert topic_metadata['name'] == topic
410+
partitions = list(map(lambda p: p['partition_index'], topic_metadata['partitions']))
411411
election_type = 0 # Preferred
412412
topic_partitions = {topic: partitions}
413413
# When Leader Election is not needed (cluster is stable), error 84 is returned
Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
from kafka.protocol.admin import Request
2-
from kafka.protocol.admin import Response
3-
from kafka.protocol.types import Schema
4-
from kafka.protocol.types import Array
5-
from kafka.protocol.types import Int16
6-
from kafka.protocol.types import String
1+
from kafka.protocol.api import Request
2+
from kafka.protocol.api import Response
3+
from kafka.protocol.types import Schema, Array, Int16, String
74

85
import pytest
96

@@ -197,31 +194,31 @@ def test_with_metadata_response():
197194

198195
assert len(obj['topics']) == 2
199196
assert obj['topics'][0]['error_code'] == 0
200-
assert obj['topics'][0]['topic'] == 'testtopic1'
197+
assert obj['topics'][0]['name'] == 'testtopic1'
201198
assert obj['topics'][0]['is_internal'] is False
202199
assert len(obj['topics'][0]['partitions']) == 2
203200
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
204-
assert obj['topics'][0]['partitions'][0]['partition'] == 0
205-
assert obj['topics'][0]['partitions'][0]['leader'] == 0
206-
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
207-
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
201+
assert obj['topics'][0]['partitions'][0]['partition_index'] == 0
202+
assert obj['topics'][0]['partitions'][0]['leader_id'] == 0
203+
assert obj['topics'][0]['partitions'][0]['replica_nodes'] == [0, 1]
204+
assert obj['topics'][0]['partitions'][0]['isr_nodes'] == [0, 1]
208205
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
209206
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
210-
assert obj['topics'][0]['partitions'][1]['partition'] == 1
211-
assert obj['topics'][0]['partitions'][1]['leader'] == 1
212-
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
213-
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
207+
assert obj['topics'][0]['partitions'][1]['partition_index'] == 1
208+
assert obj['topics'][0]['partitions'][1]['leader_id'] == 1
209+
assert obj['topics'][0]['partitions'][1]['replica_nodes'] == [1, 0]
210+
assert obj['topics'][0]['partitions'][1]['isr_nodes'] == [1, 0]
214211
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []
215212

216213
assert obj['topics'][1]['error_code'] == 0
217-
assert obj['topics'][1]['topic'] == 'other-test-topic'
214+
assert obj['topics'][1]['name'] == 'other-test-topic'
218215
assert obj['topics'][1]['is_internal'] is True
219216
assert len(obj['topics'][1]['partitions']) == 1
220217
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
221-
assert obj['topics'][1]['partitions'][0]['partition'] == 0
222-
assert obj['topics'][1]['partitions'][0]['leader'] == 0
223-
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
224-
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
218+
assert obj['topics'][1]['partitions'][0]['partition_index'] == 0
219+
assert obj['topics'][1]['partitions'][0]['leader_id'] == 0
220+
assert obj['topics'][1]['partitions'][0]['replica_nodes'] == [0, 1]
221+
assert obj['topics'][1]['partitions'][0]['isr_nodes'] == [0, 1]
225222
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []
226223

227224
tc.encode()

test/test_assignors.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):
341341

342342
assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
343343
verify_validity_and_balance(subscriptions, assignment)
344-
assert len(assignment['C2'].assignment[0][1]) == 3
344+
assert len(assignment['C2'].assigned_partitions[0][1]) == 3
345345

346346

347347
def test_sticky_add_remove_topic_two_consumers(mocker):
@@ -693,9 +693,9 @@ def test_assignment_with_multiple_generations1(mocker):
693693

694694
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
695695
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
696-
assert len(assignment1['C1'].assignment[0][1]) == 2
697-
assert len(assignment1['C2'].assignment[0][1]) == 2
698-
assert len(assignment1['C3'].assignment[0][1]) == 2
696+
assert len(assignment1['C1'].assigned_partitions[0][1]) == 2
697+
assert len(assignment1['C2'].assigned_partitions[0][1]) == 2
698+
assert len(assignment1['C3'].assigned_partitions[0][1]) == 2
699699

700700
member_metadata = {
701701
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
@@ -704,10 +704,10 @@ def test_assignment_with_multiple_generations1(mocker):
704704

705705
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
706706
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2)
707-
assert len(assignment2['C1'].assignment[0][1]) == 3
708-
assert len(assignment2['C2'].assignment[0][1]) == 3
709-
assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]])
710-
assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
707+
assert len(assignment2['C1'].assigned_partitions[0][1]) == 3
708+
assert len(assignment2['C2'].assigned_partitions[0][1]) == 3
709+
assert all([partition in assignment2['C1'].assigned_partitions[0][1] for partition in assignment1['C1'].assigned_partitions[0][1]])
710+
assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]])
711711
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
712712

713713
member_metadata = {
@@ -717,8 +717,8 @@ def test_assignment_with_multiple_generations1(mocker):
717717

718718
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
719719
verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3)
720-
assert len(assignment3['C2'].assignment[0][1]) == 3
721-
assert len(assignment3['C3'].assignment[0][1]) == 3
720+
assert len(assignment3['C2'].assigned_partitions[0][1]) == 3
721+
assert len(assignment3['C3'].assigned_partitions[0][1]) == 3
722722
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
723723

724724

@@ -733,18 +733,18 @@ def test_assignment_with_multiple_generations2(mocker):
733733

734734
assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
735735
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
736-
assert len(assignment1['C1'].assignment[0][1]) == 2
737-
assert len(assignment1['C2'].assignment[0][1]) == 2
738-
assert len(assignment1['C3'].assignment[0][1]) == 2
736+
assert len(assignment1['C1'].assigned_partitions[0][1]) == 2
737+
assert len(assignment1['C2'].assigned_partitions[0][1]) == 2
738+
assert len(assignment1['C3'].assigned_partitions[0][1]) == 2
739739

740740
member_metadata = {
741741
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
742742
}
743743

744744
assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
745745
verify_validity_and_balance({'C2': {'t'}}, assignment2)
746-
assert len(assignment2['C2'].assignment[0][1]) == 6
747-
assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
746+
assert len(assignment2['C2'].assigned_partitions[0][1]) == 6
747+
assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]])
748748
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
749749

750750
member_metadata = {
@@ -756,9 +756,9 @@ def test_assignment_with_multiple_generations2(mocker):
756756
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
757757
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3)
758758
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
759-
assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1])
760-
assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1])
761-
assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1])
759+
assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1])
760+
assert set(assignment3['C2'].assigned_partitions[0][1]) == set(assignment1['C2'].assigned_partitions[0][1])
761+
assert set(assignment3['C3'].assigned_partitions[0][1]) == set(assignment1['C3'].assigned_partitions[0][1])
762762

763763

764764
@pytest.mark.parametrize('execution_number', range(50))

0 commit comments

Comments
 (0)