Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes_multicluster/config/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2025 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"encoding/json"

"github.com/creasty/defaults"
)

type KubernetesPluginConfig struct {
// List of helm chart repositories that should be added while starting up.
ChartRepositories []HelmChartRepository `json:"chartRepositories,omitempty"`
// List of helm chart registries that should be logged in while starting up.
ChartRegistries []HelmChartRegistry `json:"chartRegistries,omitempty"`
}

func (c *KubernetesPluginConfig) UnmarshalJSON(data []byte) error {
type alias KubernetesPluginConfig

var a alias
if err := json.Unmarshal(data, &a); err != nil {
return err
}

*c = KubernetesPluginConfig(a)
if err := defaults.Set(c); err != nil {
return err
}

return nil
}

func (c *KubernetesPluginConfig) HTTPHelmChartRepositories() []HelmChartRepository {
repos := make([]HelmChartRepository, 0, len(c.ChartRepositories))
for _, r := range c.ChartRepositories {
if r.IsHTTPRepository() {
repos = append(repos, r)
}
}
return repos
}

type HelmChartRepositoryType string

const (
HTTPHelmChartRepository HelmChartRepositoryType = "HTTP"
)

type HelmChartRepository struct {
// The repository type. Only HTTP is supported.
Type HelmChartRepositoryType `json:"type" default:"HTTP"`

// Configuration for HTTP type.
// The name of the Helm chart repository.
Name string `json:"name,omitempty"`
// The address to the Helm chart repository.
Address string `json:"address,omitempty"`
// Username used for the repository backed by HTTP basic authentication.
Username string `json:"username,omitempty"`
// Password used for the repository backed by HTTP basic authentication.
Password string `json:"password,omitempty"`
// Whether to skip TLS certificate checks for the repository or not.
Insecure bool `json:"insecure"`
}

func (r *HelmChartRepository) IsHTTPRepository() bool {
return r.Type == HTTPHelmChartRepository
}

// HelmChartRegistryType represents the type of Helm chart registry.
type HelmChartRegistryType string

// The registry types that hosts Helm charts.
const (
OCIHelmChartRegistry HelmChartRegistryType = "OCI"
)

// HelmChartRegistry represents the configuration for a Helm chart registry.
type HelmChartRegistry struct {
// The registry type. Currently, only OCI is supported.
Type HelmChartRegistryType `json:"type" default:"OCI"`

// The address to the Helm chart registry.
Address string `json:"address"`
// Username used for the registry authentication.
Username string `json:"username,omitempty"`
// Password used for the registry authentication.
Password string `json:"password,omitempty"`
}

// IsOCI checks if the registry is an OCI registry.
func (r *HelmChartRegistry) IsOCI() bool {
return r.Type == OCIHelmChartRegistry
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ type toolRegistry interface {
Helm(ctx context.Context, version string) (string, error)
}

var _ sdk.DeploymentPlugin[sdk.ConfigNone, kubeconfig.KubernetesDeployTargetConfig, kubeconfig.KubernetesApplicationSpec] = (*Plugin)(nil)
var _ sdk.DeploymentPlugin[kubeconfig.KubernetesPluginConfig, kubeconfig.KubernetesDeployTargetConfig, kubeconfig.KubernetesApplicationSpec] = (*Plugin)(nil)

// FetchDefinedStages returns the defined stages for this plugin.
func (p *Plugin) FetchDefinedStages() []string {
return allStages
}

// BuildPipelineSyncStages returns the stages for the pipeline sync strategy.
func (p *Plugin) BuildPipelineSyncStages(ctx context.Context, _ *sdk.ConfigNone, input *sdk.BuildPipelineSyncStagesInput) (*sdk.BuildPipelineSyncStagesResponse, error) {
func (p *Plugin) BuildPipelineSyncStages(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, input *sdk.BuildPipelineSyncStagesInput) (*sdk.BuildPipelineSyncStagesResponse, error) {
return &sdk.BuildPipelineSyncStagesResponse{
Stages: buildPipelineStages(input.Request.Stages, input.Request.Rollback),
}, nil
}

// ExecuteStage executes the stage.
func (p *Plugin) ExecuteStage(ctx context.Context, _ *sdk.ConfigNone, dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.ExecuteStageResponse, error) {
func (p *Plugin) ExecuteStage(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, dts []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], input *sdk.ExecuteStageInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.ExecuteStageResponse, error) {
switch input.Request.StageName {
case StageK8sMultiSync:
return &sdk.ExecuteStageResponse{
Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *Plugin) loadManifests(ctx context.Context, deploy *sdk.Deployment, spec
}

// DetermineVersions determines the versions of the application.
func (p *Plugin) DetermineVersions(ctx context.Context, _ *sdk.ConfigNone, input *sdk.DetermineVersionsInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.DetermineVersionsResponse, error) {
func (p *Plugin) DetermineVersions(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, input *sdk.DetermineVersionsInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.DetermineVersionsResponse, error) {
logger := input.Logger

cfg, err := input.Request.DeploymentSource.AppConfig()
Expand Down Expand Up @@ -189,7 +189,7 @@ func (p *Plugin) DetermineVersions(ctx context.Context, _ *sdk.ConfigNone, input
}

// DetermineStrategy determines the strategy for the deployment.
func (p *Plugin) DetermineStrategy(ctx context.Context, _ *sdk.ConfigNone, input *sdk.DetermineStrategyInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.DetermineStrategyResponse, error) {
func (p *Plugin) DetermineStrategy(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, input *sdk.DetermineStrategyInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.DetermineStrategyResponse, error) {
logger := input.Logger
loader := provider.NewLoader(toolregistry.NewRegistry(input.Client.ToolRegistry()))

Expand Down Expand Up @@ -243,7 +243,7 @@ func (p *Plugin) DetermineStrategy(ctx context.Context, _ *sdk.ConfigNone, input
}

// BuildQuickSyncStages returns the stages for the quick sync strategy.
func (p *Plugin) BuildQuickSyncStages(ctx context.Context, _ *sdk.ConfigNone, input *sdk.BuildQuickSyncStagesInput) (*sdk.BuildQuickSyncStagesResponse, error) {
func (p *Plugin) BuildQuickSyncStages(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, input *sdk.BuildQuickSyncStagesInput) (*sdk.BuildQuickSyncStagesResponse, error) {
return &sdk.BuildQuickSyncStagesResponse{
Stages: buildQuickSyncPipeline(input.Request.Rollback),
}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

type Plugin struct{}

func (p Plugin) GetLivestate(ctx context.Context, _ *sdk.ConfigNone, deployTargets []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], input *sdk.GetLivestateInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.GetLivestateResponse, error) {
func (p Plugin) GetLivestate(ctx context.Context, _ *kubeconfig.KubernetesPluginConfig, deployTargets []*sdk.DeployTarget[kubeconfig.KubernetesDeployTargetConfig], input *sdk.GetLivestateInput[kubeconfig.KubernetesApplicationSpec]) (*sdk.GetLivestateResponse, error) {
cfg, err := input.Request.DeploymentSource.AppConfig()
if err != nil {
input.Logger.Error("Failed to load application config", zap.Error(err))
Expand Down
44 changes: 44 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes_multicluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,61 @@
package main

import (
"context"
"log"

"go.uber.org/zap"

sdk "github.com/pipe-cd/piped-plugin-sdk-go"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/deployment"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/livestate"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/provider"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/toolregistry"
)

type initializer struct{}

func (i *initializer) Initialize(ctx context.Context, input *sdk.InitializeInput[config.KubernetesPluginConfig, config.KubernetesDeployTargetConfig]) error {
registry := toolregistry.NewRegistry(input.Client.ToolRegistry())
helmPath, err := registry.Helm(ctx, "")
if err != nil {
return err
}

helm := provider.NewHelm("", helmPath, input.Logger)

if repos := input.Config.HTTPHelmChartRepositories(); len(repos) > 0 {
for _, repo := range repos {
if err := helm.AddRepository(ctx, repo); err != nil {
input.Logger.Error("failed to add helm chart repository", zap.String("address", repo.Address), zap.Error(err))
return err
}
}
if err := helm.UpdateRepositories(ctx); err != nil {
input.Logger.Error("failed to update helm chart repositories", zap.Error(err))
return err
}
}

for _, registry := range input.Config.ChartRegistries {
if !registry.IsOCI() {
continue
}
if err := helm.LoginToOCIRegistry(ctx, registry.Address, registry.Username, registry.Password); err != nil {
input.Logger.Error("failed to login to helm oci registry", zap.String("address", registry.Address), zap.Error(err))
return err
}
}

return nil
}

func main() {
plugin, err := sdk.NewPlugin(
"0.0.1",
sdk.WithInitializer[config.KubernetesApplicationSpec](&initializer{}),
sdk.WithDeploymentPlugin(&deployment.Plugin{}),
sdk.WithLivestatePlugin(&livestate.Plugin{}),
)
Expand Down
59 changes: 59 additions & 0 deletions pkg/app/pipedv1/plugin/kubernetes_multicluster/provider/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (
"strings"

"go.uber.org/zap"
"golang.org/x/sync/singleflight"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/kubernetes_multicluster/config"
)

var (
allowedURLSchemes = []string{"http", "https"}

updateGroup = &singleflight.Group{}
)

type Helm struct {
Expand All @@ -47,6 +50,62 @@ func NewHelm(version, path string, logger *zap.Logger) *Helm {
}
}

func (h *Helm) LoginToOCIRegistry(ctx context.Context, address, username, password string) error {
args := []string{
"registry",
"login",
"-u",
username,
"-p",
password,
address,
}

var stderr bytes.Buffer
cmd := exec.CommandContext(ctx, h.execPath, args...)
cmd.Stderr = &stderr

h.logger.Info("login to oci registry", zap.String("address", address))
if err := cmd.Run(); err != nil {
h.logger.Error("failed to login to oci registry", zap.String("address", address), zap.Error(err))
return fmt.Errorf("%w: %s", err, stderr.String())
}
return nil
}

func (h *Helm) AddRepository(ctx context.Context, repo config.HelmChartRepository) error {
args := []string{"repo", "add", repo.Name, repo.Address}
if repo.Insecure {
args = append(args, "--insecure-skip-tls-verify")
}
if repo.Username != "" || repo.Password != "" {
args = append(args, "--username", repo.Username, "--password", repo.Password)
}
cmd := exec.CommandContext(ctx, h.execPath, args...)
out, err := cmd.CombinedOutput()
if err != nil {
h.logger.Error("failed to add chart repository", zap.String("name", repo.Name), zap.Error(err))
return fmt.Errorf("failed to add chart repository %s: %s (%w)", repo.Name, string(out), err)
}
h.logger.Info("successfully added chart repository", zap.String("name", repo.Name))
return nil
}

func (h *Helm) UpdateRepositories(ctx context.Context) error {
_, err, _ := updateGroup.Do("update", func() (any, error) {
args := []string{"repo", "update"}
cmd := exec.CommandContext(ctx, h.execPath, args...)
out, err := cmd.CombinedOutput()
if err != nil {
h.logger.Error("failed to update chart repositories", zap.Error(err))
return nil, fmt.Errorf("failed to update chart repositories: %s (%w)", string(out), err)
}
h.logger.Info("successfully updated chart repositories")
return nil, nil
})
return err
}

func (h *Helm) TemplateLocalChart(ctx context.Context, appName, appDir, namespace, chartPath string, opts *config.InputHelmOptions) (string, error) {
releaseName := appName
if opts != nil && opts.ReleaseName != "" {
Expand Down
Loading