Skip to content

Commit c25ba7b

Browse files
dpkpclaude
andauthored
OffsetFetch v8 (KIP-709): Use batch interface when available (#3024)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 82c1d5d commit c25ba7b

6 files changed

Lines changed: 369 additions & 95 deletions

File tree

kafka/admin/_groups.py

Lines changed: 101 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -179,78 +179,126 @@ def list_groups(self, broker_ids=None, states_filter=None, types_filter=None):
179179

180180
# -- List group offsets -------------------------------------------
181181

182-
def _list_group_offsets_request(self, group_id, partitions=None):
182+
def _list_group_offsets_requests(self, group_specs):
183183
_Topic = OffsetFetchRequest.OffsetFetchRequestTopic
184-
if partitions is None:
185-
min_version = 1
186-
topics = None
184+
_Group = OffsetFetchRequest.OffsetFetchRequestGroup
185+
_GroupTopic = _Group.OffsetFetchRequestTopics
186+
max_version = 8
187+
188+
groups = []
189+
for group_id, partitions in group_specs.items():
190+
if partitions is None:
191+
group_topics = None
192+
else:
193+
topics_partitions = defaultdict(set)
194+
for topic, partition in partitions:
195+
topics_partitions[topic].add(partition)
196+
group_topics = [
197+
_GroupTopic(name=name, partition_indexes=list(parts))
198+
for name, parts in topics_partitions.items()
199+
]
200+
groups.append(_Group(group_id=group_id, topics=group_topics))
201+
202+
if len(groups) == 0:
203+
raise ValueError('Empty group_specs!')
204+
# Return multple requests when broker does not support v8+
205+
if self._manager.broker_version_data.api_version(OffsetFetchRequest) < 8:
206+
for group in groups:
207+
min_version = 2 if group.topics is None else 0
208+
yield (group.group_id, OffsetFetchRequest(group_id=group.group_id,
209+
topics=group.topics,
210+
min_version=min_version,
211+
max_version=max_version))
187212
else:
188-
min_version = 0
189-
topics_partitions_dict = defaultdict(set)
190-
for topic, partition in partitions:
191-
topics_partitions_dict[topic].add(partition)
192-
topics = [
193-
_Topic(name=name, partition_indexes=list(partitions))
194-
for name, partitions in topics_partitions_dict.items()
195-
]
196-
return OffsetFetchRequest(group_id=group_id, topics=topics,
197-
min_version=min_version, max_version=6)
198-
199-
def _list_group_offsets_process_response(self, response):
200-
"""Process an OffsetFetchResponse."""
201-
if response.API_VERSION > 1:
202-
error_type = Errors.for_code(response.error_code)
203-
if error_type is not Errors.NoError:
204-
raise error_type(
205-
"OffsetFetchResponse failed with response '{}'."
206-
.format(response))
213+
yield (None, OffsetFetchRequest(groups=groups,
214+
min_version=8,
215+
max_version=max_version))
216+
217+
@staticmethod
218+
def _parse_group_offsets(group):
219+
"""Build {TopicPartition: OffsetAndMetadata} from an OffsetFetchResponse or OffsetFetchResponseGroup."""
220+
error_type = Errors.for_code(group.error_code)
221+
if error_type is not Errors.NoError:
222+
raise error_type(
223+
"OffsetFetchResponse failed for group '{}'.".format(group.group_id))
207224
results = {}
208-
for topic in response.topics:
225+
for topic in group.topics:
209226
for partition in topic.partitions:
210227
tp = TopicPartition(topic.name, partition.partition_index)
211-
error_type = Errors.for_code(partition.error_code)
212-
if error_type is not Errors.NoError:
213-
raise error_type(
228+
partition_error = Errors.for_code(partition.error_code)
229+
if partition_error is not Errors.NoError:
230+
raise partition_error(
214231
f"OffsetFetchResponse failed for partition {tp.partition}")
215232
results[tp] = OffsetAndMetadata(
216233
offset=partition.committed_offset,
217234
metadata=partition.metadata,
218-
leader_epoch=partition.committed_leader_epoch
235+
leader_epoch=partition.committed_leader_epoch,
219236
)
220237
return results
221238

222-
async def _async_list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
223-
if group_coordinator_id is None:
224-
group_coordinator_id = await self._find_coordinator_id(group_id)
225-
request = self._list_group_offsets_request(group_id, partitions)
226-
response = await self._manager.send(request, node_id=group_coordinator_id)
227-
return self._list_group_offsets_process_response(response)
239+
def _list_group_offsets_process_response(self, response, group_id=None):
240+
"""Process an OffsetFetchResponse."""
241+
error_type = Errors.for_code(response.error_code)
242+
if error_type is not Errors.NoError:
243+
raise error_type(
244+
"OffsetFetchResponse failed with response '{}'."
245+
.format(response))
246+
if response.API_VERSION >= 8:
247+
return {group.group_id: self._parse_group_offsets(group)
248+
for group in response.groups}
249+
else:
250+
return {group_id: self._parse_group_offsets(response)}
228251

229-
def list_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):
230-
"""Fetch committed offsets for a single consumer group.
252+
async def _async_list_group_offsets(self, group_specs):
253+
# Bucket groups by coordinator. One OffsetFetch per coordinator.
254+
coordinators_groups = defaultdict(list)
255+
for group_id in group_specs:
256+
coordinator_id = await self._find_coordinator_id(group_id)
257+
coordinators_groups[coordinator_id].append(group_id)
231258

232-
Note:
233-
This does not verify that the group_id or partitions actually exist
234-
in the cluster.
259+
results = {}
260+
_Group = OffsetFetchRequest.OffsetFetchRequestGroup
261+
_GroupTopic = _Group.OffsetFetchRequestTopics
262+
for coordinator_id, group_ids in coordinators_groups.items():
263+
for group_id, request in self._list_group_offsets_requests({group_id: group_specs[group_id]
264+
for group_id in group_ids}):
265+
response = await self._manager.send(request, node_id=coordinator_id)
266+
results.update(self._list_group_offsets_process_response(response, group_id=group_id))
267+
return results
235268

236-
As soon as any error is encountered, it is immediately raised.
269+
def list_group_offsets(self, group_specs):
270+
"""Fetch committed offsets for one or more consumer groups.
237271
238-
Arguments:
239-
group_id (str): The consumer group id name for which to fetch offsets.
272+
On brokers supporting OffsetFetch v8+ (Apache Kafka 3.0+, KIP-709), this
273+
issues a single OffsetFetch per coordinator covering all groups
274+
hosted by that coordinator. On older brokers it currently only supports
275+
one consumer group (per coordinator).
240276
241-
Keyword Arguments:
242-
group_coordinator_id (int, optional): The node_id of the group's coordinator
243-
broker. If set to None, will query the cluster to find the group
244-
coordinator. Default: None.
245-
partitions: A list of TopicPartitions for which to fetch
246-
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
247-
known offsets for the consumer group. Default: None.
277+
Arguments:
278+
group_specs (dict): Mapping of group_id (str) to either a list of
279+
:class:`~kafka.TopicPartition` to fetch, or None to fetch all
280+
committed offsets for that group.
281+
Or, one or more group_id (str or list[str]) to fetch all offsets
282+
for each group.
248283
249284
Returns:
250-
A dict mapping :class:`~kafka.TopicPartition` to
285+
A dict mapping group_id (str) to a dict mapping
286+
:class:`~kafka.TopicPartition` to
251287
:class:`~kafka.structs.OffsetAndMetadata`.
288+
289+
Raises:
290+
UnsupportedVersionError: if multiple groups are requested against
291+
a broker that does not support OffsetFetch v8+; or if group_spec
292+
with value None against a broker that does not support
293+
OffsetFetch v2+.
294+
BrokerResponseError: as soon as any group- or partition-level error
295+
is encountered.
252296
"""
253-
return self._manager.run(self._async_list_group_offsets, group_id, group_coordinator_id, partitions)
297+
if isinstance(group_specs, list):
298+
group_specs = {group_id: None for group_id in group_specs}
299+
elif isinstance(group_specs, str):
300+
group_specs = {group_specs: None}
301+
return self._manager.run(self._async_list_group_offsets, group_specs)
254302

255303
# -- Delete groups ------------------------------------------------
256304

@@ -410,7 +458,8 @@ async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordin
410458
if group_coordinator_id is None:
411459
group_coordinator_id = await self._find_coordinator_id(group_id)
412460

413-
current = await self._async_list_group_offsets(group_id, group_coordinator_id, all_tps)
461+
#import pdb; pdb.set_trace()
462+
current = (await self._async_list_group_offsets({group_id: list(all_tps)}))[group_id]
414463
earliest = await self._async_list_partition_offsets({tp: OffsetSpec.EARLIEST for tp in all_tps})
415464
latest = await self._async_list_partition_offsets({tp: OffsetSpec.LATEST for tp in all_tps})
416465

kafka/cli/admin/groups/list_offsets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def add_arguments(cls, parser):
1414

1515
@classmethod
1616
def command(cls, client, args):
17-
offsets = client.list_group_offsets(args.group_id)
17+
offsets = client.list_group_offsets(args.group_id)[args.group_id]
1818
latest = client.list_partition_offsets({tp: OffsetSpec.LATEST for tp in offsets})
1919
results = defaultdict(dict)
2020
for tp in latest:

kafka/coordinator/consumer.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,24 +1097,32 @@ async def _send_offset_fetch_request(self, partitions):
10971097
log.debug("Group %s fetching committed offsets for partitions: %s",
10981098
self.group_id, '(all)' if partitions is None else partitions)
10991099
# construct the request
1100-
max_version = 7
1100+
_Topic = OffsetFetchRequest.OffsetFetchRequestTopic
1101+
_Group = OffsetFetchRequest.OffsetFetchRequestGroup
1102+
_GroupTopic = _Group.OffsetFetchRequestTopics
11011103
if partitions is not None:
11021104
topic_partitions = collections.defaultdict(set)
11031105
for tp in partitions:
11041106
topic_partitions[tp.topic].add(tp.partition)
1105-
topic_partitions = list(topic_partitions.items())
1107+
topics = [_Topic(name=t, partition_indexes=list(p))
1108+
for t, p in topic_partitions.items()]
1109+
group_topics = [_GroupTopic(name=t, partition_indexes=list(p))
1110+
for t, p in topic_partitions.items()]
11061111
min_version = 0
11071112
else:
1108-
topic_partitions = None
1113+
topics = None
1114+
group_topics = None
11091115
min_version = 2
11101116

1117+
groups = [_Group(group_id=self.group_id, topics=group_topics)]
11111118
require_stable = self._isolation_level == IsolationLevel.READ_COMMITTED
11121119
request = OffsetFetchRequest(
11131120
group_id=self.group_id,
1114-
topics=topic_partitions,
1121+
topics=topics,
1122+
groups=groups,
11151123
require_stable=require_stable,
11161124
min_version=min_version,
1117-
max_version=max_version,
1125+
max_version=8,
11181126
)
11191127

11201128
try:
@@ -1126,8 +1134,16 @@ async def _send_offset_fetch_request(self, partitions):
11261134

11271135
def _handle_offset_fetch_response(self, response):
11281136
log.debug("Received OffsetFetchResponse: %s", response)
1129-
if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
1130-
error_type = Errors.for_code(response.error_code)
1137+
if response.API_VERSION >= 8:
1138+
group = response.groups[0]
1139+
top_level_error_code = group.error_code
1140+
topics = group.topics
1141+
else:
1142+
top_level_error_code = response.error_code if response.API_VERSION >= 2 else Errors.NoError.errno
1143+
topics = response.topics
1144+
1145+
if top_level_error_code != Errors.NoError.errno:
1146+
error_type = Errors.for_code(top_level_error_code)
11311147
log.debug("Offset fetch failed: %s", error_type.__name__)
11321148
error = error_type()
11331149
if error_type is Errors.NotCoordinatorError:
@@ -1142,14 +1158,13 @@ def _handle_offset_fetch_response(self, response):
11421158
raise error
11431159

11441160
offsets = {}
1145-
for topic, partitions in response.topics:
1161+
for topic, partitions in ((t.name, t.partitions) for t in topics):
11461162
for partition_data in partitions:
1147-
partition, offset = partition_data[:2]
1148-
if response.API_VERSION >= 5:
1149-
leader_epoch, metadata, error_code = partition_data[2:]
1150-
else:
1151-
metadata, error_code = partition_data[2:]
1152-
leader_epoch = -1
1163+
partition = partition_data.partition_index
1164+
offset = partition_data.committed_offset
1165+
leader_epoch = partition_data.committed_leader_epoch
1166+
metadata = partition_data.metadata
1167+
error_code = partition_data.error_code
11531168
tp = TopicPartition(topic, partition)
11541169
error_type = Errors.for_code(error_code)
11551170
if error_type is not Errors.NoError:

0 commit comments

Comments
 (0)