diff --git a/acceptance/steps/multicluster.go b/acceptance/steps/multicluster.go index 689abd998..6f33eb3c6 100644 --- a/acceptance/steps/multicluster.go +++ b/acceptance/steps/multicluster.go @@ -465,7 +465,6 @@ func bootstrapTLS(ctx context.Context, t framework.TestingT, vclusters []*vclust BootstrapTLS: true, EnsureNamespace: true, OperatorNamespace: namespace, - ServiceName: "redpanda-operator-multicluster", } peers := []any{} for _, cluster := range vclusters { @@ -473,6 +472,7 @@ func bootstrapTLS(ctx context.Context, t framework.TestingT, vclusters []*vclust KubeConfig: cluster.RESTConfig(), APIServer: cluster.APIServer(), ServiceAddress: cluster.ExternalIP(), + Name: "redpanda-operator", // Fullname for helm release "redpanda" with the operator chart }) peers = append(peers, map[string]any{ "name": cluster.Name(), diff --git a/go.work.sum b/go.work.sum index b3f82cbde..ce1afa94f 100644 --- a/go.work.sum +++ b/go.work.sum @@ -2733,6 +2733,7 @@ github.com/redpanda-data/common-go/api v0.0.0-20251118002524-720a3c2f5569/go.mod github.com/redpanda-data/common-go/api v0.0.0-20260130192523-413455981e59 h1:eAOFa81IQOPhJ/7gEjcNwmPtHgdAma12UxHzajvgTAQ= github.com/redpanda-data/common-go/api v0.0.0-20260130192523-413455981e59/go.mod h1:klAmWfc8Q3hEZk8geFTMu6f2sk3VUKRS7cv/LvB05ig= github.com/redpanda-data/common-go/kube v0.0.0-20260116214328-0862a76e8f5e/go.mod h1:Ye/yB6LyJWUe0FkA6HCUOEflk4N+a4ycrw0J0Mrt0Es= +github.com/redpanda-data/common-go/kube v0.0.0-20260225221458-fec06b917c9a/go.mod h1:FMuuUG3IVwwSad+3da0WIPYHsUuvi3LHnjgSv8ovXug= github.com/redpanda-data/common-go/net v0.1.0/go.mod h1:iOdNkjxM7a1T8F3cYHTaKIPFCHzzp/ia6TN+Z+7Tt5w= github.com/redpanda-data/common-go/proto v0.0.0-20250422172326-6a3bcb14b829 h1:fx1Z+t/fa0vd7kAblgCrdYRW3QHc3svYiVnO1DadS94= github.com/redpanda-data/common-go/proto v0.0.0-20250422172326-6a3bcb14b829/go.mod h1:6WXvgZCZIkbQCNsvU5zTx/+ub5eXTuCcl90i5xkhMw0= @@ -4035,9 +4036,9 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250826171959-ef028d996bc1/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20251103181224-f26f9409b101/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= diff --git a/operator/cmd/bootstrap-standalone/main.go b/operator/cmd/bootstrap-standalone/main.go deleted file mode 100644 index c6ce5a217..000000000 --- a/operator/cmd/bootstrap-standalone/main.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2026 Redpanda Data, Inc. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.md -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0 - -package main - -import ( - "context" - "fmt" - "log" - "os" - "strings" - - "github.com/spf13/cobra" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/clientcmd/api" - - "github.com/redpanda-data/redpanda-operator/pkg/multicluster/bootstrap" -) - -func main() { - var ( - organization string - operatorNamespace string - serviceName string - kubeconfigPath string - dnsOverrides []string - ) - rootCmd := &cobra.Command{ - Use: "multicluster-bootstrap", - Short: "Standalone multicluster bootstrap command", - Run: func(cmd *cobra.Command, args []string) { - ctx := cmd.Context() - - run( - ctx, - organization, - operatorNamespace, - serviceName, - kubeconfigPath, - dnsOverrides, - ) - }, - } - - rootCmd.Flags().StringVar(&organization, "organization", "", "") - rootCmd.Flags().StringVar(&operatorNamespace, "operatorNamespace", "", "") - rootCmd.Flags().StringVar(&serviceName, "serviceName", "redpanda-operator-multicluster", "") - rootCmd.Flags().StringVar(&kubeconfigPath, "kubeconfigPath", "", "") - rootCmd.Flags().StringArrayVar(&dnsOverrides, "dns-override", []string{}, "DNS overrides in the format =") - - if err := rootCmd.Execute(); err != nil { - fmt.Printf("%+v\n", err) - os.Exit(1) - } -} - -// ClusterInfo represents a parsed cluster configuration from kubeconfig -type ClusterInfo struct { - ContextName string - ClusterName string - UserName string - Config *rest.Config -} - -// ParseKubeConfig parses a kubeconfig file and returns an array of cluster configurations -func ParseKubeConfig(kubeconfigPath string) ([]ClusterInfo, error) { - // Load the kubeconfig file - config, err := clientcmd.LoadFromFile(kubeconfigPath) - if err != nil { - return nil, fmt.Errorf("failed to load kubeconfig from %s: %w", kubeconfigPath, err) - } - - var clusterInfos []ClusterInfo - - // Iterate through all contexts in the kubeconfig - for contextName, context := range config.Contexts { - // Create a rest.Config for this specific context - restConfig, err := buildConfigFromContext(config, contextName) - if err != nil { - log.Printf("Warning: failed to build config for context %s: %v", contextName, err) - continue - } - - clusterInfos = append(clusterInfos, ClusterInfo{ - ContextName: contextName, - ClusterName: context.Cluster, - UserName: context.AuthInfo, - Config: restConfig, - }) - } - - if len(clusterInfos) == 0 { - return nil, fmt.Errorf("no valid contexts found in kubeconfig") - } - - return clusterInfos, nil -} - -// buildConfigFromContext creates a rest.Config from a specific context in the kubeconfig -func buildConfigFromContext(config *api.Config, contextName string) (*rest.Config, error) { - // Create a clientcmd.ClientConfig using the specific context - configOverrides := &clientcmd.ConfigOverrides{ - CurrentContext: contextName, - } - - clientConfig := clientcmd.NewNonInteractiveClientConfig( - *config, - contextName, - configOverrides, - nil, - ) - - // Build the rest.Config - restConfig, err := clientConfig.ClientConfig() - if err != nil { - return nil, fmt.Errorf("failed to create rest config: %w", err) - } - - return restConfig, nil -} - -func run( - ctx context.Context, - organization string, - operatorNamespace string, - serviceName string, - kubeconfigPath string, - dnsOverrides []string, -) { - log.Println("Creating certificates") - - remoteClusters := []bootstrap.RemoteConfiguration{} - - // Parse kubeconfig file if provided - if kubeconfigPath == "" { - log.Fatalf("kubeconfigPath is empty") - } - - clusterInfos, err := ParseKubeConfig(kubeconfigPath) - if err != nil { - log.Fatalf("Failed to parse kubeconfig: %v", err) - } - - overrides := make(map[string]string) - for _, override := range dnsOverrides { - parts := strings.SplitN(override, "=", 2) - if len(parts) != 2 { - log.Fatalf("Invalid DNS override format: %s", override) - } - contextName := parts[0] - dnsName := parts[1] - overrides[contextName] = dnsName - } - - // Convert ClusterInfo to RemoteConfiguration - for _, clusterInfo := range clusterInfos { - log.Printf("Adding cluster from context: %s (cluster: %s, user: %s)", - clusterInfo.ContextName, clusterInfo.ClusterName, clusterInfo.UserName) - - remoteClusters = append(remoteClusters, bootstrap.RemoteConfiguration{ - KubeConfig: clusterInfo.Config, - ContextName: clusterInfo.ContextName, - ServiceAddress: overrides[clusterInfo.ContextName], - }) - } - - err = bootstrap.BootstrapKubernetesClusters(ctx, organization, bootstrap.BootstrapClusterConfiguration{ - BootstrapTLS: true, - BootstrapKubeconfigs: true, - EnsureNamespace: true, - OperatorNamespace: operatorNamespace, - ServiceName: serviceName, - RemoteClusters: remoteClusters, - }) - if err != nil { - log.Fatalf("%s", fmt.Errorf("unable to bootstrap multi cluster: %w", err)) - } - - log.Println("Certificates created") -} diff --git a/operator/cmd/rpk-k8s/k8s/k8s.go b/operator/cmd/rpk-k8s/k8s/k8s.go new file mode 100644 index 000000000..ce9fd7692 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/k8s.go @@ -0,0 +1,30 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package k8s + +import ( + "github.com/spf13/cobra" + + "github.com/redpanda-data/redpanda-operator/operator/cmd/rpk-k8s/k8s/multicluster" +) + +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "k8s", + Short: "Interact with Redpanda clusters running on Kubernetes", + Long: "Commands for managing and interacting with Redpanda clusters deployed on Kubernetes.", + } + + cmd.AddCommand( + multicluster.Command(), + ) + + return cmd +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/bootstrap.go b/operator/cmd/rpk-k8s/k8s/multicluster/bootstrap.go new file mode 100644 index 000000000..afffff668 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/bootstrap.go @@ -0,0 +1,154 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/spf13/cobra" + + "github.com/redpanda-data/redpanda-operator/pkg/multicluster/bootstrap" +) + +// BootstrapConfig holds the configuration for the bootstrap command. +// It can be populated from CLI flags via the cobra command or set +// programmatically for testing. +type BootstrapConfig struct { + Connection ConnectionConfig + Organization string + DNSOverrides []string + TLS bool + Kubeconfigs bool + CreateNS bool +} + +// Run executes the bootstrap operation. +func (c *BootstrapConfig) Run(ctx context.Context, out io.Writer) error { + conns, err := c.Connection.Resolve() + if err != nil { + return err + } + + dnsOverrides, err := parseDNSOverrides(c.DNSOverrides) + if err != nil { + return err + } + + remoteClusters := make([]bootstrap.RemoteConfiguration, len(conns)) + for i, conn := range conns { + remoteClusters[i] = bootstrap.RemoteConfiguration{ + KubeConfig: conn.Ctl.RestConfig(), + ContextName: conn.Name, + ServiceAddress: dnsOverrides[conn.Name], + Name: conn.SecretPrefix, + } + } + + config := bootstrap.BootstrapClusterConfiguration{ + BootstrapTLS: c.TLS, + BootstrapKubeconfigs: c.Kubeconfigs, + EnsureNamespace: c.CreateNS, + OperatorNamespace: c.Connection.Namespace, + ServiceName: c.Connection.ServiceName, + RemoteClusters: remoteClusters, + } + + fmt.Fprintf(out, "Bootstrapping %d clusters...\n", len(remoteClusters)) + + if err := bootstrap.BootstrapKubernetesClusters(ctx, c.Organization, config); err != nil { + return fmt.Errorf("bootstrapping clusters: %w", err) + } + + fmt.Fprintln(out, "Bootstrap complete.") + return nil +} + +func bootstrapCommand() *cobra.Command { + cfg := BootstrapConfig{ + Organization: "Redpanda", + TLS: true, + Kubeconfigs: true, + CreateNS: true, + } + + cmd := &cobra.Command{ + Use: "bootstrap", + Short: "Bootstrap a multicluster Redpanda deployment", + Long: `Bootstrap TLS certificates and kubeconfig secrets across multiple +Kubernetes clusters for a Redpanda multicluster deployment. + +This command connects to each specified Kubernetes context, generates a shared +CA certificate, and distributes per-cluster TLS certificates and kubeconfig +secrets so that the multicluster operator can communicate across clusters. + +If --kubeconfig is provided, all contexts in the file are used automatically +and --context flags are not required. If both are provided, only the specified +contexts from the kubeconfig file are used.`, + Example: ` # Bootstrap all clusters from a kubeconfig file + rpk k8s multicluster bootstrap \ + --kubeconfig /path/to/kubeconfig \ + --namespace redpanda + + # Bootstrap specific contexts (uses default kubeconfig loading rules) + rpk k8s multicluster bootstrap \ + --context cluster-a --context cluster-b --context cluster-c \ + --namespace redpanda + + # Override the TLS secret prefix when helm release names differ from context names + rpk k8s multicluster bootstrap \ + --context cluster-a --context cluster-b --context cluster-c \ + --name-override cluster-a=redpanda-operator \ + --name-override cluster-b=redpanda-operator \ + --name-override cluster-c=redpanda-operator \ + --namespace redpanda + + # Override DNS names for TLS SANs on specific clusters + rpk k8s multicluster bootstrap \ + --context cluster-a --context cluster-b \ + --dns-override cluster-a=cluster-a.example.com \ + --dns-override cluster-b=cluster-b.example.com \ + --namespace redpanda + + # Bootstrap only TLS certificates + rpk k8s multicluster bootstrap \ + --kubeconfig /path/to/kubeconfig \ + --namespace redpanda \ + --tls --kubeconfigs=false`, + RunE: func(cmd *cobra.Command, args []string) error { + return cfg.Run(cmd.Context(), cmd.OutOrStdout()) + }, + } + + cfg.Connection.BindFlags(cmd) + cmd.Flags().StringVar(&cfg.Organization, "organization", cfg.Organization, "Organization name for generated TLS certificates") + cmd.Flags().StringArrayVar(&cfg.DNSOverrides, "dns-override", nil, "DNS override for TLS SANs in context=address format (repeatable)") + cmd.Flags().BoolVar(&cfg.TLS, "tls", cfg.TLS, "Bootstrap TLS certificates") + cmd.Flags().BoolVar(&cfg.Kubeconfigs, "kubeconfigs", cfg.Kubeconfigs, "Bootstrap kubeconfig secrets") + cmd.Flags().BoolVar(&cfg.CreateNS, "create-namespace", cfg.CreateNS, "Create the namespace if it does not exist") + + return cmd +} + +// parseDNSOverrides parses --dns-override flags in "context=address" format +// into a map keyed by context name. +func parseDNSOverrides(overrides []string) (map[string]string, error) { + m := make(map[string]string, len(overrides)) + for _, o := range overrides { + parts := strings.SplitN(o, "=", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return nil, fmt.Errorf("invalid --dns-override format %q, expected context=address", o) + } + m[parts[0]] = parts[1] + } + return m, nil +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/checks.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/checks.go new file mode 100644 index 000000000..730f8eb27 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/checks.go @@ -0,0 +1,103 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +// Package checks implements a pluggable check framework for validating +// multicluster operator deployments. Each check is a self-contained unit +// that inspects one aspect of the deployment and returns results. +// +// Checks come in two flavors: +// - ClusterCheck: runs once per cluster, given a shared CheckContext that +// accumulates state as checks execute. Checks run in registration order +// so later checks can depend on state populated by earlier ones. +// - CrossClusterCheck: runs once after all per-cluster checks complete, +// given all CheckContexts. Used for validations that compare state +// across clusters (e.g., CA consistency, leader agreement). +package checks + +import ( + "context" + "crypto/x509" + + "github.com/redpanda-data/common-go/kube" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + transportv1 "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1" +) + +// CheckContext holds shared state for a single cluster, accumulated as +// checks execute. Earlier checks populate fields that later checks read. +type CheckContext struct { + // Inputs — set before checks run. + Context string + Namespace string + ServiceName string + SecretPrefix string // helm fullname prefix for TLS secret discovery (-multicluster-certificates) + Ctl *kube.Ctl + + // Accumulated state — populated by checks for downstream consumers. + Pod *corev1.Pod + Deployment *appsv1.Deployment + DeployArgs []string + TLSSecret *corev1.Secret + CACert *x509.Certificate + TLSCert *x509.Certificate + TLSKeyMatch bool + RaftStatus *transportv1.StatusResponse +} + +// Result is the outcome of a single validation. +type Result struct { + // Name identifies the check that produced this result. + Name string + // OK is true when the check passed. + OK bool + // Message describes what was checked or what went wrong. + Message string +} + +// Pass returns a passing Result. +func Pass(name, message string) Result { + return Result{Name: name, OK: true, Message: message} +} + +// Fail returns a failing Result. +func Fail(name, message string) Result { + return Result{Name: name, OK: false, Message: message} +} + +// ClusterCheck validates one aspect of a single cluster's deployment. +type ClusterCheck interface { + Name() string + Run(ctx context.Context, cc *CheckContext) []Result +} + +// CrossClusterCheck validates consistency across all clusters. +type CrossClusterCheck interface { + Name() string + Run(contexts []*CheckContext) []Result +} + +// RunClusterChecks executes all per-cluster checks in order. +func RunClusterChecks(ctx context.Context, cc *CheckContext, checks []ClusterCheck) []Result { + var results []Result + for _, check := range checks { + results = append(results, check.Run(ctx, cc)...) + } + return results +} + +// RunCrossClusterChecks executes all cross-cluster checks. +func RunCrossClusterChecks(contexts []*CheckContext, checks []CrossClusterCheck) []Result { + var results []Result + for _, check := range checks { + results = append(results, check.Run(contexts)...) + } + return results +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment.go new file mode 100644 index 000000000..4fc7d8ae0 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment.go @@ -0,0 +1,91 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// DeploymentCheck finds the operator Deployment and validates its TLS flag +// configuration. Populates cc.Deployment and cc.DeployArgs for downstream checks. +type DeploymentCheck struct{} + +func (c *DeploymentCheck) Name() string { return "deployment" } + +func (c *DeploymentCheck) Run(ctx context.Context, cc *CheckContext) []Result { + var deploys appsv1.DeploymentList + if err := cc.Ctl.List(ctx, cc.Namespace, &deploys, client.MatchingLabels{ + "app.kubernetes.io/name": cc.ServiceName, + }); err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("listing deployments: %v", err))} + } + if len(deploys.Items) == 0 { + return []Result{Fail(c.Name(), fmt.Sprintf("no deployments found with label app.kubernetes.io/name=%s in namespace %s", cc.ServiceName, cc.Namespace))} + } + + deploy := &deploys.Items[0] + cc.Deployment = deploy + + // Extract container args. + for _, cont := range deploy.Spec.Template.Spec.Containers { + if len(cont.Args) > 0 { + cc.DeployArgs = cont.Args + break + } + } + + var results []Result + + // Validate TLS flag paths. + expected := map[string]string{ + "--ca-file": "/tls/ca.crt", + "--certificate-file": "/tls/tls.crt", + "--private-key-file": "/tls/tls.key", + } + for flag, want := range expected { + got := ExtractFlag(cc.DeployArgs, flag) + if got != want { + results = append(results, Fail(c.Name(), fmt.Sprintf("Deployment %s=%s, expected %s", flag, got, want))) + } + } + + if len(results) == 0 { + results = append(results, Pass(c.Name(), fmt.Sprintf("Deployment %s configured correctly", deploy.Name))) + } + return results +} + +// ExtractFlag extracts the value of a --flag=value from args. +func ExtractFlag(args []string, flag string) string { + prefix := flag + "=" + for _, arg := range args { + if strings.HasPrefix(arg, prefix) { + return strings.TrimPrefix(arg, prefix) + } + } + return "" +} + +// ExtractFlagAll extracts all values of a repeated --flag=value from args. +func ExtractFlagAll(args []string, flag string) []string { + prefix := flag + "=" + var values []string + for _, arg := range args { + if strings.HasPrefix(arg, prefix) { + values = append(values, strings.TrimPrefix(arg, prefix)) + } + } + return values +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment_raft.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment_raft.go new file mode 100644 index 000000000..a8e59e1d5 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_deployment_raft.go @@ -0,0 +1,72 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "fmt" + "strings" +) + +// DeploymentRaftCheck validates that the Deployment's --name and --peer flags +// are consistent with the raft status reported by the running operator. +// Requires cc.DeployArgs and cc.RaftStatus to be set. +type DeploymentRaftCheck struct{} + +func (c *DeploymentRaftCheck) Name() string { return "deployment-raft" } + +func (c *DeploymentRaftCheck) Run(_ context.Context, cc *CheckContext) []Result { + if cc.DeployArgs == nil || cc.RaftStatus == nil { + return nil + } + + var results []Result + + // --name vs raft name. + deployName := ExtractFlag(cc.DeployArgs, "--name") + if deployName != "" && cc.RaftStatus.Name != "" && deployName != cc.RaftStatus.Name { + results = append(results, Fail(c.Name(), + fmt.Sprintf("Deployment --name=%s does not match raft status name %q", deployName, cc.RaftStatus.Name))) + } else if deployName != "" && cc.RaftStatus.Name != "" { + results = append(results, Pass(c.Name(), + fmt.Sprintf("Deployment --name=%s matches raft status", deployName))) + } + + // --peer list vs raft cluster names. + deployPeers := ExtractFlagAll(cc.DeployArgs, "--peer") + if len(cc.RaftStatus.ClusterNames) > 0 && len(deployPeers) > 0 { + raftNames := make(map[string]bool, len(cc.RaftStatus.ClusterNames)) + for _, n := range cc.RaftStatus.ClusterNames { + raftNames[n] = true + } + for _, p := range deployPeers { + // Peer format: name://address:port + peerName := strings.SplitN(p, "://", 2)[0] + if !raftNames[peerName] { + results = append(results, Fail(c.Name(), + fmt.Sprintf("Deployment --peer=%s references unknown cluster %q", p, peerName))) + } + } + if len(results) == 0 || allPassing(results) { + results = append(results, Pass(c.Name(), "Deployment --peer list matches raft cluster names")) + } + } + + return results +} + +func allPassing(results []Result) bool { + for _, r := range results { + if !r.OK { + return false + } + } + return true +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_pod.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_pod.go new file mode 100644 index 000000000..96cd81e61 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_pod.go @@ -0,0 +1,71 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// PodCheck finds the operator pod and validates it is running and ready. +// Populates cc.Pod for downstream checks. +type PodCheck struct{} + +func (c *PodCheck) Name() string { return "pod" } + +func (c *PodCheck) Run(ctx context.Context, cc *CheckContext) []Result { + var pods corev1.PodList + if err := cc.Ctl.List(ctx, cc.Namespace, &pods, client.MatchingLabels{ + "app.kubernetes.io/name": cc.ServiceName, + }); err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("listing pods: %v", err))} + } + if len(pods.Items) == 0 { + return []Result{Fail(c.Name(), fmt.Sprintf("no pods found with label app.kubernetes.io/name=%s in namespace %s", cc.ServiceName, cc.Namespace))} + } + + // Prefer running pods. + pod := &pods.Items[0] + for i := range pods.Items { + if pods.Items[i].Status.Phase == corev1.PodRunning { + pod = &pods.Items[i] + break + } + } + cc.Pod = pod + + var restarts int32 + for _, cs := range pod.Status.ContainerStatuses { + restarts += cs.RestartCount + } + + ready := podReady(pod) + if !ready { + return []Result{Fail(c.Name(), fmt.Sprintf("pod %s is not ready (phase: %s, restarts: %d)", pod.Name, pod.Status.Phase, restarts))} + } + + msg := fmt.Sprintf("pod %s is running and ready", pod.Name) + if restarts > 0 { + msg += fmt.Sprintf(" (%d restarts)", restarts) + } + return []Result{Pass(c.Name(), msg)} +} + +func podReady(pod *corev1.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady { + return cond.Status == corev1.ConditionTrue + } + } + return false +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_raft.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_raft.go new file mode 100644 index 000000000..0068261bc --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_raft.go @@ -0,0 +1,179 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "strings" + "time" + + "github.com/redpanda-data/common-go/kube" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + corev1 "k8s.io/api/core/v1" + + transportv1 "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1" +) + +const raftPort = 9443 + +// RaftCheck port-forwards to the operator pod's gRPC transport and queries the +// Status RPC to obtain raft state, leader, term, peers, and health. +// Requires cc.Pod to be set (by PodCheck). Populates cc.RaftStatus. +type RaftCheck struct{} + +func (c *RaftCheck) Name() string { return "raft" } + +func (c *RaftCheck) Run(ctx context.Context, cc *CheckContext) []Result { + if cc.Pod == nil { + return []Result{Fail(c.Name(), "skipped: no operator pod found")} + } + if cc.Pod.Status.Phase != corev1.PodRunning { + return []Result{Fail(c.Name(), fmt.Sprintf("skipped: pod %s is not running (phase: %s)", cc.Pod.Name, cc.Pod.Status.Phase))} + } + + status, err := queryRaftStatus(ctx, cc.Ctl, cc.Namespace, cc.SecretPrefix, cc.Pod) + if err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("cannot query raft status: %v", err))} + } + cc.RaftStatus = status + + var results []Result + if !status.IsHealthy { + results = append(results, Fail(c.Name(), "raft cluster reports unhealthy (no leader)")) + } else { + results = append(results, Pass(c.Name(), fmt.Sprintf("raft healthy: state=%s leader=%s term=%d peers=%d", + status.RaftState, status.Leader, status.Term, len(status.ClusterNames)))) + } + if len(status.UnhealthyPeers) > 0 { + results = append(results, Fail(c.Name(), fmt.Sprintf("unhealthy peers: %s", strings.Join(status.UnhealthyPeers, ", ")))) + } + return results +} + +func queryRaftStatus(ctx context.Context, ctl *kube.Ctl, namespace, secretPrefix string, pod *corev1.Pod) (*transportv1.StatusResponse, error) { + forwardedPorts, stop, err := ctl.PortForward(ctx, pod, io.Discard, io.Discard) + if err != nil { + return nil, fmt.Errorf("port-forward to %s: %w", pod.Name, err) + } + defer stop() + + // Find the forwarded local port that maps to the raft gRPC port. + var localPort uint16 + for _, fp := range forwardedPorts { + if fp.Remote == raftPort { + localPort = fp.Local + break + } + } + if localPort == 0 { + return nil, fmt.Errorf("port %d not found in forwarded ports for pod %s", raftPort, pod.Name) + } + + tlsCreds, serverName, err := grpcCredsFromPod(ctx, ctl, namespace, secretPrefix, pod) + if err != nil { + return nil, fmt.Errorf("building gRPC credentials: %w", err) + } + + addr := fmt.Sprintf("127.0.0.1:%d", localPort) + + tlsConfig := &tls.Config{} //nolint:gosec + tlsCreds(tlsConfig) + tlsConfig.ServerName = serverName + + conn, err := grpc.NewClient(addr, + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + ) + if err != nil { + return nil, fmt.Errorf("gRPC dial: %w", err) + } + defer conn.Close() + + client := transportv1.NewTransportServiceClient(conn) + + callCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + return client.Status(callCtx, &transportv1.StatusRequest{}) +} + +func grpcCredsFromPod(ctx context.Context, ctl *kube.Ctl, namespace, secretPrefix string, pod *corev1.Pod) (func(*tls.Config), string, error) { + var secretName string + // Prefer exact match: look for the volume whose name is + // -multicluster-certificates and read SecretName from it. + if secretPrefix != "" { + target := secretPrefix + "-multicluster-certificates" + for _, vol := range pod.Spec.Volumes { + if vol.Name == target && vol.Secret != nil { + secretName = vol.Secret.SecretName + break + } + } + } + // Fallback: any volume whose name ends with the multicluster suffix. + if secretName == "" { + for _, vol := range pod.Spec.Volumes { + if strings.HasSuffix(vol.Name, "-multicluster-certificates") && vol.Secret != nil { + secretName = vol.Secret.SecretName + break + } + } + } + if secretName == "" { + return nil, "", fmt.Errorf("pod %s has no multicluster-certificates volume", pod.Name) + } + + var secret corev1.Secret + if err := ctl.Get(ctx, kube.ObjectKey{Name: secretName, Namespace: namespace}, &secret); err != nil { + return nil, "", fmt.Errorf("getting secret %s: %w", secretName, err) + } + + certificate, err := tls.X509KeyPair(secret.Data["tls.crt"], secret.Data["tls.key"]) + if err != nil { + return nil, "", fmt.Errorf("loading key pair: %w", err) + } + + caPool := x509.NewCertPool() + if !caPool.AppendCertsFromPEM(secret.Data["ca.crt"]) { + return nil, "", fmt.Errorf("invalid CA certificate") + } + + leaf, err := x509.ParseCertificate(certificate.Certificate[0]) + if err != nil { + return nil, "", fmt.Errorf("parsing leaf certificate: %w", err) + } + + // Determine the ServerName for TLS verification. The cert may have DNS + // SANs (service FQDN) or only IP SANs (ClusterIP). When port-forwarding + // we connect to 127.0.0.1, so we need to set ServerName to a SAN that + // the cert actually contains. If the cert only has IP SANs (no DNS + // names), we skip hostname verification since we're already + // authenticating via mTLS and the connection goes through a + // port-forward tunnel. + serverName := "" + skipVerify := false + if len(leaf.DNSNames) > 0 { + serverName = leaf.DNSNames[0] + } else { + skipVerify = true + } + + return func(cfg *tls.Config) { + cfg.Certificates = []tls.Certificate{certificate} + cfg.RootCAs = caPool + if skipVerify { + cfg.InsecureSkipVerify = true //nolint:gosec // port-forward tunnel; mTLS authenticates the peer + } + }, serverName, nil +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls.go new file mode 100644 index 000000000..196473e72 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls.go @@ -0,0 +1,167 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "crypto/ecdsa" + "crypto/x509" + "encoding/pem" + "fmt" + "strings" + "time" + + "github.com/redpanda-data/common-go/kube" + corev1 "k8s.io/api/core/v1" +) + +// TLSCheck reads the multicluster TLS certificate secret and validates the CA, +// leaf certificate, private key, chain of trust, expiry, and SANs. +// Populates cc.TLSSecret, cc.CACert, cc.TLSCert, cc.TLSKeyMatch. +type TLSCheck struct{} + +func (c *TLSCheck) Name() string { return "tls" } + +func (c *TLSCheck) Run(ctx context.Context, cc *CheckContext) []Result { + secretName := c.findSecretName(cc) + if secretName == "" { + // Fall back: if we know the expected secret name, try it directly. + if cc.SecretPrefix != "" { + secretName = cc.SecretPrefix + "-multicluster-certificates" + } else { + // Last resort: scan all secrets by suffix. + var secrets corev1.SecretList + if err := cc.Ctl.List(ctx, cc.Namespace, &secrets); err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("listing secrets: %v", err))} + } + for _, sec := range secrets.Items { + if strings.HasSuffix(sec.Name, "-multicluster-certificates") { + secretName = sec.Name + break + } + } + } + } + + if secretName == "" { + return []Result{Fail(c.Name(), fmt.Sprintf("no multicluster-certificates secret found in namespace %s", cc.Namespace))} + } + + var secret corev1.Secret + secret.Name = secretName + secret.Namespace = cc.Namespace + err := cc.Ctl.WaitFor(ctx, &secret, func(_ kube.Object, err error) (bool, error) { + return err == nil, nil + }) + if err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("cannot read secret %s: %v", secretName, err))} + } + cc.TLSSecret = &secret + + caPEM := secret.Data["ca.crt"] + certPEM := secret.Data["tls.crt"] + keyPEM := secret.Data["tls.key"] + + if len(caPEM) == 0 || len(certPEM) == 0 || len(keyPEM) == 0 { + return []Result{Fail(c.Name(), fmt.Sprintf("secret %s missing required keys (ca.crt, tls.crt, tls.key)", secretName))} + } + + var results []Result + + // Parse CA. + caBlock, _ := pem.Decode(caPEM) + if caBlock == nil { + return []Result{Fail(c.Name(), fmt.Sprintf("secret %s: ca.crt is not valid PEM", secretName))} + } + caCert, err := x509.ParseCertificate(caBlock.Bytes) + if err != nil { + return []Result{Fail(c.Name(), fmt.Sprintf("secret %s: cannot parse ca.crt: %v", secretName, err))} + } + cc.CACert = caCert + + if !caCert.IsCA { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: ca.crt is not a CA certificate", secretName))) + } + if caCert.KeyUsage&x509.KeyUsageCertSign == 0 { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: ca.crt missing CertSign key usage", secretName))) + } + + // Parse leaf cert. + certBlock, _ := pem.Decode(certPEM) + if certBlock == nil { + return append(results, Fail(c.Name(), fmt.Sprintf("secret %s: tls.crt is not valid PEM", secretName))) + } + cert, err := x509.ParseCertificate(certBlock.Bytes) + if err != nil { + return append(results, Fail(c.Name(), fmt.Sprintf("secret %s: cannot parse tls.crt: %v", secretName, err))) + } + cc.TLSCert = cert + + // Expiry. + now := time.Now() + if now.After(cert.NotAfter) { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: tls.crt expired on %s", secretName, cert.NotAfter.Format(time.DateOnly)))) + } + if now.After(caCert.NotAfter) { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: ca.crt expired on %s", secretName, caCert.NotAfter.Format(time.DateOnly)))) + } + + // Chain verification. + pool := x509.NewCertPool() + pool.AddCert(caCert) + if _, err := cert.Verify(x509.VerifyOptions{Roots: pool, KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageAny}}); err != nil { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: tls.crt does not chain to ca.crt: %v", secretName, err))) + } + + // Key match. + keyBlock, _ := pem.Decode(keyPEM) + if keyBlock != nil { + privKey, err := x509.ParseECPrivateKey(keyBlock.Bytes) + if err == nil { + pubKey, ok := cert.PublicKey.(*ecdsa.PublicKey) + if ok && privKey.PublicKey.Equal(pubKey) { + cc.TLSKeyMatch = true + } else { + results = append(results, Fail(c.Name(), fmt.Sprintf("secret %s: tls.key does not match tls.crt public key", secretName))) + } + } + } + + if len(results) == 0 { + results = append(results, Pass(c.Name(), fmt.Sprintf("secret %s: certificates valid and chain verified", secretName))) + } + return results +} + +func (c *TLSCheck) findSecretName(cc *CheckContext) string { + if cc.Deployment == nil { + return "" + } + // Prefer exact match: look for the volume whose name is + // -multicluster-certificates (as set by the helm chart). + // Read the SecretName from that volume rather than matching on the + // secret name directly, so user-defined volumes with similar secret + // names don't produce false matches. + if cc.SecretPrefix != "" { + target := cc.SecretPrefix + "-multicluster-certificates" + for _, vol := range cc.Deployment.Spec.Template.Spec.Volumes { + if vol.Name == target && vol.Secret != nil { + return vol.Secret.SecretName + } + } + } + // Fallback: any volume whose name ends with the multicluster suffix. + for _, vol := range cc.Deployment.Spec.Template.Spec.Volumes { + if strings.HasSuffix(vol.Name, "-multicluster-certificates") && vol.Secret != nil { + return vol.Secret.SecretName + } + } + return "" +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls_san.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls_san.go new file mode 100644 index 000000000..799ceb021 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cluster_tls_san.go @@ -0,0 +1,39 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "context" + "fmt" + "strings" +) + +// TLSSANCheck re-validates TLS certificate SANs after raft status is available. +// This runs after both TLSCheck and RaftCheck have populated CheckContext. +type TLSSANCheck struct{} + +func (c *TLSSANCheck) Name() string { return "tls-san" } + +func (c *TLSSANCheck) Run(_ context.Context, cc *CheckContext) []Result { + if cc.TLSCert == nil || cc.RaftStatus == nil || cc.RaftStatus.Name == "" { + return nil + } + expectedName := cc.RaftStatus.Name + for _, dns := range cc.TLSCert.DNSNames { + if strings.Contains(dns, expectedName) { + return []Result{Pass(c.Name(), fmt.Sprintf("tls.crt SAN matches expected name %q", expectedName))} + } + } + secretName := "" + if cc.TLSSecret != nil { + secretName = cc.TLSSecret.Name + } + return []Result{Fail(c.Name(), fmt.Sprintf("secret %s: tls.crt SANs %v do not contain expected name %q", secretName, cc.TLSCert.DNSNames, expectedName))} +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_ca_consistency.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_ca_consistency.go new file mode 100644 index 000000000..3818c532a --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_ca_consistency.go @@ -0,0 +1,45 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +// CAConsistencyCheck verifies that all clusters share the same CA certificate. +type CAConsistencyCheck struct{} + +func (c *CAConsistencyCheck) Name() string { return "ca-consistency" } + +func (c *CAConsistencyCheck) Run(contexts []*CheckContext) []Result { + type caInfo struct { + context string + raw []byte + } + var cas []caInfo + for _, cc := range contexts { + if cc.CACert != nil { + cas = append(cas, caInfo{context: cc.Context, raw: cc.CACert.Raw}) + } + } + + if len(cas) < 2 { + return nil // not enough data to compare + } + + allSame := true + for i := 1; i < len(cas); i++ { + if string(cas[i].raw) != string(cas[0].raw) { + allSame = false + break + } + } + + if allSame { + return []Result{Pass(c.Name(), "all clusters share the same CA")} + } + return []Result{Fail(c.Name(), "CA certificate mismatch across clusters — raft mTLS will fail")} +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_leader_agreement.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_leader_agreement.go new file mode 100644 index 000000000..9b503fdf3 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_leader_agreement.go @@ -0,0 +1,52 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "fmt" + "strings" +) + +// LeaderAgreementCheck verifies that all operators agree on the current leader. +type LeaderAgreementCheck struct{} + +func (c *LeaderAgreementCheck) Name() string { return "leader-agreement" } + +func (c *LeaderAgreementCheck) Run(contexts []*CheckContext) []Result { + leaderToContexts := map[string][]string{} + var maxTerm uint64 + for _, cc := range contexts { + if cc.RaftStatus == nil { + continue + } + if cc.RaftStatus.Leader != "" { + leaderToContexts[cc.RaftStatus.Leader] = append(leaderToContexts[cc.RaftStatus.Leader], cc.Context) + } + if cc.RaftStatus.Term > maxTerm { + maxTerm = cc.RaftStatus.Term + } + } + + if len(leaderToContexts) == 0 { + return []Result{Fail(c.Name(), "no cluster reports a leader")} + } + if len(leaderToContexts) == 1 { + for leader := range leaderToContexts { + return []Result{Pass(c.Name(), fmt.Sprintf("leader agreement: %s (term %d)", leader, maxTerm))} + } + } + + var results []Result + for leader, ctxs := range leaderToContexts { + results = append(results, Fail(c.Name(), + fmt.Sprintf("leader disagreement: %s says leader is %q", strings.Join(ctxs, ", "), leader))) + } + return results +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_peer_agreement.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_peer_agreement.go new file mode 100644 index 000000000..974183825 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_peer_agreement.go @@ -0,0 +1,64 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "fmt" + "sort" + "strings" +) + +// PeerAgreementCheck verifies that all operators see the same set of cluster names. +type PeerAgreementCheck struct{} + +func (c *PeerAgreementCheck) Name() string { return "peer-agreement" } + +func (c *PeerAgreementCheck) Run(contexts []*CheckContext) []Result { + type peerSet struct { + key string + context string + } + var sets []peerSet + for _, cc := range contexts { + if cc.RaftStatus == nil { + continue + } + names := make([]string, len(cc.RaftStatus.ClusterNames)) + copy(names, cc.RaftStatus.ClusterNames) + sort.Strings(names) + sets = append(sets, peerSet{ + key: strings.Join(names, ","), + context: cc.Context, + }) + } + + if len(sets) == 0 { + return []Result{Fail(c.Name(), "no clusters reachable for peer list comparison")} + } + + allSame := true + for i := 1; i < len(sets); i++ { + if sets[i].key != sets[0].key { + allSame = false + break + } + } + + if allSame { + return []Result{Pass(c.Name(), "peer lists agree across all clusters")} + } + + var results []Result + for _, ps := range sets { + results = append(results, Fail(c.Name(), + fmt.Sprintf("peer list mismatch: %s sees [%s]", ps.context, ps.key))) + } + return results +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_unique_names.go b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_unique_names.go new file mode 100644 index 000000000..b7a04c640 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/checks/cross_cluster_unique_names.go @@ -0,0 +1,47 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package checks + +import ( + "fmt" + "strings" +) + +// UniqueNamesCheck verifies that every operator reports a distinct node name. +type UniqueNamesCheck struct{} + +func (c *UniqueNamesCheck) Name() string { return "unique-names" } + +func (c *UniqueNamesCheck) Run(contexts []*CheckContext) []Result { + nameToContexts := map[string][]string{} + for _, cc := range contexts { + if cc.RaftStatus != nil && cc.RaftStatus.Name != "" { + nameToContexts[cc.RaftStatus.Name] = append(nameToContexts[cc.RaftStatus.Name], cc.Context) + } + } + + if len(nameToContexts) == 0 { + return []Result{Fail(c.Name(), "no clusters reported a node name")} + } + + var results []Result + hasDuplicates := false + for name, ctxs := range nameToContexts { + if len(ctxs) > 1 { + results = append(results, Fail(c.Name(), + fmt.Sprintf("duplicate node name %q reported by contexts: %s", name, strings.Join(ctxs, ", ")))) + hasDuplicates = true + } + } + if !hasDuplicates { + results = append(results, Pass(c.Name(), "all node names are unique")) + } + return results +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/connection.go b/operator/cmd/rpk-k8s/k8s/multicluster/connection.go new file mode 100644 index 000000000..a6c42dd6b --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/connection.go @@ -0,0 +1,194 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "fmt" + "strings" + + "github.com/redpanda-data/common-go/kube" + "github.com/spf13/cobra" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" +) + +// ClusterConnection holds the resolved client for a single cluster. +type ClusterConnection struct { + Name string + Ctl *kube.Ctl + // SecretPrefix is the helm fullname of the operator on this cluster, + // used to derive the TLS secret name (-multicluster-certificates). + // Defaults to Name (the kubernetes context name) when not set. + SecretPrefix string +} + +// ConnectionConfig holds the configuration for connecting to multiple k8s +// clusters. It can be populated from CLI flags via BindFlags or set +// programmatically for testing. +type ConnectionConfig struct { + // Kubeconfig is the path to a kubeconfig file. When set, all contexts in + // the file are used unless Contexts is also specified. + Kubeconfig string + // Contexts lists specific kubeconfig contexts to use. When Kubeconfig is + // empty, these are resolved via the default kubeconfig loading rules. + Contexts []string + // Namespace is the namespace for operator resources. + Namespace string + // ServiceName is the operator service name used for label selection + // (app.kubernetes.io/name). Typically "operator" for the redpanda operator chart. + ServiceName string + // NameOverrides maps context names to their helm fullname override, in + // "context=prefix" format. The prefix is used to derive the TLS secret + // name (-multicluster-certificates). Defaults to the context name. + NameOverrides []string + + // Connections is a pre-resolved list of cluster connections. When set, + // Kubeconfig and Contexts are ignored. This allows tests to pass + // kube.Ctl instances directly without writing kubeconfig files to disk. + Connections []ClusterConnection +} + +// BindFlags registers the connection flags on the given cobra command. +func (c *ConnectionConfig) BindFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&c.Kubeconfig, "kubeconfig", "", "Path to a kubeconfig file (all contexts in the file are used unless --context is also specified)") + cmd.Flags().StringSliceVar(&c.Contexts, "context", nil, "Kubernetes contexts (repeatable; if omitted with --kubeconfig, all contexts in the file are used)") + cmd.Flags().StringVar(&c.Namespace, "namespace", "redpanda", "Namespace for operator resources") + cmd.Flags().StringVar(&c.ServiceName, "service-name", "operator", "Operator deployment label selector value (app.kubernetes.io/name)") + cmd.Flags().StringArrayVar(&c.NameOverrides, "name-override", nil, "Override the TLS secret prefix for a context in context=prefix format (repeatable; defaults to the context name)") +} + +// Resolve returns ClusterConnections from the config. If Connections is +// already set (programmatic use), it is returned directly (with SecretPrefix +// defaulted to Name for any connection that has no prefix set). Otherwise, +// connections are built from Kubeconfig/Contexts flags and NameOverrides +// are applied. +func (c *ConnectionConfig) Resolve() ([]ClusterConnection, error) { + nameOverrides, err := parseNameOverrides(c.NameOverrides) + if err != nil { + return nil, err + } + + if len(c.Connections) > 0 { + conns := make([]ClusterConnection, len(c.Connections)) + copy(conns, c.Connections) + for i, conn := range conns { + if conn.SecretPrefix == "" { + if override, ok := nameOverrides[conn.Name]; ok { + conns[i].SecretPrefix = override + } else { + conns[i].SecretPrefix = conn.Name + } + } + } + return conns, nil + } + + if c.Kubeconfig == "" && len(c.Contexts) == 0 { + return nil, fmt.Errorf("either --kubeconfig or --context must be specified") + } + + var conns []ClusterConnection + if c.Kubeconfig != "" { + conns, err = connectionsFromKubeconfig(c.Kubeconfig, c.Contexts) + } else { + conns, err = connectionsFromContexts(c.Contexts) + } + if err != nil { + return nil, err + } + + for i, conn := range conns { + if override, ok := nameOverrides[conn.Name]; ok { + conns[i].SecretPrefix = override + } else { + conns[i].SecretPrefix = conn.Name + } + } + return conns, nil +} + +func connectionsFromKubeconfig(path string, contexts []string) ([]ClusterConnection, error) { + config, err := clientcmd.LoadFromFile(path) + if err != nil { + return nil, fmt.Errorf("loading kubeconfig from %s: %w", path, err) + } + + if len(contexts) > 0 { + for _, name := range contexts { + if _, ok := config.Contexts[name]; !ok { + return nil, fmt.Errorf("context %q not found in kubeconfig %s", name, path) + } + } + } else { + for name := range config.Contexts { + contexts = append(contexts, name) + } + } + + if len(contexts) == 0 { + return nil, fmt.Errorf("no contexts found in kubeconfig %s", path) + } + + conns := make([]ClusterConnection, 0, len(contexts)) + for _, name := range contexts { + rc, err := configFromAPIConfig(config, name) + if err != nil { + return nil, fmt.Errorf("building REST config for context %s: %w", name, err) + } + ctl, err := kube.FromRESTConfig(rc) + if err != nil { + return nil, fmt.Errorf("building kube.Ctl for context %s: %w", name, err) + } + conns = append(conns, ClusterConnection{Name: name, Ctl: ctl}) + } + return conns, nil +} + +func connectionsFromContexts(contexts []string) ([]ClusterConnection, error) { + conns := make([]ClusterConnection, 0, len(contexts)) + for _, name := range contexts { + rc, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{CurrentContext: name}, + ).ClientConfig() + if err != nil { + return nil, fmt.Errorf("building REST config for context %s: %w", name, err) + } + ctl, err := kube.FromRESTConfig(rc) + if err != nil { + return nil, fmt.Errorf("building kube.Ctl for context %s: %w", name, err) + } + conns = append(conns, ClusterConnection{Name: name, Ctl: ctl}) + } + return conns, nil +} + +func configFromAPIConfig(config *clientcmdapi.Config, contextName string) (*kube.RESTConfig, error) { + return clientcmd.NewNonInteractiveClientConfig( + *config, + contextName, + &clientcmd.ConfigOverrides{CurrentContext: contextName}, + nil, + ).ClientConfig() +} + +// parseNameOverrides parses --name-override flags in "context=prefix" format +// into a map keyed by context name. +func parseNameOverrides(overrides []string) (map[string]string, error) { + m := make(map[string]string, len(overrides)) + for _, o := range overrides { + parts := strings.SplitN(o, "=", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return nil, fmt.Errorf("invalid --name-override format %q, expected context=prefix", o) + } + m[parts[0]] = parts[1] + } + return m, nil +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/multicluster.go b/operator/cmd/rpk-k8s/k8s/multicluster/multicluster.go new file mode 100644 index 000000000..328ebcb91 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/multicluster.go @@ -0,0 +1,29 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "github.com/spf13/cobra" +) + +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "multicluster", + Short: "Manage Redpanda multicluster deployments on Kubernetes", + Long: "Commands for bootstrapping and managing Redpanda multicluster deployments across multiple Kubernetes clusters.", + } + + cmd.AddCommand( + bootstrapCommand(), + statusCommand(), + ) + + return cmd +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/multicluster_test.go b/operator/cmd/rpk-k8s/k8s/multicluster/multicluster_test.go new file mode 100644 index 000000000..d49281464 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/multicluster_test.go @@ -0,0 +1,276 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster_test + +import ( + "bytes" + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "os" + "testing" + "time" + + "github.com/redpanda-data/common-go/kube" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + operatorchart "github.com/redpanda-data/redpanda-operator/operator/chart" + "github.com/redpanda-data/redpanda-operator/operator/cmd/rpk-k8s/k8s/multicluster" + "github.com/redpanda-data/redpanda-operator/pkg/testutil" + "github.com/redpanda-data/redpanda-operator/pkg/vcluster" +) + +const ( + operatorChartPath = "../../../../../operator/chart" + licenseEnvVar = "REDPANDA_SAMPLE_LICENSE" +) + +func TestMulticlusterBootstrapAndStatus(t *testing.T) { + testutil.SkipIfNotMulticluster(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + opts := vcluster.MulticlusterOptions{ + Size: 3, + Namespace: "default", + OperatorChartPath: operatorChartPath, + } + + // Create 3 vclusters. + mc := vcluster.NewMulticluster(t, ctx, opts) + require.Len(t, mc.Nodes, 3) + + // Build connections for the Config structs. + conns := connectionsFromNodes(t, mc.Nodes) + + // Build DNS overrides from node external IPs. + var dnsOverrides []string + for _, node := range mc.Nodes { + dnsOverrides = append(dnsOverrides, fmt.Sprintf("%s=%s", node.Name(), node.ExternalIP())) + } + + // Run the bootstrap command and verify its results. + t.Run("bootstrap", func(t *testing.T) { + var out bytes.Buffer + cfg := multicluster.BootstrapConfig{ + Connection: multicluster.ConnectionConfig{ + Namespace: opts.Namespace, + Connections: conns, + }, + Organization: "Redpanda", + DNSOverrides: dnsOverrides, + TLS: true, + Kubeconfigs: true, + CreateNS: true, + } + require.NoError(t, cfg.Run(ctx, &out), "bootstrap failed: %s", out.String()) + t.Logf("bootstrap output: %s", out.String()) + + t.Run("creates_tls_secrets", func(t *testing.T) { + t.Parallel() + for _, node := range mc.Nodes { + var secrets corev1.SecretList + require.NoError(t, node.Ctl().List(ctx, opts.Namespace, &secrets)) + + var found bool + for _, sec := range secrets.Items { + if sec.Name == "redpanda-operator-multicluster-certificates" { + found = true + assert.NotEmpty(t, sec.Data["ca.crt"]) + assert.NotEmpty(t, sec.Data["tls.crt"]) + assert.NotEmpty(t, sec.Data["tls.key"]) + break + } + } + assert.True(t, found, "TLS secret not found in vcluster %s", node.Name()) + } + }) + + t.Run("ca_is_consistent", func(t *testing.T) { + t.Parallel() + var cas [][]byte + for _, node := range mc.Nodes { + var sec corev1.Secret + require.NoError(t, node.Ctl().Get(ctx, kube.ObjectKey{ + Name: "redpanda-operator-multicluster-certificates", Namespace: opts.Namespace, + }, &sec)) + cas = append(cas, sec.Data["ca.crt"]) + } + for i := 1; i < len(cas); i++ { + assert.Equal(t, string(cas[0]), string(cas[i]), + "CA mismatch between cluster 0 and cluster %d", i) + } + }) + + t.Run("certs_are_valid", func(t *testing.T) { + t.Parallel() + for _, node := range mc.Nodes { + var sec corev1.Secret + require.NoError(t, node.Ctl().Get(ctx, kube.ObjectKey{ + Name: "redpanda-operator-multicluster-certificates", Namespace: opts.Namespace, + }, &sec)) + + caBlock, _ := pem.Decode(sec.Data["ca.crt"]) + require.NotNil(t, caBlock) + caCert, err := x509.ParseCertificate(caBlock.Bytes) + require.NoError(t, err) + assert.True(t, caCert.IsCA) + + certBlock, _ := pem.Decode(sec.Data["tls.crt"]) + require.NotNil(t, certBlock) + cert, err := x509.ParseCertificate(certBlock.Bytes) + require.NoError(t, err) + + pool := x509.NewCertPool() + pool.AddCert(caCert) + _, err = cert.Verify(x509.VerifyOptions{ + Roots: pool, + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageAny}, + }) + assert.NoError(t, err, "cert chain verification failed on %s", node.Name()) + assert.True(t, time.Now().Before(cert.NotAfter), + "cert expired on %s in %s", cert.NotAfter, node.Name()) + } + }) + }) + + // Deploy operators (requires license). + license := os.Getenv(licenseEnvVar) + if license == "" { + t.Log("REDPANDA_SAMPLE_LICENSE not set, skipping operator deployment tests") + return + } + + // Create license secret on each node before deploying. + for _, node := range mc.Nodes { + require.NoError(t, node.Ctl().Create(ctx, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "redpanda-license", + Namespace: opts.Namespace, + }, + Data: map[string][]byte{ + "redpanda.license": []byte(license), + }, + })) + } + + peers := buildPeers(mc.Nodes) + mc.DeployOperators(t, ctx, opts, func(node *vcluster.MulticlusterNode) any { + return operatorchart.PartialValues{ + CRDs: &operatorchart.PartialCRDs{ + Enabled: ptr.To(true), + Experimental: ptr.To(true), + }, + LogLevel: ptr.To("debug"), + Multicluster: &operatorchart.PartialMulticluster{ + Enabled: ptr.To(true), + Name: ptr.To(node.Name()), + KubernetesAPIExternalAddress: ptr.To(node.APIServer()), + Peers: peers, + }, + Image: &operatorchart.PartialImage{ + Repository: ptr.To("localhost/redpanda-operator"), + Tag: ptr.To("dev"), + }, + Enterprise: &operatorchart.PartialEnterprise{ + LicenseSecretRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "redpanda-license"}, + Key: "redpanda.license", + }, + }, + } + }) + + // Wait for operators to become ready before running status checks. + for _, node := range mc.Nodes { + require.Eventually(t, func() bool { + var pods corev1.PodList + if err := node.Ctl().List(ctx, opts.Namespace, &pods, client.MatchingLabels{ + "app.kubernetes.io/name": "operator", + }); err != nil || len(pods.Items) == 0 { + return false + } + for _, cond := range pods.Items[0].Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + return true + } + } + return false + }, 5*time.Minute, 5*time.Second, + "operator pod never became ready in %s", node.Name()) + } + + t.Run("status", func(t *testing.T) { + var out bytes.Buffer + cfg := multicluster.StatusConfig{ + Connection: multicluster.ConnectionConfig{ + Namespace: opts.Namespace, + ServiceName: "operator", + Connections: conns, + }, + } + result, err := cfg.Run(ctx, &out) + require.NoError(t, err, "status failed: %s", out.String()) + t.Logf("status output:\n%s", out.String()) + + // Verify all clusters appear in the output. + for _, node := range mc.Nodes { + assert.Contains(t, out.String(), node.Name()) + } + + // Pod and deployment checks should pass. + for i, rs := range result.ClusterResults { + for _, r := range rs { + if r.Name == "pod" || r.Name == "deployment" { + assert.True(t, r.OK, "[%s] check %s failed: %s", + result.Contexts[i].Context, r.Name, r.Message) + } + } + } + + // CA consistency should pass after bootstrap. + for _, r := range result.CrossResults { + if r.Name == "ca-consistency" || r.Name == "unique-names" { + assert.True(t, r.OK, "cross-cluster check %s failed: %s", r.Name, r.Message) + } + } + }) +} + +func connectionsFromNodes(t *testing.T, nodes []*vcluster.MulticlusterNode) []multicluster.ClusterConnection { + t.Helper() + conns := make([]multicluster.ClusterConnection, len(nodes)) + for i, node := range nodes { + conns[i] = multicluster.ClusterConnection{ + Name: node.Name(), + Ctl: node.Ctl(), + SecretPrefix: "redpanda-operator", // matches Fullname for helm release "redpanda" with the operator chart + } + } + return conns +} + +func buildPeers(nodes []*vcluster.MulticlusterNode) []operatorchart.PartialPeer { + peers := make([]operatorchart.PartialPeer, len(nodes)) + for i, node := range nodes { + peers[i] = operatorchart.PartialPeer{ + Name: ptr.To(node.Name()), + Address: ptr.To(node.ExternalIP()), + } + } + return peers +} diff --git a/operator/cmd/rpk-k8s/k8s/multicluster/status.go b/operator/cmd/rpk-k8s/k8s/multicluster/status.go new file mode 100644 index 000000000..7f47a5d56 --- /dev/null +++ b/operator/cmd/rpk-k8s/k8s/multicluster/status.go @@ -0,0 +1,214 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package multicluster + +import ( + "context" + "fmt" + "io" + "text/tabwriter" + + "github.com/spf13/cobra" + + "github.com/redpanda-data/redpanda-operator/operator/cmd/rpk-k8s/k8s/multicluster/checks" +) + +// StatusConfig holds the configuration for the status command. +// It can be populated from CLI flags via the cobra command or set +// programmatically for testing. +type StatusConfig struct { + Connection ConnectionConfig +} + +// Per-cluster checks run in order. Later checks may depend on state +// populated by earlier ones via CheckContext. +var defaultClusterChecks = []checks.ClusterCheck{ + &checks.PodCheck{}, + &checks.DeploymentCheck{}, + &checks.TLSCheck{}, + &checks.RaftCheck{}, + &checks.TLSSANCheck{}, + &checks.DeploymentRaftCheck{}, +} + +// Cross-cluster checks run once after all per-cluster checks complete. +var defaultCrossClusterChecks = []checks.CrossClusterCheck{ + &checks.UniqueNamesCheck{}, + &checks.PeerAgreementCheck{}, + &checks.LeaderAgreementCheck{}, + &checks.CAConsistencyCheck{}, +} + +// StatusResult holds the full output of a status run, for programmatic +// inspection by tests. +type StatusResult struct { + Contexts []*checks.CheckContext + ClusterResults [][]checks.Result + CrossResults []checks.Result +} + +// Run executes the status checks and writes formatted output to out. +// Returns the full result for programmatic inspection. +func (c *StatusConfig) Run(ctx context.Context, out io.Writer) (*StatusResult, error) { + conns, err := c.Connection.Resolve() + if err != nil { + return nil, err + } + + contexts := make([]*checks.CheckContext, len(conns)) + clusterResults := make([][]checks.Result, len(conns)) + + for i, conn := range conns { + cc := &checks.CheckContext{ + Context: conn.Name, + Namespace: c.Connection.Namespace, + ServiceName: c.Connection.ServiceName, + SecretPrefix: conn.SecretPrefix, + Ctl: conn.Ctl, + } + contexts[i] = cc + clusterResults[i] = checks.RunClusterChecks(ctx, cc, defaultClusterChecks) + } + + crossResults := checks.RunCrossClusterChecks(contexts, defaultCrossClusterChecks) + + printStatusTable(out, contexts) + printClusterResults(out, contexts, clusterResults) + printCrossClusterResults(out, crossResults) + + return &StatusResult{ + Contexts: contexts, + ClusterResults: clusterResults, + CrossResults: crossResults, + }, nil +} + +func statusCommand() *cobra.Command { + var cfg StatusConfig + + cmd := &cobra.Command{ + Use: "status", + Short: "Check the health of a multicluster operator deployment", + Long: `Checks each cluster's operator pod health, raft consensus state, +TLS certificate validity, and cross-cluster consistency. + +Connects to each specified Kubernetes context, finds the multicluster operator +pod, port-forwards to its gRPC transport, and queries raft status. Also +inspects Deployment configuration and TLS secrets for correctness.`, + Example: ` # Check status across all clusters in a kubeconfig + rpk k8s multicluster status --kubeconfig /path/to/kubeconfig + + # Check specific clusters + rpk k8s multicluster status \ + --context cluster-a --context cluster-b --context cluster-c + + # Override the TLS secret prefix per context (when helm release names differ from context names) + rpk k8s multicluster status \ + --context cluster-a --context cluster-b \ + --name-override cluster-a=redpanda-operator \ + --name-override cluster-b=rp-operator`, + RunE: func(cmd *cobra.Command, args []string) error { + _, err := cfg.Run(cmd.Context(), cmd.OutOrStdout()) + return err + }, + } + + cfg.Connection.BindFlags(cmd) + + return cmd +} + +func printStatusTable(w io.Writer, contexts []*checks.CheckContext) { + tw := tabwriter.NewWriter(w, 0, 4, 2, ' ', 0) + fmt.Fprintln(tw, "CLUSTER\tOPERATOR\tRAFT-STATE\tLEADER\tPEERS\tUNHEALTHY\tTLS\tSECRETS") + + for _, cc := range contexts { + operator := "-" + if cc.Pod != nil { + operator = string(cc.Pod.Status.Phase) + var restarts int32 + for _, cs := range cc.Pod.Status.ContainerStatuses { + restarts += cs.RestartCount + } + if restarts > 0 { + operator = fmt.Sprintf("%s(%d)", operator, restarts) + } + } + + raftState, leader, peers, unhealthy := "-", "-", "-", "-" + if cc.RaftStatus != nil { + raftState = cc.RaftStatus.RaftState + leader = cc.RaftStatus.Leader + if leader == "" { + leader = "(none)" + } + peers = fmt.Sprintf("%d", len(cc.RaftStatus.ClusterNames)) + unhealthy = fmt.Sprintf("%d", len(cc.RaftStatus.UnhealthyPeers)) + } + + tlsStatus := "-" + if cc.CACert != nil { + tlsStatus = "ok" + } + + secrets := "-" + if cc.TLSSecret != nil { + secrets = "ok" + } + + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", + cc.Context, operator, raftState, leader, peers, unhealthy, tlsStatus, secrets) + } + _ = tw.Flush() +} + +func printClusterResults(w io.Writer, contexts []*checks.CheckContext, results [][]checks.Result) { + hasFailures := false + for _, rs := range results { + for _, r := range rs { + if !r.OK { + hasFailures = true + break + } + } + if hasFailures { + break + } + } + if !hasFailures { + return + } + + fmt.Fprintln(w) + fmt.Fprintln(w, "ISSUES:") + for i, rs := range results { + for _, r := range rs { + if !r.OK { + fmt.Fprintf(w, " ✗ %s: [%s] %s\n", contexts[i].Context, r.Name, r.Message) + } + } + } +} + +func printCrossClusterResults(w io.Writer, results []checks.Result) { + if len(results) == 0 { + return + } + + fmt.Fprintln(w) + fmt.Fprintln(w, "CROSS-CLUSTER:") + for _, r := range results { + if r.OK { + fmt.Fprintf(w, " ✓ [%s] %s\n", r.Name, r.Message) + } else { + fmt.Fprintf(w, " ✗ [%s] %s\n", r.Name, r.Message) + } + } +} diff --git a/operator/cmd/rpk-k8s/main.go b/operator/cmd/rpk-k8s/main.go new file mode 100644 index 000000000..6bc24276c --- /dev/null +++ b/operator/cmd/rpk-k8s/main.go @@ -0,0 +1,84 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package main + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/spf13/cobra" + + rpkk8s "github.com/redpanda-data/redpanda-operator/operator/cmd/rpk-k8s/k8s" +) + +func main() { + k8s := rpkk8s.Command() + + // rpk plugin autocomplete support: when invoked with + // --help-autocomplete, emit JSON describing all subcommands + // so rpk can register them for shell completion. + if len(os.Args) > 1 && os.Args[1] == "--help-autocomplete" { + helps := collectHelps("k8s", k8s) + out, err := json.Marshal(helps) + if err != nil { + fmt.Fprintf(os.Stderr, "error marshalling help: %v\n", err) + os.Exit(1) + } + fmt.Println(string(out)) + return + } + + // Wrap k8s under a phantom "rpk" root so cobra renders usage paths as + // "rpk k8s ..." instead of "k8s ...". Cobra derives CommandPath from + // parent names, so the phantom parent is the only way to inject "rpk" + // into the displayed path. We prepend "k8s" to the real args so routing + // goes through the full phantom→k8s→... tree. + root := &cobra.Command{ + Use: "rpk", + SilenceUsage: true, + } + root.AddCommand(k8s) + root.SetArgs(append([]string{"k8s"}, os.Args[1:]...)) + + if err := root.Execute(); err != nil { + os.Exit(1) + } +} + +type pluginHelp struct { + Path string `json:"path"` + Short string `json:"short"` + Long string `json:"long"` + Example string `json:"example"` + Args []string `json:"args"` +} + +func collectHelps(prefix string, cmd *cobra.Command) []pluginHelp { + var helps []pluginHelp + + helps = append(helps, pluginHelp{ + Path: prefix, + Short: cmd.Short, + Long: cmd.Long, + Example: cmd.Example, + Args: cmd.ValidArgs, + }) + + for _, sub := range cmd.Commands() { + if sub.Hidden { + continue + } + childPrefix := prefix + "_" + sub.Name() + helps = append(helps, collectHelps(childPrefix, sub)...) + } + + return helps +} diff --git a/operator/go.mod b/operator/go.mod index d18f1910c..018fb56df 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -53,6 +53,7 @@ require ( golang.org/x/sync v0.20.0 golang.org/x/time v0.14.0 golang.org/x/tools v0.42.0 + google.golang.org/grpc v1.79.3 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.18.5 @@ -344,7 +345,6 @@ require ( google.golang.org/genproto v0.0.0-20260217215200-42d3e9bedb6d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260319171110-e3a33c96fb44 // indirect - google.golang.org/grpc v1.79.3 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/pkg/multicluster/bootstrap/bootstrapper.go b/pkg/multicluster/bootstrap/bootstrapper.go index 7166772e0..b6c00521e 100644 --- a/pkg/multicluster/bootstrap/bootstrapper.go +++ b/pkg/multicluster/bootstrap/bootstrapper.go @@ -22,10 +22,14 @@ type RemoteConfiguration struct { ContextName string APIServer string ServiceAddress string + // Name is the helm fullname of the operator on this cluster, used as the + // TLS secret name prefix (-multicluster-certificates). When empty, + // BootstrapClusterConfiguration.ServiceName is used as the fallback. + Name string } func (r RemoteConfiguration) Client() (client.Client, error) { - config, err := configFromContext(r.ContextName) + config, err := r.Config() if err != nil { return nil, err } @@ -34,6 +38,9 @@ func (r RemoteConfiguration) Client() (client.Client, error) { } func (r RemoteConfiguration) Config() (*rest.Config, error) { + if r.KubeConfig != nil { + return r.KubeConfig, nil + } return configFromContext(r.ContextName) } @@ -53,8 +60,14 @@ func (r RemoteConfiguration) FQDN(c BootstrapClusterConfiguration) (string, erro if r.ServiceAddress != "" { return strings.Split(r.ServiceAddress, ":")[0], nil } - - return c.ServiceName + "-" + r.ContextName, nil + // Use the per-cluster helm fullname as the FQDN base when available, + // falling back to the global ServiceName. The ContextName disambiguates + // between clusters that share the same fullname. + base := r.Name + if base == "" { + base = c.ServiceName + } + return base + "-" + r.ContextName, nil } type BootstrapClusterConfiguration struct { @@ -108,13 +121,20 @@ func BootstrapKubernetesClusters(ctx context.Context, organization string, confi for i, cluster := range configuration.RemoteClusters { if configuration.BootstrapKubeconfigs { - for i := range kubeconfigs { - kubeconfig := kubeconfigs[i] + // Use the per-cluster helm fullname as the kubeconfig secret prefix so + // that it matches --kubeconfig-name=Fullname(dot) in the operator chart. + // Fall back to ServiceName when Name is not set. + kubeconfigPrefix := cluster.Name + if kubeconfigPrefix == "" { + kubeconfigPrefix = configuration.ServiceName + } + for j := range kubeconfigs { + kubeconfig := kubeconfigs[j] if err := CreateKubeconfigSecret(ctx, kubeconfig, &RemoteKubernetesConfiguration{ ContextName: cluster.ContextName, Namespace: configuration.OperatorNamespace, - Name: configuration.ServiceName + "-" + configuration.RemoteClusters[i].ContextName, + Name: kubeconfigPrefix + "-" + configuration.RemoteClusters[j].ContextName, EnsureNamespace: configuration.EnsureNamespace, RESTConfig: cluster.KubeConfig, }); err != nil { @@ -123,11 +143,15 @@ func BootstrapKubernetesClusters(ctx context.Context, organization string, confi } } if configuration.BootstrapTLS { + tlsName := cluster.Name + if tlsName == "" { + tlsName = configuration.ServiceName + } certificate := certificates[i] if err := CreateTLSSecret(ctx, caCertificate, certificate, &RemoteKubernetesConfiguration{ ContextName: cluster.ContextName, Namespace: configuration.OperatorNamespace, - Name: configuration.ServiceName, + Name: tlsName, EnsureNamespace: configuration.EnsureNamespace, RESTConfig: cluster.KubeConfig, }); err != nil { diff --git a/pkg/multicluster/bootstrap/certificates.go b/pkg/multicluster/bootstrap/certificates.go index 3e4583b6a..0da18a4ac 100644 --- a/pkg/multicluster/bootstrap/certificates.go +++ b/pkg/multicluster/bootstrap/certificates.go @@ -376,7 +376,7 @@ func CreateTLSSecret(ctx context.Context, ca *CACertificate, certificate *Certif secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: configuration.Name + "-certificates", + Name: configuration.Name + "-multicluster-certificates", Namespace: configuration.Namespace, }, Data: map[string][]byte{ diff --git a/pkg/multicluster/leaderelection/lock.go b/pkg/multicluster/leaderelection/lock.go index 780c63af9..3725f02e2 100644 --- a/pkg/multicluster/leaderelection/lock.go +++ b/pkg/multicluster/leaderelection/lock.go @@ -88,6 +88,9 @@ type LockConfiguration struct { GRPCMaxBackoff time.Duration // Logger is used for raft-internal logging. Logger raft.Logger + // IDsToNames maps raft node IDs to human-readable cluster names. + // Used by the Status RPC to return cluster names instead of IDs. + IDsToNames map[uint64]string } func (c *LockConfiguration) validate() error { @@ -262,6 +265,8 @@ func run(ctx context.Context, config LockConfiguration, transportCallback func(t transportCallback(cl) } transport.logger = config.Logger + transport.idsToNames = config.IDsToNames + transport.localID = config.ID for node, address := range nodes { if config.Logger != nil { @@ -368,17 +373,24 @@ func runRaft(ctx context.Context, transport *grpcTransport, config LockConfigura // Observe soft state changes for leadership var nowLeader bool var leader uint64 + var raftState raft.StateType if rd.SoftState != nil { leader = rd.SoftState.Lead - nowLeader = leader == config.ID || rd.SoftState.RaftState == raft.StateLeader + raftState = rd.SoftState.RaftState + nowLeader = leader == config.ID || raftState == raft.StateLeader } else { status := node.Status() leader = status.Lead - nowLeader = leader == config.ID || status.RaftState == raft.StateLeader + raftState = status.RaftState + nowLeader = leader == config.ID || raftState == raft.StateLeader } transport.leader.Store(leader) transport.isLeader.Store(nowLeader) + transport.raftState.Store(raftState.String()) + if rd.HardState.Term != 0 { + transport.term.Store(rd.HardState.Term) + } if callbacks != nil && callbacks.SetLeader != nil { callbacks.SetLeader(leader) diff --git a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go index 9e07eb6de..78cfcbe54 100644 --- a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go +++ b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message.pb.go @@ -293,6 +293,134 @@ func (x *KubeconfigResponse) GetPayload() []byte { return nil } +type StatusRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StatusRequest) Reset() { + *x = StatusRequest{} + mi := &file_transport_v1_message_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusRequest) ProtoMessage() {} + +func (x *StatusRequest) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusRequest.ProtoReflect.Descriptor instead. +func (*StatusRequest) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{6} +} + +type StatusResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + RaftState string `protobuf:"bytes,2,opt,name=raft_state,json=raftState,proto3" json:"raft_state,omitempty"` + Leader string `protobuf:"bytes,3,opt,name=leader,proto3" json:"leader,omitempty"` + Term uint64 `protobuf:"varint,4,opt,name=term,proto3" json:"term,omitempty"` + ClusterNames []string `protobuf:"bytes,5,rep,name=cluster_names,json=clusterNames,proto3" json:"cluster_names,omitempty"` + UnhealthyPeers []string `protobuf:"bytes,6,rep,name=unhealthy_peers,json=unhealthyPeers,proto3" json:"unhealthy_peers,omitempty"` + IsHealthy bool `protobuf:"varint,7,opt,name=is_healthy,json=isHealthy,proto3" json:"is_healthy,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StatusResponse) Reset() { + *x = StatusResponse{} + mi := &file_transport_v1_message_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatusResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatusResponse) ProtoMessage() {} + +func (x *StatusResponse) ProtoReflect() protoreflect.Message { + mi := &file_transport_v1_message_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. +func (*StatusResponse) Descriptor() ([]byte, []int) { + return file_transport_v1_message_proto_rawDescGZIP(), []int{7} +} + +func (x *StatusResponse) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *StatusResponse) GetRaftState() string { + if x != nil { + return x.RaftState + } + return "" +} + +func (x *StatusResponse) GetLeader() string { + if x != nil { + return x.Leader + } + return "" +} + +func (x *StatusResponse) GetTerm() uint64 { + if x != nil { + return x.Term + } + return 0 +} + +func (x *StatusResponse) GetClusterNames() []string { + if x != nil { + return x.ClusterNames + } + return nil +} + +func (x *StatusResponse) GetUnhealthyPeers() []string { + if x != nil { + return x.UnhealthyPeers + } + return nil +} + +func (x *StatusResponse) GetIsHealthy() bool { + if x != nil { + return x.IsHealthy + } + return false +} + var File_transport_v1_message_proto protoreflect.FileDescriptor const file_transport_v1_message_proto_rawDesc = "" + @@ -312,12 +440,24 @@ const file_transport_v1_message_proto_rawDesc = "" + "\aapplied\x18\x01 \x01(\bR\aapplied\"\x13\n" + "\x11KubeconfigRequest\".\n" + "\x12KubeconfigResponse\x12\x18\n" + - "\apayload\x18\x01 \x01(\fR\apayload2\xe4\x01\n" + + "\apayload\x18\x01 \x01(\fR\apayload\"\x0f\n" + + "\rStatusRequest\"\xdc\x01\n" + + "\x0eStatusResponse\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1d\n" + + "\n" + + "raft_state\x18\x02 \x01(\tR\traftState\x12\x16\n" + + "\x06leader\x18\x03 \x01(\tR\x06leader\x12\x12\n" + + "\x04term\x18\x04 \x01(\x04R\x04term\x12#\n" + + "\rcluster_names\x18\x05 \x03(\tR\fclusterNames\x12'\n" + + "\x0funhealthy_peers\x18\x06 \x03(\tR\x0eunhealthyPeers\x12\x1d\n" + + "\n" + + "is_healthy\x18\a \x01(\bR\tisHealthy2\xa9\x02\n" + "\x10TransportService\x12@\n" + "\x05Check\x12\x1a.transport.v1.CheckRequest\x1a\x1b.transport.v1.CheckResponse\x12=\n" + "\x04Send\x12\x19.transport.v1.SendRequest\x1a\x1a.transport.v1.SendResponse\x12O\n" + "\n" + - "Kubeconfig\x12\x1f.transport.v1.KubeconfigRequest\x1a .transport.v1.KubeconfigResponseB\xe0\x01\n" + + "Kubeconfig\x12\x1f.transport.v1.KubeconfigRequest\x1a .transport.v1.KubeconfigResponse\x12C\n" + + "\x06Status\x12\x1b.transport.v1.StatusRequest\x1a\x1c.transport.v1.StatusResponseB\xe0\x01\n" + "\x10com.transport.v1B\fMessageProtoP\x01Zmgithub.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1;transportv1\xa2\x02\x03TXX\xaa\x02\fTransport.V1\xca\x02\fTransport\\V1\xe2\x02\x18Transport\\V1\\GPBMetadata\xea\x02\rTransport::V1b\x06proto3" var ( @@ -332,7 +472,7 @@ func file_transport_v1_message_proto_rawDescGZIP() []byte { return file_transport_v1_message_proto_rawDescData } -var file_transport_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_transport_v1_message_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_transport_v1_message_proto_goTypes = []any{ (*CheckRequest)(nil), // 0: transport.v1.CheckRequest (*CheckResponse)(nil), // 1: transport.v1.CheckResponse @@ -340,16 +480,20 @@ var file_transport_v1_message_proto_goTypes = []any{ (*SendResponse)(nil), // 3: transport.v1.SendResponse (*KubeconfigRequest)(nil), // 4: transport.v1.KubeconfigRequest (*KubeconfigResponse)(nil), // 5: transport.v1.KubeconfigResponse + (*StatusRequest)(nil), // 6: transport.v1.StatusRequest + (*StatusResponse)(nil), // 7: transport.v1.StatusResponse } var file_transport_v1_message_proto_depIdxs = []int32{ 0, // 0: transport.v1.TransportService.Check:input_type -> transport.v1.CheckRequest 2, // 1: transport.v1.TransportService.Send:input_type -> transport.v1.SendRequest 4, // 2: transport.v1.TransportService.Kubeconfig:input_type -> transport.v1.KubeconfigRequest - 1, // 3: transport.v1.TransportService.Check:output_type -> transport.v1.CheckResponse - 3, // 4: transport.v1.TransportService.Send:output_type -> transport.v1.SendResponse - 5, // 5: transport.v1.TransportService.Kubeconfig:output_type -> transport.v1.KubeconfigResponse - 3, // [3:6] is the sub-list for method output_type - 0, // [0:3] is the sub-list for method input_type + 6, // 3: transport.v1.TransportService.Status:input_type -> transport.v1.StatusRequest + 1, // 4: transport.v1.TransportService.Check:output_type -> transport.v1.CheckResponse + 3, // 5: transport.v1.TransportService.Send:output_type -> transport.v1.SendResponse + 5, // 6: transport.v1.TransportService.Kubeconfig:output_type -> transport.v1.KubeconfigResponse + 7, // 7: transport.v1.TransportService.Status:output_type -> transport.v1.StatusResponse + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -366,7 +510,7 @@ func file_transport_v1_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_transport_v1_message_proto_rawDesc), len(file_transport_v1_message_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go index 0984bfe2d..33fbe01d2 100644 --- a/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go +++ b/pkg/multicluster/leaderelection/proto/gen/transport/v1/message_grpc.pb.go @@ -22,6 +22,7 @@ const ( TransportService_Check_FullMethodName = "/transport.v1.TransportService/Check" TransportService_Send_FullMethodName = "/transport.v1.TransportService/Send" TransportService_Kubeconfig_FullMethodName = "/transport.v1.TransportService/Kubeconfig" + TransportService_Status_FullMethodName = "/transport.v1.TransportService/Status" ) // TransportServiceClient is the client API for TransportService service. @@ -31,6 +32,7 @@ type TransportServiceClient interface { Check(ctx context.Context, in *CheckRequest, opts ...grpc.CallOption) (*CheckResponse, error) Send(ctx context.Context, in *SendRequest, opts ...grpc.CallOption) (*SendResponse, error) Kubeconfig(ctx context.Context, in *KubeconfigRequest, opts ...grpc.CallOption) (*KubeconfigResponse, error) + Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) } type transportServiceClient struct { @@ -71,6 +73,16 @@ func (c *transportServiceClient) Kubeconfig(ctx context.Context, in *KubeconfigR return out, nil } +func (c *transportServiceClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(StatusResponse) + err := c.cc.Invoke(ctx, TransportService_Status_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // TransportServiceServer is the server API for TransportService service. // All implementations must embed UnimplementedTransportServiceServer // for forward compatibility. @@ -78,6 +90,7 @@ type TransportServiceServer interface { Check(context.Context, *CheckRequest) (*CheckResponse, error) Send(context.Context, *SendRequest) (*SendResponse, error) Kubeconfig(context.Context, *KubeconfigRequest) (*KubeconfigResponse, error) + Status(context.Context, *StatusRequest) (*StatusResponse, error) mustEmbedUnimplementedTransportServiceServer() } @@ -97,6 +110,9 @@ func (UnimplementedTransportServiceServer) Send(context.Context, *SendRequest) ( func (UnimplementedTransportServiceServer) Kubeconfig(context.Context, *KubeconfigRequest) (*KubeconfigResponse, error) { return nil, status.Error(codes.Unimplemented, "method Kubeconfig not implemented") } +func (UnimplementedTransportServiceServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Status not implemented") +} func (UnimplementedTransportServiceServer) mustEmbedUnimplementedTransportServiceServer() {} func (UnimplementedTransportServiceServer) testEmbeddedByValue() {} @@ -172,6 +188,24 @@ func _TransportService_Kubeconfig_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _TransportService_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StatusRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TransportServiceServer).Status(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TransportService_Status_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TransportServiceServer).Status(ctx, req.(*StatusRequest)) + } + return interceptor(ctx, in, info, handler) +} + // TransportService_ServiceDesc is the grpc.ServiceDesc for TransportService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -191,6 +225,10 @@ var TransportService_ServiceDesc = grpc.ServiceDesc{ MethodName: "Kubeconfig", Handler: _TransportService_Kubeconfig_Handler, }, + { + MethodName: "Status", + Handler: _TransportService_Status_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "transport/v1/message.proto", diff --git a/pkg/multicluster/leaderelection/proto/transport/v1/message.proto b/pkg/multicluster/leaderelection/proto/transport/v1/message.proto index ec63f62cf..94810bfad 100644 --- a/pkg/multicluster/leaderelection/proto/transport/v1/message.proto +++ b/pkg/multicluster/leaderelection/proto/transport/v1/message.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package transport.v1; +option go_package = "github.com/redpanda-data/redpanda-operator/pkg/multicluster/leaderelection/proto/gen/transport/v1;transportv1"; + message CheckRequest { bool fromLeader = 1; } @@ -25,8 +27,21 @@ message KubeconfigResponse { bytes payload = 1; } +message StatusRequest {} + +message StatusResponse { + string name = 1; + string raft_state = 2; + string leader = 3; + uint64 term = 4; + repeated string cluster_names = 5; + repeated string unhealthy_peers = 6; + bool is_healthy = 7; +} + service TransportService { rpc Check(CheckRequest) returns (CheckResponse); rpc Send(SendRequest) returns (SendResponse); rpc Kubeconfig(KubeconfigRequest) returns (KubeconfigResponse); + rpc Status(StatusRequest) returns (StatusResponse); } \ No newline at end of file diff --git a/pkg/multicluster/leaderelection/raft.go b/pkg/multicluster/leaderelection/raft.go index 4de6af715..204dcd590 100644 --- a/pkg/multicluster/leaderelection/raft.go +++ b/pkg/multicluster/leaderelection/raft.go @@ -125,6 +125,11 @@ type grpcTransport struct { leader atomic.Uint64 isLeader atomic.Bool + raftState atomic.Value // stores string + term atomic.Uint64 + idsToNames map[uint64]string + localID uint64 + node raft.Node nodeLock sync.RWMutex @@ -371,6 +376,43 @@ func (t *grpcTransport) Check(ctx context.Context, req *transportv1.CheckRequest return &transportv1.CheckResponse{HasLeader: false, Meta: t.meta}, nil } +func (t *grpcTransport) Status(ctx context.Context, req *transportv1.StatusRequest) (*transportv1.StatusResponse, error) { + leader := t.leader.Load() + leaderName := t.idsToNames[leader] + raftState, _ := t.raftState.Load().(string) + term := t.term.Load() + + clusterNames := make([]string, 0, len(t.idsToNames)) + for _, name := range t.idsToNames { + clusterNames = append(clusterNames, name) + } + + // Determine health and unhealthy peers via the Check RPC. + var unhealthyPeers []string + isHealthy := leader != 0 + checkResp, err := t.Check(ctx, &transportv1.CheckRequest{}) + if err == nil && checkResp != nil { + isHealthy = checkResp.HasLeader + for _, id := range checkResp.UnhealthyNodes { + if name, ok := t.idsToNames[id]; ok { + unhealthyPeers = append(unhealthyPeers, name) + } + } + } + + name := t.idsToNames[t.localID] + + return &transportv1.StatusResponse{ + Name: name, + RaftState: raftState, + Leader: leaderName, + Term: term, + ClusterNames: clusterNames, + UnhealthyPeers: unhealthyPeers, + IsHealthy: isHealthy, + }, nil +} + func (t *grpcTransport) Kubeconfig(ctx context.Context, req *transportv1.KubeconfigRequest) (*transportv1.KubeconfigResponse, error) { if t.kubeconfigFetcher == nil { return nil, status.Errorf(codes.FailedPrecondition, "no kubeconfig fetcher specified") diff --git a/pkg/multicluster/raft.go b/pkg/multicluster/raft.go index f079ecf33..abbd14041 100644 --- a/pkg/multicluster/raft.go +++ b/pkg/multicluster/raft.go @@ -289,6 +289,7 @@ func NewRaftRuntimeManager(config *RaftConfiguration) (Manager, error) { HeartbeatInterval: config.HeartbeatInterval, GRPCMaxBackoff: config.GRPCMaxBackoff, Logger: &raftLogr{logger: config.Logger.WithName("raft")}, + IDsToNames: idsToNames, } if config.Bootstrap { diff --git a/pkg/vcluster/multicluster.go b/pkg/vcluster/multicluster.go new file mode 100644 index 000000000..b28fd12b2 --- /dev/null +++ b/pkg/vcluster/multicluster.go @@ -0,0 +1,332 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +package vcluster + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/redpanda-data/common-go/kube" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/rest" + + "github.com/redpanda-data/redpanda-operator/pkg/helm" + "github.com/redpanda-data/redpanda-operator/pkg/k3d" + "github.com/redpanda-data/redpanda-operator/pkg/multicluster/bootstrap" +) + +// MulticlusterNode wraps a vcluster.Cluster with additional metadata needed +// for multicluster operator deployments. +type MulticlusterNode struct { + *Cluster + ctl *kube.Ctl + apiServer string + externalIP string +} + +// APIServer returns the in-cluster API server address for this node. +func (n *MulticlusterNode) APIServer() string { return n.apiServer } + +// ExternalIP returns the ClusterIP assigned to the operator service on this node. +func (n *MulticlusterNode) ExternalIP() string { return n.externalIP } + +// Ctl returns a kube.Ctl client for this node. +func (n *MulticlusterNode) Ctl() *kube.Ctl { return n.ctl } + +// Multicluster manages a set of vclusters configured for multicluster +// operator testing. It handles vcluster creation, cross-cluster service +// networking, TLS bootstrapping, and operator deployment. +type Multicluster struct { + Nodes []*MulticlusterNode + host *k3d.Cluster +} + +// MulticlusterOptions configures a multicluster vcluster environment. +type MulticlusterOptions struct { + // Size is the number of vclusters to create. Defaults to 3. + Size int + // Namespace is the namespace within each vcluster for operator resources. + // Defaults to "default". + Namespace string + // OperatorServiceName is the name of the Service created in front of the + // operator's raft gRPC port. Defaults to "multicluster-operator". + OperatorServiceName string + // OperatorFullname is the helm fullname of the operator deployment on each + // cluster (i.e. the Fullname produced by the operator chart for the chosen + // release name). Used by BootstrapTLS to derive the TLS secret name + // (-multicluster-certificates) so it matches what the + // helm chart creates. Defaults to "redpanda-operator", which corresponds + // to a helm release named "redpanda" using the operator chart. + OperatorFullname string + // OperatorChartPath is the path to the operator helm chart. Required for + // DeployOperators. Typically "../operator/chart" or similar. + OperatorChartPath string + // OperatorImage is the operator container image. Defaults to + // "localhost/redpanda-operator:dev". + OperatorImage string + // OperatorTag is the operator image tag. Defaults to "dev". + OperatorTag string +} + +func (o *MulticlusterOptions) defaults() { + if o.Size == 0 { + o.Size = 3 + } + if o.Namespace == "" { + o.Namespace = metav1.NamespaceDefault + } + if o.OperatorServiceName == "" { + o.OperatorServiceName = "multicluster-operator" + } + if o.OperatorFullname == "" { + o.OperatorFullname = "redpanda-operator" + } + if o.OperatorImage == "" { + o.OperatorImage = "localhost/redpanda-operator" + } + if o.OperatorTag == "" { + o.OperatorTag = "dev" + } +} + +// NewMulticluster creates Size vclusters on the shared k3d cluster, +// configured with cross-cluster service replication so that operator pods +// can reach each other via ClusterIP services. +// +// Call BootstrapTLS to create TLS secrets, and DeployOperators to install +// the operator helm chart on each vcluster. +func NewMulticluster(t *testing.T, ctx context.Context, opts MulticlusterOptions) *Multicluster { + t.Helper() + opts.defaults() + + host, err := k3d.GetShared() + require.NoError(t, err) + + nodes := createMulticlusterNodes(t, ctx, host, opts) + assignOperatorServiceIPs(t, ctx, nodes, opts) + + env := &Multicluster{Nodes: nodes, host: host} + + t.Cleanup(func() { + for _, node := range nodes { + if err := node.Delete(); err != nil { + t.Logf("error deleting vcluster %s: %v", node.Name(), err) + } + } + }) + + return env +} + +// BootstrapTLS generates a shared CA and per-node TLS certificates, then +// distributes them as secrets across all vclusters. Returns the peer list +// suitable for passing to DeployOperators. +func (e *Multicluster) BootstrapTLS(t *testing.T, ctx context.Context, opts MulticlusterOptions) []map[string]any { + t.Helper() + opts.defaults() + + config := bootstrap.BootstrapClusterConfiguration{ + BootstrapTLS: true, + EnsureNamespace: true, + OperatorNamespace: opts.Namespace, + } + + var peers []map[string]any + for _, node := range e.Nodes { + config.RemoteClusters = append(config.RemoteClusters, bootstrap.RemoteConfiguration{ + KubeConfig: node.RESTConfig(), + APIServer: node.APIServer(), + ServiceAddress: node.ExternalIP(), + Name: opts.OperatorFullname, + }) + peers = append(peers, map[string]any{ + "name": node.Name(), + "address": node.ExternalIP(), + }) + } + + t.Log("bootstrapping multicluster TLS") + require.NoError(t, bootstrap.BootstrapKubernetesClusters(ctx, "redpanda-multicluster-operator", config)) + + return peers +} + +// ValuesFunc builds helm values for a single node. It receives the node so the +// caller can incorporate per-node identity (name, API server, etc.) into the +// typed chart values. +type ValuesFunc func(node *MulticlusterNode) any + +// DeployOperators installs the operator helm chart on each vcluster. The +// valuesFunc is called per node to produce the helm values; use it to +// construct typed chart values (e.g. PartialValues) that pass schema +// validation. +func (e *Multicluster) DeployOperators(t *testing.T, ctx context.Context, opts MulticlusterOptions, valuesFunc ValuesFunc) { + t.Helper() + opts.defaults() + require.NotEmpty(t, opts.OperatorChartPath, "OperatorChartPath is required") + + for _, node := range e.Nodes { + t.Logf("deploying operator in %q", node.Name()) + rel, err := node.HelmInstall(ctx, opts.OperatorChartPath, helm.InstallOptions{ + Name: "redpanda", + Values: valuesFunc(node), + Namespace: opts.Namespace, + }) + require.NoError(t, err) + + t.Cleanup(func() { + if err := node.HelmUninstall(ctx, rel); err != nil { + t.Logf("error uninstalling operator from %s: %v", node.Name(), err) + } + }) + } +} + +// RESTConfigs returns the REST configs for all nodes. +func (e *Multicluster) RESTConfigs() []*rest.Config { + configs := make([]*rest.Config, len(e.Nodes)) + for i, n := range e.Nodes { + configs[i] = n.RESTConfig() + } + return configs +} + +func createMulticlusterNodes(t *testing.T, ctx context.Context, host *k3d.Cluster, opts MulticlusterOptions) []*MulticlusterNode { + t.Helper() + + nodes := make([]*MulticlusterNode, opts.Size) + var wg sync.WaitGroup + errs := make([]error, opts.Size) + + for i := range opts.Size { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + vcValues := DefaultValues + networkingValuesForMulticluster(int32(idx), int32(opts.Size)) + cluster, err := New(ctx, host.RESTConfig(), + WithName(fmt.Sprintf("vc-%d", idx)), + WithValues(helm.RawYAML(vcValues)), + ) + if err != nil { + errs[idx] = fmt.Errorf("creating vcluster %d: %w", idx, err) + return + } + + // Use PortForwardedRESTConfig so that the kube.Ctl has a + // standard REST config (without custom Dial). This allows + // SPDY-based operations like kubectl port-forward to work + // against pods inside the vcluster. + pfCfg, err := cluster.PortForwardedRESTConfig(ctx) + if err != nil { + errs[idx] = fmt.Errorf("creating port-forwarded config for vcluster %d: %w", idx, err) + return + } + ctl, err := kube.FromRESTConfig(pfCfg) + if err != nil { + errs[idx] = fmt.Errorf("creating kube.Ctl for vcluster %d: %w", idx, err) + return + } + + var apiServer corev1.Service + if err := ctl.Get(ctx, kube.ObjectKey{Name: "kubernetes", Namespace: metav1.NamespaceDefault}, &apiServer); err != nil { + errs[idx] = fmt.Errorf("getting API server service for vcluster %d: %w", idx, err) + return + } + + nodes[idx] = &MulticlusterNode{ + Cluster: cluster, + ctl: ctl, + apiServer: fmt.Sprintf("https://%s", apiServer.Spec.ClusterIPs[0]), + } + + t.Logf("created vcluster %d (name: %q)", idx, cluster.Name()) + }(i) + } + + wg.Wait() + for i, err := range errs { + require.NoError(t, err, "creating vcluster %d", i) + } + return nodes +} + +func assignOperatorServiceIPs(t *testing.T, ctx context.Context, nodes []*MulticlusterNode, opts MulticlusterOptions) { + t.Helper() + + for _, node := range nodes { + require.NoError(t, node.ctl.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: opts.OperatorServiceName, + Namespace: opts.Namespace, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{{ + Protocol: corev1.ProtocolTCP, + Port: 9443, + TargetPort: intstr.FromInt(9443), + }}, + Selector: map[string]string{ + "app.kubernetes.io/instance": "redpanda", + "app.kubernetes.io/name": "operator", + }, + PublishNotReadyAddresses: true, + }, + })) + + require.Eventually(t, func() bool { + var svc corev1.Service + if err := node.ctl.Get(ctx, kube.ObjectKey{ + Name: opts.OperatorServiceName, Namespace: opts.Namespace, + }, &svc); err != nil { + return false + } + if len(svc.Spec.ClusterIPs) == 0 { + return false + } + node.externalIP = svc.Spec.ClusterIPs[0] + return true + }, 3*time.Minute, 1*time.Second, + "cluster %s never got operator service IP", node.Name()) + } +} + +// networkingValuesForMulticluster generates vCluster networking YAML for +// cross-cluster service replication. +func networkingValuesForMulticluster(index, total int32) string { + var entries []string + for j := range total { + if j == index { + continue + } + vcName := fmt.Sprintf("vc-%d", j) + entries = append(entries, fmt.Sprintf( + " - from: %s/multicluster-operator-x-default-x-%s\n to: default/multicluster-operator-%s", + vcName, vcName, vcName, + )) + } + if len(entries) == 0 { + return "" + } + return fmt.Sprintf(` +networking: + replicateServices: + fromHost: +%s +`, strings.Join(entries, "\n")) +} diff --git a/pkg/vcluster/vcluster.go b/pkg/vcluster/vcluster.go index ea39f470c..7d96d83ac 100644 --- a/pkg/vcluster/vcluster.go +++ b/pkg/vcluster/vcluster.go @@ -12,6 +12,7 @@ package vcluster import ( "bytes" "context" + "encoding/json" "fmt" "io" "net" @@ -518,7 +519,30 @@ func (c *Cluster) HelmInstall(ctx context.Context, chartName string, options hel return nil, err } - return install.Run(chart, options.Values.(map[string]any)) + vals, err := toStringMap(options.Values) + if err != nil { + return nil, fmt.Errorf("converting values: %w", err) + } + + return install.Run(chart, vals) +} + +// toStringMap converts a values object to map[string]any. If it's already a +// map[string]any, it's returned directly. Otherwise it's marshaled to JSON +// and back to handle typed structs (e.g. PartialValues). +func toStringMap(v any) (map[string]any, error) { + if m, ok := v.(map[string]any); ok { + return m, nil + } + data, err := json.Marshal(v) + if err != nil { + return nil, err + } + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + return m, nil } func (c *Cluster) HelmUninstall(ctx context.Context, rel *release.Release) error { diff --git a/taskfiles/build.yml b/taskfiles/build.yml index da00e8a25..c3272f14e 100644 --- a/taskfiles/build.yml +++ b/taskfiles/build.yml @@ -65,6 +65,26 @@ tasks: done done + rpk-plugin: + desc: "Build the rpk k8s plugin binary" + vars: + LD_FLAGS: >- + -X "google.golang.org/protobuf/reflect/protoregistry.conflictPolicy=ignore" + cmds: + - CGO_ENABLED=0 + go build -C ./operator + -o ../.build/.rpk.ac-k8s + -ldflags='{{.LD_FLAGS}}' + ./cmd/rpk-k8s + + rpk-plugin:install: + desc: "Build and install the rpk k8s plugin to ~/.local/bin" + deps: + - rpk-plugin + cmds: + - mkdir -p ~/.local/bin + - cp .build/.rpk.ac-k8s ~/.local/bin/.rpk.ac-k8s + gen: cmds: - task: go:build