Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 43 additions & 175 deletions kubernetes/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -34,7 +32,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand All @@ -53,58 +50,6 @@ const (
defaultPoolConcurrency = 16
)

type ConcurrencyConfig map[string]int

func (c *ConcurrencyConfig) String() string {
if *c == nil {
return ""
}
parts := make([]string, 0, len(*c))
for k, v := range *c {
parts = append(parts, fmt.Sprintf("%s=%d", k, v))
}
return strings.Join(parts, ";")
}

func (c *ConcurrencyConfig) Set(value string) error {
if *c == nil {
*c = make(ConcurrencyConfig)
}
if value == "" {
return nil
}
pairs := strings.Split(value, ";")
for _, pair := range pairs {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid concurrency config format: %s, expected format: controller=value", pair)
}
name := strings.TrimSpace(kv[0])
val, err := strconv.Atoi(strings.TrimSpace(kv[1]))
if err != nil {
return fmt.Errorf("invalid concurrency value for %s: %v", name, err)
}
if val <= 0 {
return fmt.Errorf("concurrency value must be positive for %s: %d", name, val)
}
(*c)[name] = val
}
return nil
}

func (c *ConcurrencyConfig) Get(name string, defaultVal int) int {
if *c != nil {
if v, ok := (*c)[name]; ok {
return v
}
}
return defaultVal
}

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
Expand Down Expand Up @@ -132,107 +77,30 @@ func init() {

// nolint:gocyclo
func main() {
var metricsAddr string
var metricsCertPath, metricsCertName, metricsCertKey string
var webhookCertPath, webhookCertName, webhookCertKey string
var enableLeaderElection bool
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var allowWeakTLSKeyLengths bool
var tlsOpts []func(*tls.Config)

// Log file options
var enableFileLog bool
var logFilePath string
var logMaxSize int
var logMaxBackups int
var logMaxAge int
var logCompress bool

// Kubernetes client rate limiter options
var kubeClientQPS float64
var kubeClientBurst int

// Controller concurrency options
var concurrencyConfig ConcurrencyConfig

flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&secureMetrics, "metrics-secure", true,
"If set, the metrics endpoint is served securely via HTTPS. Use --metrics-secure=false to use HTTP instead.")
flag.StringVar(&webhookCertPath, "webhook-cert-path", "", "The directory that contains the webhook certificate.")
flag.StringVar(&webhookCertName, "webhook-cert-name", "tls.crt", "The name of the webhook certificate file.")
flag.StringVar(&webhookCertKey, "webhook-cert-key", "tls.key", "The name of the webhook key file.")
flag.StringVar(&metricsCertPath, "metrics-cert-path", "",
"The directory that contains the metrics server certificate.")
flag.StringVar(&metricsCertName, "metrics-cert-name", "tls.crt", "The name of the metrics server certificate file.")
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.BoolVar(
&allowWeakTLSKeyLengths,
"allow-weak-tls-keylengths",
false,
"If set, allows TLS certificates below NIST 2030 minimum key/hash lengths (not recommended).",
)

// Log file flags
flag.BoolVar(&enableFileLog, "enable-file-log", false, "Enable log output to file")
flag.StringVar(&logFilePath, "log-file-path", "/var/log/sandbox-controller/controller.log", "Path to the log file")
flag.IntVar(&logMaxSize, "log-max-size", 100, "Maximum size in megabytes of the log file before it gets rotated")
flag.IntVar(&logMaxBackups, "log-max-backups", 10, "Maximum number of old log files to retain")
flag.IntVar(&logMaxAge, "log-max-age", 30, "Maximum number of days to retain old log files")
flag.BoolVar(&logCompress, "log-compress", true, "Compress determines if the rotated log files should be compressed using gzip")
flag.Float64Var(&kubeClientQPS, "kube-client-qps", 100, "QPS for Kubernetes client rate limiter.")
flag.IntVar(&kubeClientBurst, "kube-client-burst", 200, "Burst for Kubernetes client rate limiter.")
flag.Var(&concurrencyConfig, "concurrency", "Controller concurrency settings in format: controller1=N;controller2=M. "+
"Available controllers: batchsandbox, pool. "+
"Example: --concurrency='batchsandbox=32;pool=128'")

// Image committer
var imageCommitterImage string
flag.StringVar(&imageCommitterImage, "image-committer-image", "image-committer:dev", "The image used for commit operations (contains nerdctl tool).")

// Commit job timeout
var commitJobTimeout time.Duration
flag.DurationVar(&commitJobTimeout, "commit-job-timeout", 10*time.Minute, "The timeout duration for commit jobs.")

var snapshotRegistry string
flag.StringVar(&snapshotRegistry, "snapshot-registry", "", "OCI registry for snapshot images (e.g., registry.example.com/snapshots).")

var snapshotRegistryInsecure bool
flag.BoolVar(&snapshotRegistryInsecure, "snapshot-registry-insecure", false, "Use insecure registry mode when pushing snapshot images.")

var snapshotPushSecret string
flag.StringVar(&snapshotPushSecret, "snapshot-push-secret", "", "K8s Secret name for pushing snapshots to registry.")

var resumePullSecret string
flag.StringVar(&resumePullSecret, "resume-pull-secret", "", "K8s Secret name for pulling snapshot images during resume.")

opts := zap.Options{}
opts.BindFlags(flag.CommandLine)
options := &controllerOptions{}
options.bindFlags(flag.CommandLine)

flag.Parse()

// Setup logger with file rotation support
logOpts := logging.Options{
Development: opts.Development,
EnableFileOutput: enableFileLog,
LogFilePath: logFilePath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackups,
MaxAge: logMaxAge,
Compress: logCompress,
ZapOptions: opts,
Development: options.zapOptions.Development,
EnableFileOutput: options.enableFileLog,
LogFilePath: options.logFilePath,
MaxSize: options.logMaxSize,
MaxBackups: options.logMaxBackups,
MaxAge: options.logMaxAge,
Compress: options.logCompress,
ZapOptions: options.zapOptions,
}

logger := logging.NewLoggerWithZapOptions(logOpts)
ctrl.SetLogger(logger)
if options.legacyKlogVerbosity != "" {
setupLog.Info("deprecated --v flag ignored; use --zap-log-level instead", "v", options.legacyKlogVerbosity)
}

// if the enable-http2 flag is false (the default), http/2 should be disabled
// due to its vulnerabilities. More specifically, disabling http/2 will
Expand All @@ -249,7 +117,7 @@ func main() {
c.MinVersion = tls.VersionTLS12
})

if !enableHTTP2 {
if !options.enableHTTP2 {
tlsOpts = append(tlsOpts, disableHTTP2)
}

Expand All @@ -259,10 +127,10 @@ func main() {
// Initial webhook TLS options
webhookTLSOpts := tlsOpts

if len(webhookCertPath) > 0 {
webhookCertFile := filepath.Join(webhookCertPath, webhookCertName)
webhookKeyFile := filepath.Join(webhookCertPath, webhookCertKey)
if !allowWeakTLSKeyLengths {
if len(options.webhookCertPath) > 0 {
webhookCertFile := filepath.Join(options.webhookCertPath, options.webhookCertName)
webhookKeyFile := filepath.Join(options.webhookCertPath, options.webhookCertKey)
if !options.allowWeakTLSKeyLengths {
if err := cryptoutil.ValidateCertificateKeyPair(webhookCertFile, webhookKeyFile); err != nil {
setupLog.Error(err, "Webhook certificate does not meet NIST minimum key/hash requirements",
"webhook-cert-file", webhookCertFile, "webhook-key-file", webhookKeyFile)
Expand All @@ -271,7 +139,7 @@ func main() {
}

setupLog.Info("Initializing webhook certificate watcher using provided certificates",
"webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey)
"webhook-cert-path", options.webhookCertPath, "webhook-cert-name", options.webhookCertName, "webhook-cert-key", options.webhookCertKey)

var err error
webhookCertWatcher, err = certwatcher.New(
Expand All @@ -289,7 +157,7 @@ func main() {
if err != nil {
return nil, err
}
if allowWeakTLSKeyLengths {
if options.allowWeakTLSKeyLengths {
return cert, nil
}
if err := cryptoutil.ValidateTLSCertificate(webhookCertFile, cert); err != nil {
Expand All @@ -309,12 +177,12 @@ func main() {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/metrics/server
// - https://book.kubebuilder.io/reference/metrics.html
metricsServerOptions := metricsserver.Options{
BindAddress: metricsAddr,
SecureServing: secureMetrics,
BindAddress: options.metricsAddr,
SecureServing: options.secureMetrics,
TLSOpts: tlsOpts,
}

if secureMetrics {
if options.secureMetrics {
// FilterProvider is used to protect the metrics endpoint with authn/authz.
// These configurations ensure that only authorized users and service accounts
// can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info:
Expand All @@ -330,10 +198,10 @@ func main() {
// - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates
// managed by cert-manager for the metrics server.
// - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification.
if len(metricsCertPath) > 0 {
metricsCertFile := filepath.Join(metricsCertPath, metricsCertName)
metricsKeyFile := filepath.Join(metricsCertPath, metricsCertKey)
if !allowWeakTLSKeyLengths && metricsAddr != "0" && secureMetrics {
if len(options.metricsCertPath) > 0 {
metricsCertFile := filepath.Join(options.metricsCertPath, options.metricsCertName)
metricsKeyFile := filepath.Join(options.metricsCertPath, options.metricsCertKey)
if !options.allowWeakTLSKeyLengths && options.metricsAddr != "0" && options.secureMetrics {
if err := cryptoutil.ValidateCertificateKeyPair(metricsCertFile, metricsKeyFile); err != nil {
setupLog.Error(err, "Metrics certificate does not meet NIST minimum key/hash requirements",
"metrics-cert-file", metricsCertFile, "metrics-key-file", metricsKeyFile)
Expand All @@ -342,7 +210,7 @@ func main() {
}

setupLog.Info("Initializing metrics certificate watcher using provided certificates",
"metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey)
"metrics-cert-path", options.metricsCertPath, "metrics-cert-name", options.metricsCertName, "metrics-cert-key", options.metricsCertKey)

var err error
metricsCertWatcher, err = certwatcher.New(
Expand All @@ -360,7 +228,7 @@ func main() {
if err != nil {
return nil, err
}
if allowWeakTLSKeyLengths {
if options.allowWeakTLSKeyLengths {
return cert, nil
}
if err := cryptoutil.ValidateTLSCertificate(metricsCertFile, cert); err != nil {
Expand All @@ -373,19 +241,19 @@ func main() {

config := ctrl.GetConfigOrDie()
// Set client rate limiter if specified
if kubeClientQPS > 0 {
config.QPS = float32(kubeClientQPS)
if options.kubeClientQPS > 0 {
config.QPS = float32(options.kubeClientQPS)
}
if kubeClientBurst > 0 {
config.Burst = kubeClientBurst
if options.kubeClientBurst > 0 {
config.Burst = options.kubeClientBurst
}

mgr, err := ctrl.NewManager(config, ctrl.Options{
Scheme: scheme,
Metrics: metricsServerOptions,
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
HealthProbeBindAddress: options.probeAddr,
LeaderElection: options.enableLeaderElection,
LeaderElectionID: "2fa1c467.opensandbox.io",
// LeaderElectionReleaseOnCancel causes the leader to voluntarily release the lease
// when the Manager is stopped, allowing a new leader to acquire it without waiting
Expand All @@ -407,8 +275,8 @@ func main() {
batchSandboxKindName = strings.ToLower(getKindFromType(&sandboxv1alpha1.BatchSandbox{}))
poolKindName = strings.ToLower(getKindFromType(&sandboxv1alpha1.Pool{}))
)
batchSandboxConcurrency := concurrencyConfig.Get(batchSandboxKindName, defaultBatchSandboxConcurrency)
poolConcurrency := concurrencyConfig.Get(poolKindName, defaultPoolConcurrency)
batchSandboxConcurrency := options.concurrencyConfig.Get(batchSandboxKindName, defaultBatchSandboxConcurrency)
poolConcurrency := options.concurrencyConfig.Get(poolKindName, defaultPoolConcurrency)
setupLog.Info("controller concurrency configured", batchSandboxKindName, batchSandboxConcurrency, poolKindName, poolConcurrency)

profileStore := poolassign.NewProfileStore()
Expand All @@ -422,7 +290,7 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("batchsandbox-controller"),
ResumePullSecret: resumePullSecret,
ResumePullSecret: options.resumePullSecret,
ProfileStore: profileStore,
}).SetupWithManager(mgr, batchSandboxConcurrency); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BatchSandbox")
Expand All @@ -442,11 +310,11 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("sandboxsnapshot-controller"),
ImageCommitterImage: imageCommitterImage,
CommitJobTimeout: commitJobTimeout,
SnapshotRegistry: snapshotRegistry,
SnapshotRegistryInsecure: snapshotRegistryInsecure,
SnapshotPushSecret: snapshotPushSecret,
ImageCommitterImage: options.imageCommitterImage,
CommitJobTimeout: options.commitJobTimeout,
SnapshotRegistry: options.snapshotRegistry,
SnapshotRegistryInsecure: options.snapshotRegistryInsecure,
SnapshotPushSecret: options.snapshotPushSecret,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "SandboxSnapshot")
os.Exit(1)
Expand Down
Loading
Loading