diff --git a/.github/workflows/trivy_fs.yaml b/.github/workflows/trivy_fs.yaml index 6e43472a835..b1edec30f2f 100644 --- a/.github/workflows/trivy_fs.yaml +++ b/.github/workflows/trivy_fs.yaml @@ -23,7 +23,9 @@ jobs: security-events: write steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - - uses: aquasecurity/trivy-action@18f2510ee396bbf400402947b394f2dd8c87dbb0 # v0.29.0 + # v0.36.0 released 2026-04-22 (post-incident). Internally SHA-pins + # setup-trivy@3fb12ec = Aqua's safe v0.2.6 per GHSA-69fq-xp46-6x23. + - uses: aquasecurity/trivy-action@ed142fd0673e97e23eac54620cfb913e5ce36c25 # v0.36.0 with: scan-type: 'fs' ignore-unfixed: true diff --git a/.gitignore b/.gitignore index 0b5d53bd0b9..0dc03d8fb24 100644 --- a/.gitignore +++ b/.gitignore @@ -68,6 +68,7 @@ skaffold_build/ # Hide generated .deb files *.deb +*.local compile_commands.json clang_tidy.log diff --git a/k8s/vizier/bootstrap/adaptive_export_deployment.yaml b/k8s/vizier/bootstrap/adaptive_export_deployment.yaml index dcb9305bbb4..5d091f2c989 100644 --- a/k8s/vizier/bootstrap/adaptive_export_deployment.yaml +++ b/k8s/vizier/bootstrap/adaptive_export_deployment.yaml @@ -18,17 +18,12 @@ spec: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: + # The beta.kubernetes.io/os label has been deprecated since + # k8s v1.14; every modern kubelet sets kubernetes.io/os. The + # single term below is enough — kept both ORed terms in the + # past for pre-1.14 compatibility. - matchExpressions: - key: kubernetes.io/os - operator: Exists - - key: kubernetes.io/os - operator: In - values: - - linux - - matchExpressions: - - key: beta.kubernetes.io/os - operator: Exists - - key: beta.kubernetes.io/os operator: In values: - linux @@ -57,6 +52,31 @@ spec: value: "10" - name: DETECTION_LOOKBACK_SEC value: "30" + # EXPORT_MODE controls the reconcile behaviour: + # auto - detection drives on/off (default) + # always - plugin always enabled (bypass detection) + # never - plugin always disabled and ch-* scripts purged + - name: EXPORT_MODE + value: "auto" + # Number of consecutive empty detection ticks before auto-disable fires. + - name: EXPORT_QUIET_TICKS + value: "6" + # Optional overrides for the ClickHouse PxL scripts. When unset they are + # parsed from CLICKHOUSE_DSN. Individual fields win over the parsed DSN. + # Defaults below match soc/tree/clickhouse-lab (forensic-soc-db CHI, + # ingest_writer user, forensic_db database). + - name: KUBESCAPE_TABLE + value: "kubescape_logs" + # - name: CLICKHOUSE_HOST + # value: "clickhouse-forensic-soc-db.clickhouse.svc.cluster.local" + # - name: CLICKHOUSE_PORT + # value: "9000" + # - name: CLICKHOUSE_USER + # value: "ingest_writer" + # - name: CLICKHOUSE_PASSWORD + # value: "changeme-ingest" + # - name: CLICKHOUSE_DATABASE + # value: "forensic_db" securityContext: allowPrivilegeEscalation: false capabilities: diff --git a/k8s/vizier/bootstrap/adaptive_export_secrets.yaml b/k8s/vizier/bootstrap/adaptive_export_secrets.yaml index 19be138743b..beced120f63 100644 --- a/k8s/vizier/bootstrap/adaptive_export_secrets.yaml +++ b/k8s/vizier/bootstrap/adaptive_export_secrets.yaml @@ -7,5 +7,8 @@ type: Opaque stringData: # Replace with your actual Pixie API key from https://work.withpixie.ai pixie-api-key: "PIXIE_API_KEY_PLACEHOLDER" - # Replace with your ClickHouse DSN: clickhouse://user:password@host:port/database - clickhouse-dsn: "otelcollector:otelcollectorpass@hyperdx-hdx-oss-v2-clickhouse.click.svc.cluster.local:9000/default" + # ClickHouse DSN matches soc/tree/clickhouse-lab (CHI "forensic-soc-db", + # ingest_writer user with INSERT rights into the forensic_db database). + # Format: user:password@host:port/database + clickhouse-dsn: >- + ingest_writer:changeme-ingest@clickhouse-forensic-soc-db.clickhouse.svc.cluster.local:9000/forensic_db diff --git a/src/stirling/source_connectors/socket_tracer/testing/container_images/BUILD.bazel b/src/stirling/source_connectors/socket_tracer/testing/container_images/BUILD.bazel index bcb150a2802..38fa4950c16 100644 --- a/src/stirling/source_connectors/socket_tracer/testing/container_images/BUILD.bazel +++ b/src/stirling/source_connectors/socket_tracer/testing/container_images/BUILD.bazel @@ -24,29 +24,29 @@ package(default_visibility = [ # Generate all Go container library permutations for supported Go versions. go_container_libraries( - container_type = "grpc_server", bazel_sdk_versions = pl_all_supported_go_sdk_versions, + container_type = "grpc_server", prebuilt_container_versions = pl_go_test_versions, ) # Stirling test cases usually test server side tracing. Therefore # we only need to provide the bazel SDK versions for the client containers. go_container_libraries( - container_type = "grpc_client", bazel_sdk_versions = pl_all_supported_go_sdk_versions, + container_type = "grpc_client", ) go_container_libraries( - container_type = "tls_server", bazel_sdk_versions = pl_all_supported_go_sdk_versions, + container_type = "tls_server", prebuilt_container_versions = pl_go_test_versions, ) # Stirling test cases usually test server side tracing. Therefore # we only need to provide the bazel SDK versions for the client containers. go_container_libraries( - container_type = "tls_client", bazel_sdk_versions = pl_all_supported_go_sdk_versions, + container_type = "tls_client", ) pl_cc_test_library( diff --git a/src/vizier/services/adaptive_export/cmd/main.go b/src/vizier/services/adaptive_export/cmd/main.go index b283fe8083b..10d178f6b3f 100644 --- a/src/vizier/services/adaptive_export/cmd/main.go +++ b/src/vizier/services/adaptive_export/cmd/main.go @@ -21,12 +21,13 @@ import ( "fmt" "os" "os/signal" + "strings" "syscall" "time" log "github.com/sirupsen/logrus" - "px.dev/pixie/src/api/go/pxapi" + "px.dev/pixie/src/api/go/pxapi" "px.dev/pixie/src/vizier/services/adaptive_export/internal/config" "px.dev/pixie/src/vizier/services/adaptive_export/internal/pixie" "px.dev/pixie/src/vizier/services/adaptive_export/internal/pxl" @@ -42,21 +43,20 @@ const ( ) const ( - // TODO(ddelnano): Clickhouse configuration should come from plugin config. - schemaCreationScript = ` + schemaCreationScriptTmpl = ` import px px.display(px.CreateClickHouseSchemas( - host="hyperdx-hdx-oss-v2-clickhouse.click.svc.cluster.local", - port=9000, - username="otelcollector", - password="otelcollectorpass", - database="default" + host="%s", + port=%s, + username="%s", + password="%s", + database="%s" )) ` - detectionScript = ` + detectionScriptTmpl = ` import px -df = px.DataFrame('kubescape_logs', clickhouse_dsn='otelcollector:otelcollectorpass@hyperdx-hdx-oss-v2-clickhouse.click.svc.cluster.local:9000/default', start_time='-%ds') +df = px.DataFrame('%s', clickhouse_dsn='%s', start_time='-%ds') df.alert = df.message df.namespace = px.pluck(df.RuntimeK8sDetails, "podNamespace") df.podName = px.pluck(df.RuntimeK8sDetails, "podName") @@ -66,6 +66,15 @@ px.display(df) ` ) +func renderSchemaScript(cfg config.ClickHouse) string { + return fmt.Sprintf(schemaCreationScriptTmpl, + cfg.Host(), cfg.Port(), cfg.User(), cfg.Password(), cfg.Database()) +} + +func renderDetectionScript(cfg config.ClickHouse, lookback int64) string { + return fmt.Sprintf(detectionScriptTmpl, cfg.Table(), cfg.DSN(), lookback) +} + func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -76,11 +85,11 @@ func main() { log.WithError(err).Fatal("failed to load configuration") } - clusterId := cfg.Pixie().ClusterID() + clusterID := cfg.Pixie().ClusterID() clusterName := cfg.Worker().ClusterName() // Setup Pixie Plugin API client - log.Infof("Setting up Pixie plugin API client for cluster-id %s", clusterId) + log.Infof("Setting up Pixie plugin API client for cluster-id %s", clusterID) pluginClient, err := setupPixie(ctx, cfg.Pixie(), defaultRetries, defaultSleepTime) if err != nil { log.WithError(err).Fatal("setting up Pixie plugin client failed") @@ -94,11 +103,23 @@ func main() { log.WithError(err).Fatal("failed to create pxapi client") } - // Start schema creation background task - go runSchemaCreationTask(ctx, pxClient, clusterId) + // Start schema creation background task. This drives + // px.CreateClickHouseSchemas, which issues CREATE TABLE IF NOT EXISTS + // for every Pixie stirling table the metadata service knows about. In + // labs where ClickHouse users don't have DDL rights (e.g. soc's + // ingest_writer with allow_ddl=0), the CREATE silently fails and only + // tables pre-created by external schema.sql work. Off by default to + // avoid noisy server logs; opt-in via env when you want Pixie's + // automatic schema bootstrap. + if strings.EqualFold(os.Getenv("ENABLE_SCHEMA_CREATION"), "true") { + log.Info("ENABLE_SCHEMA_CREATION=true — starting schema creation task") + go runSchemaCreationTask(ctx, pxClient, clusterID, cfg.ClickHouse()) + } else { + log.Info("Schema creation task disabled (set ENABLE_SCHEMA_CREATION=true to opt in)") + } - // Start detection script that monitors for when to enable persistence - go runDetectionTask(ctx, pxClient, pluginClient, cfg, clusterId, clusterName) + // Start detection + reconcile loop that turns the retention plugin on/off + go runDetectionTask(ctx, pxClient, pluginClient, cfg, clusterID, clusterName) // Wait for signal to shutdown sigCh := make(chan os.Signal, 1) @@ -110,34 +131,29 @@ func main() { time.Sleep(1 * time.Second) } -func runSchemaCreationTask(ctx context.Context, client *pxapi.Client, clusterID string) { +func runSchemaCreationTask(ctx context.Context, client *pxapi.Client, clusterID string, chCfg config.ClickHouse) { ticker := time.NewTicker(schemaCreationInterval) defer ticker.Stop() - // Run immediately on startup - log.Info("Running schema creation script") - execCtx, cancel := context.WithTimeout(ctx, scriptExecutionTimeout) - if _, err := pxl.ExecuteScript(execCtx, client, clusterID, schemaCreationScript); err != nil { - log.WithError(err).Error("failed to execute schema creation script") - } else { + runOnce := func() { + log.Info("Running schema creation script") + execCtx, cancel := context.WithTimeout(ctx, scriptExecutionTimeout) + defer cancel() + if _, err := pxl.ExecuteScript(execCtx, client, clusterID, renderSchemaScript(chCfg)); err != nil { + log.WithError(err).Error("failed to execute schema creation script") + return + } log.Info("Schema creation script completed successfully") } - cancel() + runOnce() for { select { case <-ctx.Done(): log.Info("Schema creation task shutting down") return case <-ticker.C: - log.Info("Running schema creation script") - execCtx, cancel := context.WithTimeout(ctx, scriptExecutionTimeout) - if _, err := pxl.ExecuteScript(execCtx, client, clusterID, schemaCreationScript); err != nil { - log.WithError(err).Error("failed to execute schema creation script") - } else { - log.Info("Schema creation script completed successfully") - } - cancel() + runOnce() } } } @@ -145,11 +161,47 @@ func runSchemaCreationTask(ctx context.Context, client *pxapi.Client, clusterID func runDetectionTask(ctx context.Context, pxClient *pxapi.Client, pluginClient *pixie.Client, cfg config.Config, clusterID string, clusterName string) { detectionInterval := time.Duration(cfg.Worker().DetectionInterval()) * time.Second detectionLookback := cfg.Worker().DetectionLookback() + quietTicks := cfg.Worker().ExportQuietTicks() + mode := cfg.Worker().ExportMode() ticker := time.NewTicker(detectionInterval) defer ticker.Stop() - pluginEnabled := false + // pluginEnabled tracks our last-known retention-plugin state. A nil value means + // we haven't reconciled yet; we always query on the first tick. + var pluginEnabled *bool + quietStreak := int64(0) + + reconcile := func(want bool) { + if pluginEnabled != nil && *pluginEnabled == want { + log.Debugf("export already in desired state (enabled=%v), no action taken", want) + return + } + pluginCtx, pluginCancel := context.WithTimeout(ctx, 2*time.Minute) + defer pluginCancel() + if want { + log.Info("Enabling forensic export") + if err := enableClickHousePlugin(pluginCtx, pluginClient, cfg, clusterID, clusterName); err != nil { + log.WithError(err).Error("failed to enable forensic export") + return + } + v := true + pluginEnabled = &v + log.Info("Forensic export enabled successfully") + } else { + log.Info("Disabling forensic export") + if err := disableClickHousePlugin(pluginCtx, pluginClient, cfg, clusterID, clusterName); err != nil { + log.WithError(err).Error("failed to disable forensic export") + return + } + v := false + pluginEnabled = &v + quietStreak = 0 + log.Info("Forensic export disabled successfully") + } + } + + log.Infof("Detection task starting (mode=%s, quietTicks=%d)", mode, quietTicks) for { select { @@ -157,38 +209,70 @@ func runDetectionTask(ctx context.Context, pxClient *pxapi.Client, pluginClient log.Info("Detection task shutting down") return case <-ticker.C: - log.Info("Running detection script") - // Run detection script with lookback period - detectionPxl := fmt.Sprintf(detectionScript, detectionLookback) + switch mode { + case config.ExportModeAlways: + reconcile(true) + continue + case config.ExportModeNever: + reconcile(false) + continue + } + + // auto mode: detection drives the state. + log.Debug("Running detection script") execCtx, cancel := context.WithTimeout(ctx, scriptExecutionTimeout) - recordCount, err := pxl.ExecuteScript(execCtx, pxClient, clusterID, detectionPxl) + recordCount, err := pxl.ExecuteScript(execCtx, pxClient, clusterID, renderDetectionScript(cfg.ClickHouse(), detectionLookback)) cancel() - if err != nil { log.WithError(err).Error("failed to execute detection script") continue } - log.Debugf("Detection script returned %d records", recordCount) - // If we have records and plugin is not enabled, enable it - if recordCount > 0 && !pluginEnabled { - log.Info("Detection script returned records - enabling forensic export") - pluginCtx, pluginCancel := context.WithTimeout(ctx, 2*time.Minute) - if err := enableClickHousePlugin(pluginCtx, pluginClient, cfg, clusterID, clusterName); err != nil { - log.WithError(err).Error("failed to enable forensic export") - } else { - pluginEnabled = true - log.Info("Forensic export enabled successfully") + if recordCount > 0 { + quietStreak = 0 + reconcile(true) + } else { + quietStreak++ + if quietStreak >= quietTicks { + reconcile(false) } - pluginCancel() - } else if recordCount > 0 && pluginEnabled { - log.Info("Detection script returned records but forensic export already enabled, no action taken") } } } } +func disableClickHousePlugin(ctx context.Context, client *pixie.Client, cfg config.Config, clusterID string, clusterName string) error { + plugin, err := client.GetClickHousePlugin() + if err != nil { + return fmt.Errorf("getting data retention plugins failed: %w", err) + } + if !plugin.RetentionEnabled { + log.Info("ClickHouse plugin already disabled; removing any lingering ch-* scripts") + } else { + if err := client.DisableClickHousePlugin(plugin.LatestVersion); err != nil { + return fmt.Errorf("failed to disable ClickHouse plugin: %w", err) + } + } + + // Tear down the per-cluster ch-* retention scripts so the demo can be re-run cleanly. + current, err := client.GetClusterScripts(clusterID, clusterName) + if err != nil { + return fmt.Errorf("failed to list retention scripts: %w", err) + } + var errs []error + for _, s := range current { + log.Infof("Deleting retention script %s", s.Name) + if err := client.DeleteDataRetentionScript(s.ScriptId); err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return fmt.Errorf("errors while deleting retention scripts: %v", errs) + } + return nil +} + func enableClickHousePlugin(ctx context.Context, client *pixie.Client, cfg config.Config, clusterID string, clusterName string) error { log.Info("Checking the current ClickHouse plugin configuration") plugin, err := client.GetClickHousePlugin() @@ -203,7 +287,7 @@ func enableClickHousePlugin(ctx context.Context, client *pixie.Client, cfg confi if err != nil { return fmt.Errorf("getting ClickHouse plugin config failed: %w", err) } - if config.ExportUrl != cfg.ClickHouse().DSN() { + if config.ExportURL != cfg.ClickHouse().DSN() { log.Info("ClickHouse plugin is configured with different DSN... Overwriting") enablePlugin = true } @@ -212,7 +296,7 @@ func enableClickHousePlugin(ctx context.Context, client *pixie.Client, cfg confi if enablePlugin { log.Info("Enabling ClickHouse plugin") err := client.EnableClickHousePlugin(&pixie.ClickHousePluginConfig{ - ExportUrl: cfg.ClickHouse().DSN(), + ExportURL: cfg.ClickHouse().DSN(), }, plugin.LatestVersion) if err != nil { return fmt.Errorf("failed to enable ClickHouse plugin: %w", err) @@ -227,7 +311,34 @@ func enableClickHousePlugin(ctx context.Context, client *pixie.Client, cfg confi return fmt.Errorf("failed to get preset scripts: %w", err) } + // Filter presets by an allow-list of case-insensitive substrings in the + // script name. Useful when the destination ClickHouse doesn't have every + // target table pre-created (Pixie's C++ ClickHouseExportSinkNode aborts + // kelvin on UNKNOWN_TABLE from CH — upstream bug), so we must not install + // retention scripts whose target table is missing. + // + // Example: ALLOWED_RETENTION_SCRIPTS="conn_stats" installs only the + // conn_stats preset (matches "conn_stats export"), skipping dc_snoop + + // stack_traces which target tables that don't exist in soc's schema.sql. + // + // Empty/unset = no filter (install every preset — the prior behavior). definitions := defsFromPixie + if allow := strings.TrimSpace(os.Getenv("ALLOWED_RETENTION_SCRIPTS")); allow != "" { + tokens := strings.Split(allow, ",") + filtered := make([]*script.ScriptDefinition, 0, len(defsFromPixie)) + for _, d := range defsFromPixie { + nameLower := strings.ToLower(d.Name) + for _, t := range tokens { + t = strings.ToLower(strings.TrimSpace(t)) + if t != "" && strings.Contains(nameLower, t) { + filtered = append(filtered, d) + break + } + } + } + log.Infof("ALLOWED_RETENTION_SCRIPTS=%q; filtered presets: %d of %d kept", allow, len(filtered), len(defsFromPixie)) + definitions = filtered + } log.Infof("Getting current scripts for cluster") currentScripts, err := client.GetClusterScripts(clusterID, clusterName) diff --git a/src/vizier/services/adaptive_export/internal/config/config.go b/src/vizier/services/adaptive_export/internal/config/config.go index fc500359dfe..7c518513d9a 100644 --- a/src/vizier/services/adaptive_export/internal/config/config.go +++ b/src/vizier/services/adaptive_export/internal/config/config.go @@ -33,20 +33,39 @@ import ( ) const ( - envVerbose = "VERBOSE" - envClickHouseDSN = "CLICKHOUSE_DSN" - envPixieClusterID = "PIXIE_CLUSTER_ID" - envPixieEndpoint = "PIXIE_ENDPOINT" - envPixieAPIKey = "PIXIE_API_KEY" - envClusterName = "CLUSTER_NAME" - envCollectInterval = "COLLECT_INTERVAL_SEC" - envDetectionInterval = "DETECTION_INTERVAL_SEC" - envDetectionLookback = "DETECTION_LOOKBACK_SEC" - defPixieHostname = "work.withpixie.ai:443" - boolTrue = "true" - defCollectInterval = 30 - defDetectionInterval = 10 - defDetectionLookback = 15 + envVerbose = "VERBOSE" + envClickHouseDSN = "CLICKHOUSE_DSN" + envClickHouseHost = "CLICKHOUSE_HOST" + envClickHousePort = "CLICKHOUSE_PORT" + envClickHouseUser = "CLICKHOUSE_USER" + envClickHousePass = "CLICKHOUSE_PASSWORD" + envClickHouseDB = "CLICKHOUSE_DATABASE" + envKubescapeTable = "KUBESCAPE_TABLE" + envPixieClusterID = "PIXIE_CLUSTER_ID" + envPixieEndpoint = "PIXIE_ENDPOINT" + envPixieAPIKey = "PIXIE_API_KEY" + envClusterName = "CLUSTER_NAME" + envCollectInterval = "COLLECT_INTERVAL_SEC" + envDetectionInterval = "DETECTION_INTERVAL_SEC" + envDetectionLookback = "DETECTION_LOOKBACK_SEC" + envExportMode = "EXPORT_MODE" + envExportQuietTicks = "EXPORT_QUIET_TICKS" + defPixieHostname = "work.pixie.austrianopencloudcommunity.org:443" + defClickHousePort = "9000" + defKubescapeTable = "kubescape_logs" + defExportMode = "auto" + defExportQuietTicks = 6 + boolTrue = "true" + defCollectInterval = 30 + defDetectionInterval = 10 + defDetectionLookback = 15 +) + +// ExportMode values. +const ( + ExportModeAuto = "auto" + ExportModeAlways = "always" + ExportModeNever = "never" ) var ( @@ -206,6 +225,32 @@ func setUpConfig() error { return err } + exportQuietTicks, err := getIntEnvWithDefault(envExportQuietTicks, defExportQuietTicks) + if err != nil { + return err + } + + exportMode := strings.ToLower(getEnvWithDefault(envExportMode, defExportMode)) + switch exportMode { + case ExportModeAuto, ExportModeAlways, ExportModeNever: + default: + return fmt.Errorf("invalid %s=%q (must be auto|always|never)", envExportMode, exportMode) + } + + // Parse the DSN into its parts; individual env vars override the parsed values. + dsnHost, dsnPort, dsnUser, dsnPass, dsnDB := parseDSN(clickhouseDSN) + chHost := getEnvWithDefault(envClickHouseHost, dsnHost) + chPort := getEnvWithDefault(envClickHousePort, firstNonEmpty(dsnPort, defClickHousePort)) + chUser := getEnvWithDefault(envClickHouseUser, dsnUser) + chPass := getEnvWithDefault(envClickHousePass, dsnPass) + chDB := getEnvWithDefault(envClickHouseDB, dsnDB) + chTable := getEnvWithDefault(envKubescapeTable, defKubescapeTable) + + // If individual fields were provided but CLICKHOUSE_DSN was not, build one. + if clickhouseDSN == "" && chHost != "" && chUser != "" { + clickhouseDSN = fmt.Sprintf("%s:%s@%s:%s/%s", chUser, chPass, chHost, chPort, chDB) + } + instance = &config{ settings: &settings{ buildDate: buildDate, @@ -213,14 +258,22 @@ func setUpConfig() error { version: integrationVersion, }, worker: &worker{ - clusterName: clusterName, - pixieClusterID: pixieClusterID, - collectInterval: collectInterval, - detectionInterval: detectionInterval, - detectionLookback: detectionLookback, + clusterName: clusterName, + pixieClusterID: pixieClusterID, + collectInterval: collectInterval, + detectionInterval: detectionInterval, + detectionLookback: detectionLookback, + exportMode: exportMode, + exportQuietTicks: exportQuietTicks, }, clickhouse: &clickhouse{ dsn: clickhouseDSN, + host: chHost, + port: chPort, + user: chUser, + password: chPass, + database: chDB, + table: chTable, userAgent: "pixie-clickhouse/" + integrationVersion, }, pixie: &pixie{ @@ -232,6 +285,50 @@ func setUpConfig() error { return instance.validate() } +// parseDSN best-effort splits `user:pass@host:port/db`. Missing parts come back empty. +func parseDSN(dsn string) (string, string, string, string, string) { + if dsn == "" { + return "", "", "", "", "" + } + at := strings.LastIndex(dsn, "@") + if at < 0 { + return "", "", "", "", "" + } + creds := dsn[:at] + rest := dsn[at+1:] + + var user, pass string + if i := strings.Index(creds, ":"); i >= 0 { + user = creds[:i] + pass = creds[i+1:] + } else { + user = creds + } + + var db string + if i := strings.Index(rest, "/"); i >= 0 { + db = rest[i+1:] + rest = rest[:i] + } + var host, port string + if i := strings.Index(rest, ":"); i >= 0 { + host = rest[:i] + port = rest[i+1:] + } else { + host = rest + } + return host, port, user, pass, db +} + +func firstNonEmpty(vals ...string) string { + for _, v := range vals { + if v != "" { + return v + } + } + return "" +} + func getEnvWithDefault(key, defaultValue string) string { value := os.Getenv(key) if value == "" { @@ -325,29 +422,46 @@ func (s *settings) BuildDate() string { type ClickHouse interface { DSN() string + Host() string + Port() string + User() string + Password() string + Database() string + Table() string UserAgent() string validate() error } type clickhouse struct { dsn string + host string + port string + user string + password string + database string + table string userAgent string } func (c *clickhouse) validate() error { if c.dsn == "" { - return fmt.Errorf("missing required env variable '%s'", envClickHouseDSN) + return fmt.Errorf("missing required env variable '%s' (or provide %s/%s/%s/%s/%s)", + envClickHouseDSN, envClickHouseHost, envClickHousePort, envClickHouseUser, envClickHousePass, envClickHouseDB) + } + if c.host == "" || c.user == "" || c.database == "" { + return fmt.Errorf("ClickHouse host/user/database could not be derived from %s=%q", envClickHouseDSN, c.dsn) } return nil } -func (c *clickhouse) DSN() string { - return c.dsn -} - -func (c *clickhouse) UserAgent() string { - return c.userAgent -} +func (c *clickhouse) DSN() string { return c.dsn } +func (c *clickhouse) Host() string { return c.host } +func (c *clickhouse) Port() string { return c.port } +func (c *clickhouse) User() string { return c.user } +func (c *clickhouse) Password() string { return c.password } +func (c *clickhouse) Database() string { return c.database } +func (c *clickhouse) Table() string { return c.table } +func (c *clickhouse) UserAgent() string { return c.userAgent } type Pixie interface { APIKey() string @@ -390,15 +504,19 @@ type Worker interface { CollectInterval() int64 DetectionInterval() int64 DetectionLookback() int64 + ExportMode() string + ExportQuietTicks() int64 validate() error } type worker struct { - clusterName string - pixieClusterID string - collectInterval int64 - detectionInterval int64 - detectionLookback int64 + clusterName string + pixieClusterID string + collectInterval int64 + detectionInterval int64 + detectionLookback int64 + exportMode string + exportQuietTicks int64 } func (a *worker) validate() error { @@ -408,22 +526,10 @@ func (a *worker) validate() error { return nil } -func (a *worker) ClusterName() string { - return a.clusterName -} - -func (a *worker) PixieClusterID() string { - return a.pixieClusterID -} - -func (a *worker) CollectInterval() int64 { - return a.collectInterval -} - -func (a *worker) DetectionInterval() int64 { - return a.detectionInterval -} - -func (a *worker) DetectionLookback() int64 { - return a.detectionLookback -} +func (a *worker) ClusterName() string { return a.clusterName } +func (a *worker) PixieClusterID() string { return a.pixieClusterID } +func (a *worker) CollectInterval() int64 { return a.collectInterval } +func (a *worker) DetectionInterval() int64 { return a.detectionInterval } +func (a *worker) DetectionLookback() int64 { return a.detectionLookback } +func (a *worker) ExportMode() string { return a.exportMode } +func (a *worker) ExportQuietTicks() int64 { return a.exportQuietTicks } diff --git a/src/vizier/services/adaptive_export/internal/config/definition.go b/src/vizier/services/adaptive_export/internal/config/definition.go index fd772022753..2f663ac9422 100644 --- a/src/vizier/services/adaptive_export/internal/config/definition.go +++ b/src/vizier/services/adaptive_export/internal/config/definition.go @@ -17,7 +17,6 @@ package config import ( - "io/ioutil" "os" "path/filepath" "strings" @@ -35,7 +34,7 @@ func ReadScriptDefinitions(dir string) ([]*script.ScriptDefinition, error) { if _, err := os.Stat(dir); os.IsNotExist(err) { return nil, nil } - files, err := ioutil.ReadDir(dir) + files, err := os.ReadDir(dir) if err != nil { return nil, err } @@ -53,7 +52,7 @@ func ReadScriptDefinitions(dir string) ([]*script.ScriptDefinition, error) { } func readScriptDefinition(path string) (*script.ScriptDefinition, error) { - content, err := ioutil.ReadFile(path) + content, err := os.ReadFile(path) if err != nil { return nil, err } diff --git a/src/vizier/services/adaptive_export/internal/pixie/pixie.go b/src/vizier/services/adaptive_export/internal/pixie/pixie.go index 97e5bb8ae23..feb8cadd698 100644 --- a/src/vizier/services/adaptive_export/internal/pixie/pixie.go +++ b/src/vizier/services/adaptive_export/internal/pixie/pixie.go @@ -26,16 +26,16 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + "px.dev/pixie/src/api/go/pxapi/utils" "px.dev/pixie/src/api/proto/cloudpb" "px.dev/pixie/src/api/proto/uuidpb" - "px.dev/pixie/src/vizier/services/adaptive_export/internal/script" ) const ( - clickhousePluginId = "clickhouse" - exportUrlConfig = "exportURL" + clickhousePluginID = "clickhouse" + exportURLConfig = "exportURL" ) type Client struct { @@ -88,40 +88,40 @@ func (c *Client) GetClickHousePlugin() (*cloudpb.Plugin, error) { return nil, err } for _, plugin := range resp.Plugins { - if plugin.Id == clickhousePluginId { + if plugin.Id == clickhousePluginID { return plugin, nil } } - return nil, fmt.Errorf("the %s plugin could not be found", clickhousePluginId) + return nil, fmt.Errorf("the %s plugin could not be found", clickhousePluginID) } type ClickHousePluginConfig struct { - ExportUrl string + ExportURL string } func (c *Client) GetClickHousePluginConfig() (*ClickHousePluginConfig, error) { req := &cloudpb.GetOrgRetentionPluginConfigRequest{ - PluginId: clickhousePluginId, + PluginId: clickhousePluginID, } resp, err := c.pluginClient.GetOrgRetentionPluginConfig(c.ctx, req) if err != nil { return nil, err } - exportUrl := resp.CustomExportUrl - if exportUrl == "" { - exportUrl, err = c.getDefaultClickHouseExportUrl() + exportURL := resp.CustomExportUrl + if exportURL == "" { + exportURL, err = c.getDefaultClickHouseExportURL() if err != nil { return nil, err } } return &ClickHousePluginConfig{ - ExportUrl: exportUrl, + ExportURL: exportURL, }, nil } -func (c *Client) getDefaultClickHouseExportUrl() (string, error) { +func (c *Client) getDefaultClickHouseExportURL() (string, error) { req := &cloudpb.GetRetentionPluginInfoRequest{ - PluginId: clickhousePluginId, + PluginId: clickhousePluginID, } info, err := c.pluginClient.GetRetentionPluginInfo(c.ctx, req) if err != nil { @@ -132,13 +132,13 @@ func (c *Client) getDefaultClickHouseExportUrl() (string, error) { func (c *Client) EnableClickHousePlugin(config *ClickHousePluginConfig, version string) error { req := &cloudpb.UpdateRetentionPluginConfigRequest{ - PluginId: clickhousePluginId, + PluginId: clickhousePluginID, Configs: map[string]string{ - exportUrlConfig: config.ExportUrl, + exportURLConfig: config.ExportURL, }, Enabled: &types.BoolValue{Value: true}, Version: &types.StringValue{Value: version}, - CustomExportUrl: &types.StringValue{Value: config.ExportUrl}, + CustomExportUrl: &types.StringValue{Value: config.ExportURL}, InsecureTLS: &types.BoolValue{Value: false}, DisablePresets: &types.BoolValue{Value: true}, } @@ -146,6 +146,18 @@ func (c *Client) EnableClickHousePlugin(config *ClickHousePluginConfig, version return err } +// DisableClickHousePlugin flips the retention plugin off without touching scripts. +// Scripts are expected to be removed separately via DeleteDataRetentionScript. +func (c *Client) DisableClickHousePlugin(version string) error { + req := &cloudpb.UpdateRetentionPluginConfigRequest{ + PluginId: clickhousePluginID, + Enabled: &types.BoolValue{Value: false}, + Version: &types.StringValue{Value: version}, + } + _, err := c.pluginClient.UpdateRetentionPluginConfig(c.ctx, req) + return err +} + func (c *Client) GetPresetScripts() ([]*script.ScriptDefinition, error) { resp, err := c.pluginClient.GetRetentionScripts(c.ctx, &cloudpb.GetRetentionScriptsRequest{}) if err != nil { @@ -153,7 +165,7 @@ func (c *Client) GetPresetScripts() ([]*script.ScriptDefinition, error) { } var l []*script.ScriptDefinition for _, s := range resp.Scripts { - if s.PluginId == clickhousePluginId && s.IsPreset { + if s.PluginId == clickhousePluginID && s.IsPreset { sd, err := c.getScriptDefinition(s) if err != nil { return nil, err @@ -164,14 +176,14 @@ func (c *Client) GetPresetScripts() ([]*script.ScriptDefinition, error) { return l, nil } -func (c *Client) GetClusterScripts(clusterId, clusterName string) ([]*script.Script, error) { +func (c *Client) GetClusterScripts(clusterID, clusterName string) ([]*script.Script, error) { resp, err := c.pluginClient.GetRetentionScripts(c.ctx, &cloudpb.GetRetentionScriptsRequest{}) if err != nil { return nil, err } var l []*script.Script for _, s := range resp.Scripts { - if s.PluginId == clickhousePluginId { + if s.PluginId == clickhousePluginID { sd, err := c.getScriptDefinition(s) if err != nil { return nil, err @@ -179,22 +191,22 @@ func (c *Client) GetClusterScripts(clusterId, clusterName string) ([]*script.Scr l = append(l, &script.Script{ ScriptDefinition: *sd, ScriptId: utils.ProtoToUUIDStr(s.ScriptID), - ClusterIds: getClusterIdsAsString(s.ClusterIDs), + ClusterIds: getClusterIDsAsString(s.ClusterIDs), }) } } return l, nil } -func getClusterIdsAsString(clusterIDs []*uuidpb.UUID) string { - scriptClusterId := "" +func getClusterIDsAsString(clusterIDs []*uuidpb.UUID) string { + scriptClusterID := "" for i, id := range clusterIDs { if i > 0 { - scriptClusterId = scriptClusterId + "," + scriptClusterID = scriptClusterID + "," } - scriptClusterId = scriptClusterId + utils.ProtoToUUIDStr(id) + scriptClusterID = scriptClusterID + utils.ProtoToUUIDStr(id) } - return scriptClusterId + return scriptClusterID } func (c *Client) getScriptDefinition(s *cloudpb.RetentionScript) (*script.ScriptDefinition, error) { @@ -211,36 +223,36 @@ func (c *Client) getScriptDefinition(s *cloudpb.RetentionScript) (*script.Script }, nil } -func (c *Client) AddDataRetentionScript(clusterId string, scriptName string, description string, frequencyS int64, contents string) error { +func (c *Client) AddDataRetentionScript(clusterID string, scriptName string, description string, frequencyS int64, contents string) error { req := &cloudpb.CreateRetentionScriptRequest{ ScriptName: scriptName, Description: description, FrequencyS: frequencyS, Contents: contents, - ClusterIDs: []*uuidpb.UUID{utils.ProtoFromUUIDStrOrNil(clusterId)}, - PluginId: clickhousePluginId, + ClusterIDs: []*uuidpb.UUID{utils.ProtoFromUUIDStrOrNil(clusterID)}, + PluginId: clickhousePluginID, } _, err := c.pluginClient.CreateRetentionScript(c.ctx, req) return err } -func (c *Client) UpdateDataRetentionScript(clusterId string, scriptId string, scriptName string, description string, frequencyS int64, contents string) error { +func (c *Client) UpdateDataRetentionScript(clusterID string, scriptID string, scriptName string, description string, frequencyS int64, contents string) error { req := &cloudpb.UpdateRetentionScriptRequest{ - ID: utils.ProtoFromUUIDStrOrNil(scriptId), + ID: utils.ProtoFromUUIDStrOrNil(scriptID), ScriptName: &types.StringValue{Value: scriptName}, Description: &types.StringValue{Value: description}, Enabled: &types.BoolValue{Value: true}, FrequencyS: &types.Int64Value{Value: frequencyS}, Contents: &types.StringValue{Value: contents}, - ClusterIDs: []*uuidpb.UUID{utils.ProtoFromUUIDStrOrNil(clusterId)}, + ClusterIDs: []*uuidpb.UUID{utils.ProtoFromUUIDStrOrNil(clusterID)}, } _, err := c.pluginClient.UpdateRetentionScript(c.ctx, req) return err } -func (c *Client) DeleteDataRetentionScript(scriptId string) error { +func (c *Client) DeleteDataRetentionScript(scriptID string) error { req := &cloudpb.DeleteRetentionScriptRequest{ - ID: utils.ProtoFromUUIDStrOrNil(scriptId), + ID: utils.ProtoFromUUIDStrOrNil(scriptID), } _, err := c.pluginClient.DeleteRetentionScript(c.ctx, req) return err