Skip to content

Commit 67bf656

Browse files
dpkpclaude
andauthored
KIP-699: FindCoordinatorRequest v4 -- multi-group support (#3025)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent c25ba7b commit 67bf656

6 files changed

Lines changed: 340 additions & 42 deletions

File tree

kafka/admin/_groups.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,15 @@ class GroupAdminMixin:
3737

3838
# -- Describe groups ----------------------------------------------
3939

40-
def _describe_groups_request(self, group_id):
40+
def _describe_groups_request(self, group_ids):
4141
request = DescribeGroupsRequest(
42-
groups=[group_id],
42+
groups=list(group_ids),
4343
include_authorized_operations=True
4444
)
4545
return request
4646

4747
def _describe_groups_process_response(self, response):
4848
"""Process a DescribeGroupsResponse into a group description."""
49-
assert len(response.groups) == 1
5049
for group in response.groups:
5150
for member in group.members:
5251
if member.member_metadata:
@@ -73,13 +72,20 @@ def _describe_groups_process_response(self, response):
7372
return results
7473

7574
async def _async_describe_groups(self, group_ids, group_coordinator_id=None):
75+
# Bucket groups by coordinator. One DescribeGroups per coordinator.
76+
coordinators_groups = defaultdict(list)
77+
if group_coordinator_id is not None:
78+
coordinators_groups[group_coordinator_id] = list(group_ids)
79+
else:
80+
coordinator_ids = await self._find_coordinator_ids(group_ids)
81+
for group_id, coordinator_id in coordinator_ids.items():
82+
coordinators_groups[coordinator_id].append(group_id)
83+
7684
results = {}
77-
for group_id in group_ids:
78-
coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id)
79-
request = self._describe_groups_request(group_id)
85+
for coordinator_id, coordinator_group_ids in coordinators_groups.items():
86+
request = self._describe_groups_request(coordinator_group_ids)
8087
response = await self._manager.send(request, node_id=coordinator_id)
8188
results.update(self._describe_groups_process_response(response))
82-
# Combine key/vals from multiple requests into single dict
8389
return results
8490

8591
def describe_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
@@ -252,8 +258,8 @@ def _list_group_offsets_process_response(self, response, group_id=None):
252258
async def _async_list_group_offsets(self, group_specs):
253259
# Bucket groups by coordinator. One OffsetFetch per coordinator.
254260
coordinators_groups = defaultdict(list)
255-
for group_id in group_specs:
256-
coordinator_id = await self._find_coordinator_id(group_id)
261+
coordinator_ids = await self._find_coordinator_ids(list(group_specs))
262+
for group_id, coordinator_id in coordinator_ids.items():
257263
coordinators_groups[coordinator_id].append(group_id)
258264

259265
results = {}
@@ -318,8 +324,8 @@ async def _async_delete_groups(self, group_ids, group_coordinator_id=None):
318324
if group_coordinator_id is not None:
319325
coordinators_groups[group_coordinator_id] = group_ids
320326
else:
321-
for group_id in group_ids:
322-
coordinator_id = await self._find_coordinator_id(group_id)
327+
coordinator_ids = await self._find_coordinator_ids(group_ids)
328+
for group_id, coordinator_id in coordinator_ids.items():
323329
coordinators_groups[coordinator_id].append(group_id)
324330

325331
results = []
@@ -458,7 +464,6 @@ async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordin
458464
if group_coordinator_id is None:
459465
group_coordinator_id = await self._find_coordinator_id(group_id)
460466

461-
#import pdb; pdb.set_trace()
462467
current = (await self._async_list_group_offsets({group_id: list(all_tps)}))[group_id]
463468
earliest = await self._async_list_partition_offsets({tp: OffsetSpec.EARLIEST for tp in all_tps})
464469
latest = await self._async_list_partition_offsets({tp: OffsetSpec.LATEST for tp in all_tps})

kafka/admin/client.py

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -280,24 +280,59 @@ async def _refresh_controller_id(self, timeout_ms=30000):
280280
else:
281281
raise Errors.NodeNotReadyError('controller')
282282

283-
async def _find_coordinator_id(self, group_id):
284-
"""Find the broker node_id of the coordinator for a consumer group.
285-
286-
Results are cached; subsequent calls for the same group_id return
287-
the cached value without a network round-trip.
283+
async def _find_coordinator_ids(self, group_ids):
284+
"""Find broker node_ids of the coordinators for a set of consumer groups.
285+
286+
Returns a dict mapping group_id -> node_id. Results are cached;
287+
only groups not already in the cache hit the network. On brokers
288+
supporting FindCoordinator v4+ (KIP-699, Apache Kafka 3.0+), all
289+
unknown groups are resolved in a single batched request; older brokers
290+
fall back to one request per group.
288291
"""
289-
cached = self._coordinator_cache.get(group_id)
290-
if cached is not None:
291-
return cached
292-
request = FindCoordinatorRequest(group_id, 0, max_version=2) # key_type=0 for group
293-
response = await self._manager.send(request)
294-
error_type = Errors.for_code(response.error_code)
295-
if error_type is not Errors.NoError:
296-
raise error_type(
297-
"FindCoordinatorRequest failed with response '{}'."
298-
.format(response))
299-
self._coordinator_cache[group_id] = response.node_id
300-
return response.node_id
292+
result = {}
293+
unknown = []
294+
for group_id in group_ids:
295+
cached = self._coordinator_cache.get(group_id)
296+
if cached is not None:
297+
result[group_id] = cached
298+
else:
299+
unknown.append(group_id)
300+
if not unknown:
301+
return result
302+
303+
if self._manager.broker_version_data.api_version(FindCoordinatorRequest) >= 4:
304+
request = FindCoordinatorRequest(key_type=0, # group
305+
coordinator_keys=unknown,
306+
min_version=4)
307+
response = await self._manager.send(request)
308+
for coordinator in response.coordinators:
309+
error_type = Errors.for_code(coordinator.error_code)
310+
if error_type is not Errors.NoError:
311+
raise error_type(
312+
"FindCoordinatorRequest failed for group '{}': {}"
313+
.format(coordinator.key, coordinator.error_message))
314+
self._coordinator_cache[coordinator.key] = coordinator.node_id
315+
result[coordinator.key] = coordinator.node_id
316+
else:
317+
# Broker does not support batch api; fan-out request per-group
318+
for group_id in unknown:
319+
request = FindCoordinatorRequest(key=group_id,
320+
key_type=0, # group
321+
max_version=3)
322+
response = await self._manager.send(request)
323+
error_type = Errors.for_code(response.error_code)
324+
if error_type is not Errors.NoError:
325+
raise error_type(
326+
"FindCoordinatorRequest failed with response '{}'."
327+
.format(response))
328+
self._coordinator_cache[group_id] = response.node_id
329+
result[group_id] = response.node_id
330+
return result
331+
332+
async def _find_coordinator_id(self, group_id):
333+
"""Single-group wrapper for _find_coordinator_ids()"""
334+
ids = await self._find_coordinator_ids([group_id])
335+
return ids[group_id]
301336

302337
async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()):
303338
"""Send a Kafka protocol message to the cluster controller.

kafka/coordinator/base.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -789,10 +789,13 @@ async def _send_group_coordinator_request(self):
789789
if node_id is None:
790790
raise Errors.NodeNotReadyError('coordinator')
791791

792-
max_version = 3
792+
# Setting key, key_type, and coordinator_keys all at once lets the
793+
# connection layer negotiate any version: v0-v3 emit `key`/`key_type`,
794+
# v4+ (KIP-699) emit `key_type`/`coordinator_keys`.
793795
request = FindCoordinatorRequest(
794796
key=self.group_id,
795-
max_version=max_version)
797+
key_type=0,
798+
coordinator_keys=[self.group_id])
796799
log.debug("Sending group coordinator request for group %s to broker %s: %s",
797800
self.group_id, node_id, request)
798801

@@ -806,10 +809,13 @@ async def _send_group_coordinator_request(self):
806809
def _handle_find_coordinator_response(self, response):
807810
log.debug("Received find coordinator response %s", response)
808811

809-
error_type = Errors.for_code(response.error_code)
812+
# v4+ returns results in a Coordinators array; we always send a single
813+
# key, so the first entry is ours. v0-v3 returns top-level fields.
814+
result = response.coordinators[0] if response.coordinators else response
815+
error_type = Errors.for_code(result.error_code)
810816
if error_type is Errors.NoError:
811817
with self._lock:
812-
coordinator_id = self._cluster.add_coordinator(response, 'group', self.group_id)
818+
coordinator_id = self._cluster.add_coordinator(result, 'group', self.group_id)
813819
if not coordinator_id:
814820
# This could happen if coordinator metadata is different
815821
# than broker metadata

kafka/producer/transaction_manager.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,10 +1075,13 @@ def __init__(self, transaction_manager, coord_type, coord_key):
10751075
coord_type_int8 = 1
10761076
else:
10771077
raise ValueError("Unrecognized coordinator type: %s" % (coord_type,))
1078+
# Setting key, key_type, and coordinator_keys all at once lets the
1079+
# connection layer negotiate any version: v0-v3 emit `key`/`key_type`,
1080+
# v4+ (KIP-699) emit `key_type`/`coordinator_keys`.
10781081
self.request = FindCoordinatorRequest(
10791082
key=coord_key,
10801083
key_type=coord_type_int8,
1081-
max_version=3,
1084+
coordinator_keys=[coord_key],
10821085
)
10831086

10841087
@property
@@ -1094,11 +1097,14 @@ def coordinator_key(self):
10941097
return None
10951098

10961099
def handle_response(self, response):
1097-
error_type = Errors.for_code(response.error_code)
1100+
# v4+ returns results in a Coordinators array; we always send a single
1101+
# key, so the first entry is ours. v0-v3 returns top-level fields.
1102+
result = response.coordinators[0] if response.coordinators else response
1103+
error_type = Errors.for_code(result.error_code)
10981104

10991105
if error_type is Errors.NoError:
11001106
coordinator_id = self.transaction_manager._metadata.add_coordinator(
1101-
response, self._coord_type, self._coord_key)
1107+
result, self._coord_type, self._coord_key)
11021108
if self._coord_type == 'group':
11031109
self.transaction_manager._consumer_group_coordinator = coordinator_id
11041110
elif self._coord_type == 'transaction':

0 commit comments

Comments
 (0)