Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/buf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ jobs:
- name: Buf – lint, format & breaking
uses: bufbuild/buf-action@v1
with:
# Pin to the repo's canonical buf version (see taskfiles/proto.yaml install-buf)
# so CI formatting matches `task proto:generate`; buf-action defaults to latest,
# whose formatter reformats single-field messages and produces spurious diffs.
version: 1.65.0
lint: true
format: true
breaking: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'Buf Skip Breaking') }}
Expand Down Expand Up @@ -79,6 +83,7 @@ jobs:
- name: Buf – push to registry
uses: bufbuild/buf-action@v1
with:
version: 1.65.0
# No validation - already done in validate job
lint: false
format: false
Expand Down Expand Up @@ -112,6 +117,7 @@ jobs:
- name: Buf – archive label (ignore if not found)
uses: bufbuild/buf-action@v1
with:
version: 1.65.0
# Only archive - no other operations
push: true
token: ${{ env.BUF_TOKEN }}
Expand Down
42 changes: 26 additions & 16 deletions backend/pkg/api/connect/service/topic/v1/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"

common "github.com/redpanda-data/console/backend/pkg/api/connect/service/common/v1"
"github.com/redpanda-data/console/backend/pkg/console"
v1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1"
)

Expand Down Expand Up @@ -170,30 +171,39 @@ func (k *mapper) updateTopicConfigsToKafka(req *v1.UpdateTopicConfigurationsRequ
return &kafkaReq, nil
}

func (k *mapper) kafkaMetadataToProto(metadata *kmsg.MetadataResponse) []*v1.ListTopicsResponse_Topic {
topics := make([]*v1.ListTopicsResponse_Topic, len(metadata.Topics))
for i, topicMetadata := range metadata.Topics {
topics[i] = k.kafkaTopicMetadataToProto(topicMetadata)
func (k *mapper) topicSummariesToProto(summaries []*console.TopicSummary) []*v1.ListTopicsResponse_Topic {
topics := make([]*v1.ListTopicsResponse_Topic, len(summaries))
for i, summary := range summaries {
topics[i] = k.topicSummaryToProto(summary)
}

return topics
}

func (*mapper) kafkaTopicMetadataToProto(topicMetadata kmsg.MetadataResponseTopic) *v1.ListTopicsResponse_Topic {
// We iterate through all partitions to figure out the replication factor,
// in case we get an error for the first partitions
replicationFactor := -1
for _, partition := range topicMetadata.Partitions {
if len(partition.Replicas) > replicationFactor {
replicationFactor = len(partition.Replicas)
func (k *mapper) topicSummaryToProto(summary *console.TopicSummary) *v1.ListTopicsResponse_Topic {
return &v1.ListTopicsResponse_Topic{
Name: summary.TopicName,
Internal: summary.IsInternal,
PartitionCount: int32(summary.PartitionCount),
ReplicationFactor: int32(summary.ReplicationFactor),
CleanupPolicy: summary.CleanupPolicy,
LogDirSummary: k.topicLogDirSummaryToProto(summary.LogDirSummary),
}
}

func (*mapper) topicLogDirSummaryToProto(summary console.TopicLogDirSummary) *v1.ListTopicsResponse_LogDirSummary {
replicaErrors := make([]*v1.ListTopicsResponse_ReplicaError, len(summary.ReplicaErrors))
for i, replicaError := range summary.ReplicaErrors {
replicaErrors[i] = &v1.ListTopicsResponse_ReplicaError{
BrokerId: replicaError.BrokerID,
Error: replicaError.Error,
}
}

return &v1.ListTopicsResponse_Topic{
Name: *topicMetadata.Topic,
Internal: topicMetadata.IsInternal,
PartitionCount: int32(len(topicMetadata.Partitions)),
ReplicationFactor: int32(replicationFactor),
return &v1.ListTopicsResponse_LogDirSummary{
TotalSizeBytes: summary.TotalSizeBytes,
ReplicaErrors: replicaErrors,
Hint: summary.Hint,
}
}

Expand Down
81 changes: 81 additions & 0 deletions backend/pkg/api/connect/service/topic/v1/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/redpanda-data/console/backend/pkg/console"
v1 "github.com/redpanda-data/console/backend/pkg/protogen/redpanda/api/dataplane/v1"
)

Expand Down Expand Up @@ -78,3 +79,83 @@ func TestDescribeTopicConfigsToKafka(t *testing.T) {
})
}
}

// TestTopicSummariesToProto guards the ListTopics gRPC response shape: the
// cleanup policy and the aggregated log dir summary (size, hint, per-broker
// replica errors) must survive the mapping from console.TopicSummary. A
// regression here surfaces as empty cleanup-policy / size columns in the UI.
func TestTopicSummariesToProto(t *testing.T) {
testCases := []struct {
name string
input []*console.TopicSummary
validateFn func(t *testing.T, result []*v1.ListTopicsResponse_Topic)
}{
{
name: "maps cleanup policy and log dir summary",
input: []*console.TopicSummary{
{
TopicName: "orders",
IsInternal: false,
PartitionCount: 3,
ReplicationFactor: 2,
CleanupPolicy: "compact",
LogDirSummary: console.TopicLogDirSummary{
TotalSizeBytes: 2048,
Hint: "partial result",
ReplicaErrors: []console.TopicLogDirSummaryReplicaError{
{BrokerID: 1, Error: "broker down"},
},
},
},
},
validateFn: func(t *testing.T, result []*v1.ListTopicsResponse_Topic) {
require.Len(t, result, 1)
topic := result[0]
assert.Equal(t, "orders", topic.GetName())
assert.False(t, topic.GetInternal())
assert.Equal(t, int32(3), topic.GetPartitionCount())
assert.Equal(t, int32(2), topic.GetReplicationFactor())
assert.Equal(t, "compact", topic.GetCleanupPolicy())

summary := topic.GetLogDirSummary()
require.NotNil(t, summary)
assert.Equal(t, int64(2048), summary.GetTotalSizeBytes())
assert.Equal(t, "partial result", summary.GetHint())
require.Len(t, summary.GetReplicaErrors(), 1)
assert.Equal(t, int32(1), summary.GetReplicaErrors()[0].GetBrokerId())
assert.Equal(t, "broker down", summary.GetReplicaErrors()[0].GetError())
},
},
{
name: "preserves order and the N/A cleanup policy fallback",
input: []*console.TopicSummary{
{TopicName: "a", CleanupPolicy: "delete", LogDirSummary: console.TopicLogDirSummary{TotalSizeBytes: 1}},
{TopicName: "b", CleanupPolicy: "N/A", LogDirSummary: console.TopicLogDirSummary{TotalSizeBytes: 2}},
},
validateFn: func(t *testing.T, result []*v1.ListTopicsResponse_Topic) {
require.Len(t, result, 2)
assert.Equal(t, "a", result[0].GetName())
assert.Equal(t, "delete", result[0].GetCleanupPolicy())
assert.Equal(t, "b", result[1].GetName())
assert.Equal(t, "N/A", result[1].GetCleanupPolicy())
assert.Empty(t, result[1].GetLogDirSummary().GetReplicaErrors())
},
},
{
name: "empty input yields empty slice",
input: []*console.TopicSummary{},
validateFn: func(t *testing.T, result []*v1.ListTopicsResponse_Topic) {
assert.Empty(t, result)
},
},
}

kafkaMapper := mapper{}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
out := kafkaMapper.topicSummariesToProto(tc.input)
tc.validateFn(t, out)
})
}
}
17 changes: 9 additions & 8 deletions backend/pkg/api/connect/service/topic/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"connectrpc.com/connect"
"github.com/redpanda-data/common-go/api/pagination"
"github.com/twmb/franz-go/pkg/kmsg"

apierrors "github.com/redpanda-data/console/backend/pkg/api/connect/errors"
"github.com/redpanda-data/console/backend/pkg/config"
Expand All @@ -46,8 +45,10 @@ type Service struct {
// ListTopics lists all Kafka topics with their most important metadata.
func (s *Service) ListTopics(ctx context.Context, req *connect.Request[v1.ListTopicsRequest]) (*connect.Response[v1.ListTopicsResponse], error) {
s.defaulter.applyListTopicsRequest(req.Msg)
kafkaReq := kmsg.NewMetadataRequest()
kafkaRes, err := s.consoleSvc.GetMetadata(ctx, &kafkaReq)

// We reuse the console GetTopicsOverview so the gRPC response reaches parity with the
// legacy REST /topics endpoint (cleanup policy and log dir size on top of the metadata).
topicSummaries, err := s.consoleSvc.GetTopicsOverview(ctx)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInternal,
Expand All @@ -58,16 +59,16 @@ func (s *Service) ListTopics(ctx context.Context, req *connect.Request[v1.ListTo

// Filter topics if a filter is set
if req.Msg.Filter != nil && req.Msg.Filter.NameContains != "" {
filteredTopics := make([]kmsg.MetadataResponseTopic, 0, len(kafkaRes.Topics))
for _, topic := range kafkaRes.Topics {
if strings.Contains(*topic.Topic, req.Msg.Filter.NameContains) {
filteredTopics := make([]*console.TopicSummary, 0, len(topicSummaries))
for _, topic := range topicSummaries {
if strings.Contains(topic.TopicName, req.Msg.Filter.NameContains) {
filteredTopics = append(filteredTopics, topic)
}
}
kafkaRes.Topics = filteredTopics
topicSummaries = filteredTopics
}

topics := s.mapper.kafkaMetadataToProto(kafkaRes)
topics := s.mapper.topicSummariesToProto(topicSummaries)

// Add pagination
var nextPageToken string
Expand Down
Loading
Loading