Skip to content

Commit ab1f9ca

Browse files
dpkpclaude
andauthored
KIP-602: Support client_dns_lookup in Consumer/Producer/Admin config (#3004)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 364fcad commit ab1f9ca

7 files changed

Lines changed: 125 additions & 3 deletions

File tree

kafka/admin/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ class KafkaAdminClient(
169169
'sock_chunk_buffer_count': 1000, # undocumented experimental option
170170
'retry_backoff_ms': 100,
171171
'metadata_max_age_ms': 300000,
172+
'client_dns_lookup': 'use_all_dns_ips',
172173
'security_protocol': 'PLAINTEXT',
173174
'ssl_context': None,
174175
'ssl_check_hostname': True,

kafka/cluster.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class ClusterMetadata:
4444
'metadata_max_age_ms': 300000,
4545
'bootstrap_servers': [],
4646
'allow_auto_create_topics': True,
47+
'client_dns_lookup': 'use_all_dns_ips',
4748
}
4849

4950
def __init__(self, **configs):
@@ -180,6 +181,9 @@ def _generate_bootstrap_brokers(self):
180181
# collect_hosts does not perform DNS, so we should be fine to re-use
181182
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
182183

184+
if self.config['client_dns_lookup'] == 'resolve_canonical_bootstrap_servers_only':
185+
bootstrap_hosts = expand_to_canonical_bootstrap_hosts(bootstrap_hosts)
186+
183187
brokers = {}
184188
for i, (host, port, _) in enumerate(bootstrap_hosts):
185189
node_id = 'bootstrap-%s' % i
@@ -615,8 +619,9 @@ def __str__(self):
615619

616620
def collect_hosts(hosts, randomize=True):
617621
"""
618-
Collects a comma-separated set of hosts (host:port) and optionally
619-
randomize the returned list.
622+
Processes a list (or comma-separated string) of hosts strings (host:port)
623+
and returns a list of (host, port, family) tuples.
624+
Optionally randomizes the returned list.
620625
"""
621626

622627
if isinstance(hosts, str):
@@ -634,6 +639,38 @@ def collect_hosts(hosts, randomize=True):
634639
return result
635640

636641

642+
def expand_to_canonical_bootstrap_hosts(hosts):
643+
"""Expand each bootstrap entry to one entry per canonical FQDN.
644+
645+
Mirrors Java's ``client.dns.lookup=resolve_canonical_bootstrap_servers_only``:
646+
forward-resolve each host, take the ``canonname`` reported by the resolver,
647+
and emit one bootstrap entry per unique canonical name. Useful for
648+
Kerberos round-robin DNS deployments where the principal must match each
649+
individual broker FQDN.
650+
651+
If a host fails to resolve, the original entry is preserved verbatim --
652+
matching Java's best-effort behaviour so bootstrap doesn't fail outright.
653+
"""
654+
expanded = []
655+
for host, port, afi in hosts:
656+
try:
657+
addrinfos = socket.getaddrinfo(
658+
host, port, afi, socket.SOCK_STREAM, 0, socket.AI_CANONNAME)
659+
except socket.gaierror as exc:
660+
log.warning('Canonical bootstrap resolution failed for %s:%s: %s; '
661+
'keeping original entry', host, port, exc)
662+
expanded.append((host, port, afi))
663+
continue
664+
seen = set()
665+
for family, _socktype, _proto, canonname, _sockaddr in addrinfos:
666+
name = canonname or host
667+
if name in seen:
668+
continue
669+
seen.add(name)
670+
expanded.append((name, port, family))
671+
return expanded
672+
673+
637674
def _address_family(address):
638675
"""
639676
Attempt to determine the family of an address (or hostname)

kafka/consumer/group.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ class KafkaConsumer:
311311
'isolation_level': 'read_uncommitted',
312312
'allow_auto_create_topics': True,
313313
'metadata_max_age_ms': 5 * 60 * 1000,
314+
'client_dns_lookup': 'use_all_dns_ips',
314315
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
315316
'max_poll_records': 500,
316317
'max_poll_interval_ms': 300000,

kafka/net/manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,21 @@ class KafkaConnectionManager:
5757
'metrics': None,
5858
'metric_group_prefix': '',
5959
'metadata_max_age_ms': 300000,
60+
'client_dns_lookup': 'use_all_dns_ips',
6061
}
62+
_VALID_DNS_LOOKUP_MODES = ('use_all_dns_ips', 'resolve_canonical_bootstrap_servers_only')
63+
6164
def __init__(self, net, **configs):
6265
self.config = copy.copy(self.DEFAULT_CONFIG)
6366
for key in self.config:
6467
if key in configs:
6568
self.config[key] = configs[key]
6669

70+
if self.config['client_dns_lookup'] not in self._VALID_DNS_LOOKUP_MODES:
71+
raise ValueError(
72+
"client_dns_lookup must be one of %s; got %r"
73+
% (self._VALID_DNS_LOOKUP_MODES, self.config['client_dns_lookup']))
74+
6775
if 'socks5_proxy' in configs:
6876
if self.config['proxy_url'] is None:
6977
log.warning('socks5_proxy is deprecated, use proxy_url instead')
@@ -73,6 +81,7 @@ def __init__(self, net, **configs):
7381
self.cluster = ClusterMetadata(
7482
bootstrap_servers=self.config['bootstrap_servers'],
7583
metadata_max_age_ms=self.config['metadata_max_age_ms'],
84+
client_dns_lookup=self.config['client_dns_lookup'],
7685
)
7786
self.cluster.attach(self)
7887
self._conns = {}

kafka/producer/kafka.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ class KafkaProducer:
398398
'max_request_size': 1048576,
399399
'allow_auto_create_topics': True,
400400
'metadata_max_age_ms': 300000,
401+
'client_dns_lookup': 'use_all_dns_ips',
401402
'retry_backoff_ms': 100,
402403
'request_timeout_ms': 30000,
403404
'receive_buffer_bytes': None,

test/net/test_manager.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ def test_api_versions(self, net):
4242
m = KafkaConnectionManager(net, api_version=(1, 0))
4343
assert m.broker_version_data == BrokerVersionData((1, 0))
4444

45+
def test_client_dns_lookup_default(self, net):
46+
m = KafkaConnectionManager(net)
47+
assert m.config['client_dns_lookup'] == 'use_all_dns_ips'
48+
assert m.cluster.config['client_dns_lookup'] == 'use_all_dns_ips'
49+
50+
def test_client_dns_lookup_canonical_passthrough(self, net):
51+
m = KafkaConnectionManager(net, client_dns_lookup='resolve_canonical_bootstrap_servers_only')
52+
assert m.cluster.config['client_dns_lookup'] == 'resolve_canonical_bootstrap_servers_only'
53+
54+
def test_client_dns_lookup_invalid_rejected(self, net):
55+
with pytest.raises(ValueError, match='client_dns_lookup'):
56+
KafkaConnectionManager(net, client_dns_lookup='default')
57+
with pytest.raises(ValueError, match='client_dns_lookup'):
58+
KafkaConnectionManager(net, client_dns_lookup='garbage')
59+
4560

4661
class TestKafkaConnectionManagerProxyConfig:
4762
def test_proxy_url_default_none(self, net):

test/test_cluster.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import pytest
77

8-
from kafka.cluster import collect_hosts
8+
from kafka.cluster import ClusterMetadata, collect_hosts, expand_to_canonical_bootstrap_hosts
99
from kafka.future import Future
1010
from kafka.protocol.metadata import MetadataResponse
1111
from kafka.structs import TopicPartition
@@ -241,6 +241,64 @@ def test_collect_hosts__protocol(self):
241241
])
242242

243243

244+
class TestExpandToCanonicalBootstrapHosts:
245+
def test_expands_multi_ip_host_to_canonical_names(self):
246+
addrinfos = [
247+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-1.kafka.example.com', ('10.0.0.1', 9092)),
248+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-2.kafka.example.com', ('10.0.0.2', 9092)),
249+
]
250+
with patch('socket.getaddrinfo', return_value=addrinfos):
251+
expanded = expand_to_canonical_bootstrap_hosts([('kafka.example.com', 9092, socket.AF_UNSPEC)])
252+
assert expanded == [
253+
('broker-1.kafka.example.com', 9092, socket.AF_INET),
254+
('broker-2.kafka.example.com', 9092, socket.AF_INET),
255+
]
256+
257+
def test_deduplicates_canonical_names(self):
258+
addrinfos = [
259+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-1.kafka.example.com', ('10.0.0.1', 9092)),
260+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-1.kafka.example.com', ('10.0.0.5', 9092)),
261+
]
262+
with patch('socket.getaddrinfo', return_value=addrinfos):
263+
expanded = expand_to_canonical_bootstrap_hosts([('kafka.example.com', 9092, socket.AF_UNSPEC)])
264+
assert expanded == [('broker-1.kafka.example.com', 9092, socket.AF_INET)]
265+
266+
def test_falls_back_to_original_on_resolution_failure(self):
267+
with patch('socket.getaddrinfo', side_effect=socket.gaierror('no such host')):
268+
expanded = expand_to_canonical_bootstrap_hosts([('kafka.example.com', 9092, socket.AF_UNSPEC)])
269+
assert expanded == [('kafka.example.com', 9092, socket.AF_UNSPEC)]
270+
271+
def test_missing_canonname_falls_back_to_input_host(self):
272+
addrinfos = [
273+
(socket.AF_INET, socket.SOCK_STREAM, 0, '', ('10.0.0.1', 9092)),
274+
]
275+
with patch('socket.getaddrinfo', return_value=addrinfos):
276+
expanded = expand_to_canonical_bootstrap_hosts([('kafka.example.com', 9092, socket.AF_UNSPEC)])
277+
assert expanded == [('kafka.example.com', 9092, socket.AF_INET)]
278+
279+
280+
class TestClusterMetadataClientDnsLookup:
281+
def test_default_does_not_expand_bootstrap(self):
282+
with patch('socket.getaddrinfo') as mock_gai:
283+
cluster = ClusterMetadata(bootstrap_servers='kafka.example.com:9092')
284+
assert mock_gai.call_count == 0
285+
nodes = cluster.bootstrap_brokers()
286+
assert [n.host for n in nodes] == ['kafka.example.com']
287+
288+
def test_canonical_mode_expands_bootstrap(self):
289+
addrinfos = [
290+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-1.kafka.example.com', ('10.0.0.1', 9092)),
291+
(socket.AF_INET, socket.SOCK_STREAM, 0, 'broker-2.kafka.example.com', ('10.0.0.2', 9092)),
292+
]
293+
with patch('socket.getaddrinfo', return_value=addrinfos):
294+
cluster = ClusterMetadata(
295+
bootstrap_servers='kafka.example.com:9092',
296+
client_dns_lookup='resolve_canonical_bootstrap_servers_only')
297+
nodes = cluster.bootstrap_brokers()
298+
assert sorted(n.host for n in nodes) == [
299+
'broker-1.kafka.example.com', 'broker-2.kafka.example.com']
300+
301+
244302
class TestClusterMetadataRefresh:
245303
def test_request_update_returns_future(self, cluster):
246304
f = cluster.request_update()

0 commit comments

Comments
 (0)