Skip to content

Commit 8af7cc4

Browse files
authored
Switch to new protocol attrs (no aliases) (#2750)
1 parent ff72238 commit 8af7cc4

16 files changed

Lines changed: 99 additions & 98 deletions

File tree

kafka/admin/client.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def _find_coordinator_id_process_response(self, response):
308308
raise error_type(
309309
"FindCoordinatorRequest failed with response '{}'."
310310
.format(response))
311-
return response.coordinator_id
311+
return response.node_id
312312

313313
def _find_coordinator_ids(self, group_ids):
314314
"""Find the broker node_ids of the coordinators of the given groups.
@@ -471,13 +471,13 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
471471
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
472472
.format(self.config['api_version']))
473473
request = CreateTopicsRequest[version](
474-
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
475-
timeout=timeout_ms
474+
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
475+
timeout_ms=timeout_ms
476476
)
477477
elif version <= 3:
478478
request = CreateTopicsRequest[version](
479-
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
480-
timeout=timeout_ms,
479+
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
480+
timeout_ms=timeout_ms,
481481
validate_only=validate_only
482482
)
483483
else:
@@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
504504
"""
505505
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
506506
timeout_ms = self._validate_timeout(timeout_ms)
507-
request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms)
507+
request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms)
508508
def get_response_errors(r):
509509
for response in r.responses:
510510
yield Errors.for_code(response[1])
@@ -640,19 +640,19 @@ def describe_acls(self, acl_filter):
640640
version = self._client.api_version(DescribeAclsRequest, max_version=1)
641641
if version == 0:
642642
request = DescribeAclsRequest[version](
643-
resource_type=acl_filter.resource_pattern.resource_type,
644-
resource_name=acl_filter.resource_pattern.resource_name,
645-
principal=acl_filter.principal,
646-
host=acl_filter.host,
643+
resource_type_filter=acl_filter.resource_pattern.resource_type,
644+
resource_name_filter=acl_filter.resource_pattern.resource_name,
645+
principal_filter=acl_filter.principal,
646+
host_filter=acl_filter.host,
647647
operation=acl_filter.operation,
648648
permission_type=acl_filter.permission_type
649649
)
650650
elif version <= 1:
651651
request = DescribeAclsRequest[version](
652-
resource_type=acl_filter.resource_pattern.resource_type,
653-
resource_name=acl_filter.resource_pattern.resource_name,
652+
resource_type_filter=acl_filter.resource_pattern.resource_type,
653+
resource_name_filter=acl_filter.resource_pattern.resource_name,
654654
resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
655-
principal=acl_filter.principal,
655+
principal_filter=acl_filter.principal,
656656
host=acl_filter.host,
657657
operation=acl_filter.operation,
658658
permission_type=acl_filter.permission_type
@@ -727,7 +727,7 @@ def _convert_create_acls_response_to_acls(acls, create_response):
727727

728728
creations_error = []
729729
creations_success = []
730-
for i, creations in enumerate(create_response.creation_responses):
730+
for i, creations in enumerate(create_response.results):
731731
if version <= 1:
732732
error_code, error_message = creations
733733
acl = acls[i]
@@ -827,7 +827,7 @@ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response)
827827
"""
828828
version = delete_response.API_VERSION
829829
filter_result_list = []
830-
for i, filter_responses in enumerate(delete_response.filter_responses):
830+
for i, filter_responses in enumerate(delete_response.filter_results):
831831
filter_error_code, filter_error_message, matching_acls = filter_responses
832832
filter_error = Errors.for_code(filter_error_code)
833833
acl_result_list = []
@@ -1054,8 +1054,8 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
10541054
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
10551055
timeout_ms = self._validate_timeout(timeout_ms)
10561056
request = CreatePartitionsRequest[version](
1057-
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
1058-
timeout=timeout_ms,
1057+
topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
1058+
timeout_ms=timeout_ms,
10591059
validate_only=validate_only
10601060
)
10611061
def get_response_errors(r):
@@ -1594,7 +1594,7 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
15941594
request = ElectLeadersRequest[version](
15951595
election_type=ElectionType(election_type),
15961596
topic_partitions=self._get_topic_partitions(topic_partitions),
1597-
timeout=timeout_ms,
1597+
timeout_ms=timeout_ms,
15981598
)
15991599
# TODO convert structs to a more pythonic interface
16001600
def get_response_errors(r):

kafka/cluster.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self, **configs):
4545
self._brokers = {} # node_id -> BrokerMetadata
4646
self._partitions = {} # topic -> partition -> PartitionMetadata
4747
self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...}
48-
self._coordinators = {} # (coord_type, coord_key) -> node_id
48+
self._coordinators = {} # (key_type, key) -> node_id
4949
self._last_refresh_ms = 0
5050
self._last_successful_refresh_ms = 0
5151
self._need_update = True
@@ -369,36 +369,36 @@ def remove_listener(self, listener):
369369
"""Remove a previously added listener callback"""
370370
self._listeners.remove(listener)
371371

372-
def add_coordinator(self, response, coord_type, coord_key):
372+
def add_coordinator(self, response, key_type, key):
373373
"""Update with metadata for a group or txn coordinator
374374
375375
Arguments:
376376
response (FindCoordinatorResponse): broker response
377-
coord_type (str): 'group' or 'transaction'
378-
coord_key (str): consumer_group or transactional_id
377+
key_type (str): 'group' or 'transaction'
378+
key (str): consumer_group or transactional_id
379379
380380
Returns:
381381
string: coordinator node_id if metadata is updated, None on error
382382
"""
383-
log.debug("Updating coordinator for %s/%s: %s", coord_type, coord_key, response)
383+
log.debug("Updating coordinator for %s/%s: %s", key_type, key, response)
384384
error_type = Errors.for_code(response.error_code)
385385
if error_type is not Errors.NoError:
386386
log.error("FindCoordinatorResponse error: %s", error_type)
387-
self._coordinators[(coord_type, coord_key)] = -1
387+
self._coordinators[(key_type, key)] = -1
388388
return
389389

390390
# Use a coordinator-specific node id so that requests
391391
# get a dedicated connection
392-
node_id = 'coordinator-{}'.format(response.coordinator_id)
392+
node_id = 'coordinator-{}'.format(response.node_id)
393393
coordinator = BrokerMetadata(
394394
node_id,
395395
response.host,
396396
response.port,
397397
None)
398398

399-
log.info("Coordinator for %s/%s is %s", coord_type, coord_key, coordinator)
399+
log.info("Coordinator for %s/%s is %s", key_type, key, coordinator)
400400
self._coordinator_brokers[node_id] = coordinator
401-
self._coordinators[(coord_type, coord_key)] = node_id
401+
self._coordinators[(key_type, key)] = node_id
402402
return node_id
403403

404404
def with_partitions(self, partitions_to_add):

kafka/conn.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ def _handle_api_versions_response(self, future, response):
578578
future.failure(error_type())
579579
if error_type is Errors.UnsupportedVersionError:
580580
self._api_versions_idx -= 1
581-
for api_version_data in response.api_versions:
581+
for api_version_data in response.api_keys:
582582
api_key, min_version, max_version = api_version_data[:3]
583583
# If broker provides a lower max_version, skip to that
584584
if api_key == response.API_KEY:
@@ -593,7 +593,7 @@ def _handle_api_versions_response(self, future, response):
593593
return
594594
self._api_versions = dict([
595595
(api_version_data[0], (api_version_data[1], api_version_data[2]))
596-
for api_version_data in response.api_versions
596+
for api_version_data in response.api_keys
597597
])
598598
self._api_version = infer_broker_version_from_api_versions(self._api_versions)
599599
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
@@ -666,11 +666,11 @@ def _handle_sasl_handshake_response(self, future, response):
666666
self.close(error=error)
667667
return future.failure(error_type(self))
668668

669-
if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
669+
if self.config['sasl_mechanism'] not in response.mechanisms:
670670
future.failure(
671671
Errors.UnsupportedSaslMechanismError(
672672
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
673-
% (self.config['sasl_mechanism'], response.enabled_mechanisms)))
673+
% (self.config['sasl_mechanism'], response.mechanisms)))
674674
else:
675675
self._sasl_authenticate(future)
676676

kafka/consumer/fetcher.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -757,14 +757,14 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
757757
return
758758

759759
partitions = set([TopicPartition(topic, partition_data[0])
760-
for topic, partitions in response.topics
760+
for topic, partitions in response.responses
761761
for partition_data in partitions])
762762
if self._sensors:
763763
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
764764
else:
765765
metric_aggregator = None
766766

767-
for topic, partitions in response.topics:
767+
for topic, partitions in response.responses:
768768
for partition_data in partitions:
769769
tp = TopicPartition(topic, partition_data[0])
770770
fetch_offset = fetch_offsets[tp]
@@ -922,7 +922,7 @@ def __init__(self, fetch_offset, tp, records,
922922
self.aborted_producer_ids = set()
923923
self.aborted_transactions = collections.deque(
924924
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
925-
key=lambda txn: txn.first_offset)
925+
key=lambda txn: txn.first_offset)
926926
)
927927
self.metric_aggregator = metric_aggregator
928928
self.check_crcs = check_crcs
@@ -1206,7 +1206,7 @@ def handle_error(self, _exception):
12061206

12071207
def _response_partitions(self, response):
12081208
return {TopicPartition(topic, partition_data[0])
1209-
for topic, partitions in response.topics
1209+
for topic, partitions in response.responses
12101210
for partition_data in partitions}
12111211

12121212

kafka/coordinator/base.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -604,10 +604,10 @@ def _handle_join_group_response(self, future, send_time, response):
604604
else:
605605
self._generation = Generation(response.generation_id,
606606
response.member_id,
607-
response.group_protocol)
607+
response.protocol_name)
608608

609609
log.info("Successfully joined group %s %s", self.group_id, self._generation)
610-
if response.leader_id == response.member_id:
610+
if response.leader == response.member_id:
611611
log.info("Elected group leader -- performing partition"
612612
" assignments using %s", self._generation.protocol)
613613
self._on_join_leader(response).chain(future)
@@ -697,10 +697,13 @@ def _on_join_leader(self, response):
697697
Future: resolves to member assignment encoded-bytes
698698
"""
699699
try:
700-
members = [GroupMember(*member) if response.API_VERSION >= 5 else GroupMember(member[0], None, member[1])
700+
members = [GroupMember(
701+
member_id=member[0],
702+
group_instance_id=member[1] if response.API_VERSION >= 5 else None,
703+
metadata=member[2] if response.API_VERSION >= 5 else member[1])
701704
for member in response.members]
702-
group_assignment = self._perform_assignment(response.leader_id,
703-
response.group_protocol,
705+
group_assignment = self._perform_assignment(response.leader,
706+
response.protocol_name,
704707
members)
705708
except Exception as e:
706709
return Future().failure(e)
@@ -747,7 +750,7 @@ def _handle_sync_group_response(self, future, send_time, response):
747750
if error_type is Errors.NoError:
748751
if self._sensors:
749752
self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000)
750-
future.success(response.member_assignment)
753+
future.success(response.assignment)
751754
return
752755

753756
# Always rejoin on error
@@ -802,12 +805,12 @@ def _send_group_coordinator_request(self):
802805
self.group_id, node_id, request)
803806
future = Future()
804807
_f = self._client.send(node_id, request)
805-
_f.add_callback(self._handle_group_coordinator_response, future)
808+
_f.add_callback(self._handle_find_coordinator_response, future)
806809
_f.add_errback(self._failed_request, node_id, request, future)
807810
return future
808811

809-
def _handle_group_coordinator_response(self, future, response):
810-
log.debug("Received group coordinator response %s", response)
812+
def _handle_find_coordinator_response(self, future, response):
813+
log.debug("Received find coordinator response %s", response)
811814

812815
error_type = Errors.for_code(response.error_code)
813816
if error_type is Errors.NoError:

kafka/coordinator/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
334334
all_subscribed_topics = set()
335335
for member in members:
336336
subscription = Subscription(
337-
ConsumerProtocol[0].METADATA.decode(member.metadata_bytes),
337+
ConsumerProtocol[0].METADATA.decode(member.metadata),
338338
member.group_instance_id
339339
)
340340
member_subscriptions[member.member_id] = subscription

kafka/producer/sender.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
405405
batches_by_partition = dict([(batch.topic_partition, batch)
406406
for batch in batches])
407407

408-
for topic, partitions in response.topics:
408+
for topic, partitions in response.responses:
409409
for partition_info in partitions:
410410
log_append_time = -1
411411
log_start_offset = -1
@@ -609,17 +609,17 @@ def _produce_request(self, node_id, acks, timeout, batches):
609609
if version >= 3:
610610
return ProduceRequest[version](
611611
transactional_id=transactional_id,
612-
required_acks=acks,
613-
timeout=timeout,
614-
topics=topic_partition_data,
612+
acks=acks,
613+
timeout_ms=timeout,
614+
topic_data=topic_partition_data,
615615
)
616616
else:
617617
if transactional_id is not None:
618618
log.warning('%s: Broker does not support ProduceRequest v3+, required for transactional_id', str(self))
619619
return ProduceRequest[version](
620-
required_acks=acks,
621-
timeout=timeout,
622-
topics=topic_partition_data,
620+
acks=acks,
621+
timeout_ms=timeout,
622+
topic_data=topic_partition_data,
623623
)
624624

625625
def wakeup(self):

kafka/producer/transaction_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -658,10 +658,10 @@ def __init__(self, transaction_manager, topic_partitions):
658658
for tp in topic_partitions:
659659
topic_data[tp.topic].append(tp.partition)
660660
self.request = AddPartitionsToTxnRequest[version](
661-
transactional_id=self.transactional_id,
662-
producer_id=self.producer_id,
663-
producer_epoch=self.producer_epoch,
664-
topics=list(topic_data.items()))
661+
v3_and_below_transactional_id=self.transactional_id,
662+
v3_and_below_producer_id=self.producer_id,
663+
v3_and_below_producer_epoch=self.producer_epoch,
664+
v3_and_below_topics=list(topic_data.items()))
665665

666666
@property
667667
def priority(self):
@@ -673,7 +673,7 @@ def handle_response(self, response):
673673
self.retry_backoff_ms = self.transaction_manager.retry_backoff_ms
674674

675675
results = {TopicPartition(topic, partition): Errors.for_code(error_code)
676-
for topic, partition_data in response.results
676+
for topic, partition_data in response.results_by_topic_v3_and_below
677677
for partition, error_code in partition_data}
678678

679679
for tp, error in results.items():

kafka/protocol/group.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
DEFAULT_GENERATION_ID = -1
99
UNKNOWN_MEMBER_ID = ''
1010

11-
GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata_bytes"])
11+
GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata"])
1212
GroupMember.__new__.__defaults__ = (None,) * len(GroupMember._fields)
1313

1414

test/integration/fixtures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
614614
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
615615
replication_factor, [], [])], timeout_ms)
616616
response = self._send_request(request, timeout=timeout_ms)
617-
for topic_result in response.topic_errors:
617+
for topic_result in response.topics:
618618
error_code = topic_result[1]
619619
if error_code != 0:
620620
raise errors.for_code(error_code)

0 commit comments

Comments
 (0)