Skip to content

Commit 508d930

Browse files
committed
Add KIP-848 consumer groups support
- Split group listing by type (classic vs consumer) - Use DescribeConsumerGroups API for KIP-848 groups - Convert ConsumerGroupMember assignments to display format - Add groupType field to distinguish classic vs KIP-848 groups - Add Assigning and Reconciling states for KIP-848 - Display 'consumer (KIP-848)' vs 'classic' in UI statistics
1 parent 1ee3908 commit 508d930

5 files changed

Lines changed: 20257 additions & 31 deletions

File tree

backend/pkg/console/consumer_group_overview.go

Lines changed: 183 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
// ConsumerGroupOverview for a Kafka Consumer Group
2626
type ConsumerGroupOverview struct {
2727
GroupID string `json:"groupId"`
28+
GroupType string `json:"groupType"` // "classic" or "consumer" (KIP-848)
2829
State string `json:"state"`
2930
ProtocolType string `json:"protocolType"`
3031
Protocol string `json:"protocol"`
@@ -47,8 +48,9 @@ type GroupMemberAssignment struct {
4748
PartitionIDs []int32 `json:"partitionIds"`
4849
}
4950

50-
// GetConsumerGroupsOverview returns a ConsumerGroupOverview for all available consumer groups
51+
// GetConsumerGroupsOverview returns a ConsumerGroupOverview for all available consumer groups.
5152
// Pass nil for groupIDs if you want to fetch all available groups.
53+
// Supports both classic groups (DescribeGroups API) and KIP-848 consumer groups (DescribeConsumerGroups API).
5254
func (s *Service) GetConsumerGroupsOverview(ctx context.Context, groupIDs []string) ([]ConsumerGroupOverview, *rest.Error) {
5355
_, adminCl, err := s.kafkaClientFactory.GetKafkaClient(ctx)
5456
if err != nil {
@@ -64,14 +66,13 @@ func (s *Service) GetConsumerGroupsOverview(ctx context.Context, groupIDs []stri
6466
}
6567
}
6668

67-
if groupIDs == nil {
68-
groupIDs = groups.Groups()
69-
} else {
70-
// Not existent consumer groups will be reported as "dead" by Kafka. We would like to report them as 404 instead.
71-
// Hence we'll check if the passed group IDs exist in the response
69+
allGroupIDs := groups.Groups()
70+
if groupIDs != nil {
71+
allGroupIDs = groupIDs
72+
// Validate requested group IDs exist in the cluster
73+
clusterGroups := groups.Groups()
7274
for _, id := range groupIDs {
73-
exists := slices.Contains(groupIDs, id)
74-
if !exists {
75+
if !slices.Contains(clusterGroups, id) {
7576
return nil, &rest.Error{
7677
Err: fmt.Errorf("requested group id '%v' does not exist in Kafka cluster", id),
7778
Status: http.StatusNotFound,
@@ -82,25 +83,86 @@ func (s *Service) GetConsumerGroupsOverview(ctx context.Context, groupIDs []stri
8283
}
8384
}
8485

85-
describedGroups, err := adminCl.DescribeGroups(ctx, groupIDs...)
86-
if err != nil {
87-
var se *kadm.ShardErrors
88-
if !errors.As(err, &se) {
89-
return nil, errorToRestError(err)
86+
// Split groups by type: classic vs consumer (KIP-848)
87+
consumerGroupIDs := s.listConsumerTypeGroups(ctx, adminCl, allGroupIDs)
88+
classicGroupIDs := make([]string, 0)
89+
for _, id := range allGroupIDs {
90+
if !slices.Contains(consumerGroupIDs, id) {
91+
classicGroupIDs = append(classicGroupIDs, id)
92+
}
93+
}
94+
95+
// Describe classic groups
96+
var classicOverviews []ConsumerGroupOverview
97+
if len(classicGroupIDs) > 0 {
98+
describedClassic, err := adminCl.DescribeGroups(ctx, classicGroupIDs...)
99+
if err != nil {
100+
var se *kadm.ShardErrors
101+
if !errors.As(err, &se) {
102+
return nil, errorToRestError(err)
103+
}
104+
if se.AllFailed {
105+
return nil, errorToRestError(err)
106+
}
107+
s.logger.WarnContext(ctx, "failed to describe classic consumer groups from some shards", slog.Int("failed_shards", len(se.Errs)))
108+
for _, shardErr := range se.Errs {
109+
s.logger.WarnContext(ctx, "shard error for describing classic consumer groups",
110+
slog.Int("broker_id", int(shardErr.Broker.NodeID)),
111+
slog.Any("error", shardErr.Err))
112+
}
113+
}
114+
classicOverviews = s.convertKgoGroupDescriptions(describedClassic, make(map[string][]GroupTopicOffsets))
115+
}
116+
117+
// Describe consumer (KIP-848) groups
118+
var consumerOverviews []ConsumerGroupOverview
119+
explicitConsumerRequest := groupIDs != nil && len(consumerGroupIDs) > 0
120+
if len(consumerGroupIDs) > 0 {
121+
describedConsumer, err := adminCl.DescribeConsumerGroups(ctx, consumerGroupIDs...)
122+
if err != nil {
123+
if explicitConsumerRequest {
124+
return nil, &rest.Error{
125+
Err: fmt.Errorf("failed to describe KIP-848 consumer groups: %w", err),
126+
Status: http.StatusInternalServerError,
127+
Message: "Failed to describe consumer groups",
128+
IsSilent: false,
129+
}
130+
}
131+
s.logger.WarnContext(ctx, "failed to describe KIP-848 consumer groups, skipping", slog.Any("error", err))
132+
} else {
133+
if describedConsumer.Error() != nil {
134+
s.logger.WarnContext(ctx, "partial failure describing KIP-848 consumer groups", slog.Any("error", describedConsumer.Error()))
135+
}
136+
// Always convert successful groups; convertDescribedConsumerGroups skips per-group errors
137+
consumerOverviews = s.convertDescribedConsumerGroups(describedConsumer, make(map[string][]GroupTopicOffsets))
90138
}
139+
}
91140

92-
if se.AllFailed {
93-
return nil, errorToRestError(err)
141+
// For explicit requests, ensure all requested consumer groups were successfully described
142+
if explicitConsumerRequest {
143+
describedSet := make(map[string]struct{})
144+
for _, o := range consumerOverviews {
145+
describedSet[o.GroupID] = struct{}{}
94146
}
95-
s.logger.WarnContext(ctx, "failed to describe consumer groups from some shards", slog.Int("failed_shards", len(se.Errs)))
96-
for _, shardErr := range se.Errs {
97-
s.logger.WarnContext(ctx, "shard error for describing consumer groups",
98-
slog.Int("broker_id", int(shardErr.Broker.NodeID)),
99-
slog.Any("error", shardErr.Err))
147+
for _, id := range consumerGroupIDs {
148+
if _, ok := describedSet[id]; !ok {
149+
return nil, &rest.Error{
150+
Err: fmt.Errorf("requested consumer group '%v' could not be described", id),
151+
Status: http.StatusNotFound,
152+
Message: fmt.Sprintf("Requested consumer group '%v' could not be described", id),
153+
IsSilent: false,
154+
}
155+
}
100156
}
101157
}
102158

103-
groupLags, err := s.getConsumerGroupOffsets(ctx, adminCl, describedGroups.Names())
159+
// Merge results and fetch offsets for all groups
160+
allOverviews := append(classicOverviews, consumerOverviews...)
161+
groupIDsForOffsets := make([]string, len(allOverviews))
162+
for i, o := range allOverviews {
163+
groupIDsForOffsets[i] = o.GroupID
164+
}
165+
groupLags, err := s.getConsumerGroupOffsets(ctx, adminCl, groupIDsForOffsets)
104166
if err != nil {
105167
return nil, &rest.Error{
106168
Err: fmt.Errorf("failed to get consumer group lags: %w", err),
@@ -110,10 +172,106 @@ func (s *Service) GetConsumerGroupsOverview(ctx context.Context, groupIDs []stri
110172
}
111173
}
112174

113-
res := s.convertKgoGroupDescriptions(describedGroups, groupLags)
114-
sort.Slice(res, func(i, j int) bool { return res[i].GroupID < res[j].GroupID })
175+
for i := range allOverviews {
176+
allOverviews[i].TopicOffsets = groupLags[allOverviews[i].GroupID]
177+
}
178+
179+
sort.Slice(allOverviews, func(i, j int) bool { return allOverviews[i].GroupID < allOverviews[j].GroupID })
180+
return allOverviews, nil
181+
}
182+
183+
// listConsumerTypeGroups returns group IDs that are KIP-848 "consumer" type.
184+
// On failure (e.g. older Kafka), returns empty slice and all groups are treated as classic.
185+
func (s *Service) listConsumerTypeGroups(ctx context.Context, adminCl *kadm.Client, filterGroupIDs []string) []string {
186+
listed, err := adminCl.ListGroupsByType(ctx, []string{"consumer"})
187+
if err != nil {
188+
s.logger.WarnContext(ctx, "ListGroupsByType(consumer) not supported or failed, treating all groups as classic", slog.Any("error", err))
189+
return nil
190+
}
191+
consumerIDs := listed.Groups()
192+
if len(filterGroupIDs) == 0 {
193+
return consumerIDs
194+
}
195+
filterSet := make(map[string]struct{})
196+
for _, id := range filterGroupIDs {
197+
filterSet[id] = struct{}{}
198+
}
199+
filtered := make([]string, 0)
200+
for _, id := range consumerIDs {
201+
if _, ok := filterSet[id]; ok {
202+
filtered = append(filtered, id)
203+
}
204+
}
205+
return filtered
206+
}
207+
208+
// convertDescribedConsumerGroups converts kadm.DescribedConsumerGroups to []ConsumerGroupOverview.
209+
func (s *Service) convertDescribedConsumerGroups(described kadm.DescribedConsumerGroups, offsets map[string][]GroupTopicOffsets) []ConsumerGroupOverview {
210+
if offsets == nil {
211+
offsets = make(map[string][]GroupTopicOffsets)
212+
}
213+
result := make([]ConsumerGroupOverview, 0)
214+
for _, group := range described.Sorted() {
215+
if group.Err != nil {
216+
s.logger.Warn("failed to describe KIP-848 consumer group",
217+
slog.String("group_id", group.Group),
218+
slog.Int("coordinator_id", int(group.Coordinator.NodeID)))
219+
continue
220+
}
221+
222+
result = append(result, ConsumerGroupOverview{
223+
GroupID: group.Group,
224+
GroupType: "consumer",
225+
State: group.State,
226+
ProtocolType: "consumer",
227+
Protocol: "",
228+
Members: s.convertConsumerGroupMembers(group.Members),
229+
CoordinatorID: group.Coordinator.NodeID,
230+
TopicOffsets: offsets[group.Group],
231+
})
232+
}
233+
return result
234+
}
115235

116-
return res, nil
236+
// convertConsumerGroupMembers converts KIP-848 ConsumerGroupMember to GroupMemberDescription.
237+
// ConsumerGroupMember has Assignment.TopicPartitions (topic -> partitions) directly.
238+
func (s *Service) convertConsumerGroupMembers(members []kadm.ConsumerGroupMember) []GroupMemberDescription {
239+
response := make([]GroupMemberDescription, 0)
240+
for _, m := range members {
241+
convertedAssignments := make([]GroupMemberAssignment, 0)
242+
m.Assignment.Each(func(t string, p int32) {
243+
// Each is called per partition; we need to group by topic
244+
found := false
245+
for i := range convertedAssignments {
246+
if convertedAssignments[i].TopicName == t {
247+
convertedAssignments[i].PartitionIDs = append(convertedAssignments[i].PartitionIDs, p)
248+
found = true
249+
break
250+
}
251+
}
252+
if !found {
253+
convertedAssignments = append(convertedAssignments, GroupMemberAssignment{
254+
TopicName: t,
255+
PartitionIDs: []int32{p},
256+
})
257+
}
258+
})
259+
sort.Slice(convertedAssignments, func(i, j int) bool {
260+
return convertedAssignments[i].TopicName < convertedAssignments[j].TopicName
261+
})
262+
for i := range convertedAssignments {
263+
sort.Slice(convertedAssignments[i].PartitionIDs, func(a, b int) bool {
264+
return convertedAssignments[i].PartitionIDs[a] < convertedAssignments[i].PartitionIDs[b]
265+
})
266+
}
267+
response = append(response, GroupMemberDescription{
268+
ID: m.MemberID,
269+
ClientID: m.ClientID,
270+
ClientHost: m.ClientHost,
271+
Assignments: convertedAssignments,
272+
})
273+
}
274+
return response
117275
}
118276

119277
func (s *Service) convertKgoGroupDescriptions(describedGroups kadm.DescribedGroups, offsets map[string][]GroupTopicOffsets) []ConsumerGroupOverview {
@@ -128,6 +286,7 @@ func (s *Service) convertKgoGroupDescriptions(describedGroups kadm.DescribedGrou
128286

129287
result = append(result, ConsumerGroupOverview{
130288
GroupID: group.Group,
289+
GroupType: "classic",
131290
State: group.State,
132291
ProtocolType: group.ProtocolType,
133292
Protocol: group.Protocol,

0 commit comments

Comments
 (0)