Skip to content

Commit 8196628

Browse files
committed
refactor: remove dead REST-oriented service methods after RPC migration (UX-1199)
Remove CreateACL, DeleteACLs, DescribeQuotas methods and their associated types (DeleteACLsResponse, QuotaResponse*) from the console Servicer interface and Service implementation. These only served the deleted REST handlers and are fully replaced by the Kafka proxy methods (CreateACLs, DeleteACLsKafka, DescribeClientQuotas) used by Connect RPC.
1 parent 45f0a9f commit 8196628

4 files changed

Lines changed: 0 additions & 196 deletions

File tree

backend/pkg/console/create_acl.go

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,61 +11,10 @@ package console
1111

1212
import (
1313
"context"
14-
"errors"
15-
"fmt"
16-
"log/slog"
17-
"net/http"
1814

19-
"github.com/cloudhut/common/rest"
20-
"github.com/twmb/franz-go/pkg/kerr"
2115
"github.com/twmb/franz-go/pkg/kmsg"
2216
)
2317

24-
// CreateACL creates an ACL resource in your target Kafka cluster.
25-
func (s *Service) CreateACL(ctx context.Context, createReq kmsg.CreateACLsRequestCreation) *rest.Error {
26-
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)
27-
if err != nil {
28-
return errorToRestError(err)
29-
}
30-
31-
req := kmsg.NewCreateACLsRequest()
32-
req.Creations = []kmsg.CreateACLsRequestCreation{createReq}
33-
res, err := req.RequestWith(ctx, cl)
34-
if err != nil {
35-
return &rest.Error{
36-
Err: err,
37-
Status: http.StatusServiceUnavailable,
38-
Message: fmt.Sprintf("Failed to execute create ACL command: %v", err.Error()),
39-
InternalLogs: []slog.Attr{slog.Any("create_acl_req", createReq)},
40-
IsSilent: false,
41-
}
42-
}
43-
44-
if len(res.Results) != 1 {
45-
return &rest.Error{
46-
Err: errors.New("unexpected number of results in create ACL response"),
47-
Status: http.StatusInternalServerError,
48-
Message: fmt.Sprintf("Failed to execute delete topic command: %v", err.Error()),
49-
InternalLogs: []slog.Attr{slog.Int("results_length", len(res.Results))},
50-
IsSilent: false,
51-
}
52-
}
53-
54-
aclRes := res.Results[0]
55-
err = kerr.ErrorForCode(aclRes.ErrorCode)
56-
if err != nil {
57-
return &rest.Error{
58-
Err: err,
59-
Status: http.StatusServiceUnavailable,
60-
Message: fmt.Sprintf("Failed to execute create ACL command: %v", err.Error()),
61-
InternalLogs: []slog.Attr{slog.Any("create_acl_req", createReq)},
62-
IsSilent: false,
63-
}
64-
}
65-
66-
return nil
67-
}
68-
6918
// CreateACLs proxies the request/response to CreateACLs via the Kafka API.
7019
func (s *Service) CreateACLs(ctx context.Context, req *kmsg.CreateACLsRequest) (*kmsg.CreateACLsResponse, error) {
7120
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)

backend/pkg/console/delete_acls.go

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -11,73 +11,10 @@ package console
1111

1212
import (
1313
"context"
14-
"fmt"
15-
"log/slog"
16-
"net/http"
1714

18-
"github.com/cloudhut/common/rest"
1915
"github.com/twmb/franz-go/pkg/kmsg"
2016
)
2117

22-
// DeleteACLsResponse is the response to deleting ACL resources.
23-
type DeleteACLsResponse struct {
24-
ErrorMessages []string `json:"errorMessage"`
25-
MatchedACLs int `json:"matchedACLs"`
26-
DeletedACLs int `json:"deletedACLs"`
27-
}
28-
29-
// DeleteACLs deletes Kafka ACLs based on a given filter.
30-
func (s *Service) DeleteACLs(ctx context.Context, filter kmsg.DeleteACLsRequestFilter) (DeleteACLsResponse, *rest.Error) {
31-
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)
32-
if err != nil {
33-
return DeleteACLsResponse{}, errorToRestError(err)
34-
}
35-
36-
req := kmsg.NewDeleteACLsRequest()
37-
req.Filters = []kmsg.DeleteACLsRequestFilter{filter}
38-
39-
res, err := req.RequestWith(ctx, cl)
40-
if err != nil {
41-
return DeleteACLsResponse{}, &rest.Error{
42-
Err: err,
43-
Status: http.StatusServiceUnavailable,
44-
Message: fmt.Sprintf("Failed to execute delete topic command: %v", err.Error()),
45-
InternalLogs: []slog.Attr{slog.Any("delete_acl_req", filter)},
46-
IsSilent: false,
47-
}
48-
}
49-
50-
deleteAclsRes := DeleteACLsResponse{
51-
ErrorMessages: make([]string, 0),
52-
MatchedACLs: 0,
53-
DeletedACLs: 0,
54-
}
55-
for _, aclRes := range res.Results {
56-
err := newKafkaErrorWithDynamicMessage(aclRes.ErrorCode, aclRes.ErrorMessage)
57-
if err != nil {
58-
return DeleteACLsResponse{}, &rest.Error{
59-
Err: err,
60-
Status: http.StatusServiceUnavailable,
61-
Message: fmt.Sprintf("Failed to delete Kafka ACL: %v", err.Error()),
62-
InternalLogs: []slog.Attr{slog.Any("delete_acl_req", filter)},
63-
IsSilent: false,
64-
}
65-
}
66-
67-
for _, item := range aclRes.MatchingACLs {
68-
deleteAclsRes.MatchedACLs++
69-
err := newKafkaErrorWithDynamicMessage(item.ErrorCode, item.ErrorMessage)
70-
if err != nil {
71-
deleteAclsRes.ErrorMessages = append(deleteAclsRes.ErrorMessages, err.Error())
72-
continue
73-
}
74-
deleteAclsRes.DeletedACLs++
75-
}
76-
}
77-
78-
return deleteAclsRes, nil
79-
}
80-
8118
// DeleteACLsKafka proxies the request/response via the Kafka API.
8219
func (s *Service) DeleteACLsKafka(ctx context.Context, req *kmsg.DeleteACLsRequest) (*kmsg.DeleteACLsResponse, error) {
8320
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)

backend/pkg/console/describe_quotas.go

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -11,89 +11,10 @@ package console
1111

1212
import (
1313
"context"
14-
"fmt"
1514

16-
"github.com/twmb/franz-go/pkg/kerr"
1715
"github.com/twmb/franz-go/pkg/kmsg"
1816
)
1917

20-
// QuotaResponse is a helper type that carries the sum of all quota responses
21-
// sent by each broker in the cluster.
22-
type QuotaResponse struct {
23-
Error string `json:"error,omitempty"`
24-
Items []QuotaResponseItem `json:"items"`
25-
}
26-
27-
// QuotaResponseItem is a broker's response to a quota entity in Kafka.
28-
type QuotaResponseItem struct {
29-
EntityType string `json:"entityType"`
30-
EntityName string `json:"entityName"`
31-
Settings []QuotaResponseSetting `json:"settings"`
32-
}
33-
34-
// QuotaResponseSetting is a quota configuration.
35-
type QuotaResponseSetting struct {
36-
Key string `json:"key"`
37-
Value float64 `json:"value"`
38-
}
39-
40-
// DescribeQuotas fetches the configured quota settings in the target cluster.
41-
func (s *Service) DescribeQuotas(ctx context.Context) QuotaResponse {
42-
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)
43-
if err != nil {
44-
return QuotaResponse{
45-
Error: err.Error(),
46-
}
47-
}
48-
49-
items := make([]QuotaResponseItem, 0)
50-
req := kmsg.NewDescribeClientQuotasRequest()
51-
quotas, err := req.RequestWith(ctx, cl)
52-
if err != nil {
53-
return QuotaResponse{
54-
Error: fmt.Errorf("kafka request has failed: %w", err).Error(),
55-
Items: nil,
56-
}
57-
}
58-
59-
err = kerr.ErrorForCode(quotas.ErrorCode)
60-
if err != nil {
61-
return QuotaResponse{
62-
Error: fmt.Errorf("inner kafka error: %w", err).Error(),
63-
Items: nil,
64-
}
65-
}
66-
67-
// Flat map all quota settings from response into our items array
68-
for _, entry := range quotas.Entries {
69-
settings := make([]QuotaResponseSetting, len(entry.Values))
70-
for i, setting := range entry.Values {
71-
settings[i] = QuotaResponseSetting{
72-
Key: setting.Key,
73-
Value: setting.Value,
74-
}
75-
}
76-
77-
for _, entity := range entry.Entity {
78-
// A nil value for entity.Name means that this quota is the default for the respective entity.Type
79-
entityName := "<default>"
80-
if entity.Name != nil {
81-
entityName = *entity.Name
82-
}
83-
items = append(items, QuotaResponseItem{
84-
EntityType: entity.Type,
85-
EntityName: entityName,
86-
Settings: settings,
87-
})
88-
}
89-
}
90-
91-
return QuotaResponse{
92-
Error: "",
93-
Items: items,
94-
}
95-
}
96-
9718
// DescribeClientQuotas proxies the request/response for describing client quotas via the Kafka API.
9819
func (s *Service) DescribeClientQuotas(ctx context.Context, req *kmsg.DescribeClientQuotasRequest) (*kmsg.DescribeClientQuotasResponse, error) {
9920
cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx)

backend/pkg/console/servicer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,10 @@ type Servicer interface {
2525
GetClusterInfo(ctx context.Context) (*ClusterInfo, error)
2626
DeleteConsumerGroup(ctx context.Context, groupID string) error
2727
GetConsumerGroupsOverview(ctx context.Context, groupIDs []string) ([]ConsumerGroupOverview, *rest.Error)
28-
CreateACL(ctx context.Context, createReq kmsg.CreateACLsRequestCreation) *rest.Error
2928
CreateTopic(ctx context.Context, createTopicReq kmsg.CreateTopicsRequestTopic) (CreateTopicResponse, *rest.Error)
30-
DeleteACLs(ctx context.Context, filter kmsg.DeleteACLsRequestFilter) (DeleteACLsResponse, *rest.Error)
3129
DeleteConsumerGroupOffsets(ctx context.Context, groupID string, topics []kmsg.OffsetDeleteRequestTopic) ([]DeleteConsumerGroupOffsetsResponseTopic, error)
3230
DeleteTopic(ctx context.Context, topicName string) *rest.Error
3331
DeleteTopicRecords(ctx context.Context, deleteReq kmsg.DeleteRecordsRequestTopic) (DeleteTopicRecordsResponse, *rest.Error)
34-
DescribeQuotas(ctx context.Context) QuotaResponse
3532
EditConsumerGroupOffsets(ctx context.Context, groupID string, topics []kmsg.OffsetCommitRequestTopic) (*EditConsumerGroupOffsetsResponse, *rest.Error)
3633
EditTopicConfig(ctx context.Context, topicName string, configs []kmsg.IncrementalAlterConfigsRequestResourceConfig) error
3734
GetEndpointCompatibility(ctx context.Context) (EndpointCompatibility, error)

0 commit comments

Comments
 (0)