Skip to content

Commit 73941d8

Browse files
authored
Move broker version check data to kafka.protocol.broker_api_versions (#2736)
1 parent 07b63bf commit 73941d8

3 files changed

Lines changed: 74 additions & 65 deletions

File tree

kafka/conn.py

Lines changed: 7 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,12 @@
1111
import kafka.errors as Errors
1212
from kafka.future import Future
1313
from kafka.metrics.stats import Avg, Count, Max, Rate
14-
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
1514
from kafka.protocol.api_versions import ApiVersionsRequest
16-
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
17-
from kafka.protocol.commit import OffsetFetchRequest
18-
from kafka.protocol.fetch import FetchRequest
19-
from kafka.protocol.find_coordinator import FindCoordinatorRequest
20-
from kafka.protocol.list_offsets import ListOffsetsRequest
21-
from kafka.protocol.metadata import MetadataRequest
15+
from kafka.protocol.broker_api_versions import (
16+
BROKER_API_VERSIONS, VERSION_CHECKS,
17+
infer_broker_version_from_api_versions,
18+
)
2219
from kafka.protocol.parser import KafkaProtocol
23-
from kafka.protocol.produce import ProduceRequest
2420
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
2521
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
2622
from kafka.protocol.types import Int32
@@ -216,12 +212,6 @@ class BrokerConnection(object):
216212
'socks5_proxy': None,
217213
}
218214
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
219-
VERSION_CHECKS = (
220-
((0, 9), ListGroupsRequest[0]()),
221-
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
222-
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
223-
((0, 8, 0), MetadataRequest[0]([])),
224-
)
225215

226216
def __init__(self, host, port, afi, **configs):
227217
self.host = host
@@ -556,8 +546,8 @@ def _try_api_versions_check(self):
556546
self._api_versions_future = future
557547
self.state = ConnectionStates.API_VERSIONS_RECV
558548
self.config['state_change_callback'](self.node_id, self._sock, self)
559-
elif self._check_version_idx < len(self.VERSION_CHECKS):
560-
version, request = self.VERSION_CHECKS[self._check_version_idx]
549+
elif self._check_version_idx < len(VERSION_CHECKS):
550+
version, request = VERSION_CHECKS[self._check_version_idx]
561551
future = Future()
562552
self._api_versions_check_timeout /= 2
563553
response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout)
@@ -605,7 +595,7 @@ def _handle_api_versions_response(self, future, response):
605595
(api_version_data[0], (api_version_data[1], api_version_data[2]))
606596
for api_version_data in response.api_versions
607597
])
608-
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
598+
self._api_version = infer_broker_version_from_api_versions(self._api_versions)
609599
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
610600
future.success(self._api_version)
611601
self.connect()
@@ -1215,52 +1205,6 @@ def get_api_versions(self):
12151205
self.check_version()
12161206
return self._api_versions
12171207

1218-
def _infer_broker_version_from_api_versions(self, api_versions):
1219-
# The logic here is to check the list of supported request versions
1220-
# in reverse order. As soon as we find one that works, return it
1221-
test_cases = [
1222-
# format (<broker version>, <needed struct>)
1223-
# Make sure to update consumer_integration test check when adding newer versions.
1224-
# ((3, 9), FetchRequest[17]),
1225-
# ((3, 8), ProduceRequest[11]),
1226-
# ((3, 7), FetchRequest[16]),
1227-
# ((3, 6), AddPartitionsToTxnRequest[4]),
1228-
# ((3, 5), FetchRequest[15]),
1229-
# ((3, 4), StopReplicaRequest[3]), # broker-internal api...
1230-
# ((3, 3), DescribeAclsRequest[3]),
1231-
# ((3, 2), JoinGroupRequest[9]),
1232-
# ((3, 1), FetchRequest[13]),
1233-
# ((3, 0), ListOffsetsRequest[7]),
1234-
# ((2, 8), ProduceRequest[9]),
1235-
# ((2, 7), FetchRequest[12]),
1236-
# ((2, 6), ListGroupsRequest[4]),
1237-
# ((2, 5), JoinGroupRequest[7]),
1238-
((2, 6), DescribeClientQuotasRequest[0]),
1239-
((2, 5), DescribeAclsRequest[2]),
1240-
((2, 4), ProduceRequest[8]),
1241-
((2, 3), FetchRequest[11]),
1242-
((2, 2), ListOffsetsRequest[5]),
1243-
((2, 1), FetchRequest[10]),
1244-
((2, 0), FetchRequest[8]),
1245-
((1, 1), FetchRequest[7]),
1246-
((1, 0), MetadataRequest[5]),
1247-
((0, 11), MetadataRequest[4]),
1248-
((0, 10, 2), OffsetFetchRequest[2]),
1249-
((0, 10, 1), MetadataRequest[2]),
1250-
]
1251-
1252-
# Get the best match of test cases
1253-
for broker_version, proto_struct in sorted(test_cases, reverse=True):
1254-
if proto_struct.API_KEY not in api_versions:
1255-
continue
1256-
min_version, max_version = api_versions[proto_struct.API_KEY]
1257-
if min_version <= proto_struct.API_VERSION <= max_version:
1258-
return broker_version
1259-
1260-
# We know that ApiVersionsResponse is only supported in 0.10+
1261-
# so if all else fails, choose that
1262-
return (0, 10, 0)
1263-
12641208
def check_version(self, timeout=2, **kwargs):
12651209
"""Attempt to guess the broker version.
12661210

kafka/protocol/broker_api_versions.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,68 @@
1+
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
2+
from kafka.protocol.commit import OffsetFetchRequest
3+
from kafka.protocol.fetch import FetchRequest
4+
from kafka.protocol.find_coordinator import FindCoordinatorRequest
5+
from kafka.protocol.list_offsets import ListOffsetsRequest
6+
from kafka.protocol.metadata import MetadataRequest
7+
from kafka.protocol.produce import ProduceRequest
8+
9+
10+
def infer_broker_version_from_api_versions(api_versions):
11+
# The logic here is to check the list of supported request versions
12+
# in reverse order. As soon as we find one that works, return it
13+
test_cases = [
14+
# format (<broker version>, <needed struct>)
15+
# Make sure to update consumer_integration test check when adding newer versions.
16+
# ((3, 9), FetchRequest[17]),
17+
# ((3, 8), ProduceRequest[11]),
18+
# ((3, 7), FetchRequest[16]),
19+
# ((3, 6), AddPartitionsToTxnRequest[4]),
20+
# ((3, 5), FetchRequest[15]),
21+
# ((3, 4), StopReplicaRequest[3]), # broker-internal api...
22+
# ((3, 3), DescribeAclsRequest[3]),
23+
# ((3, 2), JoinGroupRequest[9]),
24+
# ((3, 1), FetchRequest[13]),
25+
# ((3, 0), ListOffsetsRequest[7]),
26+
# ((2, 8), ProduceRequest[9]),
27+
# ((2, 7), FetchRequest[12]),
28+
# ((2, 6), ListGroupsRequest[4]),
29+
# ((2, 5), JoinGroupRequest[7]),
30+
((2, 6), DescribeClientQuotasRequest[0]),
31+
((2, 5), DescribeAclsRequest[2]),
32+
((2, 4), ProduceRequest[8]),
33+
((2, 3), FetchRequest[11]),
34+
((2, 2), ListOffsetsRequest[5]),
35+
((2, 1), FetchRequest[10]),
36+
((2, 0), FetchRequest[8]),
37+
((1, 1), FetchRequest[7]),
38+
((1, 0), MetadataRequest[5]),
39+
((0, 11), MetadataRequest[4]),
40+
((0, 10, 2), OffsetFetchRequest[2]),
41+
((0, 10, 1), MetadataRequest[2]),
42+
]
43+
44+
# Get the best match of test cases
45+
for broker_version, proto_struct in sorted(test_cases, reverse=True):
46+
if proto_struct.API_KEY not in api_versions:
47+
continue
48+
min_version, max_version = api_versions[proto_struct.API_KEY]
49+
if min_version <= proto_struct.API_VERSION <= max_version:
50+
return broker_version
51+
52+
# We know that ApiVersionsResponse is only supported in 0.10+
53+
# so if all else fails, choose that
54+
return (0, 10, 0)
55+
56+
57+
# Fallback version checks for brokers that do not support ApiVersionsCheck
58+
VERSION_CHECKS = (
59+
((0, 9), ListGroupsRequest[0]()),
60+
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
61+
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
62+
((0, 8, 0), MetadataRequest[0]([])),
63+
)
64+
65+
166
BROKER_API_VERSIONS = {
267
# api_versions responses prior to (0, 10) are synthesized for compatibility
368
(0, 8, 0): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0)},

test/test_conn.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from kafka.conn import BrokerConnection, ConnectionStates
1010
from kafka.future import Future
11-
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError
11+
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS
1212
from kafka.metrics.metrics import Metrics
1313
from kafka.metrics.stats.sensor import Sensor
1414
from kafka.protocol.api import RequestHeader
@@ -91,7 +91,7 @@ def test_api_versions_check(_socket, mocker):
9191
assert conn._try_api_versions_check() is False
9292
assert conn.connecting() is True
9393

94-
conn._check_version_idx = len(conn.VERSION_CHECKS)
94+
conn._check_version_idx = len(VERSION_CHECKS)
9595
conn._api_versions_future = None
9696
assert conn._try_api_versions_check() is False
9797
assert conn.connecting() is False

0 commit comments

Comments
 (0)