diff --git a/Makefile b/Makefile index 0126d00a..9d6eaae1 100644 --- a/Makefile +++ b/Makefile @@ -175,6 +175,7 @@ build-windows: save-version gen-version $(BUILD_UI) ## Build for Windows # Target: build-ui # Description: Builds the UI for the dashboard. # Usage: make build-ui +# TODO: use env var to define the build mode .PHONY: build-ui build-ui: gen-version ## Build UI for the dashboard @echo "🧀 Building UI for the dashboard ..." @@ -292,3 +293,33 @@ check: ## Check the lint, test, and build @$(MAKE) build || (echo "❌ Build check failed!" && exit 1) @echo "✅ Build check passed!" @echo "🎉 All checks passed successfully!" + +# controller-gen path +CONTROLLER_GEN = ${GOPATH}/bin/controller-gen +# controller-gen version +CONTROLLER_GEN_VERSION = v0.17.1 + +# Target: install-controller-gen +# Description: Install controller_gen. +# Usage: +# make install-controller-gen +.PHONY: install-controller-gen +install-controller-gen: + $(GO) install sigs.k8s.io/controller-tools/cmd/controller-gen@$(CONTROLLER_GEN_VERSION) + +# Target: generate-crds +# Description: Generate CRDs into a special dir. +# Usage: +# make generate-crds +.PHONY: generate-crds +generate-crds: + @# generate rbac, webhook and crds + $(CONTROLLER_GEN) crd paths="./pkg/kubernetes/apis/cluster/v1beta1/..." output:crd:artifacts:config=config/crds/ + $(CONTROLLER_GEN) crd paths="./pkg/kubernetes/apis/search/v1beta1/..." output:crd:artifacts:config=config/crds/ + +# Target: manifests +# Description: Install controller_gen and generate CRDs into a special dir. +# Usage: +# make manifests +.PHONY: manifests +manifests: install-controller-gen generate-crds diff --git a/api/openapispec/docs.go b/api/openapispec/docs.go index d588053b..5c41aee7 100644 --- a/api/openapispec/docs.go +++ b/api/openapispec/docs.go @@ -545,6 +545,20 @@ var doc = `{ "name": "description", "in": "formData", "required": true + }, + { + "type": "string", + "description": "cluster mode", + "name": "clusterMode", + "in": "formData", + "required": true + }, + { + "type": "integer", + "description": "cluster scale level", + "name": "clusterLevel", + "in": "formData", + "required": true } ], "responses": { @@ -920,6 +934,69 @@ var doc = `{ } } }, + "/rest-api/v1/cluster/{clusterName}/agentYaml": { + "get": { + "description": "Obtain the agent yaml in secret for cluster.", + "consumes": [ + "text/plain", + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "cluster" + ], + "summary": "Get agent yaml", + "parameters": [ + { + "type": "string", + "description": "The name of the cluster", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Verification passed server version", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "type": "string" + } + }, + "429": { + "description": "Too Many Requests", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "string" + } + } + } + } + }, "/rest-api/v1/clusters": { "get": { "description": "This endpoint lists all cluster resources.", @@ -2265,6 +2342,14 @@ var doc = `{ "cluster.ClusterPayload": { "type": "object", "properties": { + "clusterLevel": { + "description": "clusterLevel is the scale level of cluster to be created", + "type": "integer" + }, + "clusterMode": { + "description": "ClusterMode is the mode of cluster to be created", + "type": "string" + }, "description": { "description": "ClusterDescription is the description of cluster to be created", "type": "string" diff --git a/api/openapispec/swagger.json b/api/openapispec/swagger.json index d5092eea..78e30607 100644 --- a/api/openapispec/swagger.json +++ b/api/openapispec/swagger.json @@ -529,6 +529,20 @@ "name": "description", "in": "formData", "required": true + }, + { + "type": "string", + "description": "cluster mode", + "name": "clusterMode", + "in": "formData", + "required": true + }, + { + "type": "integer", + "description": "cluster scale level", + "name": "clusterLevel", + "in": "formData", + "required": true } ], "responses": { @@ -904,6 +918,69 @@ } } }, + "/rest-api/v1/cluster/{clusterName}/agentYml": { + "get": { + "description": "Obtain the agent yaml in secret for cluster.", + "consumes": [ + "text/plain", + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "cluster" + ], + "summary": "Get agent yaml", + "parameters": [ + { + "type": "string", + "description": "The name of the cluster", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Verification passed server version", + "schema": { + "type": "string" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "type": "string" + } + }, + "401": { + "description": "Unauthorized", + "schema": { + "type": "string" + } + }, + "404": { + "description": "Not Found", + "schema": { + "type": "string" + } + }, + "429": { + "description": "Too Many Requests", + "schema": { + "type": "string" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "string" + } + } + } + } + }, "/rest-api/v1/clusters": { "get": { "description": "This endpoint lists all cluster resources.", @@ -2249,6 +2326,14 @@ "cluster.ClusterPayload": { "type": "object", "properties": { + "clusterLevel": { + "description": "clusterLevel is the scale level of cluster to be created", + "type": "integer" + }, + "clusterMode": { + "description": "ClusterMode is the mode of cluster to be created", + "type": "string" + }, "description": { "description": "ClusterDescription is the description of cluster to be created", "type": "string" diff --git a/api/openapispec/swagger.yaml b/api/openapispec/swagger.yaml index 46f4b1eb..9ce91cc6 100644 --- a/api/openapispec/swagger.yaml +++ b/api/openapispec/swagger.yaml @@ -86,6 +86,12 @@ definitions: type: object cluster.ClusterPayload: properties: + clusterLevel: + description: clusterLevel is the scale level of cluster to be created + type: integer + clusterMode: + description: ClusterMode is the mode of cluster to be created + type: string description: description: ClusterDescription is the description of cluster to be created type: string @@ -774,6 +780,48 @@ paths: summary: Update updates the cluster metadata by name. tags: - cluster + /rest-api/v1/cluster/{clusterName}/agentYml: + get: + consumes: + - text/plain + - application/json + description: Obtain the agent yaml in secret for cluster. + parameters: + - description: The name of the cluster + in: path + name: clusterName + required: true + type: string + produces: + - application/json + responses: + "200": + description: Verification passed server version + schema: + type: string + "400": + description: Bad Request + schema: + type: string + "401": + description: Unauthorized + schema: + type: string + "404": + description: Not Found + schema: + type: string + "429": + description: Too Many Requests + schema: + type: string + "500": + description: Internal Server Error + schema: + type: string + summary: Get agent yaml + tags: + - cluster /rest-api/v1/cluster/config/file: post: consumes: @@ -800,6 +848,16 @@ paths: name: description required: true type: string + - description: cluster mode + in: formData + name: clusterMode + required: true + type: string + - description: cluster scale level + in: formData + name: clusterLevel + required: true + type: integer produces: - text/plain responses: diff --git a/cmd/karpor/app/agent.go b/cmd/karpor/app/agent.go new file mode 100644 index 00000000..b1a1cd03 --- /dev/null +++ b/cmd/karpor/app/agent.go @@ -0,0 +1,127 @@ +// Copyright The Karpor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "context" + + esclient "github.com/elastic/go-elasticsearch/v8" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2/klogr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/healthz" + + "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + "github.com/KusionStack/karpor/pkg/kubernetes/scheme" + "github.com/KusionStack/karpor/pkg/syncer" + "github.com/KusionStack/karpor/pkg/syncer/utils" +) + +type agentOptions struct { + syncerOptions + ClusterName string + ClusterMode string +} + +func NewAgentOptions() *agentOptions { + return &agentOptions{ + syncerOptions: *NewSyncerOptions(), + } +} + +func (o *agentOptions) AddFlags(fs *pflag.FlagSet) { + o.syncerOptions.AddFlags(fs) + fs.StringVar(&o.ClusterName, "cluster-name", "", "The cluster name in hub cluster.") + fs.StringVar(&o.ClusterMode, "cluster-mode", "pull", "The cluster mode.") +} + +func NewAgentCommand(ctx context.Context) *cobra.Command { + options := NewAgentOptions() + cmd := &cobra.Command{ + Use: "agent", + Short: "start a resource syncer agent which deployed in user cluster", + RunE: func(cmd *cobra.Command, args []string) error { + // use the same logical as the Non-HA syncer in controller cluster + return runAgent(ctx, options) + }, + } + options.AddFlags(cmd.Flags()) + return cmd +} + +func runAgent(ctx context.Context, options *agentOptions) error { + ctrl.SetLogger(klogr.New()) + log := ctrl.Log.WithName("setup") + + if options.ClusterMode == clusterv1beta1.PushClusterMode { + // apply crds + dynamicClient, err := dynamic.NewForConfig(ctrl.GetConfigOrDie()) + if err != nil { + return errors.Wrapf(err, "failed to build dynamic client for ageng") + } + err = utils.ApplyCrds(ctx, dynamicClient) + if err != nil { + return err + } + } + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme.Scheme, + MetricsBindAddress: options.MetricsAddr, + HealthProbeBindAddress: options.ProbeAddr, + }) + if err != nil { + log.Error(err, "unable to start manager") + return err + } + + // TODO: add startup parameters to change the type of storage + //nolint:contextcheck + es, err := elasticsearch.NewStorage(esclient.Config{ + Addresses: options.ElasticSearchAddresses, + }) + if err != nil { + log.Error(err, "unable to init elasticsearch client") + return err + } + + //nolint:contextcheck + if err = syncer.NewAgentReconciler(es, options.ClusterName).SetupWithManager(mgr); err != nil { + log.Error(err, "unable to create resource syncer") + return err + } + + if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + log.Error(err, "unable to set up health check") + return err + } + + if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + log.Error(err, "unable to set up ready check") + return err + } + + log.Info("starting manager") + if err := mgr.Start(ctx); err != nil { + log.Error(err, "problem running manager") + return err + } + + return nil +} diff --git a/cmd/karpor/app/options/core_options.go b/cmd/karpor/app/options/core_options.go index 228fc4f6..c4b294f7 100644 --- a/cmd/karpor/app/options/core_options.go +++ b/cmd/karpor/app/options/core_options.go @@ -20,10 +20,11 @@ import ( ) type CoreOptions struct { - EnableRBAC bool - ReadOnlyMode bool - GithubBadge bool - Version bool + EnableRBAC bool + ReadOnlyMode bool + GithubBadge bool + Version bool + HighAvailability bool } func NewCoreOptions() *CoreOptions { @@ -50,4 +51,5 @@ func (o *CoreOptions) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&o.ReadOnlyMode, "read-only-mode", false, "turn on the read only mode") fs.BoolVar(&o.GithubBadge, "github-badge", false, "whether to display the github badge") fs.BoolVarP(&o.Version, "version", "V", o.Version, "Print version and exit") + fs.BoolVar(&o.HighAvailability, "high-availability", false, "whether to use high-availability feature.") } diff --git a/cmd/karpor/app/syncer.go b/cmd/karpor/app/syncer.go index 90b07dd4..c292e9fc 100644 --- a/cmd/karpor/app/syncer.go +++ b/cmd/karpor/app/syncer.go @@ -17,21 +17,27 @@ package app import ( "context" - "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" - "github.com/KusionStack/karpor/pkg/kubernetes/scheme" - "github.com/KusionStack/karpor/pkg/syncer" esclient "github.com/elastic/go-elasticsearch/v8" "github.com/spf13/cobra" "github.com/spf13/pflag" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" + + "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" + "github.com/KusionStack/karpor/pkg/kubernetes/scheme" + "github.com/KusionStack/karpor/pkg/syncer" ) type syncerOptions struct { + HighAvailability bool + MetricsAddr string ProbeAddr string ElasticSearchAddresses []string + + ExternalEndpoint string + AgentImageTag string } func NewSyncerOptions() *syncerOptions { @@ -39,9 +45,13 @@ func NewSyncerOptions() *syncerOptions { } func (o *syncerOptions) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.HighAvailability, "high-availability", false, "Whether to use high-availability feature.") + fs.StringVar(&o.MetricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") fs.StringVar(&o.ProbeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") fs.StringSliceVar(&o.ElasticSearchAddresses, "elastic-search-addresses", nil, "The elastic search address.") + fs.StringVar(&o.ExternalEndpoint, "external-addresses", "", "The external address that expose to user cluster in pull mode.") + fs.StringVar(&o.AgentImageTag, "agent-image-tag", "v0.0.0", "The agent image tag.") } func NewSyncerCommand(ctx context.Context) *cobra.Command { @@ -82,7 +92,7 @@ func run(ctx context.Context, options *syncerOptions) error { } //nolint:contextcheck - if err = syncer.NewSyncReconciler(es).SetupWithManager(mgr); err != nil { + if err = syncer.NewSyncReconciler(es, options.HighAvailability, options.ElasticSearchAddresses, options.ExternalEndpoint, options.AgentImageTag).SetupWithManager(mgr); err != nil { log.Error(err, "unable to create resource syncer") return err } diff --git a/cmd/karpor/main.go b/cmd/karpor/main.go index 748bcd08..c6890468 100644 --- a/cmd/karpor/main.go +++ b/cmd/karpor/main.go @@ -19,9 +19,10 @@ package main import ( "os" - "github.com/KusionStack/karpor/cmd/karpor/app" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/component-base/cli" + + "github.com/KusionStack/karpor/cmd/karpor/app" ) // @title Karpor @@ -33,7 +34,10 @@ func main() { cmd := app.NewServerCommand(ctx) syncCmd := app.NewSyncerCommand(ctx) + agentCmd := app.NewAgentCommand(ctx) + cmd.AddCommand(syncCmd) + cmd.AddCommand(agentCmd) code := cli.Run(cmd) os.Exit(code) diff --git a/config/agent.tpl b/config/agent.tpl new file mode 100644 index 00000000..c20f68d6 --- /dev/null +++ b/config/agent.tpl @@ -0,0 +1,137 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: karpor +spec: + finalizers: + - kubernetes +{{- if eq .ClusterMode "pull" }} +--- +apiVersion: v1 +data: + config: |- + apiVersion: v1 + clusters: + - cluster: + insecure-skip-tls-verify: true + server: {{ .ExternalEndpoint }} + name: karpor + contexts: + - context: + cluster: karpor + user: {{ .ClusterName }} + name: default + current-context: default + kind: Config + users: + - name: {{ .ClusterName }} + user: + client-certificate-data: {{ .CaCert }} + client-key-data: {{ .CaKey }} +kind: ConfigMap +metadata: + name: karpor-kubeconfig + namespace: karpor +{{- end }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: karpor-agent + namespace: karpor +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app.kubernetes.io/component: karpor-agent + app.kubernetes.io/instance: karpor + app.kubernetes.io/name: karpor + strategy: + rollingUpdate: + maxSurge: 25% + maxUnavailable: 25% + type: RollingUpdate + template: + metadata: + labels: + app.kubernetes.io/component: karpor-agent + app.kubernetes.io/instance: karpor + app.kubernetes.io/name: karpor + spec: + containers: + - args: + - agent + - --elastic-search-addresses={{ range .StorageAddresses }}{{.}} {{ end }} + - --cluster-name={{ .ClusterName }} + - --cluster-mode={{ .ClusterMode }} + command: + - /karpor +{{- if eq .ClusterMode "pull" }} + env: + - name: KUBECONFIG + value: /etc/karpor/config +{{- end }} + image: kusionstack/karpor:{{ .AgentImageTag }} + imagePullPolicy: IfNotPresent + name: karpor-agent + ports: + - containerPort: 7443 + protocol: TCP + resources: +{{- if eq .Level 3 }} + limits: + cpu: 1 + ephemeral-storage: 20Gi + memory: 2Gi + requests: + cpu: 500m + ephemeral-storage: 4Gi + memory: 512Mi +{{- else if eq .Level 2 }} + limits: + cpu: 500m + ephemeral-storage: 10Gi + memory: 1Gi + requests: + cpu: 250m + ephemeral-storage: 2Gi + memory: 256Mi +{{- else }} + limits: + cpu: 250m + ephemeral-storage: 5Gi + memory: 500Mi + requests: + cpu: 125m + ephemeral-storage: 1Gi + memory: 128Mi +{{- end }} +{{- if eq .ClusterMode "pull" }} + volumeMounts: + - mountPath: /etc/karpor/ + name: karpor-kubeconfig +{{- end }} + dnsPolicy: ClusterFirst + restartPolicy: Always + terminationGracePeriodSeconds: 30 +{{- if eq .ClusterMode "pull" }} + volumes: + - configMap: + defaultMode: 420 + name: karpor-kubeconfig + name: karpor-kubeconfig +{{- end }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: karpor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: cluster-admin +subjects: +- kind: ServiceAccount + name: default + namespace: karpor diff --git a/config/crds/cluster.karpor.io_clusters.yaml b/config/crds/cluster.karpor.io_clusters.yaml new file mode 100644 index 00000000..435f8ea1 --- /dev/null +++ b/config/crds/cluster.karpor.io_clusters.yaml @@ -0,0 +1,139 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.17.1 + name: clusters.cluster.karpor.io +spec: + group: cluster.karpor.io + names: + kind: Cluster + listKind: ClusterList + plural: clusters + singular: cluster + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + description: Cluster is an extension type to access a cluster + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + access: + properties: + caBundle: + format: byte + type: string + credential: + properties: + execConfig: + properties: + apiVersion: + type: string + args: + items: + type: string + type: array + command: + type: string + env: + items: + properties: + name: + type: string + value: + type: string + required: + - name + - value + type: object + type: array + installHint: + type: string + interactiveMode: + type: string + provideClusterInfo: + type: boolean + required: + - args + - command + - env + - provideClusterInfo + type: object + serviceAccountToken: + type: string + type: + type: string + x509: + properties: + certificate: + format: byte + type: string + privateKey: + format: byte + type: string + required: + - certificate + - privateKey + type: object + required: + - type + type: object + endpoint: + type: string + insecure: + type: boolean + required: + - endpoint + type: object + description: + type: string + displayName: + type: string + finalized: + type: boolean + level: + description: cluster scale level, optional value 1, 2, 3, default + 1 + type: integer + mode: + type: string + provider: + type: string + required: + - access + - displayName + - level + - provider + type: object + status: + properties: + healthy: + type: boolean + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crds/search.karpor.io_syncregistries.yaml b/config/crds/search.karpor.io_syncregistries.yaml new file mode 100644 index 00000000..25421f72 --- /dev/null +++ b/config/crds/search.karpor.io_syncregistries.yaml @@ -0,0 +1,286 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.17.1 + name: syncregistries.search.karpor.io +spec: + group: search.karpor.io + names: + kind: SyncRegistry + listKind: SyncRegistryList + plural: syncregistries + singular: syncregistry + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + clusterLabelSelector: + description: ClusterLabelSelector is used to filter the target clusters + that need to be synced from. + properties: + matchExpressions: + description: matchExpressions is a list of label selector requirements. + The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector applies + to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + clusters: + description: Clusters is the list of the target clusters to be be + synced from. + items: + type: string + type: array + syncResources: + items: + description: ResourceSyncRule is used to specify the way to sync + the specified resource + properties: + apiVersion: + description: APIVersion represents the group version of the + target resource. + type: string + maxConcurrent: + description: 'MaxConcurrent is the maximum number of workers + (default: 10)' + type: integer + namespace: + description: |- + Namespace specifies the namespace in which the ListWatch of the target resources is limited + to. + type: string + remainAfterDeleted: + description: RemainAfterDeleted indicates whether the resource + should remain in ES after being deleted in k8s. + type: boolean + resource: + description: Resource is the the target resource. + type: string + resyncPeriod: + description: ResynPeriod is the period to resync + type: string + selectors: + description: Selectors are used to filter the target resources + to sync. Multiple selectors are ORed. + items: + description: Selector represents a resource filter + properties: + fieldSelector: + description: |- + FieldSelector is a filter to select resources by fields. + If non-nil and non-empty, only the resource match this filter will be selected. + properties: + matchFields: + additionalProperties: + type: string + description: |- + MatchFields is a map of {field,value} pairs. A single {field,value} in the matchFields + map means that the specified field should have an exact match with the specified value. + Multiple entries are ANDed. + type: object + serverSupported: + description: |- + ServerSupported indicates whether the matchFields is supported by the API server. + If not supported, the client-side filtering will be utilized instead." + type: boolean + type: object + labelSelector: + description: |- + LabelSelector is a filter to select resources by labels. + If non-nil and non-empty, only the resource match this filter will be selected. + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + type: object + type: array + transform: + description: |- + Transform is the rule applied to the original resource to transform it to the desired target + resource. + properties: + type: + description: Type is the type of transformer. + type: string + valueTemplate: + description: ValueTemplate is the template of the input + data to be paased to the transformer + type: string + required: + - type + - valueTemplate + type: object + transformRefName: + description: TransformRefName is the name of the TransformRule + type: string + trim: + description: Trim defines the trimming strategy for the resources + of the current type. + properties: + retain: + description: Retain specifies which fields should be retained + after trimming. + properties: + jsonPaths: + description: |- + JSONPaths specifies the path of the field to be retained. + For usage, please refer to https://kubernetes.io/docs/reference/kubectl/jsonpath/ + items: + type: string + type: array + type: object + type: object + trimRefName: + description: TrimRefName is the name of the TrimRule. + type: string + required: + - apiVersion + - resource + type: object + type: array + syncResourcesRefName: + type: string + type: object + status: + properties: + clusters: + items: + properties: + cluster: + type: string + resources: + items: + properties: + apiVersion: + type: string + kind: + type: string + lastTransitionTime: + format: date-time + type: string + message: + type: string + reason: + type: string + status: + type: string + required: + - apiVersion + - kind + - lastTransitionTime + - status + type: object + type: array + status: + type: string + required: + - cluster + - status + type: object + type: array + lastTransitionTime: + format: date-time + type: string + required: + - lastTransitionTime + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crds/search.karpor.io_syncresources.yaml b/config/crds/search.karpor.io_syncresources.yaml new file mode 100644 index 00000000..bcfa063b --- /dev/null +++ b/config/crds/search.karpor.io_syncresources.yaml @@ -0,0 +1,188 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.17.1 + name: syncresources.search.karpor.io +spec: + group: search.karpor.io + names: + kind: SyncResources + listKind: SyncResourcesList + plural: syncresources + singular: syncresources + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + syncResources: + items: + description: ResourceSyncRule is used to specify the way to sync + the specified resource + properties: + apiVersion: + description: APIVersion represents the group version of the + target resource. + type: string + maxConcurrent: + description: 'MaxConcurrent is the maximum number of workers + (default: 10)' + type: integer + namespace: + description: |- + Namespace specifies the namespace in which the ListWatch of the target resources is limited + to. + type: string + remainAfterDeleted: + description: RemainAfterDeleted indicates whether the resource + should remain in ES after being deleted in k8s. + type: boolean + resource: + description: Resource is the the target resource. + type: string + resyncPeriod: + description: ResynPeriod is the period to resync + type: string + selectors: + description: Selectors are used to filter the target resources + to sync. Multiple selectors are ORed. + items: + description: Selector represents a resource filter + properties: + fieldSelector: + description: |- + FieldSelector is a filter to select resources by fields. + If non-nil and non-empty, only the resource match this filter will be selected. + properties: + matchFields: + additionalProperties: + type: string + description: |- + MatchFields is a map of {field,value} pairs. A single {field,value} in the matchFields + map means that the specified field should have an exact match with the specified value. + Multiple entries are ANDed. + type: object + serverSupported: + description: |- + ServerSupported indicates whether the matchFields is supported by the API server. + If not supported, the client-side filtering will be utilized instead." + type: boolean + type: object + labelSelector: + description: |- + LabelSelector is a filter to select resources by labels. + If non-nil and non-empty, only the resource match this filter will be selected. + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: |- + A label selector requirement is a selector that contains values, a key, and an operator that + relates the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: |- + operator represents a key's relationship to a set of values. + Valid operators are In, NotIn, Exists and DoesNotExist. + type: string + values: + description: |- + values is an array of string values. If the operator is In or NotIn, + the values array must be non-empty. If the operator is Exists or DoesNotExist, + the values array must be empty. This array is replaced during a strategic + merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: |- + matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels + map is equivalent to an element of matchExpressions, whose key field is "key", the + operator is "In", and the values array contains only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + type: object + type: array + transform: + description: |- + Transform is the rule applied to the original resource to transform it to the desired target + resource. + properties: + type: + description: Type is the type of transformer. + type: string + valueTemplate: + description: ValueTemplate is the template of the input + data to be paased to the transformer + type: string + required: + - type + - valueTemplate + type: object + transformRefName: + description: TransformRefName is the name of the TransformRule + type: string + trim: + description: Trim defines the trimming strategy for the resources + of the current type. + properties: + retain: + description: Retain specifies which fields should be retained + after trimming. + properties: + jsonPaths: + description: |- + JSONPaths specifies the path of the field to be retained. + For usage, please refer to https://kubernetes.io/docs/reference/kubectl/jsonpath/ + items: + type: string + type: array + type: object + type: object + trimRefName: + description: TrimRefName is the name of the TrimRule. + type: string + required: + - apiVersion + - resource + type: object + type: array + type: object + type: object + served: true + storage: true diff --git a/config/crds/search.karpor.io_transformrules.yaml b/config/crds/search.karpor.io_transformrules.yaml new file mode 100644 index 00000000..811e3388 --- /dev/null +++ b/config/crds/search.karpor.io_transformrules.yaml @@ -0,0 +1,56 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.17.1 + name: transformrules.search.karpor.io +spec: + group: search.karpor.io + names: + kind: TransformRule + listKind: TransformRuleList + plural: transformrules + singular: transformrule + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + description: |- + TransformRule is used to define the rule to transform the original resource into the desired + target resource. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + type: + description: Type is the type of transformer. + type: string + valueTemplate: + description: ValueTemplate is the template of the input data to be + paased to the transformer + type: string + required: + - type + - valueTemplate + type: object + type: object + served: true + storage: true diff --git a/config/crds/search.karpor.io_trimrules.yaml b/config/crds/search.karpor.io_trimrules.yaml new file mode 100644 index 00000000..72dbf84f --- /dev/null +++ b/config/crds/search.karpor.io_trimrules.yaml @@ -0,0 +1,58 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.17.1 + name: trimrules.search.karpor.io +spec: + group: search.karpor.io + names: + kind: TrimRule + listKind: TrimRuleList + plural: trimrules + singular: trimrule + scope: Cluster + versions: + - name: v1beta1 + schema: + openAPIV3Schema: + description: |- + TrimRule defines the strategy of trimming k8s objects, which can save + informer memory by discarding redundant fields. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + retain: + description: Retain specifies which fields should be retained after + trimming. + properties: + jsonPaths: + description: |- + JSONPaths specifies the path of the field to be retained. + For usage, please refer to https://kubernetes.io/docs/reference/kubectl/jsonpath/ + items: + type: string + type: array + type: object + type: object + type: object + served: true + storage: true diff --git a/config/embed.go b/config/embed.go index 5c3ac5bb..98d453f6 100644 --- a/config/embed.go +++ b/config/embed.go @@ -32,3 +32,23 @@ var DefaultRelationship []byte //go:embed default-sync-strategy.yaml var DefaultSyncStrategy []byte + +var CrdList = [][]byte{ClustersCrd, SyncRegistriesCrd, SyncResourcesCrd, TransformRulesCrd, TrimRulesCrd} + +//go:embed crds/cluster.karpor.io_clusters.yaml +var ClustersCrd []byte + +//go:embed crds/search.karpor.io_syncregistries.yaml +var SyncRegistriesCrd []byte + +//go:embed crds/search.karpor.io_syncresources.yaml +var SyncResourcesCrd []byte + +//go:embed crds/search.karpor.io_transformrules.yaml +var TransformRulesCrd []byte + +//go:embed crds/search.karpor.io_trimrules.yaml +var TrimRulesCrd []byte + +//go:embed agent.tpl +var AgentTpl []byte diff --git a/docs/api.md b/docs/api.md index a1b0cb59..debd6b3d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -46,6 +46,7 @@ Karpor is a brand new Kubernetes visualization tool that focuses on search, insi |---------|---------|--------|---------| | DELETE | /rest-api/v1/cluster/{clusterName} | [delete rest API v1 cluster cluster name](#delete-rest-api-v1-cluster-cluster-name) | Delete removes a cluster resource by name. | | GET | /rest-api/v1/cluster/{clusterName} | [get rest API v1 cluster cluster name](#get-rest-api-v1-cluster-cluster-name) | Get returns a cluster resource by name. | +| GET | /rest-api/v1/cluster/{clusterName}/agentYml | [get rest API v1 cluster cluster name agent yml](#get-rest-api-v1-cluster-cluster-name-agent-yml) | Get agent yaml | | GET | /rest-api/v1/clusters | [get rest API v1 clusters](#get-rest-api-v1-clusters) | List lists all cluster resources. | | POST | /rest-api/v1/cluster/{clusterName} | [post rest API v1 cluster cluster name](#post-rest-api-v1-cluster-cluster-name) | Create creates a cluster resource. | | POST | /rest-api/v1/cluster/config/file | [post rest API v1 cluster config file](#post-rest-api-v1-cluster-config-file) | Upload kubeConfig file for cluster | @@ -657,6 +658,94 @@ Status: Internal Server Error +### Get agent yaml (*GetRestAPIV1ClusterClusterNameAgentYml*) + +``` +GET /rest-api/v1/cluster/{clusterName}/agentYml +``` + +Obtain the agent yaml in secret for cluster. + +#### Consumes + * application/json + * text/plain + +#### Produces + * application/json + +#### Parameters + +| Name | Source | Type | Go type | Separator | Required | Default | Description | +|------|--------|------|---------|-----------| :------: |---------|-------------| +| clusterName | `path` | string | `string` | | ✓ | | The name of the cluster | + +#### All responses +| Code | Status | Description | Has headers | Schema | +|------|--------|-------------|:-----------:|--------| +| [200](#get-rest-api-v1-cluster-cluster-name-agent-yml-200) | OK | Verification passed server version | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-200-schema) | +| [400](#get-rest-api-v1-cluster-cluster-name-agent-yml-400) | Bad Request | Bad Request | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-400-schema) | +| [401](#get-rest-api-v1-cluster-cluster-name-agent-yml-401) | Unauthorized | Unauthorized | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-401-schema) | +| [404](#get-rest-api-v1-cluster-cluster-name-agent-yml-404) | Not Found | Not Found | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-404-schema) | +| [429](#get-rest-api-v1-cluster-cluster-name-agent-yml-429) | Too Many Requests | Too Many Requests | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-429-schema) | +| [500](#get-rest-api-v1-cluster-cluster-name-agent-yml-500) | Internal Server Error | Internal Server Error | | [schema](#get-rest-api-v1-cluster-cluster-name-agent-yml-500-schema) | + +#### Responses + + +##### 200 - Verification passed server version +Status: OK + +###### Schema + + + + + +##### 400 - Bad Request +Status: Bad Request + +###### Schema + + + + + +##### 401 - Unauthorized +Status: Unauthorized + +###### Schema + + + + + +##### 404 - Not Found +Status: Not Found + +###### Schema + + + + + +##### 429 - Too Many Requests +Status: Too Many Requests + +###### Schema + + + + + +##### 500 - Internal Server Error +Status: Internal Server Error + +###### Schema + + + + + ### List lists all cluster resources. (*GetRestAPIV1Clusters*) ``` @@ -2228,6 +2317,8 @@ Uploads a KubeConfig file for cluster, with a maximum size of 2MB. | Name | Source | Type | Go type | Separator | Required | Default | Description | |------|--------|------|---------|-----------| :------: |---------|-------------| +| clusterLevel | `formData` | integer | `int64` | | ✓ | | cluster scale level | +| clusterMode | `formData` | string | `string` | | ✓ | | cluster mode | | description | `formData` | string | `string` | | ✓ | | cluster description | | displayName | `formData` | string | `string` | | ✓ | | cluster display name | | file | `formData` | file | `io.ReadCloser` | | ✓ | | Upload file with field name 'file' | @@ -2801,6 +2892,8 @@ Status: Internal Server Error | Name | Type | Go type | Required | Default | Description | Example | |------|------|---------|:--------:| ------- |-------------|---------| +| clusterLevel | integer| `int64` | | | clusterLevel is the scale level of cluster to be created | | +| clusterMode | string| `string` | | | ClusterMode is the mode of cluster to be created | | | description | string| `string` | | | ClusterDescription is the description of cluster to be created | | | displayName | string| `string` | | | ClusterDisplayName is the display name of cluster to be created | | | kubeConfig | string| `string` | | | ClusterKubeConfig is the kubeconfig of cluster to be created | | @@ -2924,10 +3017,16 @@ Status: Internal Server Error | Name | Type | Go type | Required | Default | Description | Example | |------|------|---------|:--------:| ------- |-------------|---------| -| issuesTotal | integer| `int64` | | | IssuesTotal is the total count of all issues found during the audit.
This count can be used to understand the overall number of problems
that need to be addressed. | | +| issuesTotal | integer| `int64` | | | IssuesTotal is the total count of all issues found during the audit. +This count can be used to understand the overall number of problems +that need to be addressed. | | | resourceTotal | integer| `int64` | | | ResourceTotal is the count of unique resources audited during the scan. | | -| score | number| `float64` | | | Score represents the calculated score of the audited manifest based on
the number and severity of issues. It provides a quantitative measure
of the security posture of the resources in the manifest. | | -| severityStatistic | map of integer| `map[string]int64` | | | SeverityStatistic is a mapping of severity levels to their respective
number of occurrences. It allows for a quick overview of the distribution
of issues across different severity categories. | | +| score | number| `float64` | | | Score represents the calculated score of the audited manifest based on +the number and severity of issues. It provides a quantitative measure +of the security posture of the resources in the manifest. | | +| severityStatistic | map of integer| `map[string]int64` | | | SeverityStatistic is a mapping of severity levels to their respective +number of occurrences. It allows for a quick overview of the distribution +of issues across different severity categories. | | @@ -3010,5 +3109,8 @@ Status: Internal Server Error | Name | Type | Go type | Required | Default | Description | Example | |------|------|---------|:--------:| ------- |-------------|---------| -| object | [interface{}](#interface)| `interface{}` | | | Object is a JSON compatible map with string, float, int, bool, []interface{}, or
map[string]interface{}
children. | | +| object | [interface{}](#interface)| `interface{}` | | | Object is a JSON compatible map with string, float, int, bool, []interface{}, or +map[string]interface{} +children. | | + diff --git a/docs/cli/karpor.md b/docs/cli/karpor.md index 41c5e576..e1a2ea83 100644 --- a/docs/cli/karpor.md +++ b/docs/cli/karpor.md @@ -15,10 +15,14 @@ karpor [flags] ``` --admission-control-config-file string File with admission control configuration. --advertise-address ip The IP address on which to advertise the apiserver to members of the cluster. This address must be reachable by the rest of the cluster. If blank, the --bind-address will be used. If --bind-address is unspecified, the host's default interface will be used. - --ai-auth-token string The ai auth token (same as api key) + --ai-auth-token string The ai auth token --ai-backend string The ai backend (default "openai") --ai-base-url string The ai base url + --ai-http-proxy string The ai http proxy + --ai-https-proxy string The ai https proxy --ai-model string The ai model (default "gpt-3.5-turbo") + --ai-no-proxy string The ai no-proxy + --ai-proxy-enabled The ai proxy enable --ai-temperature float32 The ai temperature (default 1) --ai-top-p float32 The ai top-p (default 1) --anonymous-auth Enables anonymous requests to the secure port of the API server. Requests that are not rejected by another authentication method are treated as anonymous requests. Anonymous requests have a username of system:anonymous, and a group name of system:unauthenticated. (default true) @@ -197,6 +201,7 @@ karpor [flags] --github-badge whether to display the github badge --goaway-chance float To prevent HTTP/2 clients from getting stuck on a single apiserver, randomly close a connection (GOAWAY). The client's other in-flight requests won't be affected, and the client will reconnect, likely landing on a different apiserver after going through the load balancer again. This argument sets the fraction of requests that will be sent a GOAWAY. Clusters with single apiservers, or which don't use a load balancer, should NOT enable this. Min is 0 (off), Max is .02 (1/50 requests); .001 (1/1000) is a recommended starting point. -h, --help help for karpor + --high-availability whether to use high-availability feature. --http2-max-streams-per-connection int The limit that the server gives to clients for the maximum number of streams in an HTTP/2 connection. Zero means to use golang's default. (default 1000) --lease-reuse-duration-seconds int The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer. (default 60) --livez-grace-period duration This option represents the maximum amount of time it should take for apiserver to complete its startup sequence and become live. From apiserver's start time to when this amount of time has elapsed, /livez will assume that unfinished post-start hooks will complete successfully and therefore return true. @@ -244,4 +249,4 @@ karpor [flags] * [karpor syncer](karpor_syncer.md) - start a resource syncer to sync resource from clusters -###### Auto generated by spf13/cobra on 27-Nov-2024 +###### Auto generated by spf13/cobra on 2-May-2025 diff --git a/docs/cli/karpor_syncer.md b/docs/cli/karpor_syncer.md index ce2063d2..af2d081c 100644 --- a/docs/cli/karpor_syncer.md +++ b/docs/cli/karpor_syncer.md @@ -9,14 +9,18 @@ karpor syncer [flags] ### Options ``` + --agent-image-tag string The agent image tag. (default "v0.0.0") --elastic-search-addresses strings The elastic search address. + --external-addresses string The external address that expose to user cluster in pull mode. --health-probe-bind-address string The address the probe endpoint binds to. (default ":8081") -h, --help help for syncer + --high-availability Whether to use high-availability feature. --metrics-bind-address string The address the metric endpoint binds to. (default ":8080") + --only-push-mode Only push mode in high availability feature. ``` ### SEE ALSO * [karpor](karpor.md) - Launch an API server -###### Auto generated by spf13/cobra on 27-Nov-2024 +###### Auto generated by spf13/cobra on 2-May-2025 diff --git a/pkg/core/handler/cluster/cluster.go b/pkg/core/handler/cluster/cluster.go index ff4f376e..b70ab76f 100644 --- a/pkg/core/handler/cluster/cluster.go +++ b/pkg/core/handler/cluster/cluster.go @@ -20,11 +20,6 @@ import ( "strconv" "strings" - "github.com/KusionStack/karpor/pkg/core/handler" - "github.com/KusionStack/karpor/pkg/core/manager/cluster" - "github.com/KusionStack/karpor/pkg/infra/multicluster" - "github.com/KusionStack/karpor/pkg/util/clusterinstall" - "github.com/KusionStack/karpor/pkg/util/ctxutil" "github.com/go-chi/chi/v5" "github.com/pkg/errors" _ "k8s.io/api/core/v1" @@ -33,6 +28,12 @@ import ( "k8s.io/apiserver/pkg/server" "k8s.io/client-go/tools/clientcmd" k8syaml "sigs.k8s.io/yaml" + + "github.com/KusionStack/karpor/pkg/core/handler" + "github.com/KusionStack/karpor/pkg/core/manager/cluster" + "github.com/KusionStack/karpor/pkg/infra/multicluster" + "github.com/KusionStack/karpor/pkg/util/clusterinstall" + "github.com/KusionStack/karpor/pkg/util/ctxutil" ) // Get returns an HTTP handler function that reads a cluster @@ -112,7 +113,8 @@ func Create(clusterMgr *cluster.ClusterManager, c *server.CompletedConfig) http. } client, _ := multicluster.BuildMultiClusterClient(r.Context(), c.LoopbackClientConfig, "") - clusterCreated, err := clusterMgr.CreateCluster(r.Context(), client, cluster, payload.ClusterDisplayName, payload.ClusterDescription, payload.ClusterKubeConfig) + clusterCreated, err := clusterMgr.CreateCluster(r.Context(), client, cluster, payload.ClusterDisplayName, payload.ClusterDescription, + payload.ClusterMode, payload.ClusterKubeConfig, payload.ClusterLevel) handler.HandleResult(w, r, ctx, err, clusterCreated) } } @@ -242,13 +244,15 @@ func Delete(clusterMgr *cluster.ClusterManager, c *server.CompletedConfig) http. // @Tags cluster // @Accept multipart/form-data // @Produce plain -// @Param file formData file true "Upload file with field name 'file'" -// @Param name formData string true "cluster name" -// @Param displayName formData string true "cluster display name" -// @Param description formData string true "cluster description" -// @Success 200 {object} UploadData "Returns the content of the uploaded KubeConfig file." -// @Failure 400 {string} string "The uploaded file is too large or the request is invalid." -// @Failure 500 {string} string "Internal server error." +// @Param file formData file true "Upload file with field name 'file'" +// @Param name formData string true "cluster name" +// @Param displayName formData string true "cluster display name" +// @Param description formData string true "cluster description" +// @Param clusterMode formData string true "cluster mode" +// @Param clusterLevel formData int true "cluster scale level" +// @Success 200 {object} UploadData "Returns the content of the uploaded KubeConfig file." +// @Failure 400 {string} string "The uploaded file is too large or the request is invalid." +// @Failure 500 {string} string "Internal server error." // @Router /rest-api/v1/cluster/config/file [post] func UploadKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -271,6 +275,13 @@ func UploadKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { name := r.FormValue("name") displayName := r.FormValue("displayName") description := r.FormValue("description") + clusterMode := r.FormValue("clusterMode") + clusterLevel := r.FormValue("clusterLevel") + level, err := strconv.Atoi(clusterLevel) + if err != nil { + log.Info("failed to parse cluster level") + level = 1 + } file, fileHeader, err := r.FormFile("file") if err != nil { handler.FailureRender(ctx, w, r, errors.Wrapf(err, "failed to get uploaded file")) @@ -297,7 +308,7 @@ func UploadKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { } // Convert the rest.Config to Cluster object. - clusterObj, err := clusterinstall.ConvertKubeconfigToCluster(name, displayName, description, restConfig) + clusterObj, err := clusterinstall.ConvertKubeconfigToCluster(name, displayName, description, clusterMode, level, restConfig) if err != nil { handler.FailureRender(ctx, w, r, errors.Wrapf(err, "error convert kubeconfig to cluster")) return @@ -343,12 +354,12 @@ func UploadKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { // @Accept json // @Produce json // @Param request body ValidatePayload true "KubeConfig payload to validate" -// @Success 200 {string} string "Verification passed server version" -// @Failure 400 {object} string "Bad Request" -// @Failure 401 {object} string "Unauthorized" -// @Failure 429 {object} string "Too Many Requests" -// @Failure 404 {object} string "Not Found" -// @Failure 500 {object} string "Internal Server Error" +// @Success 200 {string} string "Verification passed server version" +// @Failure 400 {object} string "Bad Request" +// @Failure 401 {object} string "Unauthorized" +// @Failure 429 {object} string "Too Many Requests" +// @Failure 404 {object} string "Not Found" +// @Failure 500 {object} string "Internal Server Error" // @Router /rest-api/v1/cluster/config/validate [post] func ValidateKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { @@ -381,3 +392,40 @@ func ValidateKubeConfig(clusterMgr *cluster.ClusterManager) http.HandlerFunc { } } } + +// GetAgentYml returns an HTTP handler function to obtain the agent yaml of the special cluster. +// +// @Summary Get agent yaml +// @Description Obtain the agent yaml in secret for cluster. +// @Tags cluster +// @Accept plain +// @Accept json +// @Produce json +// @Param clusterName path string true "The name of the cluster" +// @Success 200 {string} string "Verification passed server version" +// @Failure 400 {object} string "Bad Request" +// @Failure 401 {object} string "Unauthorized" +// @Failure 429 {object} string "Too Many Requests" +// @Failure 404 {object} string "Not Found" +// @Failure 500 {object} string "Internal Server Error" +// @Router /rest-api/v1/cluster/{clusterName}/agentYml [get] +func GetAgentYml(clusterMgr *cluster.ClusterManager, c *server.CompletedConfig) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Extract the context and logger from the request. + ctx := r.Context() + logger := ctxutil.GetLogger(ctx) + cluster := chi.URLParam(r, "clusterName") + logger.Info("Getting cluster...", "cluster", cluster) + + client, err := multicluster.BuildHubClients(r.Context(), c.LoopbackClientConfig) + if err != nil { + handler.FailureRender(ctx, w, r, err) + return + } + + agentYaml, err := clusterMgr.GetAgentYamlForCluster(r.Context(), client, cluster) + handler.HandleResult(w, r, ctx, err, map[string]string{ + "agentYml": string(agentYaml), + }) + } +} diff --git a/pkg/core/handler/cluster/types.go b/pkg/core/handler/cluster/types.go index e61bd5a6..85c6542a 100644 --- a/pkg/core/handler/cluster/types.go +++ b/pkg/core/handler/cluster/types.go @@ -33,9 +33,11 @@ var sortCriteriaMap = map[string]cluster.SortCriteria{ // //nolint:tagliatelle type ClusterPayload struct { - ClusterDisplayName string `json:"displayName"` // ClusterDisplayName is the display name of cluster to be created - ClusterDescription string `json:"description"` // ClusterDescription is the description of cluster to be created - ClusterKubeConfig string `json:"kubeConfig"` // ClusterKubeConfig is the kubeconfig of cluster to be created + ClusterDisplayName string `json:"displayName"` // ClusterDisplayName is the display name of cluster to be created + ClusterDescription string `json:"description"` // ClusterDescription is the description of cluster to be created + ClusterKubeConfig string `json:"kubeConfig"` // ClusterKubeConfig is the kubeconfig of cluster to be created + ClusterMode string `json:"clusterMode"` // ClusterMode is the mode of cluster to be created + ClusterLevel int `json:"clusterLevel"` // clusterLevel is the scale level of cluster to be created } type UploadData struct { diff --git a/pkg/core/manager/cluster/manager.go b/pkg/core/manager/cluster/manager.go index 84faea32..b810b5cb 100644 --- a/pkg/core/manager/cluster/manager.go +++ b/pkg/core/manager/cluster/manager.go @@ -20,11 +20,6 @@ import ( "fmt" "time" - "github.com/KusionStack/karpor/pkg/core/handler" - "github.com/KusionStack/karpor/pkg/infra/multicluster" - clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" - "github.com/KusionStack/karpor/pkg/util/clusterinstall" - "github.com/KusionStack/karpor/pkg/util/ctxutil" errors2 "github.com/pkg/errors" yaml "gopkg.in/yaml.v3" v1 "k8s.io/api/core/v1" @@ -35,7 +30,14 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + corev1 "k8s.io/kubernetes/pkg/apis/core/v1" k8syaml "sigs.k8s.io/yaml" + + "github.com/KusionStack/karpor/pkg/core/handler" + "github.com/KusionStack/karpor/pkg/infra/multicluster" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + "github.com/KusionStack/karpor/pkg/util/clusterinstall" + "github.com/KusionStack/karpor/pkg/util/ctxutil" ) type ClusterManager struct{} @@ -68,7 +70,7 @@ func (c *ClusterManager) GetCluster( func (c *ClusterManager) CreateCluster( ctx context.Context, client *multicluster.MultiClusterClient, - name, displayName, description, kubeconfig string, + name, displayName, description, clusterMode, kubeconfig string, clusterLevel int, ) (*unstructured.Unstructured, error) { clusterGVR := clusterv1beta1.SchemeGroupVersion.WithResource("clusters") // Make sure the cluster does not exist first @@ -91,6 +93,8 @@ func (c *ClusterManager) CreateCluster( name, displayName, description, + clusterMode, + clusterLevel, restConfig, ) if err != nil { @@ -142,6 +146,8 @@ func (c *ClusterManager) UpdateCredential( } displayName := currentObj.Object["spec"].(map[string]interface{})["displayName"].(string) description := currentObj.Object["spec"].(map[string]interface{})["description"].(string) + clusterMode := currentObj.Object["spec"].(map[string]interface{})["mode"].(string) + clusterLevel := currentObj.Object["spec"].(map[string]interface{})["level"].(int) // Create new restConfig from updated kubeconfig restConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(kubeconfig)) @@ -153,7 +159,9 @@ func (c *ClusterManager) UpdateCredential( clusterObj, err := clusterinstall.ConvertKubeconfigToCluster( name, displayName, + clusterMode, description, + clusterLevel, restConfig, ) if err != nil { @@ -466,3 +474,22 @@ func (c *ClusterManager) ValidateKubeConfigFor( return info.String(), nil } } + +// GetAgentYamlForCluster returns the agent yaml byte for a given cluster +func (c *ClusterManager) GetAgentYamlForCluster( + ctx context.Context, + client *multicluster.MultiClusterClient, + name string, +) ([]byte, error) { + secret, err := client.ClientSet.CoreV1().Secrets("karpor").Get(ctx, fmt.Sprintf("%s-agent", name), metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + if errors.IsNotFound(err) { + return nil, nil + } + if secret == nil || secret.Data == nil { + return nil, errors.NewNotFound(corev1.Resource("secrets"), name) + } + return secret.Data["config"], nil +} diff --git a/pkg/core/manager/cluster/manager_test.go b/pkg/core/manager/cluster/manager_test.go index 3e8f5682..b97d60c3 100644 --- a/pkg/core/manager/cluster/manager_test.go +++ b/pkg/core/manager/cluster/manager_test.go @@ -20,8 +20,6 @@ import ( "fmt" "testing" - "github.com/KusionStack/karpor/pkg/infra/multicluster" - clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" "github.com/bytedance/mockey" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/api/errors" @@ -29,6 +27,9 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" k8syaml "sigs.k8s.io/yaml" + + "github.com/KusionStack/karpor/pkg/infra/multicluster" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" ) // TestGetCluster tests the GetCluster method of the ClusterManager for various @@ -132,6 +133,8 @@ func TestCreateCluster(t *testing.T) { clusterName string displayName string description string + clusterMode string + clusterLevel int kubeConfig string expectError bool expectedErrorMessage string @@ -172,7 +175,9 @@ func TestCreateCluster(t *testing.T) { tc.clusterName, tc.displayName, tc.description, + tc.clusterMode, tc.kubeConfig, + tc.clusterLevel, ) if tc.expectError { @@ -204,6 +209,8 @@ func TestUpdateMetadata(t *testing.T) { clusterName string displayName string description string + clusterMode string + clusterLevel int expectError bool expectedError string }{ @@ -484,6 +491,8 @@ spec: privateKey: M2I5NioqKioqKioqKioqKioqKioqKioqKioqKjY1MzY= description: mock-description displayName: Existing Cluster + level: 2 + mode: pull `, }, { @@ -731,6 +740,8 @@ func newMockCluster(name string) *unstructured.Unstructured { "caBundle": "sensitive-ca-bundle", }, }, + "mode": "pull", + "level": 2, } // Set annotations on the object diff --git a/pkg/core/manager/cluster/types.go b/pkg/core/manager/cluster/types.go index 55d2b973..e633d629 100644 --- a/pkg/core/manager/cluster/types.go +++ b/pkg/core/manager/cluster/types.go @@ -104,3 +104,7 @@ type User struct { Username string `yaml:"username,omitempty"` Password string `yaml:"password,omitempty"` } + +type AgentYml struct { + AgentYml string `json:"agentYml"` +} diff --git a/pkg/core/route/route.go b/pkg/core/route/route.go index c3cded43..691cc7c0 100644 --- a/pkg/core/route/route.go +++ b/pkg/core/route/route.go @@ -20,6 +20,12 @@ import ( "expvar" "net/http" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + httpswagger "github.com/swaggo/http-swagger/v2" + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/klog/v2" + docs "github.com/KusionStack/karpor/api/openapispec" aggregatorhandler "github.com/KusionStack/karpor/pkg/core/handler/aggregator" authnhandler "github.com/KusionStack/karpor/pkg/core/handler/authn" @@ -44,11 +50,6 @@ import ( "github.com/KusionStack/karpor/pkg/infra/search/storage" "github.com/KusionStack/karpor/pkg/kubernetes/registry" "github.com/KusionStack/karpor/pkg/kubernetes/registry/search" - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - httpswagger "github.com/swaggo/http-swagger/v2" - genericapiserver "k8s.io/apiserver/pkg/server" - "k8s.io/klog/v2" ) // NewCoreRoute creates and configures an instance of chi.Mux with the given @@ -156,6 +157,7 @@ func setupRestAPIV1( r.Post("/", clusterhandler.Create(clusterMgr, genericConfig)) r.Put("/", clusterhandler.Update(clusterMgr, genericConfig)) r.Delete("/", clusterhandler.Delete(clusterMgr, genericConfig)) + r.Get("/agentYml", clusterhandler.GetAgentYml(clusterMgr, genericConfig)) }) r.Post("/config/file", clusterhandler.UploadKubeConfig(clusterMgr)) r.Post("/config/validate", clusterhandler.ValidateKubeConfig(clusterMgr)) diff --git a/pkg/infra/search/storage/elasticsearch/resource.go b/pkg/infra/search/storage/elasticsearch/resource.go index bcc205ad..28187103 100644 --- a/pkg/infra/search/storage/elasticsearch/resource.go +++ b/pkg/infra/search/storage/elasticsearch/resource.go @@ -124,7 +124,7 @@ func (s *Storage) GetResource(ctx context.Context, cluster string, obj runtime.O } if len(resp.Hits.Hits) == 0 { - return fmt.Errorf("no resource found for cluster: %s, namespace: %s, name: %s", cluster, unObj.GetNamespace(), unObj.GetName()) + return ErrNotFound } res, err := storage.Map2Resource(resp.Hits.Hits[0].Source) diff --git a/pkg/infra/search/storage/types.go b/pkg/infra/search/storage/types.go index 4b444d19..5408871b 100644 --- a/pkg/infra/search/storage/types.go +++ b/pkg/infra/search/storage/types.go @@ -76,6 +76,7 @@ type SearchStorage interface { type SearchStorageGetter interface { GetSearchStorage() (SearchStorage, error) } + type ResourceStorageGetter interface { GetResourceStorage() (ResourceStorage, error) } diff --git a/pkg/kubernetes/apis/cluster/types.go b/pkg/kubernetes/apis/cluster/types.go index 3eb3b21e..05ec2461 100644 --- a/pkg/kubernetes/apis/cluster/types.go +++ b/pkg/kubernetes/apis/cluster/types.go @@ -28,6 +28,14 @@ const ( CredentialTypeOIDC CredentialType = "OIDC" ) +const ( + PullClusterMode = "pull" + PushClusterMode = "push" +) + +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:subresource:status // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -58,6 +66,9 @@ type ClusterSpec struct { Description string `json:"description,omitempty"` DisplayName string `json:"displayName,omitempty"` Finalized *bool `json:"finalized,omitempty"` + Mode string `json:"mode,omitempty"` + // cluster scale level, optional value 1, 2, 3, default 1 + Level int `json:"level"` } type ClusterStatus struct { diff --git a/pkg/kubernetes/apis/cluster/v1beta1/types.go b/pkg/kubernetes/apis/cluster/v1beta1/types.go index 4a80096a..b5f5a3b9 100644 --- a/pkg/kubernetes/apis/cluster/v1beta1/types.go +++ b/pkg/kubernetes/apis/cluster/v1beta1/types.go @@ -28,6 +28,18 @@ const ( CredentialTypeOIDC CredentialType = "OIDC" ) +const ( + PullClusterMode = "pull" + PushClusterMode = "push" +) + +const ( + ClusterFinalizer = "finalizers.cluster.karpor.io" +) + +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:subresource:status // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -59,6 +71,9 @@ type ClusterSpec struct { Description string `json:"description"` DisplayName string `json:"displayName"` Finalized *bool `json:"finalized,omitempty"` + Mode string `json:"mode,omitempty"` + // cluster scale level, optional value 1, 2, 3, default 1 + Level int `json:"level"` } type ClusterStatus struct { diff --git a/pkg/kubernetes/apis/cluster/v1beta1/zz_generated.conversion.go b/pkg/kubernetes/apis/cluster/v1beta1/zz_generated.conversion.go index 194e8c33..339cfc80 100644 --- a/pkg/kubernetes/apis/cluster/v1beta1/zz_generated.conversion.go +++ b/pkg/kubernetes/apis/cluster/v1beta1/zz_generated.conversion.go @@ -297,6 +297,8 @@ func autoConvert_v1beta1_ClusterSpec_To_cluster_ClusterSpec(in *ClusterSpec, out out.Description = in.Description out.DisplayName = in.DisplayName out.Finalized = (*bool)(unsafe.Pointer(in.Finalized)) + out.Mode = in.Mode + out.Level = in.Level return nil } @@ -313,6 +315,8 @@ func autoConvert_cluster_ClusterSpec_To_v1beta1_ClusterSpec(in *cluster.ClusterS out.Description = in.Description out.DisplayName = in.DisplayName out.Finalized = (*bool)(unsafe.Pointer(in.Finalized)) + out.Mode = in.Mode + out.Level = in.Level return nil } diff --git a/pkg/kubernetes/apis/search/v1beta1/types.go b/pkg/kubernetes/apis/search/v1beta1/types.go index 6c722392..229391d3 100644 --- a/pkg/kubernetes/apis/search/v1beta1/types.go +++ b/pkg/kubernetes/apis/search/v1beta1/types.go @@ -20,6 +20,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster +// +kubebuilder:subresource:status // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -62,6 +65,8 @@ type SyncRegistryList struct { Items []SyncRegistry `json:"items"` } +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -137,6 +142,8 @@ type ResourceSyncRule struct { RemainAfterDeleted bool `json:"remainAfterDeleted,omitempty"` } +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -175,6 +182,8 @@ type TrimRuleList struct { Items []TrimRule `json:"items"` } +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Cluster // +genclient // +genclient:nonNamespaced // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/kubernetes/generated/informers/externalversions/cluster/v1beta1/cluster.go b/pkg/kubernetes/generated/informers/externalversions/cluster/v1beta1/cluster.go index 456fb061..21689c93 100644 --- a/pkg/kubernetes/generated/informers/externalversions/cluster/v1beta1/cluster.go +++ b/pkg/kubernetes/generated/informers/externalversions/cluster/v1beta1/cluster.go @@ -76,17 +76,14 @@ func NewFilteredClusterInformer(client versioned.Interface, resyncPeriod time.Du ) } -// defaultInformer provides a default implementation for the cluster informer using the given client and resync period. func (f *clusterInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredClusterInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } -// Informer returns the SharedIndexInformer for the cluster informer. func (f *clusterInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&clusterv1beta1.Cluster{}, f.defaultInformer) } -// Lister returns the ClusterLister for the cluster informer. func (f *clusterInformer) Lister() v1beta1.ClusterLister { return v1beta1.NewClusterLister(f.Informer().GetIndexer()) } diff --git a/pkg/kubernetes/generated/informers/externalversions/factory.go b/pkg/kubernetes/generated/informers/externalversions/factory.go index 90cb7bd5..116160dd 100644 --- a/pkg/kubernetes/generated/informers/externalversions/factory.go +++ b/pkg/kubernetes/generated/informers/externalversions/factory.go @@ -113,7 +113,6 @@ func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResy return factory } -// Start starts all informers in the factory and waits for the stop channel to close before exiting. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() @@ -138,7 +137,6 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { } } -// Shutdown stops all running informers in the factory. func (f *sharedInformerFactory) Shutdown() { f.lock.Lock() f.shuttingDown = true @@ -148,7 +146,6 @@ func (f *sharedInformerFactory) Shutdown() { f.wg.Wait() } -// WaitForCacheSync waits for all started informers to sync with the cluster state. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() @@ -251,12 +248,10 @@ type SharedInformerFactory interface { Search() search.Interface } -// Cluster returns the cluster informer. func (f *sharedInformerFactory) Cluster() cluster.Interface { return cluster.New(f, f.namespace, f.tweakListOptions) } -// Search returns the search informer. func (f *sharedInformerFactory) Search() search.Interface { return search.New(f, f.namespace, f.tweakListOptions) } diff --git a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncregistry.go b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncregistry.go index a01e732c..2f1edc17 100644 --- a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncregistry.go +++ b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncregistry.go @@ -76,17 +76,14 @@ func NewFilteredSyncRegistryInformer(client versioned.Interface, resyncPeriod ti ) } -// defaultInformer provides the default implementation for the shared informer used by the SyncRegistryInformer. func (f *syncRegistryInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredSyncRegistryInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } -// Informer returns the shared informer for the SyncRegistryInformer. func (f *syncRegistryInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&searchv1beta1.SyncRegistry{}, f.defaultInformer) } -// Lister returns the lister for the SyncRegistryInformer. func (f *syncRegistryInformer) Lister() v1beta1.SyncRegistryLister { return v1beta1.NewSyncRegistryLister(f.Informer().GetIndexer()) } diff --git a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncresources.go b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncresources.go index 335d512f..5c386813 100644 --- a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncresources.go +++ b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/syncresources.go @@ -76,17 +76,14 @@ func NewFilteredSyncResourcesInformer(client versioned.Interface, resyncPeriod t ) } -// defaultInformer provides a default SharedIndexInformer implementation for SyncResources resources. func (f *syncResourcesInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredSyncResourcesInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } -// Informer returns the SharedIndexInformer for SyncResources resources. func (f *syncResourcesInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&searchv1beta1.SyncResources{}, f.defaultInformer) } -// Lister returns the SyncResourcesLister for SyncResources resources. func (f *syncResourcesInformer) Lister() v1beta1.SyncResourcesLister { return v1beta1.NewSyncResourcesLister(f.Informer().GetIndexer()) } diff --git a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/transformrule.go b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/transformrule.go index 509aebd2..443efd2c 100644 --- a/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/transformrule.go +++ b/pkg/kubernetes/generated/informers/externalversions/search/v1beta1/transformrule.go @@ -76,17 +76,14 @@ func NewFilteredTransformRuleInformer(client versioned.Interface, resyncPeriod t ) } -// defaultInformer provides a default implementation for the SharedIndexInformer interface. func (f *transformRuleInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredTransformRuleInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } -// Informer returns the SharedIndexInformer for TransformRule resources. func (f *transformRuleInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&searchv1beta1.TransformRule{}, f.defaultInformer) } -// Lister returns the TransformRuleLister for TransformRule resources. func (f *transformRuleInformer) Lister() v1beta1.TransformRuleLister { return v1beta1.NewTransformRuleLister(f.Informer().GetIndexer()) } diff --git a/pkg/kubernetes/generated/openapi/zz_generated.openapi.go b/pkg/kubernetes/generated/openapi/zz_generated.openapi.go index 116b9869..524bd5f9 100644 --- a/pkg/kubernetes/generated/openapi/zz_generated.openapi.go +++ b/pkg/kubernetes/generated/openapi/zz_generated.openapi.go @@ -361,8 +361,22 @@ func schema_kubernetes_apis_cluster_v1beta1_ClusterSpec(ref common.ReferenceCall Format: "", }, }, + "mode": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "level": { + SchemaProps: spec.SchemaProps{ + Description: "cluster scale level, optional value 1, 2, 3, default 1", + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, }, - Required: []string{"provider", "access", "displayName"}, + Required: []string{"provider", "access", "displayName", "level"}, }, }, Dependencies: []string{ diff --git a/pkg/syncer/agent.go b/pkg/syncer/agent.go new file mode 100644 index 00000000..bf5b8e29 --- /dev/null +++ b/pkg/syncer/agent.go @@ -0,0 +1,170 @@ +// Copyright The Karpor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "fmt" + "os" + "os/user" + "path" + "path/filepath" + "sync" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/KusionStack/karpor/pkg/infra/search/storage" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" +) + +type AgentReconciler struct { + SyncReconciler + gvrToGVKCache sync.Map + discoveryClient discovery.DiscoveryInterface + clusterName string +} + +// NewAgentReconciler creates a new instance of the AgentReconciler structure with the given storage. +func NewAgentReconciler(storage storage.ResourceStorage, clusterName string) *AgentReconciler { + return &AgentReconciler{ + SyncReconciler: SyncReconciler{ + storage: storage, + }, + + clusterName: clusterName, + } +} + +// SetupWithManager sets up the AgentReconciler with the given manager and registers it as a controller. Different from the SyncReconcile, it only focus on the special cluster. +func (r *AgentReconciler) SetupWithManager(mgr ctrl.Manager) error { + // only focus on the special cluster + clusterFilter := predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return e.Object.GetName() == r.clusterName + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return e.ObjectNew.GetName() == r.clusterName + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return e.Object.GetName() == r.clusterName + }, + GenericFunc: func(e event.GenericEvent) bool { + return e.Object.GetName() == r.clusterName + }, + } + + controller, err := ctrl.NewControllerManagedBy(mgr). + For(&clusterv1beta1.Cluster{}, builder.WithPredicates(clusterFilter)). + Watches(&source.Kind{Type: &searchv1beta1.SyncRegistry{}}, &handler.Funcs{ + CreateFunc: r.CreateEvent, + UpdateFunc: r.UpdateEvent, + DeleteFunc: r.DeleteEvent, + }). + Build(r) + if err != nil { + return err + } + r.client = mgr.GetClient() + r.controller = controller + r.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig()) + // TODO: + r.mgr = NewMultiClusterSyncManager(context.Background(), r.controller, r.storage) + return nil +} + +// Reconcile is the main entry point for the syncer reconciler, which is called whenever there is a change in the watched resources. +func (r *AgentReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + logger := ctrl.LoggerFrom(ctx) + + var cluster clusterv1beta1.Cluster + if err := r.client.Get(ctx, req.NamespacedName, &cluster); err != nil { + if apierrors.IsNotFound(err) { + logger.Info("cluster doesn't exist", "cluster", req.Name) + return reconcile.Result{}, r.stopCluster(ctx, req.Name) + } + return reconcile.Result{}, err + } + + if !cluster.DeletionTimestamp.IsZero() { + logger.Info("cluster is being deleted", "cluster", cluster.Name) + + err := r.stopCluster(ctx, cluster.Name) + if err != nil { + return reconcile.Result{}, err + } + + if len(cluster.Finalizers) > 0 { + cluster.Finalizers = nil + err := r.client.Update(ctx, &cluster) + if err != nil && !apierrors.IsNotFound(err) { + return reconcile.Result{}, err + } + } + + return reconcile.Result{}, nil + } + + return reconcile.Result{}, r.handleClusterAddOrUpdate(ctx, cluster.DeepCopy(), buildClusterConfigInAgent) +} + +// agent and reconciler should reuse the 'handleClusterAddOrUpdate' func, however there are different logic in building cluster client. +func buildClusterConfigInAgent(cluster *clusterv1beta1.Cluster) (*rest.Config, error) { + loadingRules := &clientcmd.ClientConfigLoadingRules{ + WarnIfAllMissing: false, + Precedence: []string{clientcmd.RecommendedHomeFile}, + MigrationRules: map[string]string{ + clientcmd.RecommendedHomeFile: filepath.Join(os.Getenv("HOME"), clientcmd.RecommendedHomeDir, ".kubeconfig"), + }, + } + if _, ok := os.LookupEnv("HOME"); !ok { + u, err := user.Current() + if err != nil { + return nil, fmt.Errorf("could not get current user: %v", err) + } + loadingRules.Precedence = append(loadingRules.Precedence, path.Join(u.HomeDir, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName)) + } + cfg, err := loadConfigWithContext("", loadingRules, "") + if err != nil { + return nil, err + } + if cfg.QPS == 0.0 { + cfg.QPS = 20.0 + cfg.Burst = 30.0 + } + return cfg, nil +} + +func loadConfigWithContext(apiServerURL string, loader clientcmd.ClientConfigLoader, context string) (*rest.Config, error) { + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + loader, + &clientcmd.ConfigOverrides{ + ClusterInfo: clientcmdapi.Cluster{ + Server: apiServerURL, + }, + CurrentContext: context, + }).ClientConfig() +} diff --git a/pkg/syncer/agent_test.go b/pkg/syncer/agent_test.go new file mode 100644 index 00000000..bb8f0a6d --- /dev/null +++ b/pkg/syncer/agent_test.go @@ -0,0 +1,168 @@ +// Copyright The Karpor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//nolint:dupl +package syncer + +import ( + "context" + "testing" + + "github.com/bytedance/mockey" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/KusionStack/karpor/pkg/infra/search/storage" + "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" + "github.com/KusionStack/karpor/pkg/kubernetes/scheme" +) + +func TestAgentReconciler_Reconcile(t *testing.T) { + tests := []struct { + name string + cluster *clusterv1beta1.Cluster + req reconcile.Request + wantErr bool + }{ + { + "test no error", + &clusterv1beta1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}}, + reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster1"}}, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &AgentReconciler{ + SyncReconciler: SyncReconciler{ + client: fake.NewClientBuilder().WithRuntimeObjects(tt.cluster).WithScheme(scheme.Scheme).Build(), + }, + } + m := mockey.Mock((*SyncReconciler).handleClusterAddOrUpdate).Return(nil).Build() + defer m.UnPatch() + _, err := r.Reconcile(context.TODO(), tt.req) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestNewAgentReconciler(t *testing.T) { + tests := []struct { + name string + storage storage.ResourceStorage + clusterName string + }{ + { + "test nil", + nil, + "example-cluster", + }, + { + "test not nil", + &elasticsearch.Storage{}, + "example-cluster2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewAgentReconciler(tt.storage, tt.clusterName) + require.Equal(t, got.storage, tt.storage) + require.Equal(t, got.clusterName, tt.clusterName) + }) + } +} + +func TestAgentReconciler_getResources(t *testing.T) { + tests := []struct { + name string + cluster *clusterv1beta1.Cluster + registries []searchv1beta1.SyncRegistry + resources map[schema.GroupVersionResource]*searchv1beta1.ResourceSyncRule + wildResources map[string]*searchv1beta1.ResourceSyncRule + allResources []*searchv1beta1.ResourceSyncRule + pendingWildcards []*searchv1beta1.ResourceSyncRule + wantErr bool + }{ + { + "test no error", + &clusterv1beta1.Cluster{ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}}, + []searchv1beta1.SyncRegistry{ + { + ObjectMeta: metav1.ObjectMeta{Name: "example-registry1"}, + Spec: searchv1beta1.SyncRegistrySpec{ + Clusters: []string{"cluster1"}, + }, + }, + }, + map[schema.GroupVersionResource]*searchv1beta1.ResourceSyncRule{ + v1.SchemeGroupVersion.WithResource("pods"): { + APIVersion: "v1", + Resource: "pods", + }, + }, + map[string]*searchv1beta1.ResourceSyncRule{ + "v1": { + APIVersion: "v1", + Resource: "*", + }, + }, + []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "v1", + Resource: "pods", + }, + }, + []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "v1", + Resource: "*", + }, + }, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &AgentReconciler{ + SyncReconciler: SyncReconciler{ + client: fake.NewClientBuilder().WithRuntimeObjects(tt.cluster).WithScheme(scheme.Scheme).Build(), + }, + } + r.gvrToGVKCache.Store(v1.SchemeGroupVersion.WithResource("pods"), schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}) + + mockey.Mock((*SyncReconciler).getRegistries).Return(tt.registries, nil).Build() + mockey.Mock((*SyncReconciler).getNormalizedResources).Return(tt.resources, tt.wildResources, nil).Build() + defer mockey.UnPatchAll() + allResources, pendingWildcards, err := r.getResources(context.TODO(), tt.cluster) + require.Equal(t, allResources, tt.allResources) + require.Equal(t, pendingWildcards, tt.pendingWildcards) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/syncer/reconciler.go b/pkg/syncer/reconciler.go index 19b4bbab..1d17b0cd 100644 --- a/pkg/syncer/reconciler.go +++ b/pkg/syncer/reconciler.go @@ -15,25 +15,35 @@ package syncer import ( + "bytes" "context" "fmt" "reflect" "strings" + templateUtil "text/template" - "github.com/KusionStack/karpor/pkg/infra/search/storage" - clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" - searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" "github.com/pkg/errors" + "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + utilErr "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "github.com/KusionStack/karpor/config" + "github.com/KusionStack/karpor/pkg/infra/search/storage" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" + "github.com/KusionStack/karpor/pkg/syncer/template" + "github.com/KusionStack/karpor/pkg/syncer/utils" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -48,18 +58,35 @@ const ( anyResource = "*" ) +type ( + buildClusterConfigFunc func(*clusterv1beta1.Cluster) (*rest.Config, error) +) + // SyncReconciler is the main structure that holds the state and dependencies for the multi-cluster syncer reconciler. type SyncReconciler struct { storage storage.ResourceStorage + highAvailability bool + client client.Client controller controller.Controller mgr MultiClusterSyncManager + + storageAddresses []string + + externalEndpoint string + agentImageTag string } // NewSyncReconciler creates a new instance of the SyncReconciler structure with the given storage. -func NewSyncReconciler(storage storage.ResourceStorage) *SyncReconciler { - return &SyncReconciler{storage: storage} +func NewSyncReconciler(storage storage.ResourceStorage, highAvailability bool, storageAddresses []string, externalEndpoint, agentImageTag string) *SyncReconciler { + return &SyncReconciler{ + storage: storage, + highAvailability: highAvailability, + storageAddresses: storageAddresses, + externalEndpoint: externalEndpoint, + agentImageTag: agentImageTag, + } } // SetupWithManager sets up the SyncReconciler with the given manager and registers it as a controller. @@ -123,7 +150,7 @@ func (r *SyncReconciler) DeleteEvent(de event.DeleteEvent, queue workqueue.RateL } // Reconcile is the main entry point for the syncer reconciler, which is called whenever there is a change in the watched resources. -func (r *SyncReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { +func (r *SyncReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, retErr error) { logger := ctrl.LoggerFrom(ctx) var cluster clusterv1beta1.Cluster @@ -135,9 +162,22 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req reconcile.Request) ( return reconcile.Result{}, err } + defer func() { + // patch of delete finalizer for high availability cluster. + err := r.client.Update(ctx, &cluster) + if err != nil && !apierrors.IsNotFound(err) { + retErr = err + } + }() + if !cluster.DeletionTimestamp.IsZero() { logger.Info("cluster is being deleted", "cluster", cluster.Name) - return reconcile.Result{}, r.stopCluster(ctx, cluster.Name) + + err := r.reconcileDelete(ctx, &cluster) + if err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{}, nil } // TODO: it's danger @@ -146,7 +186,45 @@ func (r *SyncReconciler) Reconcile(ctx context.Context, req reconcile.Request) ( // return reconcile.Result{}, r.stopCluster(ctx, cluster.Name) // } - return reconcile.Result{}, r.handleClusterAddOrUpdate(ctx, cluster.DeepCopy()) + if r.highAvailability { + // set finalizer for cluster. + cluster.SetFinalizers([]string{clusterv1beta1.ClusterFinalizer}) + if cluster.Spec.Mode == clusterv1beta1.PushClusterMode { + return reconcile.Result{}, r.handleClusterAddOrUpdateForPush(ctx, cluster.DeepCopy()) + } + // TODO implement pull mode for high-availability + // In the pull mode, controller only generate the agent yaml and not dispatch to managed cluster + return reconcile.Result{}, nil + } + + return reconcile.Result{}, r.handleClusterAddOrUpdate(ctx, cluster.DeepCopy(), buildClusterConfigInSyncer) +} + +// reconcileDelete delete relevant resources for cluster in ha mode. +func (r *SyncReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1beta1.Cluster) error { + err := r.stopCluster(ctx, cluster.Name) + if err != nil { + return err + } + + if r.highAvailability && cluster.Spec.Mode == clusterv1beta1.PushClusterMode { + clusterConfig, err := buildClusterConfigInSyncer(cluster) + if err != nil { + return errors.Wrapf(err, "failed to build config for cluster %s", cluster.Name) + } + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return errors.Wrapf(err, "failed to build dynamic client for cluster %s", cluster.Name) + } + + err = dynamicClient.Resource(clusterv1beta1.SchemeGroupVersion.WithResource("clusters")).Namespace("").Delete(ctx, cluster.Name, metav1.DeleteOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to delete cluster cr %s in user cluster", cluster.Name) + } + } + + cluster.SetFinalizers(nil) + return nil } // stopCluster stops the reconciliation process for the given cluster. @@ -157,6 +235,21 @@ func (r *SyncReconciler) stopCluster(ctx context.Context, clusterName string) er return err } r.mgr.Stop(ctx, clusterName) + + if r.highAvailability { + // delete secret + secretName := fmt.Sprintf("%s-agent", clusterName) + err := r.client.Delete(ctx, &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: "karpor", + }, + }) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + logger.Info("syncing cluster has been stopped", "cluster", clusterName) return nil } @@ -170,7 +263,8 @@ func (r *SyncReconciler) startCluster(ctx context.Context, clusterName string) e } // handleClusterAddOrUpdate is responsible for handling the addition or update of a cluster resource. -func (r *SyncReconciler) handleClusterAddOrUpdate(ctx context.Context, cluster *clusterv1beta1.Cluster) error { +// This function will be used for agent in ha mode, and reconciler with no-ha mode. +func (r *SyncReconciler) handleClusterAddOrUpdate(ctx context.Context, cluster *clusterv1beta1.Cluster, buildClusterConfigFunc buildClusterConfigFunc) error { logger := ctrl.LoggerFrom(ctx) resources, pendingWildcards, err := r.getResources(ctx, cluster) @@ -186,7 +280,7 @@ func (r *SyncReconciler) handleClusterAddOrUpdate(ctx context.Context, cluster * return r.stopCluster(ctx, cluster.Name) } - clusterConfig, err := buildClusterConfig(cluster) + clusterConfig, err := buildClusterConfigFunc(cluster) if err != nil { return errors.Wrapf(err, "failed to build config for cluster %s", cluster.Name) } @@ -235,6 +329,40 @@ func (r *SyncReconciler) handleClusterAddOrUpdate(ctx context.Context, cluster * return nil } +// handleClusterAddOrUpdateForPush dispatches the relevant crds resources to managed cluster in high-availability scene. +func (r *SyncReconciler) handleClusterAddOrUpdateForPush(ctx context.Context, cluster *clusterv1beta1.Cluster) error { + logger := ctrl.LoggerFrom(ctx) + logger.V(5).Info("handle cluster has been added/updated, push mode", "cluster", cluster.Name) + + // build user client + clusterConfig, err := buildClusterConfigInSyncer(cluster) + if err != nil { + return errors.Wrapf(err, "failed to build config for cluster %s", cluster.Name) + } + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return errors.Wrapf(err, "failed to build dynamic client for cluster %s", cluster.Name) + } + + // must apply crds before other resources. + err = utils.ApplyCrds(ctx, dynamicClient) + if err != nil { + return errors.Wrapf(err, "failed to apply crds for cluster %s", cluster.Name) + } + + err = r.dispatchResources(ctx, dynamicClient, cluster) + if err != nil { + return errors.Wrapf(err, "failed to dispatch resources for cluster %s", cluster.Name) + } + + err = r.generateAgentYaml(ctx, cluster) + if err != nil { + return errors.Wrapf(err, "failed to generate agent yaml for push mode cluster %s", cluster.Name) + } + + return err +} + // getResources retrieves the list of resource sync rules for the given cluster. func (r *SyncReconciler) getResources(ctx context.Context, cluster *clusterv1beta1.Cluster) ([]*searchv1beta1.ResourceSyncRule, []*searchv1beta1.ResourceSyncRule, error) { registries, err := r.getRegistries(ctx, cluster) @@ -316,6 +444,132 @@ func (r *SyncReconciler) getNormalizedResources(ctx context.Context, registry *s return ret, pendingWildcards, nil } +// dispatchResources dispatch cluster, syncregistry, syncresource, transformrule and trimrule resource to user cluster. +func (r *SyncReconciler) dispatchResources(ctx context.Context, dynamicClient dynamic.Interface, cluster *clusterv1beta1.Cluster) error { + // collect the resources needed to be dispatched + unstructuredObjectMap := map[schema.GroupVersionResource][]unstructured.Unstructured{} + err := r.getUnstructuredCluster(cluster, unstructuredObjectMap) + if err != nil { + return errors.Wrapf(err, "error get unstructured cluster cr for cluster %s", cluster.Name) + } + + err = r.getUnstructuredRegistries(ctx, cluster, unstructuredObjectMap) + if err != nil { + return errors.Wrapf(err, "error get unstructured objects of the syncregistries for cluster %s", cluster.Name) + } + + // dispatch resources + var errs []error + for gvr := range unstructuredObjectMap { + for idx := range unstructuredObjectMap[gvr] { + unstructuredObj := unstructuredObjectMap[gvr][idx] + err := utils.CreateOrUpdateUnstructured(ctx, dynamicClient, gvr, "", &unstructuredObj) + if err != nil { + errs = append(errs, err) + } + } + } + + return utilErr.NewAggregate(errs) +} + +// generateAgentYaml generate the agent yaml to be deployed in user cluster, and save yaml into secret. +func (r *SyncReconciler) generateAgentYaml(ctx context.Context, cluster *clusterv1beta1.Cluster) error { + agentYml, err := r.renderYamlFile(cluster) + if err != nil { + return errors.Wrap(err, "failed to render agent yaml") + } + + err = applyAgentYmlSecret(ctx, r.client, cluster, []byte(agentYml)) + if err != nil { + return err + } + + return nil +} + +// getUnstructuredCluster retrieves the cluster cr for the given cluster. +func (r *SyncReconciler) getUnstructuredCluster(cluster *clusterv1beta1.Cluster, unstructuredObjectMap map[schema.GroupVersionResource][]unstructured.Unstructured) error { + // only dispatch cluster cr once + unstructuredObj, err := utils.ConvertToUnstructured(cluster) + if err != nil { + return errors.Wrapf(err, "failed to convert to unstructured object") + } + unstructuredObjectMap[clusterv1beta1.SchemeGroupVersion.WithResource("clusters")] = []unstructured.Unstructured{*unstructuredObj} + + return nil +} + +// getRegistries retrieves the list of sync registries for the given cluster. +func (r *SyncReconciler) getUnstructuredRegistries(ctx context.Context, cluster *clusterv1beta1.Cluster, unstructuredObjectMap map[schema.GroupVersionResource][]unstructured.Unstructured) error { + registries, err := r.getRegistries(ctx, cluster) + if err != nil { + return errors.Wrapf(err, "failed to get registries") + } + + // init map value + unstructuredRegistries := make([]unstructured.Unstructured, 0, len(registries)) + var unstructuredTransformRules []unstructured.Unstructured + var unstructuredTrimRules []unstructured.Unstructured + + // avoid to collect duplicate cr + transformRuleMap := make(map[string]struct{}) + trimRuleMap := make(map[string]struct{}) + + // collect cr list + for idx := range registries { + registry := registries[idx] + // set special cluster name when dispatching + registry.Spec.Clusters = []string{cluster.Name} + + unstructuredObj, err := utils.ConvertToUnstructured(®istry) + if err != nil { + return errors.Wrapf(err, "failed to convert to unstructured object for registry %s", registry.Name) + } + // do not set status when update + unstructuredObj.Object["status"] = nil + unstructuredRegistries = append(unstructuredRegistries, *unstructuredObj) + + // obtain relevant cr + for _, sr := range registry.Spec.SyncResources { + if _, ok := transformRuleMap[sr.TransformRefName]; !ok && sr.TransformRefName != "" { + rule, err := r.extractTransformRule(ctx, &sr) + if err != nil { + return err + } + unstructuredObj, err = utils.ConvertToUnstructured(rule) + if err != nil { + return errors.Wrapf(err, "failed to convert to unstructured object for transformrule %s", rule.Name) + } + + unstructuredTransformRules = append(unstructuredTransformRules, *unstructuredObj) + transformRuleMap[sr.TransformRefName] = struct{}{} + } + + if _, ok := trimRuleMap[sr.TrimRefName]; !ok && sr.TrimRefName != "" { + rule, err := r.extractTrimRule(ctx, &sr) + if err != nil { + return err + } + unstructuredObj, err = utils.ConvertToUnstructured(rule) + if err != nil { + return errors.Wrapf(err, "failed to convert to unstructured object for trimrule %s", rule.Name) + } + + unstructuredTrimRules = append(unstructuredTrimRules, *unstructuredObj) + transformRuleMap[sr.TrimRefName] = struct{}{} + } + } + } + + // set map + unstructuredObjectMap[searchv1beta1.SchemeGroupVersion.WithResource("syncregistries")] = unstructuredRegistries + unstructuredObjectMap[searchv1beta1.SchemeGroupVersion.WithResource("transformrules")] = unstructuredTransformRules + unstructuredObjectMap[searchv1beta1.SchemeGroupVersion.WithResource("trimrules")] = unstructuredTrimRules + + return nil +} + // getRegistries retrieves the list of sync registries for the given cluster. func (r *SyncReconciler) getRegistries(ctx context.Context, cluster *clusterv1beta1.Cluster) ([]searchv1beta1.SyncRegistry, error) { var syncRegistriesList searchv1beta1.SyncRegistryList @@ -379,33 +633,17 @@ func (r *SyncReconciler) getNormalizedResource(ctx context.Context, rsr *searchv normalized := rsr.DeepCopy() if rsr.TransformRefName != "" { - if rsr.Transform != nil { - return nil, fmt.Errorf("specify both Transform and TransformRefName in ResourceSyncRule is not allowed") - } - - var rule searchv1beta1.TransformRule - err := r.client.Get(ctx, types.NamespacedName{Name: rsr.TransformRefName}, &rule) + rule, err := r.extractTransformRule(ctx, rsr) if err != nil { - if apierrors.IsNotFound(err) { - return nil, fmt.Errorf("TransformRule referenced by name %q doesn't exist", rsr.TransformRefName) - } - return nil, errors.Wrapf(err, "failed to list transformRule %s from lister", rsr.TransformRefName) + return nil, err } normalized.Transform = &rule.Spec } if rsr.TrimRefName != "" { - if rsr.Trim != nil { - return nil, fmt.Errorf("specify both Trim and TrimRefName in ResourceSyncRule is not allowed") - } - - var rule searchv1beta1.TrimRule - err := r.client.Get(ctx, types.NamespacedName{Name: rsr.TrimRefName}, &rule) + rule, err := r.extractTrimRule(ctx, rsr) if err != nil { - if apierrors.IsNotFound(err) { - return nil, fmt.Errorf("TrimRule referenced by name %q doesn't exist", rsr.TrimRefName) - } - return nil, errors.Wrapf(err, "failed to list trimRule %s from lister", rsr.TrimRefName) + return nil, err } normalized.Trim = &rule.Spec } @@ -413,8 +651,66 @@ func (r *SyncReconciler) getNormalizedResource(ctx context.Context, rsr *searchv return normalized, nil } -// buildClusterConfig creates a rest.Config object for the given cluster. -func buildClusterConfig(cluster *clusterv1beta1.Cluster) (*rest.Config, error) { +// extractTransformRule extras transform rules from syncrule +func (r *SyncReconciler) extractTransformRule(ctx context.Context, rsr *searchv1beta1.ResourceSyncRule) (*searchv1beta1.TransformRule, error) { + var rule searchv1beta1.TransformRule + if rsr.Transform != nil { + return nil, fmt.Errorf("specify both Transform and TransformRefName in ResourceSyncRule is not allowed") + } + + err := r.client.Get(ctx, types.NamespacedName{Name: rsr.TransformRefName}, &rule) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, fmt.Errorf("TransformRule referenced by name %q doesn't exist", rsr.TransformRefName) + } + return nil, errors.Wrapf(err, "failed to list transformRule %s from lister", rsr.TransformRefName) + } + return &rule, nil +} + +// extractTransformRule extras trim rules from syncrule +func (r *SyncReconciler) extractTrimRule(ctx context.Context, rsr *searchv1beta1.ResourceSyncRule) (*searchv1beta1.TrimRule, error) { + var rule searchv1beta1.TrimRule + if rsr.Trim != nil { + return nil, fmt.Errorf("specify both Trim and TrimRefName in ResourceSyncRule is not allowed") + } + + err := r.client.Get(ctx, types.NamespacedName{Name: rsr.TrimRefName}, &rule) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, fmt.Errorf("TrimRule referenced by name %q doesn't exist", rsr.TrimRefName) + } + return nil, errors.Wrapf(err, "failed to list trimRule %s from lister", rsr.TrimRefName) + } + return &rule, nil +} + +// renderYamlFile render agent yaml use known parameters. +func (r *SyncReconciler) renderYamlFile(cluster *clusterv1beta1.Cluster) (string, error) { + c := template.Config{ + ClusterName: cluster.Name, + Level: cluster.Spec.Level, + StorageAddresses: r.storageAddresses, + ClusterMode: cluster.Spec.Mode, + AgentImageTag: r.agentImageTag, + } + + agentYml, err := templateUtil.New("").Parse(string(config.AgentTpl)) + if err != nil { + return "", errors.Wrap(err, "failed to parse agent yaml") + } + + var renderedTemplate bytes.Buffer + err = agentYml.Execute(&renderedTemplate, c) + if err != nil { + return "", errors.Wrap(err, "failed to render agent yaml") + } + + return renderedTemplate.String(), nil +} + +// buildClusterConfigInSyncer creates a rest.Config object for the given cluster in syncer. +func buildClusterConfigInSyncer(cluster *clusterv1beta1.Cluster) (*rest.Config, error) { access := cluster.Spec.Access if len(access.Endpoint) == 0 { return nil, fmt.Errorf("cluster %s's endpoint is empty", cluster.Name) @@ -466,6 +762,41 @@ func buildClusterConfig(cluster *clusterv1beta1.Cluster) (*rest.Config, error) { return &config, nil } +// applyAgentYmlSecret apply agent yml +func applyAgentYmlSecret(ctx context.Context, cli client.Client, cluster *clusterv1beta1.Cluster, content []byte) error { + secretName := fmt.Sprintf("%s-agent", cluster.Name) + newAgentSecret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: "karpor", + }, + Data: map[string][]byte{ + "config": content, + }, + } + + oldAgentSecret := &v1.Secret{} + err := cli.Get(ctx, client.ObjectKey{ + Name: secretName, + Namespace: "karpor", + }, oldAgentSecret) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "failed to create agent secret") + } + err = cli.Create(ctx, newAgentSecret) + if err != nil { + return errors.Wrap(err, "failed to update agent secret") + } + } else if !reflect.DeepEqual(oldAgentSecret.Data, newAgentSecret.Data) { + err = cli.Update(ctx, newAgentSecret) + if err != nil { + return errors.Wrap(err, "failed to update agent secret") + } + } + return nil +} + // processWildcardResources processes wildcard resources using the singleClusterSyncManager's discoveryClient func (r *SyncReconciler) processWildcardResources( _ context.Context, diff --git a/pkg/syncer/reconciler_test.go b/pkg/syncer/reconciler_test.go index 8eff23bb..dff00f87 100644 --- a/pkg/syncer/reconciler_test.go +++ b/pkg/syncer/reconciler_test.go @@ -17,14 +17,11 @@ package syncer import ( "context" + "crypto" + "crypto/x509" "fmt" "testing" - "github.com/KusionStack/karpor/pkg/infra/search/storage" - "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" - clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" - searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" - "github.com/KusionStack/karpor/pkg/kubernetes/scheme" "github.com/bytedance/mockey" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -36,9 +33,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/KusionStack/karpor/pkg/infra/search/storage" + "github.com/KusionStack/karpor/pkg/infra/search/storage/elasticsearch" + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" + searchv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/search/v1beta1" + "github.com/KusionStack/karpor/pkg/kubernetes/scheme" + "github.com/KusionStack/karpor/pkg/syncer/utils" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" ) -func Test_buildClusterConfig(t *testing.T) { +func Test_buildClusterConfigInSyncer(t *testing.T) { tests := []struct { name string cluster *clusterv1beta1.Cluster @@ -81,7 +89,7 @@ func Test_buildClusterConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := buildClusterConfig(tt.cluster) + got, err := buildClusterConfigInSyncer(tt.cluster) if tt.wantErr { require.Error(t, err) } else { @@ -463,22 +471,43 @@ func TestSyncReconciler_getRegistries(t *testing.T) { func TestNewSyncReconciler(t *testing.T) { tests := []struct { - name string - storage storage.ResourceStorage + name string + storage storage.ResourceStorage + highAvailability bool + storageAddresses []string + externalEndpoint string + agentImageTag string + caCert *x509.Certificate + caKey crypto.Signer }{ { "test nil", nil, + false, + []string{"127.0.0.1"}, + "127.0.0.1", + "v1.0.0", + nil, + nil, }, { "test not nil", &elasticsearch.Storage{}, + false, + []string{"127.0.0.1"}, + "127.0.0.1", + "v1.0.0", + nil, + nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := NewSyncReconciler(tt.storage) + got := NewSyncReconciler(tt.storage, tt.highAvailability, tt.storageAddresses, + tt.externalEndpoint, tt.agentImageTag) require.Equal(t, got.storage, tt.storage) + require.Equal(t, got.highAvailability, tt.highAvailability) + require.Equal(t, got.storageAddresses, tt.storageAddresses) }) } } @@ -568,7 +597,146 @@ func TestSyncReconciler_getResources(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := &SyncReconciler{client: fake.NewClientBuilder().WithRuntimeObjects(tt.srs...).WithScheme(scheme.Scheme).Build()} - got, _, err := r.getResources(context.TODO(), tt.cluster) + allResources, _, err := r.getResources(context.TODO(), tt.cluster) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.want, allResources) + } + }) + } +} + +func TestSyncReconciler_processWildcardResources(t *testing.T) { + tests := []struct { + name string + wildcards []*searchv1beta1.ResourceSyncRule + apiResources []metav1.APIResource + clusterName string + want []*searchv1beta1.ResourceSyncRule + wantErr bool + }{ + { + name: "successfully process wildcard resources", + wildcards: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "*", + }, + }, + apiResources: []metav1.APIResource{ + { + Name: "foos", + Namespaced: true, + }, + { + Name: "bars", + Namespaced: false, + }, + }, + clusterName: "cluster1", + want: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "foos", + }, + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "bars", + }, + }, + wantErr: false, + }, + { + name: "skip subresources", + wildcards: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "*", + }, + }, + apiResources: []metav1.APIResource{ + { + Name: "foos/status", + Namespaced: true, + }, + { + Name: "foos", + Namespaced: true, + }, + }, + clusterName: "cluster1", + want: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "foos", + }, + }, + wantErr: false, + }, + { + name: "skip cluster-scoped resources when namespace specified", + wildcards: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "*", + Namespace: "default", + }, + }, + apiResources: []metav1.APIResource{ + { + Name: "bars", + Namespaced: false, + }, + { + Name: "foos", + Namespaced: true, + }, + }, + clusterName: "cluster1", + want: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "samplecontroller.k8s.io/v1alpha1", + Resource: "foos", + Namespace: "default", + }, + }, + wantErr: false, + }, + { + name: "invalid group version", + wildcards: []*searchv1beta1.ResourceSyncRule{ + { + APIVersion: "invalid", + Resource: "*", + }, + }, + apiResources: []metav1.APIResource{}, + clusterName: "cluster1", + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockManager := &mock.Mock{} + if tt.wantErr { + mockManager.On("GetAPIResources", mock.Anything).Return(nil, fmt.Errorf("failed to get API resources")) + } else { + mockManager.On("GetAPIResources", mock.Anything).Return( + &metav1.APIResourceList{ + GroupVersion: tt.wildcards[0].APIVersion, + APIResources: tt.apiResources, + }, + nil, + ) + } + + r := &SyncReconciler{} + got, err := r.processWildcardResources(context.TODO(), tt.wildcards, &fakeSingleClusterSyncManager{mockManager}, tt.clusterName) + if tt.wantErr { require.Error(t, err) } else { @@ -670,58 +838,9 @@ func TestSyncReconciler_handleClusterAddOrUpdate(t *testing.T) { }, exist: false, }, - { - name: "test wildcard", - cluster: &clusterv1beta1.Cluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster1", - }, - Spec: clusterv1beta1.ClusterSpec{ - Access: clusterv1beta1.ClusterAccess{ - Endpoint: "https://localhost:6443", - CABundle: []byte("ca"), - Credential: &clusterv1beta1.ClusterAccessCredential{ - Type: clusterv1beta1.CredentialTypeX509Certificate, - X509: &clusterv1beta1.X509{ - Certificate: []byte("cert"), - PrivateKey: []byte("key"), - }, - }, - }, - }, - }, - srs: []runtime.Object{ - &searchv1beta1.SyncRegistry{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "1", - }, - Spec: searchv1beta1.SyncRegistrySpec{ - Clusters: []string{"cluster1"}, - SyncResources: []searchv1beta1.ResourceSyncRule{{APIVersion: "samplecontroller.k8s.io/v1alpha1", Resource: "*"}}, - }, - }, - }, - config: &rest.Config{ - Host: "https://localhost:6443", - TLSClientConfig: rest.TLSClientConfig{ - CAData: []byte("ca"), - CertData: []byte("cert"), - KeyData: []byte("key"), - }, - }, - exist: true, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.name == "test wildcard" { - m := mockey.Mock((*SyncReconciler).processWildcardResources).To( - func(_ context.Context, _ []*searchv1beta1.ResourceSyncRule, _ SingleClusterSyncManager, _ string) ([]*searchv1beta1.ResourceSyncRule, error) { - return []*searchv1beta1.ResourceSyncRule{{APIVersion: "samplecontroller.k8s.io/v1alpha1", Resource: "foos"}}, nil - }).Build() - defer m.UnPatch() - } - m1 := &mock.Mock{} m1.On("UpdateSyncResources", mock.Anything, mock.Anything).Return(nil) m1.On("ClusterConfig").Return(tt.config) @@ -733,7 +852,7 @@ func TestSyncReconciler_handleClusterAddOrUpdate(t *testing.T) { mgr: &fakeMultiClusterSyncManager{m2}, client: fake.NewClientBuilder().WithRuntimeObjects(tt.srs...).WithScheme(scheme.Scheme).Build(), } - err := r.handleClusterAddOrUpdate(context.TODO(), tt.cluster) + err := r.handleClusterAddOrUpdate(context.TODO(), tt.cluster, buildClusterConfigInSyncer) if tt.wantErr { require.Error(t, err) } else { @@ -743,135 +862,145 @@ func TestSyncReconciler_handleClusterAddOrUpdate(t *testing.T) { } } -func TestSyncReconciler_processWildcardResources(t *testing.T) { +func TestSyncReconciler_dispatchResources(t *testing.T) { tests := []struct { - name string - wildcards []*searchv1beta1.ResourceSyncRule - apiResources []metav1.APIResource - clusterName string - want []*searchv1beta1.ResourceSyncRule - wantErr bool + name string + ctx context.Context + cluster *clusterv1beta1.Cluster + dynamicClient dynamic.Interface + objects []runtime.Object + wantErr bool }{ { - name: "successfully process wildcard resources", - wildcards: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "*", - }, - }, - apiResources: []metav1.APIResource{ - { - Name: "foos", - Namespaced: true, + name: "test no error", + ctx: context.Background(), + cluster: &clusterv1beta1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster1", }, - { - Name: "bars", - Namespaced: false, + Spec: clusterv1beta1.ClusterSpec{ + Mode: clusterv1beta1.PullClusterMode, + Level: 2, }, }, - clusterName: "cluster1", - want: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "foos", - }, - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "bars", + dynamicClient: &dynamicfake.FakeDynamicClient{}, + objects: []runtime.Object{ + &searchv1beta1.SyncRegistry{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Spec: searchv1beta1.SyncRegistrySpec{ + Clusters: []string{"cluster1"}, + SyncResources: []searchv1beta1.ResourceSyncRule{{APIVersion: "v1", Resource: "pods", TransformRefName: "tfr1", TrimRefName: "tr1"}}, + }, }, + &searchv1beta1.TransformRule{ObjectMeta: metav1.ObjectMeta{Name: "tfr1"}, Spec: searchv1beta1.TransformRuleSpec{}}, + &searchv1beta1.TrimRule{ObjectMeta: metav1.ObjectMeta{Name: "tr1"}, Spec: searchv1beta1.TrimRuleSpec{}}, }, wantErr: false, }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockey.Mock(utils.CreateOrUpdateUnstructured).Return(nil).Build() + mockey.Mock((*SyncReconciler).getUnstructuredRegistries).Return(nil).Build() + defer mockey.UnPatchAll() + + r := &SyncReconciler{client: fake.NewClientBuilder().WithRuntimeObjects(tt.objects...).WithScheme(scheme.Scheme).Build()} + err := r.dispatchResources(tt.ctx, tt.dynamicClient, tt.cluster) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestSyncReconciler_getUnstructuredRegistries(t *testing.T) { + tests := []struct { + name string + ctx context.Context + cluster *clusterv1beta1.Cluster + unstructuredObjectMap map[schema.GroupVersionResource][]unstructured.Unstructured + objects []runtime.Object + wantErr bool + }{ { - name: "skip subresources", - wildcards: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "*", - }, - }, - apiResources: []metav1.APIResource{ - { - Name: "foos/status", - Namespaced: true, + name: "test transform and trim rule", + cluster: &clusterv1beta1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster1", }, - { - Name: "foos", - Namespaced: true, + Spec: clusterv1beta1.ClusterSpec{ + Mode: clusterv1beta1.PullClusterMode, + Level: 2, }, }, - clusterName: "cluster1", - want: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "foos", + unstructuredObjectMap: map[schema.GroupVersionResource][]unstructured.Unstructured{}, + objects: []runtime.Object{ + &searchv1beta1.SyncRegistry{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Spec: searchv1beta1.SyncRegistrySpec{ + Clusters: []string{"cluster1"}, + SyncResources: []searchv1beta1.ResourceSyncRule{{APIVersion: "v1", Resource: "pods", TransformRefName: "tfr1", TrimRefName: "tr1"}}, + }, }, + &searchv1beta1.TransformRule{ObjectMeta: metav1.ObjectMeta{Name: "tfr1"}, Spec: searchv1beta1.TransformRuleSpec{}}, + &searchv1beta1.TrimRule{ObjectMeta: metav1.ObjectMeta{Name: "tr1"}, Spec: searchv1beta1.TrimRuleSpec{}}, }, wantErr: false, }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &SyncReconciler{client: fake.NewClientBuilder().WithRuntimeObjects(tt.objects...).WithScheme(scheme.Scheme).Build()} + err := r.getUnstructuredRegistries(tt.ctx, tt.cluster, tt.unstructuredObjectMap) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestSyncReconciler_renderYamlFile(t *testing.T) { + tests := []struct { + name string + cluster *clusterv1beta1.Cluster + certData string + keyData string + want string + wantErr bool + }{ { - name: "skip cluster-scoped resources when namespace specified", - wildcards: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "*", - Namespace: "default", - }, - }, - apiResources: []metav1.APIResource{ - { - Name: "bars", - Namespaced: false, - }, - { - Name: "foos", - Namespaced: true, + name: "test pull mode", + cluster: &clusterv1beta1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster1", }, - }, - clusterName: "cluster1", - want: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "samplecontroller.k8s.io/v1alpha1", - Resource: "foos", - Namespace: "default", + Spec: clusterv1beta1.ClusterSpec{ + Mode: clusterv1beta1.PushClusterMode, + Level: 2, }, }, - wantErr: false, - }, - { - name: "invalid group version", - wildcards: []*searchv1beta1.ResourceSyncRule{ - { - APIVersion: "invalid", - Resource: "*", - }, - }, - apiResources: []metav1.APIResource{}, - clusterName: "cluster1", - want: nil, - wantErr: true, + certData: "cert", + keyData: "key", + want: renderResForPush, + wantErr: false, }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mockManager := &mock.Mock{} - if tt.wantErr { - mockManager.On("GetAPIResources", mock.Anything).Return(nil, fmt.Errorf("failed to get API resources")) - } else { - mockManager.On("GetAPIResources", mock.Anything).Return( - &metav1.APIResourceList{ - GroupVersion: tt.wildcards[0].APIVersion, - APIResources: tt.apiResources, - }, - nil, - ) + r := &SyncReconciler{ + storageAddresses: []string{"https://localhost:6443"}, + agentImageTag: "latest", + externalEndpoint: "https://localhost:6443", } - - r := &SyncReconciler{} - got, err := r.processWildcardResources(context.TODO(), tt.wildcards, &fakeSingleClusterSyncManager{mockManager}, tt.clusterName) - + got, err := r.renderYamlFile(tt.cluster) if tt.wantErr { require.Error(t, err) } else { @@ -881,3 +1010,7 @@ func TestSyncReconciler_processWildcardResources(t *testing.T) { }) } } + +const ( + renderResForPush = "apiVersion: v1\nkind: Namespace\nmetadata:\n name: karpor\nspec:\n finalizers:\n - kubernetes\n---\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: karpor-agent\n namespace: karpor\nspec:\n replicas: 1\n revisionHistoryLimit: 10\n selector:\n matchLabels:\n app.kubernetes.io/component: karpor-agent\n app.kubernetes.io/instance: karpor\n app.kubernetes.io/name: karpor\n strategy:\n rollingUpdate:\n maxSurge: 25%\n maxUnavailable: 25%\n type: RollingUpdate\n template:\n metadata:\n labels:\n app.kubernetes.io/component: karpor-agent\n app.kubernetes.io/instance: karpor\n app.kubernetes.io/name: karpor\n spec:\n containers:\n - args:\n - agent\n - --elastic-search-addresses=https://localhost:6443 \n - --cluster-name=cluster1\n - --cluster-mode=push\n command:\n - /karpor\n image: kusionstack/karpor:latest\n imagePullPolicy: IfNotPresent\n name: karpor-agent\n ports:\n - containerPort: 7443\n protocol: TCP\n resources:\n limits:\n cpu: 500m\n ephemeral-storage: 10Gi\n memory: 1Gi\n requests:\n cpu: 250m\n ephemeral-storage: 2Gi\n memory: 256Mi\n dnsPolicy: ClusterFirst\n restartPolicy: Always\n terminationGracePeriodSeconds: 30\n---\napiVersion: rbac.authorization.k8s.io/v1\nkind: ClusterRoleBinding\nmetadata:\n name: karpor\nroleRef:\n apiGroup: rbac.authorization.k8s.io\n kind: ClusterRole\n name: cluster-admin\nsubjects:\n- kind: ServiceAccount\n name: default\n namespace: karpor\n" +) diff --git a/pkg/syncer/template/metadata.go b/pkg/syncer/template/metadata.go new file mode 100644 index 00000000..e145f0a8 --- /dev/null +++ b/pkg/syncer/template/metadata.go @@ -0,0 +1,27 @@ +// Copyright The Karpor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package template + +// Config is the struct that defines the metadata of the agent template. +type Config struct { + ExternalEndpoint string + CaCert string + CaKey string + StorageAddresses []string + ClusterName string + ClusterMode string + Level int + AgentImageTag string +} diff --git a/pkg/syncer/utils/kubernetes.go b/pkg/syncer/utils/kubernetes.go new file mode 100644 index 00000000..3fd7863a --- /dev/null +++ b/pkg/syncer/utils/kubernetes.go @@ -0,0 +1,95 @@ +// Copyright The Karpor Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "sigs.k8s.io/yaml" + + "github.com/KusionStack/karpor/config" +) + +// ConvertToUnstructured converts the structured object to unstructured +func ConvertToUnstructured(obj runtime.Object) (*unstructured.Unstructured, error) { + unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, fmt.Errorf("failed to convert to Unstructured: %v", err) + } + + return &unstructured.Unstructured{ + Object: unstructuredObj, + }, nil +} + +// ApplyCrds applies crds to user cluster before other resources. +func ApplyCrds(ctx context.Context, dynamicClient dynamic.Interface) error { + for _, crd := range config.CrdList { + var objMap map[string]interface{} + err := yaml.Unmarshal(crd, &objMap) + if err != nil { + return err + } + + unstructuredObj := &unstructured.Unstructured{ + Object: objMap, + } + err = CreateOrUpdateUnstructured(ctx, dynamicClient, apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), "", unstructuredObj) + if err != nil { + return err + } + } + + return nil +} + +// CreateOrUpdateUnstructured creates or updates object using dynamic client. +func CreateOrUpdateUnstructured(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource, namespace string, newObject *unstructured.Unstructured) error { + resourceClient := dynamicClient.Resource(gvr).Namespace(namespace) + + existingObj, getErr := resourceClient.Get(ctx, newObject.GetName(), metav1.GetOptions{}) + if getErr != nil { + if apierrors.IsNotFound(getErr) { + // set initial resource version + newObject.SetResourceVersion("0") + _, createErr := resourceClient.Create(ctx, newObject, metav1.CreateOptions{}) + if createErr != nil { + return errors.Wrapf(createErr, "failed to create resource") + } + } else { + return errors.Wrapf(getErr, "failed to get resource: %v", getErr) + } + } else { + // set uid and resource version for existed object + newObject.SetResourceVersion(existingObj.GetResourceVersion()) + newObject.SetUID(existingObj.GetUID()) + + _, updateErr := resourceClient.Update(ctx, newObject, metav1.UpdateOptions{}) + if updateErr != nil { + return errors.Wrapf(updateErr, "failed to update resource: %v", newObject.GetName()) + } + } + + return nil +} diff --git a/pkg/util/clusterinstall/cluster_install.go b/pkg/util/clusterinstall/cluster_install.go index 16971e75..52a48133 100644 --- a/pkg/util/clusterinstall/cluster_install.go +++ b/pkg/util/clusterinstall/cluster_install.go @@ -22,7 +22,7 @@ import ( "k8s.io/client-go/rest" ) -func ConvertKubeconfigToCluster(name, displayName, description string, cfg *rest.Config) (*clusterv1beta1.Cluster, error) { +func ConvertKubeconfigToCluster(name, displayName, description, clusterMode string, clusterLevel int, cfg *rest.Config) (*clusterv1beta1.Cluster, error) { cluster := clusterv1beta1.Cluster{ TypeMeta: metav1.TypeMeta{ APIVersion: clusterv1beta1.SchemeGroupVersion.String(), @@ -36,6 +36,12 @@ func ConvertKubeconfigToCluster(name, displayName, description string, cfg *rest } else { cluster.Spec.DisplayName = name } + if clusterMode != "" { + cluster.Spec.Mode = clusterMode + } + if clusterLevel > 0 && clusterLevel <= 3 { + cluster.Spec.Level = clusterLevel + } access := clusterv1beta1.ClusterAccess{} if !cfg.Insecure { access.CABundle = cfg.CAData diff --git a/pkg/util/clusterinstall/cluster_install_test.go b/pkg/util/clusterinstall/cluster_install_test.go index 21f0dd8a..85894a01 100644 --- a/pkg/util/clusterinstall/cluster_install_test.go +++ b/pkg/util/clusterinstall/cluster_install_test.go @@ -19,19 +19,22 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" "github.com/stretchr/testify/require" "k8s.io/client-go/rest" + + clusterv1beta1 "github.com/KusionStack/karpor/pkg/kubernetes/apis/cluster/v1beta1" ) func TestConvertKubeconfigToCluster(t *testing.T) { tests := []struct { - name string - displayName string - description string - cfg *rest.Config - wantCluster *clusterv1beta1.Cluster - wantErr bool + name string + displayName string + description string + clusterMode string + clusterLevel int + cfg *rest.Config + wantCluster *clusterv1beta1.Cluster + wantErr bool }{ // Test case with secure setup (non-insecure, with certificate data) { @@ -122,7 +125,7 @@ func TestConvertKubeconfigToCluster(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { // Call the function under test - cluster, err := ConvertKubeconfigToCluster(tc.name, tc.displayName, tc.description, tc.cfg) + cluster, err := ConvertKubeconfigToCluster(tc.name, tc.displayName, tc.description, tc.clusterMode, tc.clusterLevel, tc.cfg) // Assert that an error occurred when expected if tc.wantErr { require.Error(t, err)