diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..1d8eba3 --- /dev/null +++ b/config/config.go @@ -0,0 +1,230 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "context" + "fmt" + "net/http" + "slices" + "time" + + "github.com/PuerkitoBio/rehttp" + "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/utils" + "golang.org/x/oauth2/google" + "google.golang.org/api/compute/v1" + "google.golang.org/api/monitoring/v3" + "google.golang.org/api/option" +) + +type Option struct { + CLIFlag string + OTelKey string + Default any +} + +const ( + DefaultUniverseDomain = "googleapis.com" + DefaultMaxRetries = 0 + DefaultHTTPTimeout = "10s" + DefaultMaxBackoff = "5s" + DefaultBackoffJitter = "1s" + DefaultMetricsInterval = "5m" + DefaultMetricsOffset = "0s" + DefaultMetricsIngest = false + DefaultFillMissing = true + DefaultDropDelegated = false + DefaultAggregateDeltas = false + DefaultDeltasTTL = "30m" + DefaultDescriptorTTL = "0s" + DefaultDescriptorGoogleOnly = true +) + +// DefaultRetryStatuses must be treated as immutable after declaration. +var DefaultRetryStatuses = []int{http.StatusServiceUnavailable} + +var ( + ProjectIDs = Option{CLIFlag: "google.project-ids", OTelKey: "project_ids"} + ProjectsFilter = Option{CLIFlag: "google.projects.filter", OTelKey: "projects_filter"} + UniverseDomain = Option{CLIFlag: "google.universe-domain", OTelKey: "universe_domain", Default: DefaultUniverseDomain} + MaxRetries = Option{CLIFlag: "stackdriver.max-retries", OTelKey: "max_retries", Default: DefaultMaxRetries} + HTTPTimeout = Option{CLIFlag: "stackdriver.http-timeout", OTelKey: "http_timeout", Default: DefaultHTTPTimeout} + MaxBackoff = Option{CLIFlag: "stackdriver.max-backoff", OTelKey: "max_backoff", Default: DefaultMaxBackoff} + BackoffJitter = Option{CLIFlag: "stackdriver.backoff-jitter", OTelKey: "backoff_jitter", Default: DefaultBackoffJitter} + RetryStatuses = Option{CLIFlag: "stackdriver.retry-statuses", OTelKey: "retry_statuses", Default: DefaultRetryStatuses} + MetricsPrefixes = Option{CLIFlag: "monitoring.metrics-prefixes", OTelKey: "metrics_prefixes"} + MetricsInterval = Option{CLIFlag: "monitoring.metrics-interval", OTelKey: "metrics_interval", Default: DefaultMetricsInterval} + MetricsOffset = Option{CLIFlag: "monitoring.metrics-offset", OTelKey: "metrics_offset", Default: DefaultMetricsOffset} + MetricsIngest = Option{CLIFlag: "monitoring.metrics-ingest-delay", OTelKey: "metrics_ingest_delay", Default: DefaultMetricsIngest} + FillMissing = Option{CLIFlag: "collector.fill-missing-labels", OTelKey: "fill_missing_labels", Default: DefaultFillMissing} + DropDelegated = Option{CLIFlag: "monitoring.drop-delegated-projects", OTelKey: "drop_delegated_projects", Default: DefaultDropDelegated} + Filters = Option{CLIFlag: "monitoring.filters", OTelKey: "filters"} + AggregateDeltas = Option{CLIFlag: "monitoring.aggregate-deltas", OTelKey: "aggregate_deltas", Default: DefaultAggregateDeltas} + DeltasTTL = Option{CLIFlag: "monitoring.aggregate-deltas-ttl", OTelKey: "aggregate_deltas_ttl", Default: DefaultDeltasTTL} + DescriptorTTL = Option{CLIFlag: "monitoring.descriptor-cache-ttl", OTelKey: "descriptor_cache_ttl", Default: DefaultDescriptorTTL} + DescriptorGoogleOnly = Option{CLIFlag: "monitoring.descriptor-cache-only-google", OTelKey: "descriptor_cache_only_google", Default: DefaultDescriptorGoogleOnly} + + AllOptions = []Option{ + ProjectIDs, + ProjectsFilter, + UniverseDomain, + MaxRetries, + HTTPTimeout, + MaxBackoff, + BackoffJitter, + RetryStatuses, + MetricsPrefixes, + MetricsInterval, + MetricsOffset, + MetricsIngest, + FillMissing, + DropDelegated, + Filters, + AggregateDeltas, + DeltasTTL, + DescriptorTTL, + DescriptorGoogleOnly, + } +) + +type RuntimeConfig struct { + ProjectIDs []string + ProjectsFilter string + UniverseDomain string + MaxRetries int + HTTPTimeout time.Duration + MaxBackoff time.Duration + BackoffJitter time.Duration + RetryStatuses []int + MetricsPrefixes []string + MetricsInterval time.Duration + MetricsOffset time.Duration + MetricsIngest bool + FillMissing bool + DropDelegated bool + Filters []string + AggregateDeltas bool + DeltasTTL time.Duration + DescriptorTTL time.Duration + DescriptorGoogleOnly bool +} + +func OTelComponentDefaults() map[string]interface{} { + defaults := make(map[string]interface{}, len(AllOptions)) + for _, option := range AllOptions { + if option.Default == nil { + continue + } + // Option defaults are shared values and must not be mutated by callers. + defaults[option.OTelKey] = option.Default + } + return defaults +} + +func ParseDuration(name, raw string) (time.Duration, error) { + duration, err := time.ParseDuration(raw) + if err != nil { + return 0, fmt.Errorf("%s: invalid duration %q: %w", name, raw, err) + } + return duration, nil +} + +func ValidateRetryStatuses(codes []int) error { + for _, code := range codes { + if code < http.StatusContinue || code > 599 { + return fmt.Errorf("retry status %d is not a valid HTTP status code", code) + } + } + return nil +} + +func ParseMetricPrefixes(prefixes []string) []string { + return utils.ParseMetricTypePrefixes(prefixes) +} + +func ParseMetricFilters(filters []string) []collectors.MetricFilter { + return collectors.ParseMetricExtraFilters(filters) +} + +func DeduplicateProjectIDs(projectIDs []string) []string { + normalized := slices.Clone(projectIDs) + slices.Sort(normalized) + return slices.Compact(normalized) +} + +func (c RuntimeConfig) MonitoringCollectorOptions() collectors.MonitoringCollectorOptions { + return c.MonitoringCollectorOptionsForPrefixes(ParseMetricPrefixes(c.MetricsPrefixes)) +} + +func (c RuntimeConfig) MonitoringCollectorOptionsForPrefixes(metricPrefixes []string) collectors.MonitoringCollectorOptions { + return collectors.MonitoringCollectorOptions{ + MetricTypePrefixes: metricPrefixes, + ExtraFilters: ParseMetricFilters(c.Filters), + RequestInterval: c.MetricsInterval, + RequestOffset: c.MetricsOffset, + IngestDelay: c.MetricsIngest, + FillMissingLabels: c.FillMissing, + DropDelegatedProjects: c.DropDelegated, + AggregateDeltas: c.AggregateDeltas, + DescriptorCacheTTL: c.DescriptorTTL, + DescriptorCacheOnlyGoogle: c.DescriptorGoogleOnly, + } +} + +func (c RuntimeConfig) CollectorCacheTTL() time.Duration { + if c.AggregateDeltas || c.DescriptorTTL > 0 { + ttl := c.DeltasTTL + if c.DescriptorTTL > ttl { + ttl = c.DescriptorTTL + } + return ttl + } + + return 2 * time.Hour +} + +func DiscoverDefaultProjectID(ctx context.Context) (string, error) { + credentials, err := google.FindDefaultCredentials(ctx, compute.ComputeScope) + if err != nil { + return "", err + } + if credentials.ProjectID == "" { + return "", fmt.Errorf("unable to identify default GCP project") + } + return credentials.ProjectID, nil +} + +func (c RuntimeConfig) CreateMonitoringService(ctx context.Context) (*monitoring.Service, error) { + googleClient, err := google.DefaultClient(ctx, monitoring.MonitoringReadScope) + if err != nil { + return nil, fmt.Errorf("error creating Google client: %w", err) + } + + googleClient.Timeout = c.HTTPTimeout + googleClient.Transport = rehttp.NewTransport( + googleClient.Transport, + rehttp.RetryAll( + rehttp.RetryMaxRetries(c.MaxRetries), + rehttp.RetryStatuses(c.RetryStatuses...), + ), + rehttp.ExpJitterDelay(c.BackoffJitter, c.MaxBackoff), + ) + + service, err := monitoring.NewService(ctx, option.WithHTTPClient(googleClient), option.WithUniverseDomain(c.UniverseDomain)) + if err != nil { + return nil, fmt.Errorf("error creating Google Stackdriver Monitoring service: %w", err) + } + return service, nil +} diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..c8299a3 --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,224 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "reflect" + "testing" + "time" + + "github.com/prometheus-community/stackdriver_exporter/collectors" +) + +func TestParseDuration(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + raw string + want time.Duration + wantErr bool + }{ + { + name: "valid", + raw: "5m", + want: 5 * time.Minute, + }, + { + name: "invalid", + raw: "nope", + wantErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := ParseDuration("metrics_interval", tt.raw) + if (err != nil) != tt.wantErr { + t.Fatalf("ParseDuration() error = %v, wantErr %v", err, tt.wantErr) + } + if err == nil && got != tt.want { + t.Fatalf("ParseDuration() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestValidateRetryStatuses(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + codes []int + wantErr bool + }{ + { + name: "valid", + codes: []int{429, 503}, + }, + { + name: "too low", + codes: []int{99}, + wantErr: true, + }, + { + name: "too high", + codes: []int{600}, + wantErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := ValidateRetryStatuses(tt.codes) + if (err != nil) != tt.wantErr { + t.Fatalf("ValidateRetryStatuses() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestAllOptionsHaveUniqueKeys(t *testing.T) { + t.Parallel() + + cliFlags := make(map[string]struct{}, len(AllOptions)) + otelKeys := make(map[string]struct{}, len(AllOptions)) + + for _, option := range AllOptions { + if option.CLIFlag == "" { + t.Fatal("AllOptions contains an option with an empty CLI flag") + } + if _, ok := cliFlags[option.CLIFlag]; ok { + t.Fatalf("AllOptions contains duplicate CLI flag %q", option.CLIFlag) + } + cliFlags[option.CLIFlag] = struct{}{} + + if option.OTelKey == "" { + t.Fatal("AllOptions contains an option with an empty OTel key") + } + if _, ok := otelKeys[option.OTelKey]; ok { + t.Fatalf("AllOptions contains duplicate OTel key %q", option.OTelKey) + } + otelKeys[option.OTelKey] = struct{}{} + } +} + +func TestNormalizeProjectIDs(t *testing.T) { + t.Parallel() + + input := []string{"project-b", "project-a", "project-b"} + want := []string{"project-a", "project-b"} + + got := DeduplicateProjectIDs(input) + if !reflect.DeepEqual(got, want) { + t.Fatalf("NormalizeProjectIDs() = %#v, want %#v", got, want) + } + if !reflect.DeepEqual(input, []string{"project-b", "project-a", "project-b"}) { + t.Fatalf("NormalizeProjectIDs() mutated input = %#v", input) + } +} + +func TestRuntimeConfigMonitoringCollectorOptions(t *testing.T) { + t.Parallel() + + cfg := RuntimeConfig{ + MetricsPrefixes: []string{"pubsub.googleapis.com/topic/", "compute.googleapis.com/instance"}, + Filters: []string{"pubsub.googleapis.com/topic:resource.labels.topic_id=has_substring(\"prod\")"}, + MetricsInterval: 5 * time.Minute, + MetricsOffset: 30 * time.Second, + MetricsIngest: true, + FillMissing: true, + DropDelegated: true, + AggregateDeltas: true, + DescriptorTTL: 10 * time.Minute, + DescriptorGoogleOnly: true, + } + + want := collectors.MonitoringCollectorOptions{ + MetricTypePrefixes: ParseMetricPrefixes(cfg.MetricsPrefixes), + ExtraFilters: []collectors.MetricFilter{{TargetedMetricPrefix: "pubsub.googleapis.com/topic", FilterQuery: "resource.labels.topic_id=has_substring(\"prod\")"}}, + RequestInterval: 5 * time.Minute, + RequestOffset: 30 * time.Second, + IngestDelay: true, + FillMissingLabels: true, + DropDelegatedProjects: true, + AggregateDeltas: true, + DescriptorCacheTTL: 10 * time.Minute, + DescriptorCacheOnlyGoogle: true, + } + + got := cfg.MonitoringCollectorOptions() + if !reflect.DeepEqual(got, want) { + t.Fatalf("MonitoringCollectorOptions() = %#v, want %#v", got, want) + } +} + +func TestRuntimeConfigCollectorCacheTTL(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cfg RuntimeConfig + want time.Duration + }{ + { + name: "default fallback", + cfg: RuntimeConfig{}, + want: 2 * time.Hour, + }, + { + name: "aggregate deltas uses deltas ttl", + cfg: RuntimeConfig{ + AggregateDeltas: true, + DeltasTTL: 30 * time.Minute, + }, + want: 30 * time.Minute, + }, + { + name: "descriptor ttl wins when larger", + cfg: RuntimeConfig{ + AggregateDeltas: true, + DeltasTTL: 30 * time.Minute, + DescriptorTTL: 45 * time.Minute, + }, + want: 45 * time.Minute, + }, + { + name: "descriptor cache alone enables cache ttl", + cfg: RuntimeConfig{ + DeltasTTL: 30 * time.Minute, + DescriptorTTL: 15 * time.Minute, + }, + want: 30 * time.Minute, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got := tt.cfg.CollectorCacheTTL() + if got != tt.want { + t.Fatalf("CollectorCacheTTL() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/otelcollector/config.go b/otelcollector/config.go index a0374ed..5e14efd 100644 --- a/otelcollector/config.go +++ b/otelcollector/config.go @@ -15,9 +15,10 @@ package otelcollector import ( "fmt" - "net/http" + "slices" "time" + "github.com/prometheus-community/stackdriver_exporter/config" prombridge "github.com/prometheus/opentelemetry-collector-bridge" ) @@ -48,43 +49,26 @@ var _ prombridge.Config = (*Config)(nil) func defaultConfig() *Config { return &Config{ - UniverseDomain: "googleapis.com", - MaxRetries: 0, - HTTPTimeout: "10s", - MaxBackoff: "5s", - BackoffJitter: "1s", - RetryStatuses: []int{503}, - MetricsInterval: "5m", - MetricsOffset: "0s", - MetricsIngest: false, - FillMissing: true, - DropDelegated: false, - AggregateDeltas: false, - DeltasTTL: "30m", - DescriptorTTL: "0s", - DescriptorGoogleOnly: true, + UniverseDomain: config.DefaultUniverseDomain, + MaxRetries: config.DefaultMaxRetries, + HTTPTimeout: config.DefaultHTTPTimeout, + MaxBackoff: config.DefaultMaxBackoff, + BackoffJitter: config.DefaultBackoffJitter, + RetryStatuses: slices.Clone(config.DefaultRetryStatuses), + MetricsInterval: config.DefaultMetricsInterval, + MetricsOffset: config.DefaultMetricsOffset, + MetricsIngest: config.DefaultMetricsIngest, + FillMissing: config.DefaultFillMissing, + DropDelegated: config.DefaultDropDelegated, + AggregateDeltas: config.DefaultAggregateDeltas, + DeltasTTL: config.DefaultDeltasTTL, + DescriptorTTL: config.DefaultDescriptorTTL, + DescriptorGoogleOnly: config.DefaultDescriptorGoogleOnly, } } func defaultComponentDefaults() map[string]interface{} { - cfg := defaultConfig() - return map[string]interface{}{ - "max_retries": cfg.MaxRetries, - "http_timeout": cfg.HTTPTimeout, - "max_backoff": cfg.MaxBackoff, - "backoff_jitter": cfg.BackoffJitter, - "retry_statuses": cfg.RetryStatuses, - "universe_domain": cfg.UniverseDomain, - "metrics_interval": cfg.MetricsInterval, - "metrics_offset": cfg.MetricsOffset, - "metrics_ingest_delay": cfg.MetricsIngest, - "fill_missing_labels": cfg.FillMissing, - "drop_delegated_projects": cfg.DropDelegated, - "aggregate_deltas": cfg.AggregateDeltas, - "aggregate_deltas_ttl": cfg.DeltasTTL, - "descriptor_cache_ttl": cfg.DescriptorTTL, - "descriptor_cache_only_google": cfg.DescriptorGoogleOnly, - } + return config.OTelComponentDefaults() } func (c *Config) Validate() error { @@ -97,10 +81,8 @@ func (c *Config) Validate() error { return err } - for _, code := range c.RetryStatuses { - if code < http.StatusContinue || code > 599 { - return fmt.Errorf("retry status %d is not a valid HTTP status code", code) - } + if err := config.ValidateRetryStatuses(c.RetryStatuses); err != nil { + return err } return nil @@ -117,39 +99,31 @@ type parsedConfig struct { } func (c *Config) parsedDurations() (parsedConfig, error) { - parse := func(name, raw string) (time.Duration, error) { - d, err := time.ParseDuration(raw) - if err != nil { - return 0, fmt.Errorf("%s: invalid duration %q: %w", name, raw, err) - } - return d, nil - } - - httpTimeout, err := parse("http_timeout", c.HTTPTimeout) + httpTimeout, err := config.ParseDuration("http_timeout", c.HTTPTimeout) if err != nil { return parsedConfig{}, err } - maxBackoff, err := parse("max_backoff", c.MaxBackoff) + maxBackoff, err := config.ParseDuration("max_backoff", c.MaxBackoff) if err != nil { return parsedConfig{}, err } - backoffJitter, err := parse("backoff_jitter", c.BackoffJitter) + backoffJitter, err := config.ParseDuration("backoff_jitter", c.BackoffJitter) if err != nil { return parsedConfig{}, err } - metricsInterval, err := parse("metrics_interval", c.MetricsInterval) + metricsInterval, err := config.ParseDuration("metrics_interval", c.MetricsInterval) if err != nil { return parsedConfig{}, err } - metricsOffset, err := parse("metrics_offset", c.MetricsOffset) + metricsOffset, err := config.ParseDuration("metrics_offset", c.MetricsOffset) if err != nil { return parsedConfig{}, err } - deltasTTL, err := parse("aggregate_deltas_ttl", c.DeltasTTL) + deltasTTL, err := config.ParseDuration("aggregate_deltas_ttl", c.DeltasTTL) if err != nil { return parsedConfig{}, err } - descriptorTTL, err := parse("descriptor_cache_ttl", c.DescriptorTTL) + descriptorTTL, err := config.ParseDuration("descriptor_cache_ttl", c.DescriptorTTL) if err != nil { return parsedConfig{}, err } @@ -165,6 +139,35 @@ func (c *Config) parsedDurations() (parsedConfig, error) { }, nil } +func (c *Config) runtimeConfig() (config.RuntimeConfig, error) { + parsed, err := c.parsedDurations() + if err != nil { + return config.RuntimeConfig{}, err + } + + return config.RuntimeConfig{ + ProjectIDs: slices.Clone(c.ProjectIDs), + ProjectsFilter: c.ProjectsFilter, + UniverseDomain: c.UniverseDomain, + MaxRetries: c.MaxRetries, + HTTPTimeout: parsed.HTTPTimeout, + MaxBackoff: parsed.MaxBackoff, + BackoffJitter: parsed.BackoffJitter, + RetryStatuses: slices.Clone(c.RetryStatuses), + MetricsPrefixes: slices.Clone(c.MetricsPrefixes), + MetricsInterval: parsed.MetricsInterval, + MetricsOffset: parsed.MetricsOffset, + MetricsIngest: c.MetricsIngest, + FillMissing: c.FillMissing, + DropDelegated: c.DropDelegated, + Filters: slices.Clone(c.Filters), + AggregateDeltas: c.AggregateDeltas, + DeltasTTL: parsed.DeltasTTL, + DescriptorTTL: parsed.DescriptorTTL, + DescriptorGoogleOnly: c.DescriptorGoogleOnly, + }, nil +} + type configUnmarshaler struct{} func (configUnmarshaler) GetConfigStruct() prombridge.Config { diff --git a/otelcollector/config_test.go b/otelcollector/config_test.go index 403c0ab..f5b5242 100644 --- a/otelcollector/config_test.go +++ b/otelcollector/config_test.go @@ -14,8 +14,11 @@ package otelcollector import ( + "reflect" "testing" "time" + + "github.com/prometheus-community/stackdriver_exporter/config" ) func TestConfig_Validate(t *testing.T) { @@ -164,3 +167,54 @@ func TestConfig_Durations(t *testing.T) { t.Fatalf("DescriptorTTL = %v, want %v", parsed.DescriptorTTL, 0*time.Second) } } + +func TestDefaultComponentDefaultsMatchOptionDefaults(t *testing.T) { + t.Parallel() + + defaults := defaultComponentDefaults() + + for _, option := range config.AllOptions { + value, ok := defaults[option.OTelKey] + if option.Default != nil { + if !ok { + t.Fatalf("defaultComponentDefaults() missing key %q", option.OTelKey) + } + if !reflect.DeepEqual(value, option.Default) { + t.Fatalf("defaultComponentDefaults()[%q] = %#v, want %#v", option.OTelKey, value, option.Default) + } + continue + } + + if ok { + t.Fatalf("defaultComponentDefaults() unexpectedly includes key %q", option.OTelKey) + } + } +} + +func TestConfigMapstructureTagsMatchAllOptions(t *testing.T) { + t.Parallel() + + cfgType := reflect.TypeOf(Config{}) + tags := make(map[string]struct{}, cfgType.NumField()) + for i := 0; i < cfgType.NumField(); i++ { + tag := cfgType.Field(i).Tag.Get("mapstructure") + if tag == "" { + continue + } + tags[tag] = struct{}{} + } + + optionKeys := make(map[string]struct{}, len(config.AllOptions)) + for _, option := range config.AllOptions { + optionKeys[option.OTelKey] = struct{}{} + if _, ok := tags[option.OTelKey]; !ok { + t.Fatalf("Config is missing mapstructure tag %q", option.OTelKey) + } + } + + for tag := range tags { + if _, ok := optionKeys[tag]; !ok { + t.Fatalf("AllOptions is missing config key %q", tag) + } + } +} diff --git a/otelcollector/go.mod b/otelcollector/go.mod index 187e0c6..b810941 100644 --- a/otelcollector/go.mod +++ b/otelcollector/go.mod @@ -3,7 +3,6 @@ module github.com/prometheus-community/stackdriver_exporter/otelcollector go 1.25.0 require ( - github.com/PuerkitoBio/rehttp v1.4.0 github.com/prometheus-community/stackdriver_exporter v0.0.0 github.com/prometheus/client_golang v1.23.2 github.com/prometheus/opentelemetry-collector-bridge v0.0.0-20260317204527-5fc426455618 @@ -11,7 +10,6 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.148.0 go.opentelemetry.io/collector/receiver v1.54.0 go.opentelemetry.io/collector/receiver/receivertest v0.148.0 - golang.org/x/oauth2 v0.34.0 google.golang.org/api v0.251.0 ) @@ -19,6 +17,7 @@ require ( cloud.google.com/go/auth v0.16.5 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect + github.com/PuerkitoBio/rehttp v1.4.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -64,11 +63,12 @@ require ( go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.51.0 // indirect + golang.org/x/oauth2 v0.36.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect - google.golang.org/grpc v1.79.2 // indirect + google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/otelcollector/go.sum b/otelcollector/go.sum index 20b2bfc..4e6d78d 100644 --- a/otelcollector/go.sum +++ b/otelcollector/go.sum @@ -154,8 +154,8 @@ golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVo golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= -golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= -golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/oauth2 v0.36.0 h1:peZ/1z27fi9hUOFCAZaHyrpWG5lwe0RJEEEeH0ThlIs= +golang.org/x/oauth2 v0.36.0/go.mod h1:YDBUJMTkDnJS+A4BP4eZBjCqtokkg1hODuPjwiGPO7Q= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -177,8 +177,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1: google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409/go.mod h1:fl8J1IvUjCilwZzQowmw2b7HQB2eAuYBabMXzWurF+I= google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= -google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= +google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/otelcollector/lifecycle.go b/otelcollector/lifecycle.go index 2fa0020..08beb97 100644 --- a/otelcollector/lifecycle.go +++ b/otelcollector/lifecycle.go @@ -17,19 +17,15 @@ import ( "context" "fmt" "log/slog" - "slices" "time" - "github.com/PuerkitoBio/rehttp" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/config" "github.com/prometheus-community/stackdriver_exporter/delta" "github.com/prometheus-community/stackdriver_exporter/utils" "github.com/prometheus/client_golang/prometheus" prombridge "github.com/prometheus/opentelemetry-collector-bridge" - "golang.org/x/oauth2/google" - "google.golang.org/api/compute/v1" "google.golang.org/api/monitoring/v3" - "google.golang.org/api/option" ) type collectorFactoryFunc func(projectID string, service *monitoring.Service, opts collectors.MonitoringCollectorOptions, deltasTTL time.Duration, logger *slog.Logger) (prometheus.Collector, error) @@ -37,7 +33,7 @@ type collectorFactoryFunc func(projectID string, service *monitoring.Service, op type lifecycleManager struct { logger *slog.Logger - monitoringServiceFactory func(ctx context.Context, parsed parsedConfig, cfg *Config) (*monitoring.Service, error) + monitoringServiceFactory func(ctx context.Context, cfg config.RuntimeConfig) (*monitoring.Service, error) collectorFactory collectorFactoryFunc filterProjectDiscoverer func(ctx context.Context, filter string) ([]string, error) defaultProjectDiscoverer func(ctx context.Context) (string, error) @@ -45,8 +41,10 @@ type lifecycleManager struct { func newLifecycleManager(logger *slog.Logger) *lifecycleManager { return &lifecycleManager{ - logger: logger, - monitoringServiceFactory: createMonitoringService, + logger: logger, + monitoringServiceFactory: func(ctx context.Context, cfg config.RuntimeConfig) (*monitoring.Service, error) { + return cfg.CreateMonitoringService(ctx) + }, collectorFactory: func(projectID string, service *monitoring.Service, opts collectors.MonitoringCollectorOptions, deltasTTL time.Duration, logger *slog.Logger) (prometheus.Collector, error) { return collectors.NewMonitoringCollector( projectID, @@ -58,7 +56,7 @@ func newLifecycleManager(logger *slog.Logger) *lifecycleManager { ) }, filterProjectDiscoverer: utils.GetProjectIDsFromFilter, - defaultProjectDiscoverer: discoverDefaultProjectID, + defaultProjectDiscoverer: config.DiscoverDefaultProjectID, } } @@ -68,7 +66,7 @@ func (m *lifecycleManager) Start(ctx context.Context, exporterConfig prombridge. return nil, fmt.Errorf("invalid exporter config type: %T", exporterConfig) } - parsed, err := cfg.parsedDurations() + runtimeCfg, err := cfg.runtimeConfig() if err != nil { return nil, err } @@ -78,30 +76,15 @@ func (m *lifecycleManager) Start(ctx context.Context, exporterConfig prombridge. return nil, err } - monitoringService, err := m.monitoringServiceFactory(ctx, parsed, cfg) + monitoringService, err := m.monitoringServiceFactory(ctx, runtimeCfg) if err != nil { return nil, err } registry := prometheus.NewRegistry() - metricPrefixes := utils.ParseMetricTypePrefixes(cfg.MetricsPrefixes) - extraFilters := collectors.ParseMetricExtraFilters(cfg.Filters) for _, projectID := range projectIDs { - opts := collectors.MonitoringCollectorOptions{ - MetricTypePrefixes: metricPrefixes, - ExtraFilters: extraFilters, - RequestInterval: parsed.MetricsInterval, - RequestOffset: parsed.MetricsOffset, - IngestDelay: cfg.MetricsIngest, - FillMissingLabels: cfg.FillMissing, - DropDelegatedProjects: cfg.DropDelegated, - AggregateDeltas: cfg.AggregateDeltas, - DescriptorCacheTTL: parsed.DescriptorTTL, - DescriptorCacheOnlyGoogle: cfg.DescriptorGoogleOnly, - } - - collector, err := m.collectorFactory(projectID, monitoringService, opts, parsed.DeltasTTL, m.logger) + collector, err := m.collectorFactory(projectID, monitoringService, runtimeCfg.MonitoringCollectorOptions(), runtimeCfg.DeltasTTL, m.logger) if err != nil { return nil, fmt.Errorf("failed to create collector for project %q: %w", projectID, err) } @@ -138,40 +121,5 @@ func (m *lifecycleManager) resolveProjectIDs(ctx context.Context, cfg *Config) ( projectIDs = append(projectIDs, projectID) } - slices.Sort(projectIDs) - return slices.Compact(projectIDs), nil -} - -func discoverDefaultProjectID(ctx context.Context) (string, error) { - credentials, err := google.FindDefaultCredentials(ctx, compute.ComputeScope) - if err != nil { - return "", err - } - if credentials.ProjectID == "" { - return "", fmt.Errorf("unable to identify default GCP project") - } - return credentials.ProjectID, nil -} - -func createMonitoringService(ctx context.Context, parsed parsedConfig, cfg *Config) (*monitoring.Service, error) { - googleClient, err := google.DefaultClient(ctx, monitoring.MonitoringReadScope) - if err != nil { - return nil, fmt.Errorf("error creating Google client: %w", err) - } - - googleClient.Timeout = parsed.HTTPTimeout - googleClient.Transport = rehttp.NewTransport( - googleClient.Transport, - rehttp.RetryAll( - rehttp.RetryMaxRetries(cfg.MaxRetries), - rehttp.RetryStatuses(cfg.RetryStatuses...), - ), - rehttp.ExpJitterDelay(parsed.BackoffJitter, parsed.MaxBackoff), - ) - - service, err := monitoring.NewService(ctx, option.WithHTTPClient(googleClient), option.WithUniverseDomain(cfg.UniverseDomain)) - if err != nil { - return nil, fmt.Errorf("error creating Google Stackdriver Monitoring service: %w", err) - } - return service, nil + return config.DeduplicateProjectIDs(projectIDs), nil } diff --git a/otelcollector/lifecycle_test.go b/otelcollector/lifecycle_test.go index 89729d6..43a3568 100644 --- a/otelcollector/lifecycle_test.go +++ b/otelcollector/lifecycle_test.go @@ -17,11 +17,13 @@ import ( "context" "errors" "log/slog" + "reflect" "strings" "testing" "time" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/config" "github.com/prometheus/client_golang/prometheus" "google.golang.org/api/monitoring/v3" ) @@ -42,12 +44,16 @@ func TestLifecycleManager_Start(t *testing.T) { } var createdProjects []string + var gotOpts collectors.MonitoringCollectorOptions + var gotDeltasTTL time.Duration mgr := newLifecycleManager(slog.Default()) - mgr.monitoringServiceFactory = func(context.Context, parsedConfig, *Config) (*monitoring.Service, error) { + mgr.monitoringServiceFactory = func(context.Context, config.RuntimeConfig) (*monitoring.Service, error) { return &monitoring.Service{}, nil } - mgr.collectorFactory = func(projectID string, _ *monitoring.Service, _ collectors.MonitoringCollectorOptions, _ time.Duration, _ *slog.Logger) (prometheus.Collector, error) { + mgr.collectorFactory = func(projectID string, _ *monitoring.Service, opts collectors.MonitoringCollectorOptions, deltasTTL time.Duration, _ *slog.Logger) (prometheus.Collector, error) { createdProjects = append(createdProjects, projectID) + gotOpts = opts + gotDeltasTTL = deltasTTL return prometheus.NewGauge(prometheus.GaugeOpts{ Name: "test_metric_" + strings.ReplaceAll(projectID, "-", "_"), Help: "test", @@ -64,6 +70,24 @@ func TestLifecycleManager_Start(t *testing.T) { if len(createdProjects) != 2 { t.Fatalf("collectorFactory called %d times, want 2", len(createdProjects)) } + wantOpts := collectors.MonitoringCollectorOptions{ + MetricTypePrefixes: config.ParseMetricPrefixes(cfg.MetricsPrefixes), + ExtraFilters: config.ParseMetricFilters(cfg.Filters), + RequestInterval: 5 * time.Minute, + RequestOffset: 0, + IngestDelay: false, + FillMissingLabels: false, + DropDelegatedProjects: false, + AggregateDeltas: false, + DescriptorCacheTTL: 0, + DescriptorCacheOnlyGoogle: false, + } + if !reflect.DeepEqual(gotOpts, wantOpts) { + t.Fatalf("collector options = %#v, want %#v", gotOpts, wantOpts) + } + if gotDeltasTTL != 30*time.Minute { + t.Fatalf("deltas TTL = %v, want %v", gotDeltasTTL, 30*time.Minute) + } // Ensure the created registry can gather metrics. if _, err := reg.Gather(); err != nil { @@ -89,7 +113,7 @@ func TestLifecycleManager_Start_UsesDefaultProjectDiscovery(t *testing.T) { mgr.defaultProjectDiscoverer = func(context.Context) (string, error) { return "auto-project", nil } - mgr.monitoringServiceFactory = func(context.Context, parsedConfig, *Config) (*monitoring.Service, error) { + mgr.monitoringServiceFactory = func(context.Context, config.RuntimeConfig) (*monitoring.Service, error) { return &monitoring.Service{}, nil } mgr.collectorFactory = func(projectID string, _ *monitoring.Service, _ collectors.MonitoringCollectorOptions, _ time.Duration, _ *slog.Logger) (prometheus.Collector, error) { @@ -120,7 +144,7 @@ func TestLifecycleManager_Start_ReturnsErrorFromCollectorFactory(t *testing.T) { } mgr := newLifecycleManager(slog.Default()) - mgr.monitoringServiceFactory = func(context.Context, parsedConfig, *Config) (*monitoring.Service, error) { + mgr.monitoringServiceFactory = func(context.Context, config.RuntimeConfig) (*monitoring.Service, error) { return &monitoring.Service{}, nil } mgr.collectorFactory = func(string, *monitoring.Service, collectors.MonitoringCollectorOptions, time.Duration, *slog.Logger) (prometheus.Collector, error) { diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index 516f5b0..e20242b 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -20,10 +20,9 @@ import ( "net/http" "os" "slices" + "strconv" "strings" - "time" - "github.com/PuerkitoBio/rehttp" "github.com/alecthomas/kingpin/v2" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" @@ -33,12 +32,10 @@ import ( "github.com/prometheus/common/version" "github.com/prometheus/exporter-toolkit/web" webflag "github.com/prometheus/exporter-toolkit/web/kingpinflag" - "golang.org/x/oauth2/google" - "google.golang.org/api/compute/v1" "google.golang.org/api/monitoring/v3" - "google.golang.org/api/option" "github.com/prometheus-community/stackdriver_exporter/collectors" + "github.com/prometheus-community/stackdriver_exporter/config" "github.com/prometheus-community/stackdriver_exporter/delta" "github.com/prometheus-community/stackdriver_exporter/utils" ) @@ -61,36 +58,36 @@ var ( ).String() projectIDs = kingpin.Flag( - "google.project-ids", "Repeatable flag of Google Project IDs", + config.ProjectIDs.CLIFlag, "Repeatable flag of Google Project IDs", ).Strings() projectsFilter = kingpin.Flag( - "google.projects.filter", "Google projects search filter.", + config.ProjectsFilter.CLIFlag, "Google projects search filter.", ).String() googleUniverseDomain = kingpin.Flag( - "google.universe-domain", "The Cloud universe to use.", - ).Default("googleapis.com").String() + config.UniverseDomain.CLIFlag, "The Cloud universe to use.", + ).Default(config.DefaultUniverseDomain).String() stackdriverMaxRetries = kingpin.Flag( - "stackdriver.max-retries", "Max number of retries that should be attempted on 503 errors from stackdriver.", - ).Default("0").Int() + config.MaxRetries.CLIFlag, "Max number of retries that should be attempted on 503 errors from stackdriver.", + ).Default(strconv.Itoa(config.DefaultMaxRetries)).Int() stackdriverHttpTimeout = kingpin.Flag( - "stackdriver.http-timeout", "How long should stackdriver_exporter wait for a result from the Stackdriver API.", - ).Default("10s").Duration() + config.HTTPTimeout.CLIFlag, "How long should stackdriver_exporter wait for a result from the Stackdriver API.", + ).Default(config.DefaultHTTPTimeout).Duration() stackdriverMaxBackoffDuration = kingpin.Flag( - "stackdriver.max-backoff", "Max time between each request in an exp backoff scenario.", - ).Default("5s").Duration() + config.MaxBackoff.CLIFlag, "Max time between each request in an exp backoff scenario.", + ).Default(config.DefaultMaxBackoff).Duration() stackdriverBackoffJitterBase = kingpin.Flag( - "stackdriver.backoff-jitter", "The amount of jitter to introduce in a exp backoff scenario.", - ).Default("1s").Duration() + config.BackoffJitter.CLIFlag, "The amount of jitter to introduce in a exp backoff scenario.", + ).Default(config.DefaultBackoffJitter).Duration() stackdriverRetryStatuses = kingpin.Flag( - "stackdriver.retry-statuses", "The HTTP statuses that should trigger a retry.", - ).Default("503").Ints() + config.RetryStatuses.CLIFlag, "The HTTP statuses that should trigger a retry.", + ).Default(defaultRetryStatuses()...).Ints() // Monitoring collector flags monitoringMetricsTypePrefixes = kingpin.Flag( @@ -98,99 +95,64 @@ var ( ).String() monitoringMetricsPrefixes = kingpin.Flag( - "monitoring.metrics-prefixes", "Google Stackdriver Monitoring Metric Type prefixes. Repeat this flag to scrape multiple prefixes.", + config.MetricsPrefixes.CLIFlag, "Google Stackdriver Monitoring Metric Type prefixes. Repeat this flag to scrape multiple prefixes.", ).Strings() monitoringMetricsInterval = kingpin.Flag( - "monitoring.metrics-interval", "Interval to request the Google Stackdriver Monitoring Metrics for. Only the most recent data point is used.", - ).Default("5m").Duration() + config.MetricsInterval.CLIFlag, "Interval to request the Google Stackdriver Monitoring Metrics for. Only the most recent data point is used.", + ).Default(config.DefaultMetricsInterval).Duration() monitoringMetricsOffset = kingpin.Flag( - "monitoring.metrics-offset", "Offset for the Google Stackdriver Monitoring Metrics interval into the past.", - ).Default("0s").Duration() + config.MetricsOffset.CLIFlag, "Offset for the Google Stackdriver Monitoring Metrics interval into the past.", + ).Default(config.DefaultMetricsOffset).Duration() monitoringMetricsIngestDelay = kingpin.Flag( - "monitoring.metrics-ingest-delay", "Offset for the Google Stackdriver Monitoring Metrics interval into the past by the ingest delay from the metric's metadata.", - ).Default("false").Bool() + config.MetricsIngest.CLIFlag, "Offset for the Google Stackdriver Monitoring Metrics interval into the past by the ingest delay from the metric's metadata.", + ).Default(strconv.FormatBool(config.DefaultMetricsIngest)).Bool() collectorFillMissingLabels = kingpin.Flag( - "collector.fill-missing-labels", "Fill missing metrics labels with empty string to avoid label dimensions inconsistent failure.", - ).Default("true").Bool() + config.FillMissing.CLIFlag, "Fill missing metrics labels with empty string to avoid label dimensions inconsistent failure.", + ).Default(strconv.FormatBool(config.DefaultFillMissing)).Bool() monitoringDropDelegatedProjects = kingpin.Flag( - "monitoring.drop-delegated-projects", "Drop metrics from attached projects and fetch `project_id` only.", - ).Default("false").Bool() + config.DropDelegated.CLIFlag, "Drop metrics from attached projects and fetch `project_id` only.", + ).Default(strconv.FormatBool(config.DefaultDropDelegated)).Bool() monitoringMetricsExtraFilter = kingpin.Flag( - "monitoring.filters", + config.Filters.CLIFlag, "Filters. i.e: pubsub.googleapis.com/subscription:resource.labels.subscription_id=monitoring.regex.full_match(\"my-subs-prefix.*\")", ).Strings() monitoringMetricsAggregateDeltas = kingpin.Flag( - "monitoring.aggregate-deltas", "If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge", - ).Default("false").Bool() + config.AggregateDeltas.CLIFlag, "If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge", + ).Default(strconv.FormatBool(config.DefaultAggregateDeltas)).Bool() monitoringMetricsDeltasTTL = kingpin.Flag( - "monitoring.aggregate-deltas-ttl", "How long should a delta metric continue to be exported after GCP stops producing a metric", - ).Default("30m").Duration() + config.DeltasTTL.CLIFlag, "How long should a delta metric continue to be exported after GCP stops producing a metric", + ).Default(config.DefaultDeltasTTL).Duration() monitoringDescriptorCacheTTL = kingpin.Flag( - "monitoring.descriptor-cache-ttl", "How long should the metric descriptors for a prefixed be cached for", - ).Default("0s").Duration() + config.DescriptorTTL.CLIFlag, "How long should the metric descriptors for a prefixed be cached for", + ).Default(config.DefaultDescriptorTTL).Duration() monitoringDescriptorCacheOnlyGoogle = kingpin.Flag( - "monitoring.descriptor-cache-only-google", "Only cache descriptors for *.googleapis.com metrics", - ).Default("true").Bool() + config.DescriptorGoogleOnly.CLIFlag, "Only cache descriptors for *.googleapis.com metrics", + ).Default(strconv.FormatBool(config.DefaultDescriptorGoogleOnly)).Bool() ) func init() { prometheus.MustRegister(versioncollector.NewCollector("stackdriver_exporter")) } -func getDefaultGCPProject(ctx context.Context) (*string, error) { - credentials, err := google.FindDefaultCredentials(ctx, compute.ComputeScope) - if err != nil { - return nil, err - } - if credentials.ProjectID == "" { - return nil, fmt.Errorf("unable to identify the gcloud project. Got empty string") - } - return &credentials.ProjectID, nil -} - -func createMonitoringService(ctx context.Context) (*monitoring.Service, error) { - googleClient, err := google.DefaultClient(ctx, monitoring.MonitoringReadScope) - if err != nil { - return nil, fmt.Errorf("error creating Google client: %v", err) - } - - googleClient.Timeout = *stackdriverHttpTimeout - googleClient.Transport = rehttp.NewTransport( - googleClient.Transport, // need to wrap DefaultClient transport - rehttp.RetryAll( - rehttp.RetryMaxRetries(*stackdriverMaxRetries), - rehttp.RetryStatuses(*stackdriverRetryStatuses...)), // Cloud support suggests retrying on 503 errors - rehttp.ExpJitterDelay(*stackdriverBackoffJitterBase, *stackdriverMaxBackoffDuration), // Set timeout to <10s as that is prom default timeout - ) - - monitoringService, err := monitoring.NewService(ctx, option.WithHTTPClient(googleClient), option.WithUniverseDomain(*googleUniverseDomain)) - if err != nil { - return nil, fmt.Errorf("error creating Google Stackdriver Monitoring service: %v", err) - } - - return monitoringService, nil -} - type handler struct { handler http.Handler logger *slog.Logger - projectIDs []string - metricsPrefixes []string - metricsExtraFilters []collectors.MetricFilter - additionalGatherer prometheus.Gatherer - m *monitoring.Service - collectors *collectors.CollectorCache + projectIDs []string + cfg config.RuntimeConfig + additionalGatherer prometheus.Gatherer + m *monitoring.Service + collectors *collectors.CollectorCache } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -208,28 +170,16 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.handler.ServeHTTP(w, r) } -func newHandler(projectIDs []string, metricPrefixes []string, metricExtraFilters []collectors.MetricFilter, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler { - var ttl time.Duration - // Add collector caching TTL as max of deltas aggregation or descriptor caching - if *monitoringMetricsAggregateDeltas || *monitoringDescriptorCacheTTL > 0 { - ttl = *monitoringMetricsDeltasTTL - if *monitoringDescriptorCacheTTL > ttl { - ttl = *monitoringDescriptorCacheTTL - } - } else { - ttl = 2 * time.Hour - } - - logger.Info("Creating collector cache", "ttl", ttl) +func newHandler(projectIDs []string, cfg config.RuntimeConfig, m *monitoring.Service, logger *slog.Logger, additionalGatherer prometheus.Gatherer) *handler { + logger.Info("Creating collector cache", "ttl", cfg.CollectorCacheTTL()) h := &handler{ - logger: logger, - projectIDs: projectIDs, - metricsPrefixes: metricPrefixes, - metricsExtraFilters: metricExtraFilters, - additionalGatherer: additionalGatherer, - m: m, - collectors: collectors.NewCollectorCache(ttl), + logger: logger, + projectIDs: projectIDs, + cfg: cfg, + additionalGatherer: additionalGatherer, + m: m, + collectors: collectors.NewCollectorCache(cfg.CollectorCacheTTL()), } h.handler = h.innerHandler(nil) @@ -244,18 +194,14 @@ func (h *handler) getCollector(project string, filters map[string]bool) (*collec return collector, nil } - collector, err := collectors.NewMonitoringCollector(project, h.m, collectors.MonitoringCollectorOptions{ - MetricTypePrefixes: filterdPrefixes, - ExtraFilters: h.metricsExtraFilters, - RequestInterval: *monitoringMetricsInterval, - RequestOffset: *monitoringMetricsOffset, - IngestDelay: *monitoringMetricsIngestDelay, - FillMissingLabels: *collectorFillMissingLabels, - DropDelegatedProjects: *monitoringDropDelegatedProjects, - AggregateDeltas: *monitoringMetricsAggregateDeltas, - DescriptorCacheTTL: *monitoringDescriptorCacheTTL, - DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, - }, h.logger, delta.NewInMemoryCounterStore(h.logger, *monitoringMetricsDeltasTTL), delta.NewInMemoryHistogramStore(h.logger, *monitoringMetricsDeltasTTL)) + collector, err := collectors.NewMonitoringCollector( + project, + h.m, + h.cfg.MonitoringCollectorOptionsForPrefixes(filterdPrefixes), + h.logger, + delta.NewInMemoryCounterStore(h.logger, h.cfg.DeltasTTL), + delta.NewInMemoryHistogramStore(h.logger, h.cfg.DeltasTTL), + ) if err != nil { return nil, err } @@ -289,10 +235,10 @@ func (h *handler) innerHandler(filters map[string]bool) http.Handler { // filterMetricTypePrefixes filters the initial list of metric type prefixes, with the ones coming from an individual // prometheus collect request. func (h *handler) filterMetricTypePrefixes(filters map[string]bool) []string { - filteredPrefixes := h.metricsPrefixes + filteredPrefixes := h.cfg.MetricsPrefixes if len(filters) > 0 { filteredPrefixes = nil - for _, prefix := range h.metricsPrefixes { + for _, prefix := range h.cfg.MetricsPrefixes { for filter := range filters { if strings.HasPrefix(filter, prefix) { filteredPrefixes = append(filteredPrefixes, filter) @@ -300,7 +246,7 @@ func (h *handler) filterMetricTypePrefixes(filters map[string]bool) []string { } } } - return utils.ParseMetricTypePrefixes(filteredPrefixes) + return config.ParseMetricPrefixes(filteredPrefixes) } func main() { @@ -324,27 +270,27 @@ func main() { } ctx := context.Background() + runtimeCfg := collectorRuntimeConfigFromFlags() var discoveredProjectIDs []string - if len(*projectIDs) == 0 && *projectID == "" && *projectsFilter == "" { + if len(runtimeCfg.ProjectIDs) == 0 && *projectID == "" && runtimeCfg.ProjectsFilter == "" { logger.Info("Neither projectIDs nor projectsFilter was provided. Trying to discover it") - var err error - defaultProject, err := getDefaultGCPProject(ctx) + defaultProject, err := config.DiscoverDefaultProjectID(ctx) if err != nil { logger.Error("no explicit projectIDs and error trying to discover default GCloud project", "err", err) os.Exit(1) } - discoveredProjectIDs = append(discoveredProjectIDs, *defaultProject) + discoveredProjectIDs = append(discoveredProjectIDs, defaultProject) } - monitoringService, err := createMonitoringService(ctx) + monitoringService, err := runtimeCfg.CreateMonitoringService(ctx) if err != nil { logger.Error("failed to create monitoring service", "err", err) os.Exit(1) } - if *projectsFilter != "" { - projectIDsFromFilter, err := utils.GetProjectIDsFromFilter(ctx, *projectsFilter) + if runtimeCfg.ProjectsFilter != "" { + projectIDsFromFilter, err := utils.GetProjectIDsFromFilter(ctx, runtimeCfg.ProjectsFilter) if err != nil { logger.Error("failed to get project IDs from filter", "err", err) os.Exit(1) @@ -352,45 +298,35 @@ func main() { discoveredProjectIDs = append(discoveredProjectIDs, projectIDsFromFilter...) } - if len(*projectIDs) > 0 { - discoveredProjectIDs = append(discoveredProjectIDs, *projectIDs...) + if len(runtimeCfg.ProjectIDs) > 0 { + discoveredProjectIDs = append(discoveredProjectIDs, runtimeCfg.ProjectIDs...) } if *projectID != "" { discoveredProjectIDs = append(discoveredProjectIDs, strings.Split(*projectID, ",")...) } - var metricsPrefixes []string - if len(*monitoringMetricsPrefixes) > 0 { - metricsPrefixes = append(metricsPrefixes, *monitoringMetricsPrefixes...) - } if *monitoringMetricsTypePrefixes != "" { - metricsPrefixes = append(metricsPrefixes, strings.Split(*monitoringMetricsTypePrefixes, ",")...) + runtimeCfg.MetricsPrefixes = append(runtimeCfg.MetricsPrefixes, strings.Split(*monitoringMetricsTypePrefixes, ",")...) } logger.Info( "Starting stackdriver_exporter", "version", version.Info(), "build_context", version.BuildContext(), - "metric_prefixes", fmt.Sprintf("%v", metricsPrefixes), - "extra_filters", strings.Join(*monitoringMetricsExtraFilter, ","), + "metric_prefixes", fmt.Sprintf("%v", runtimeCfg.MetricsPrefixes), + "extra_filters", strings.Join(runtimeCfg.Filters, ","), "projectIDs", fmt.Sprintf("%v", discoveredProjectIDs), - "projectsFilter", *projectsFilter, + "projectsFilter", runtimeCfg.ProjectsFilter, ) - parsedMetricsPrefixes := utils.ParseMetricTypePrefixes(metricsPrefixes) - metricExtraFilters := collectors.ParseMetricExtraFilters(*monitoringMetricsExtraFilter) - // drop duplicate projects - slices.Sort(discoveredProjectIDs) - uniqueProjectIds := slices.Compact(discoveredProjectIDs) + uniqueProjectIds := config.DeduplicateProjectIDs(discoveredProjectIDs) if *metricsPath == *stackdriverMetricsPath { - handler := newHandler( - uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, monitoringService, logger, prometheus.DefaultGatherer) + handler := newHandler(uniqueProjectIds, runtimeCfg, monitoringService, logger, prometheus.DefaultGatherer) http.Handle(*metricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) } else { logger.Info("Serving Stackdriver metrics at separate path", "path", *stackdriverMetricsPath) - handler := newHandler( - uniqueProjectIds, parsedMetricsPrefixes, metricExtraFilters, monitoringService, logger, nil) + handler := newHandler(uniqueProjectIds, runtimeCfg, monitoringService, logger, nil) http.Handle(*stackdriverMetricsPath, promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, handler)) http.Handle(*metricsPath, promhttp.Handler()) } @@ -429,3 +365,35 @@ func main() { os.Exit(1) } } + +func collectorRuntimeConfigFromFlags() config.RuntimeConfig { + return config.RuntimeConfig{ + ProjectIDs: slices.Clone(*projectIDs), + ProjectsFilter: *projectsFilter, + UniverseDomain: *googleUniverseDomain, + MaxRetries: *stackdriverMaxRetries, + HTTPTimeout: *stackdriverHttpTimeout, + MaxBackoff: *stackdriverMaxBackoffDuration, + BackoffJitter: *stackdriverBackoffJitterBase, + RetryStatuses: slices.Clone(*stackdriverRetryStatuses), + MetricsPrefixes: slices.Clone(*monitoringMetricsPrefixes), + MetricsInterval: *monitoringMetricsInterval, + MetricsOffset: *monitoringMetricsOffset, + MetricsIngest: *monitoringMetricsIngestDelay, + FillMissing: *collectorFillMissingLabels, + DropDelegated: *monitoringDropDelegatedProjects, + Filters: slices.Clone(*monitoringMetricsExtraFilter), + AggregateDeltas: *monitoringMetricsAggregateDeltas, + DeltasTTL: *monitoringMetricsDeltasTTL, + DescriptorTTL: *monitoringDescriptorCacheTTL, + DescriptorGoogleOnly: *monitoringDescriptorCacheOnlyGoogle, + } +} + +func defaultRetryStatuses() []string { + defaults := make([]string, 0, len(config.DefaultRetryStatuses)) + for _, status := range config.DefaultRetryStatuses { + defaults = append(defaults, strconv.Itoa(status)) + } + return defaults +} diff --git a/stackdriver_exporter_test.go b/stackdriver_exporter_test.go index cabf195..2d9c0ba 100644 --- a/stackdriver_exporter_test.go +++ b/stackdriver_exporter_test.go @@ -16,6 +16,8 @@ package main import ( "reflect" "testing" + + "github.com/prometheus-community/stackdriver_exporter/config" ) func TestFilterMetricTypePrefixes(t *testing.T) { @@ -24,7 +26,9 @@ func TestFilterMetricTypePrefixes(t *testing.T) { } h := &handler{ - metricsPrefixes: metricPrefixes, + cfg: config.RuntimeConfig{ + MetricsPrefixes: metricPrefixes, + }, } inputFilters := map[string]bool{