Skip to content

Commit 484939f

Browse files
dpkpclaude
andauthored
Admin: Cleanup alter_partition_reassignments (#3002)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent d962915 commit 484939f

4 files changed

Lines changed: 144 additions & 59 deletions

File tree

kafka/admin/_partitions.py

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -268,45 +268,63 @@ def _process_alter_partition_reassignments_input(reassignments):
268268
_Partition = _Topic.ReassignablePartition
269269
topic2partitions = defaultdict(list)
270270
for tp, replicas in reassignments.items():
271+
if replicas is not None:
272+
replicas = list(replicas)
273+
if not replicas:
274+
raise ValueError(
275+
"Replica list for %s must be non-empty; "
276+
"use None to cancel a reassignment." % (tp,))
277+
elif not all(isinstance(item, int) for item in replicas):
278+
raise ValueError(
279+
"Replica list for %s must be int broker_ids." % (tp,))
271280
topic2partitions[tp.topic].append(_Partition(
272281
partition_index=tp.partition,
273-
replicas=list(replicas) if replicas is not None else None,
282+
replicas=replicas,
274283
))
275284
return [_Topic(name=topic, partitions=parts) for topic, parts in topic2partitions.items()]
276285

277-
def alter_partition_reassignments(self, reassignments, timeout_ms=None, raise_errors=True):
286+
def alter_partition_reassignments(self, reassignments, timeout_ms=None):
278287
"""Alter the replica sets for the given partitions.
279288
280289
Arguments:
281290
reassignments (dict): A dict mapping
282-
:class:`~kafka.TopicPartition` to a list of broker IDs for
283-
the new replica set, or ``None`` to cancel a pending
284-
reassignment for that partition.
291+
:class:`~kafka.TopicPartition` to a list of broker IDs
292+
for the new replica set, or ``None`` to cancel a
293+
pending reassignment for that partition.
285294
286295
Keyword Arguments:
287-
timeout_ms (numeric, optional): The time in ms to wait for the
288-
request to complete.
289-
raise_errors (bool, optional): Whether to raise errors as
290-
exceptions. Default True.
296+
timeout_ms (numeric, optional): The time in ms to wait for
297+
the request to complete.
298+
299+
Raises: top-level failures that prevents processing request.
300+
Does not raise partition-specific errors.
291301
292302
Returns:
293-
Decoded AlterPartitionReassignmentsResponse (as a dict).
303+
dict: A dict mapping each :class:`~kafka.TopicPartition`
304+
that the broker acknowledged to the error class for that
305+
partition, or ``None`` if the reassignment was accepted.
306+
Partitions the broker did not report on are absent from the
307+
dict.
294308
"""
295309
timeout_ms = self._validate_timeout(timeout_ms)
296310

297-
def response_errors(r):
298-
yield Errors.for_code(r.error_code)
299-
for topic in r.responses:
300-
for partition in topic.partitions:
301-
yield Errors.for_code(partition.error_code)
302-
303311
request = AlterPartitionReassignmentsRequest(
304312
timeout_ms=timeout_ms,
305313
topics=self._process_alter_partition_reassignments_input(reassignments),
306314
)
315+
316+
def top_level_error(r):
317+
yield Errors.for_code(r.error_code)
307318
response = self._manager.run(
308-
self._send_request_to_controller, request, response_errors, raise_errors)
309-
return response.to_dict()
319+
self._send_request_to_controller, request, top_level_error)
320+
321+
results = {}
322+
for topic in response.responses:
323+
for partition in topic.partitions:
324+
tp = TopicPartition(topic.name, partition.partition_index)
325+
err = Errors.for_code(partition.error_code)
326+
results[tp] = err if err is not Errors.NoError else None
327+
return results
310328

311329
async def _async_list_partition_reassignments(self, topic_partitions=None, timeout_ms=None):
312330
timeout_ms = self._validate_timeout(timeout_ms)
@@ -333,12 +351,10 @@ async def _async_list_partition_reassignments(self, topic_partitions=None, timeo
333351
timeout_ms=timeout_ms,
334352
topics=topics_field,
335353
)
336-
response = await self._manager.send(request)
337354

338-
top_level_error = Errors.for_code(response.error_code)
339-
if top_level_error is not Errors.NoError:
340-
raise top_level_error(
341-
"ListPartitionReassignmentsRequest failed: %s" % response.error_message)
355+
def top_level_error(r):
356+
yield Errors.for_code(r.error_code)
357+
response = await self._send_request_to_controller(request, top_level_error)
342358

343359
ret = {}
344360
for topic in response.topics:

kafka/cli/admin/partitions/alter_reassignments.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ def add_arguments(cls, parser):
1616
parser.add_argument(
1717
'--timeout-ms', type=int, default=None,
1818
help='Request timeout in milliseconds')
19-
parser.add_argument(
20-
'--no-raise-errors', dest='raise_errors', action='store_false',
21-
help='Do not raise on partition-level errors; return the response instead')
2219

2320
@classmethod
2421
def command(cls, client, args):
@@ -31,7 +28,10 @@ def command(cls, client, args):
3128
reassignments[tp] = None
3229
else:
3330
reassignments[tp] = [int(b) for b in replicas_str.split(',') if b]
34-
return client.alter_partition_reassignments(
31+
results = client.alter_partition_reassignments(
3532
reassignments,
36-
timeout_ms=args.timeout_ms,
37-
raise_errors=args.raise_errors)
33+
timeout_ms=args.timeout_ms)
34+
return {
35+
'%s:%d' % (tp.topic, tp.partition): (err.__name__ if err else None)
36+
for tp, err in results.items()
37+
}

test/admin/test_admin_partitions.py

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from kafka.admin import OffsetSpec, KafkaAdminClient
66
from kafka.errors import (
7+
InvalidPartitionsError,
78
NotLeaderForPartitionError,
89
UnknownTopicOrPartitionError,
910
IncompatibleBrokerVersion,
@@ -25,7 +26,7 @@
2526

2627

2728
class TestAlterPartitionReassignmentsMockBroker:
28-
def test_success_returns_dict(self, broker, admin):
29+
def test_success_returns_per_partition_none(self, broker, admin):
2930
Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse
3031
Partition = Topic.ReassignablePartitionResponse
3132
broker.respond(
@@ -39,6 +40,7 @@ def test_success_returns_dict(self, broker, admin):
3940
name='topic-a',
4041
partitions=[
4142
Partition(partition_index=0, error_code=0, error_message=None),
43+
Partition(partition_index=1, error_code=0, error_message=None),
4244
],
4345
),
4446
],
@@ -47,11 +49,13 @@ def test_success_returns_dict(self, broker, admin):
4749

4850
result = admin.alter_partition_reassignments({
4951
TopicPartition('topic-a', 0): [1, 2, 3],
52+
TopicPartition('topic-a', 1): [4, 5, 6],
5053
})
5154

52-
assert result['error_code'] == 0
53-
assert result['responses'][0]['name'] == 'topic-a'
54-
assert result['responses'][0]['partitions'][0]['error_code'] == 0
55+
assert result == {
56+
TopicPartition('topic-a', 0): None,
57+
TopicPartition('topic-a', 1): None,
58+
}
5559

5660
def test_cancel_reassignment_sends_null_replicas(self, broker, admin):
5761
captured = {}
@@ -66,7 +70,7 @@ def handler(api_key, api_version, correlation_id, request_bytes):
6670
broker.respond_fn(AlterPartitionReassignmentsRequest, handler)
6771

6872
admin.alter_partition_reassignments({
69-
TopicPartition('topic-a', 0): None, # cancel
73+
TopicPartition('topic-a', 0): None,
7074
TopicPartition('topic-a', 1): [4, 5],
7175
})
7276

@@ -77,7 +81,7 @@ def handler(api_key, api_version, correlation_id, request_bytes):
7781
assert by_index[0].replicas is None
7882
assert list(by_index[1].replicas) == [4, 5]
7983

80-
def test_partition_level_error_raises(self, broker, admin):
84+
def test_partition_level_error_returned_in_dict(self, broker, admin):
8185
Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse
8286
Partition = Topic.ReassignablePartitionResponse
8387
broker.respond(
@@ -90,45 +94,69 @@ def test_partition_level_error_raises(self, broker, admin):
9094
Topic(
9195
name='topic-a',
9296
partitions=[
93-
Partition(partition_index=0, error_code=37, # InvalidPartitionsError
97+
Partition(partition_index=0, error_code=0, error_message=None),
98+
Partition(partition_index=1, error_code=37,
9499
error_message='bad partition'),
95100
],
96101
),
97102
],
98103
),
99104
)
100105

106+
result = admin.alter_partition_reassignments({
107+
TopicPartition('topic-a', 0): [1, 2, 3],
108+
TopicPartition('topic-a', 1): [4, 5, 6],
109+
})
110+
111+
assert result == {
112+
TopicPartition('topic-a', 0): None,
113+
TopicPartition('topic-a', 1): InvalidPartitionsError,
114+
}
115+
116+
def test_top_level_error_raises(self, broker, admin):
117+
broker.respond(
118+
AlterPartitionReassignmentsRequest,
119+
AlterPartitionReassignmentsResponse(
120+
throttle_time_ms=0,
121+
error_code=38, # ClusterAuthorizationFailedError
122+
error_message='not authorized',
123+
responses=[],
124+
),
125+
)
126+
101127
with pytest.raises(Exception):
102128
admin.alter_partition_reassignments({
103129
TopicPartition('topic-a', 0): [1, 2, 3],
104130
})
105131

106-
def test_partition_error_suppressed_with_raise_errors_false(self, broker, admin):
107-
Topic = AlterPartitionReassignmentsResponse.ReassignableTopicResponse
108-
Partition = Topic.ReassignablePartitionResponse
132+
def test_empty_replica_list_rejected(self, broker, admin):
133+
with pytest.raises(ValueError, match='non-empty'):
134+
admin.alter_partition_reassignments({
135+
TopicPartition('topic-a', 0): [],
136+
})
137+
138+
def test_non_int_replica_rejected(self, broker, admin):
139+
with pytest.raises(ValueError, match='int broker_ids'):
140+
admin.alter_partition_reassignments({
141+
TopicPartition('topic-a', 0): ['1', '2', '3'],
142+
})
143+
144+
def test_missing_partition_response_is_absent(self, broker, admin):
109145
broker.respond(
110146
AlterPartitionReassignmentsRequest,
111147
AlterPartitionReassignmentsResponse(
112148
throttle_time_ms=0,
113149
error_code=0,
114150
error_message=None,
115-
responses=[
116-
Topic(
117-
name='topic-a',
118-
partitions=[
119-
Partition(partition_index=0, error_code=37, error_message='bad'),
120-
],
121-
),
122-
],
151+
responses=[],
123152
),
124153
)
125154

126-
result = admin.alter_partition_reassignments(
127-
{TopicPartition('topic-a', 0): [1, 2, 3]},
128-
raise_errors=False,
129-
)
155+
result = admin.alter_partition_reassignments({
156+
TopicPartition('topic-a', 0): [1, 2, 3],
157+
})
130158

131-
assert result['responses'][0]['partitions'][0]['error_code'] == 37
159+
assert result == {}
132160

133161

134162
# ---------------------------------------------------------------------------
@@ -242,15 +270,59 @@ def test_top_level_error_raises(self, broker, admin):
242270
ListPartitionReassignmentsRequest,
243271
ListPartitionReassignmentsResponse(
244272
throttle_time_ms=0,
245-
error_code=41, # NotControllerError
246-
error_message='not controller',
273+
error_code=29, # TopicAuthorizationFailedError
274+
error_message='not authorized',
247275
topics=[],
248276
),
249277
)
250278

251279
with pytest.raises(Exception) as exc_info:
252280
admin.list_partition_reassignments()
253-
assert 'not controller' in str(exc_info.value)
281+
assert 'not authorized' in str(exc_info.value)
282+
283+
def test_not_controller_retries(self, broker, admin):
284+
Topic = ListPartitionReassignmentsResponse.OngoingTopicReassignment
285+
Partition = Topic.OngoingPartitionReassignment
286+
# First response: NotControllerError. After controller refresh, success.
287+
broker.respond(
288+
ListPartitionReassignmentsRequest,
289+
ListPartitionReassignmentsResponse(
290+
throttle_time_ms=0,
291+
error_code=41, # NotControllerError
292+
error_message='not controller',
293+
topics=[],
294+
),
295+
)
296+
broker.respond(
297+
ListPartitionReassignmentsRequest,
298+
ListPartitionReassignmentsResponse(
299+
throttle_time_ms=0,
300+
error_code=0,
301+
error_message=None,
302+
topics=[
303+
Topic(
304+
name='topic-a',
305+
partitions=[
306+
Partition(
307+
partition_index=0,
308+
replicas=[1, 2, 3],
309+
adding_replicas=[4],
310+
removing_replicas=[1],
311+
),
312+
],
313+
),
314+
],
315+
),
316+
)
317+
318+
result = admin.list_partition_reassignments()
319+
assert result == {
320+
TopicPartition('topic-a', 0): {
321+
'replicas': [1, 2, 3],
322+
'adding_replicas': [4],
323+
'removing_replicas': [1],
324+
},
325+
}
254326

255327

256328
# ---------------------------------------------------------------------------

test/integration/test_admin_integration.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,15 +525,12 @@ def test_describe_metadata_quorum(kafka_admin_client):
525525

526526
@pytest.mark.skipif(env_kafka_version() < (2, 4), reason="AlterPartitionReassignments requires broker >=2.4")
527527
def test_alter_partition_reassignments(kafka_admin_client, topic):
528-
topic_metadata = kafka_admin_client.describe_topics([topic])[0]
529528
brokers = [b.node_id for b in kafka_admin_client._manager.cluster.brokers()]
530529
# Single-broker cluster: only valid reassignment target is [broker]
531530
tp = TopicPartition(topic, 0)
532531

533532
result = kafka_admin_client.alter_partition_reassignments({tp: brokers})
534-
assert result['error_code'] == 0
535-
assert len(result['responses']) == 1
536-
assert result['responses'][0]['name'] == topic
533+
assert result == {tp: None}
537534

538535

539536
@pytest.mark.skipif(env_kafka_version() < (2, 4), reason="ListPartitionReassignments requires broker >=2.4")

0 commit comments

Comments
 (0)