Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions kafka/admin/_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ class GroupAdminMixin:

# -- Describe groups ----------------------------------------------

def _describe_groups_request(self, group_id):
def _describe_groups_request(self, group_ids):
request = DescribeGroupsRequest(
groups=[group_id],
groups=list(group_ids),
include_authorized_operations=True
)
return request

def _describe_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
assert len(response.groups) == 1
for group in response.groups:
for member in group.members:
if member.member_metadata:
Expand All @@ -73,13 +72,20 @@ def _describe_groups_process_response(self, response):
return results

async def _async_describe_groups(self, group_ids, group_coordinator_id=None):
# Bucket groups by coordinator. One DescribeGroups per coordinator.
coordinators_groups = defaultdict(list)
if group_coordinator_id is not None:
coordinators_groups[group_coordinator_id] = list(group_ids)
else:
coordinator_ids = await self._find_coordinator_ids(group_ids)
for group_id, coordinator_id in coordinator_ids.items():
coordinators_groups[coordinator_id].append(group_id)

results = {}
for group_id in group_ids:
coordinator_id = group_coordinator_id or await self._find_coordinator_id(group_id)
request = self._describe_groups_request(group_id)
for coordinator_id, coordinator_group_ids in coordinators_groups.items():
request = self._describe_groups_request(coordinator_group_ids)
response = await self._manager.send(request, node_id=coordinator_id)
results.update(self._describe_groups_process_response(response))
# Combine key/vals from multiple requests into single dict
return results

def describe_groups(self, group_ids, group_coordinator_id=None, include_authorized_operations=False):
Expand Down Expand Up @@ -252,8 +258,8 @@ def _list_group_offsets_process_response(self, response, group_id=None):
async def _async_list_group_offsets(self, group_specs):
# Bucket groups by coordinator. One OffsetFetch per coordinator.
coordinators_groups = defaultdict(list)
for group_id in group_specs:
coordinator_id = await self._find_coordinator_id(group_id)
coordinator_ids = await self._find_coordinator_ids(list(group_specs))
for group_id, coordinator_id in coordinator_ids.items():
coordinators_groups[coordinator_id].append(group_id)

results = {}
Expand Down Expand Up @@ -318,8 +324,8 @@ async def _async_delete_groups(self, group_ids, group_coordinator_id=None):
if group_coordinator_id is not None:
coordinators_groups[group_coordinator_id] = group_ids
else:
for group_id in group_ids:
coordinator_id = await self._find_coordinator_id(group_id)
coordinator_ids = await self._find_coordinator_ids(group_ids)
for group_id, coordinator_id in coordinator_ids.items():
coordinators_groups[coordinator_id].append(group_id)

results = []
Expand Down Expand Up @@ -458,7 +464,6 @@ async def _async_reset_group_offsets(self, group_id, offset_specs, group_coordin
if group_coordinator_id is None:
group_coordinator_id = await self._find_coordinator_id(group_id)

#import pdb; pdb.set_trace()
current = (await self._async_list_group_offsets({group_id: list(all_tps)}))[group_id]
earliest = await self._async_list_partition_offsets({tp: OffsetSpec.EARLIEST for tp in all_tps})
latest = await self._async_list_partition_offsets({tp: OffsetSpec.LATEST for tp in all_tps})
Expand Down
69 changes: 52 additions & 17 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,24 +280,59 @@ async def _refresh_controller_id(self, timeout_ms=30000):
else:
raise Errors.NodeNotReadyError('controller')

async def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator for a consumer group.

Results are cached; subsequent calls for the same group_id return
the cached value without a network round-trip.
async def _find_coordinator_ids(self, group_ids):
"""Find broker node_ids of the coordinators for a set of consumer groups.

Returns a dict mapping group_id -> node_id. Results are cached;
only groups not already in the cache hit the network. On brokers
supporting FindCoordinator v4+ (KIP-699, Apache Kafka 3.0+), all
unknown groups are resolved in a single batched request; older brokers
fall back to one request per group.
"""
cached = self._coordinator_cache.get(group_id)
if cached is not None:
return cached
request = FindCoordinatorRequest(group_id, 0, max_version=2) # key_type=0 for group
response = await self._manager.send(request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"FindCoordinatorRequest failed with response '{}'."
.format(response))
self._coordinator_cache[group_id] = response.node_id
return response.node_id
result = {}
unknown = []
for group_id in group_ids:
cached = self._coordinator_cache.get(group_id)
if cached is not None:
result[group_id] = cached
else:
unknown.append(group_id)
if not unknown:
return result

if self._manager.broker_version_data.api_version(FindCoordinatorRequest) >= 4:
request = FindCoordinatorRequest(key_type=0, # group
coordinator_keys=unknown,
min_version=4)
response = await self._manager.send(request)
for coordinator in response.coordinators:
error_type = Errors.for_code(coordinator.error_code)
if error_type is not Errors.NoError:
raise error_type(
"FindCoordinatorRequest failed for group '{}': {}"
.format(coordinator.key, coordinator.error_message))
self._coordinator_cache[coordinator.key] = coordinator.node_id
result[coordinator.key] = coordinator.node_id
else:
# Broker does not support batch api; fan-out request per-group
for group_id in unknown:
request = FindCoordinatorRequest(key=group_id,
key_type=0, # group
max_version=3)
response = await self._manager.send(request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"FindCoordinatorRequest failed with response '{}'."
.format(response))
self._coordinator_cache[group_id] = response.node_id
result[group_id] = response.node_id
return result

async def _find_coordinator_id(self, group_id):
"""Single-group wrapper for _find_coordinator_ids()"""
ids = await self._find_coordinator_ids([group_id])
return ids[group_id]

async def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), raise_errors=True, ignore_errors=()):
"""Send a Kafka protocol message to the cluster controller.
Expand Down
14 changes: 10 additions & 4 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,13 @@ async def _send_group_coordinator_request(self):
if node_id is None:
raise Errors.NodeNotReadyError('coordinator')

max_version = 3
# Setting key, key_type, and coordinator_keys all at once lets the
# connection layer negotiate any version: v0-v3 emit `key`/`key_type`,
# v4+ (KIP-699) emit `key_type`/`coordinator_keys`.
request = FindCoordinatorRequest(
key=self.group_id,
max_version=max_version)
key_type=0,
coordinator_keys=[self.group_id])
log.debug("Sending group coordinator request for group %s to broker %s: %s",
self.group_id, node_id, request)

Expand All @@ -806,10 +809,13 @@ async def _send_group_coordinator_request(self):
def _handle_find_coordinator_response(self, response):
log.debug("Received find coordinator response %s", response)

error_type = Errors.for_code(response.error_code)
# v4+ returns results in a Coordinators array; we always send a single
# key, so the first entry is ours. v0-v3 returns top-level fields.
result = response.coordinators[0] if response.coordinators else response
error_type = Errors.for_code(result.error_code)
if error_type is Errors.NoError:
with self._lock:
coordinator_id = self._cluster.add_coordinator(response, 'group', self.group_id)
coordinator_id = self._cluster.add_coordinator(result, 'group', self.group_id)
if not coordinator_id:
# This could happen if coordinator metadata is different
# than broker metadata
Expand Down
12 changes: 9 additions & 3 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,10 +1075,13 @@ def __init__(self, transaction_manager, coord_type, coord_key):
coord_type_int8 = 1
else:
raise ValueError("Unrecognized coordinator type: %s" % (coord_type,))
# Setting key, key_type, and coordinator_keys all at once lets the
# connection layer negotiate any version: v0-v3 emit `key`/`key_type`,
# v4+ (KIP-699) emit `key_type`/`coordinator_keys`.
self.request = FindCoordinatorRequest(
key=coord_key,
key_type=coord_type_int8,
max_version=3,
coordinator_keys=[coord_key],
)

@property
Expand All @@ -1094,11 +1097,14 @@ def coordinator_key(self):
return None

def handle_response(self, response):
error_type = Errors.for_code(response.error_code)
# v4+ returns results in a Coordinators array; we always send a single
# key, so the first entry is ours. v0-v3 returns top-level fields.
result = response.coordinators[0] if response.coordinators else response
error_type = Errors.for_code(result.error_code)

if error_type is Errors.NoError:
coordinator_id = self.transaction_manager._metadata.add_coordinator(
response, self._coord_type, self._coord_key)
result, self._coord_type, self._coord_key)
if self._coord_type == 'group':
self.transaction_manager._consumer_group_coordinator = coordinator_id
elif self._coord_type == 'transaction':
Expand Down
Loading
Loading