Skip to content

Commit 6af7ff5

Browse files
dpkpclaude
andauthored
KIP-516: Initial topic id (uuid) support (#3031)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 2f4b81e commit 6af7ff5

6 files changed

Lines changed: 308 additions & 11 deletions

File tree

kafka/admin/_cluster.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from enum import IntEnum
77
import logging
88
from typing import TYPE_CHECKING
9+
import uuid
910

1011
import kafka.errors as Errors
1112
from kafka.protocol.api_key import ApiKey
@@ -30,11 +31,20 @@ class ClusterAdminMixin:
3031
_manager: KafkaConnectionManager
3132

3233
async def _get_cluster_metadata(self, topics):
33-
"""topics = [] for no topics, None for all."""
34+
"""topics = [] for no topics, None for all. Items may be topic-name
35+
strings or :class:`uuid.UUID` topic ids (KIP-516, requires broker
36+
>= 2.8 / MetadataRequest v12+)."""
37+
_Topic = MetadataRequest.MetadataRequestTopic
38+
if topics is None:
39+
request_topics = None
40+
else:
41+
request_topics = [
42+
_Topic(name=None, topic_id=t) if isinstance(t, uuid.UUID)
43+
else _Topic(name=t)
44+
for t in topics
45+
]
3446
request = MetadataRequest(
35-
topics=[
36-
MetadataRequest.MetadataRequestTopic(name=topic)
37-
for topic in topics] if topics is not None else None,
47+
topics=request_topics,
3848
allow_auto_topic_creation=False,
3949
include_cluster_authorized_operations=True,
4050
include_topic_authorized_operations=True,

kafka/admin/_topics.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ def describe_topics(self, topics=None):
3838
"""Fetch metadata for the specified topics or all topics if None.
3939
4040
Keyword Arguments:
41-
topics ([str], optional) A list of topic names. If None, metadata for all
42-
topics is retrieved.
41+
topics (list, optional): A list of topic names or
42+
:class:`uuid.UUID` topic ids (KIP-516). Strings and UUIDs may
43+
be mixed. Describing by id requires broker >= 2.8
44+
(MetadataRequest v12+); name-based describe works on any
45+
broker. If None, metadata for all topics is retrieved.
4346
4447
Returns:
4548
A list of dicts describing each topic (including partition info).
@@ -110,7 +113,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
110113
in broker metadata with a leader assigned for every partition. Default: False
111114
112115
Returns:
113-
dict of CreateTopicResponse key/vals
116+
dict of CreateTopicsResponse key/vals.
114117
"""
115118
if validate_only and wait_for_metadata:
116119
raise ValueError('validate_only and wait_for_metadata are mutually exclusive')
@@ -131,7 +134,6 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
131134
topics=topics,
132135
timeout_ms=timeout_ms,
133136
validate_only=validate_only,
134-
max_version=3,
135137
)
136138
def response_errors(r):
137139
for topic in r.topics:

kafka/cli/admin/topics/describe.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
1+
import uuid
2+
3+
14
class DescribeTopics:
25
COMMAND = 'describe'
36
HELP = 'Describe Kafka Topics'
47

58
@classmethod
69
def add_arguments(cls, parser):
7-
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics')
10+
parser.add_argument('-t', '--topic', type=str, action='append', dest='topics', default=[], help='topic name')
11+
parser.add_argument('--id', type=str, action='append', dest='topic_ids', default=[],
12+
help='topic UUID (requires broker >= 2.8, KIP-516)')
813

914
@classmethod
1015
def command(cls, client, args):
11-
return client.describe_topics(args.topics or None)
16+
topic_ids = [uuid.UUID(topic_id) for topic_id in args.topic_ids]
17+
topics = args.topics + topic_ids
18+
return client.describe_topics(topics or None)

kafka/cluster.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import socket
77
import threading
88
import time
9+
import uuid
910
import weakref
1011

1112
from kafka import errors as Errors
@@ -53,6 +54,8 @@ def __init__(self, **configs):
5354
self._brokers = {} # node_id -> MetadataResponseBroker
5455
self._partitions = {} # topic -> partition -> PartitionMetadata
5556
self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...}
57+
self._topic_ids = {} # topic name -> uuid.UUID
58+
self._topic_names_by_id = {} # uuid.UUID -> topic name
5659
self._coordinators = {} # (key_type, key) -> node_id
5760
self._last_refresh_ms = 0
5861
self._last_successful_refresh_ms = 0
@@ -472,6 +475,22 @@ def metadata_request(self):
472475
include_topic_authorized_operations=False,
473476
)
474477

478+
def topic_id(self, topic_name):
479+
"""Return the topic UUID for ``topic_name``, or None if unknown.
480+
481+
Populated from MetadataResponse v10+ (Kafka 2.8+, KIP-516). Older
482+
responses leave this empty.
483+
"""
484+
return self._topic_ids.get(topic_name)
485+
486+
def topic_name_for_id(self, topic_id):
487+
"""Return the topic name for ``topic_id`` (uuid.UUID), or None.
488+
489+
Reverse lookup of :meth:`topic_id`. Populated from MetadataResponse
490+
v10+ (KIP-516).
491+
"""
492+
return self._topic_names_by_id.get(topic_id)
493+
475494
def failed_update(self, exception):
476495
"""Update cluster state given a failed MetadataRequest."""
477496
f = None
@@ -520,6 +539,8 @@ def update_metadata(self, metadata):
520539
_new_broker_partitions = collections.defaultdict(set)
521540
_new_unauthorized_topics = set()
522541
_new_internal_topics = set()
542+
_new_topic_ids = {}
543+
_new_topic_names_by_id = {}
523544
_retry_topics = set()
524545

525546
# KAFKA-9212: pre-2.4 brokers may emit stale leader_epoch values
@@ -537,11 +558,23 @@ def update_metadata(self, metadata):
537558
if t.is_internal:
538559
_new_internal_topics.add(topic)
539560
error_type = Errors.for_code(t.error_code)
561+
new_topic_id = t.topic_id
562+
recreated = False
563+
if new_topic_id is not None and topic is not None:
564+
prior = self._topic_ids.get(topic)
565+
if prior is not None and prior != new_topic_id:
566+
log.warning(
567+
"Topic %s topic_id changed from %s to %s -- likely"
568+
" recreated; resetting cached leader epochs.",
569+
topic, prior, new_topic_id)
570+
recreated = True
571+
_new_topic_ids[topic] = new_topic_id
572+
_new_topic_names_by_id[new_topic_id] = topic
540573
if error_type is Errors.NoError:
541574
_new_partitions[topic] = {}
542575
for p_data in t.partitions:
543576
partition = p_data.partition_index
544-
if not epoch_reliable:
577+
if not epoch_reliable or recreated:
545578
p_data.leader_epoch = -1
546579
_new_partitions[topic][partition] = p_data
547580
if p_data.leader_id != -1:
@@ -576,6 +609,13 @@ def update_metadata(self, metadata):
576609
self._broker_partitions = _new_broker_partitions
577610
self.unauthorized_topics = _new_unauthorized_topics
578611
self.internal_topics = _new_internal_topics
612+
# Pre-v10 responses don't carry topic_id, so the wholesale swap
613+
# would clobber known ids during a rolling downgrade (or any
614+
# cross-broker version skew). Only replace the index when the
615+
# response actually had a chance to populate it.
616+
if metadata.API_VERSION >= 10:
617+
self._topic_ids = _new_topic_ids
618+
self._topic_names_by_id = _new_topic_names_by_id
579619
self._need_update = len(_retry_topics) > 0
580620
f = None
581621
if self._future:

test/admin/test_admin_topics.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
1+
import uuid
2+
13
import pytest
24

35
from kafka.admin import KafkaAdminClient, NewTopic, NewPartitions
46
from kafka.errors import KafkaTimeoutError, UnknownTopicOrPartitionError
7+
from kafka.protocol.admin import (
8+
CreateTopicsRequest,
9+
CreateTopicsResponse,
10+
DeleteTopicsRequest,
11+
DeleteTopicsResponse,
12+
)
13+
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
514

615

716
def test_new_partitions():
@@ -190,3 +199,112 @@ def fake_describe_topics(topics):
190199
monkeypatch.setattr(admin, 'describe_topics', fake_describe_topics)
191200
admin.wait_for_topics(['foo'], timeout_ms=5000)
192201
assert state['calls'] == 2
202+
203+
204+
# ---------------------------------------------------------------------------
205+
# KIP-516: topic UUIDs in admin requests/responses
206+
# ---------------------------------------------------------------------------
207+
208+
209+
class TestTopicIdAdmin:
210+
"""KIP-516 topic ids in the admin surface: delete-by-id, describe-by-id,
211+
and CreateTopics surfacing topic_id from v7+ responses."""
212+
213+
def test_delete_topics_by_id_encodes_topic_id(self, broker, admin):
214+
"""delete_topics already accepts uuid.UUID items; pin the wire shape."""
215+
captured = {}
216+
217+
def handler(api_key, api_version, correlation_id, request_bytes):
218+
decoded = DeleteTopicsRequest.decode(
219+
request_bytes, version=api_version, header=True)
220+
captured['request'] = decoded
221+
captured['version'] = api_version
222+
return DeleteTopicsResponse(throttle_time_ms=0, responses=[])
223+
224+
broker.respond_fn(DeleteTopicsRequest, handler)
225+
226+
u = uuid.uuid4()
227+
admin.delete_topics([u])
228+
229+
req = captured['request']
230+
# v6+ uses the structured DeleteTopicState carrying topic_id.
231+
assert captured['version'] >= 6
232+
assert len(req.topics) == 1
233+
assert req.topics[0].topic_id == u
234+
assert req.topics[0].name is None
235+
236+
def test_describe_topics_by_id_sends_topic_id(self, broker, admin):
237+
captured = {}
238+
239+
def handler(api_key, api_version, correlation_id, request_bytes):
240+
decoded = MetadataRequest.decode(
241+
request_bytes, version=api_version, header=True)
242+
captured['request'] = decoded
243+
captured['version'] = api_version
244+
return MetadataResponse(throttle_time_ms=0, brokers=[], cluster_id='c',
245+
controller_id=0, topics=[])
246+
247+
broker.respond_fn(MetadataRequest, handler)
248+
249+
u = uuid.uuid4()
250+
admin.describe_topics([u])
251+
252+
req = captured['request']
253+
# MetadataRequest must be v12+ for name=null + topic_id to work.
254+
assert captured['version'] >= 12
255+
assert len(req.topics) == 1
256+
assert req.topics[0].topic_id == u
257+
assert req.topics[0].name is None
258+
259+
def test_describe_topics_mixed_name_and_id(self, broker, admin):
260+
captured = {}
261+
262+
def handler(api_key, api_version, correlation_id, request_bytes):
263+
decoded = MetadataRequest.decode(
264+
request_bytes, version=api_version, header=True)
265+
captured['request'] = decoded
266+
return MetadataResponse(throttle_time_ms=0, brokers=[], cluster_id='c',
267+
controller_id=0, topics=[])
268+
269+
broker.respond_fn(MetadataRequest, handler)
270+
271+
u = uuid.uuid4()
272+
admin.describe_topics(['by-name', u])
273+
274+
req = captured['request']
275+
assert len(req.topics) == 2
276+
assert req.topics[0].name == 'by-name'
277+
assert req.topics[0].topic_id is None
278+
assert req.topics[1].name is None
279+
assert req.topics[1].topic_id == u
280+
281+
def test_create_topics_negotiates_v7_and_surfaces_topic_id(self, broker, admin):
282+
"""With max_version=3 dropped, negotiation should reach v7+ on a
283+
modern MockBroker and the response's topic_id flows through to_dict()."""
284+
captured = {}
285+
u = uuid.uuid4()
286+
287+
def handler(api_key, api_version, correlation_id, request_bytes):
288+
captured['version'] = api_version
289+
_Result = CreateTopicsResponse.CreatableTopicResult
290+
return CreateTopicsResponse(
291+
throttle_time_ms=0,
292+
topics=[_Result(
293+
name='foo',
294+
topic_id=u,
295+
error_code=0,
296+
error_message=None,
297+
num_partitions=1,
298+
replication_factor=1,
299+
configs=[],
300+
)],
301+
)
302+
303+
broker.respond_fn(CreateTopicsRequest, handler)
304+
305+
result = admin.create_topics({'foo': {'num_partitions': 1, 'replication_factor': 1}})
306+
307+
# Default MockBroker is (4, 2); negotiation should land on v7+.
308+
assert captured['version'] >= 7
309+
# to_dict() stringifies the UUID; compare via uuid.UUID round-trip.
310+
assert uuid.UUID(result['topics'][0]['topic_id']) == u

0 commit comments

Comments
 (0)