From ba24a0c5fa99be1307511257487ae4806b01d1d2 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Sun, 26 Apr 2026 19:28:57 +0200 Subject: [PATCH 01/27] add spec for prometheus sources --- ...architecture-prometheus-exporter-source.md | 405 ++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 spec/architecture-prometheus-exporter-source.md diff --git a/spec/architecture-prometheus-exporter-source.md b/spec/architecture-prometheus-exporter-source.md new file mode 100644 index 0000000000..7779d74a4c --- /dev/null +++ b/spec/architecture-prometheus-exporter-source.md @@ -0,0 +1,405 @@ +--- +title: Prometheus Exporter as a pgwatch Source +version: 1.0 +date_created: 2026-04-26 +tags: [architecture, design, prometheus, sources, metrics, sinks] +--- + +# Introduction + +This specification defines how pgwatch fetches metrics from external Prometheus exporters +(e.g., `node_exporter`, `postgres_exporter`, custom exporters) and routes them through the +standard pgwatch pipeline. It covers the new source kind, the metric definition extension +that maps Prometheus metric families to pgwatch logical metrics, label-to-column conventions, +and a lightweight proxy path that avoids double-conversion when the configured sink is also +Prometheus. + +--- + +## 1. Purpose & Scope + +**Purpose**: Allow pgwatch to scrape arbitrary Prometheus exporters, apply metric grouping and +filtering, and write measurements to any configured sink — including acting as a low-overhead +proxy when the sink is a Prometheus endpoint. + +**In scope**: +- New source `Kind` value `prometheus`. +- Extension of `metrics.Metric` for Prometheus selector lists. +- HTTP scrape lifecycle inside `SourceConn` and the reaper. +- Measurement row structure for Prometheus samples. +- Prometheus-to-Prometheus (Prom→Prom) proxy optimisation in `PrometheusWriter`. +- Authentication and TLS configuration. + +**Out of scope**: +- Push-gateway integration. +- Prometheus alerting rules. +- Remote-write protocol (only the `/metrics` exposition format is targeted). +- Grafana dashboard changes. + +**Audience**: pgwatch maintainers implementing the feature. The document is also intended to be +consumed directly by AI coding assistants. + +--- + +## 2. Definitions + +| Term | Definition | +|---|---| +| **Prometheus exporter** | An HTTP service that exposes metrics in Prometheus exposition format (text or protobuf) at a `/metrics` endpoint. | +| **Metric family** | A single named group of Prometheus samples sharing the same `__name__` label (e.g., `pg_stat_activity_count`). | +| **Sample** | A single (label-set, float64 value, optional timestamp) triple within a metric family. | +| **pgwatch metric** | A named, schema-fixed set of measurements stored as one table/series in the sink. | +| **pgwatch source** | A configured entity from which measurements are collected. | +| **scrape** | An HTTP GET request to a Prometheus exporter's `/metrics` endpoint. | +| **selector list** | The ordered list of Prometheus metric family names that are grouped into one pgwatch metric. | +| **MeasurementEnvelope** | The internal pgwatch structure `{DBName, MetricName, CustomTags, Data Measurements}` passed from reaper to sinks. | +| **Prom→Prom path** | The code path activated when `source.Kind == "prometheus"` and the active sink is `PrometheusWriter`. | +| **tag column** | A pgwatch measurement column whose name starts with `tag_`. Treated as an indexed dimension. | +| **value column** | A numeric measurement column not prefixed `tag_`. | +| **epoch_ns** | Mandatory measurement column: Unix timestamp in nanoseconds (int64). | +| **ConnStr** | The `Source.ConnStr` field, repurposed as the scrape URL for `prometheus` sources. | + +--- + +## 3. Requirements, Constraints & Guidelines + +### Source Definition + +- **REQ-001**: `sources.Kind` MUST include a new constant `SourcePrometheus = "prometheus"`. +- **REQ-002**: A `prometheus` source MUST use `Source.ConnStr` as the HTTP(S) scrape URL (e.g., `http://host:9187/metrics`). No PostgreSQL DSN is required or expected. +- **REQ-003**: `Source.Metrics` (type `metrics.MetricIntervals`) MUST continue to map pgwatch metric name → scrape interval in seconds, identical to existing source kinds. +- **REQ-004**: `Source.PresetMetrics` MUST be supported for `prometheus` sources (preset resolves to a `MetricIntervals` map as usual). +- **CON-001**: Fields `Source.IncludePattern`, `Source.ExcludePattern`, `Source.OnlyIfMaster`, `Source.MetricsStandby`, and `Source.PresetMetricsStandby` are irrelevant for `prometheus` sources and MUST be ignored without error. +- **GUD-001**: `Source.Name` SHOULD be set to a human-readable identifier for the exporter (e.g., `node-exporter-prod-db01`). It maps to `MeasurementEnvelope.DBName`. +- **REQ-005**: `sources.Kinds` slice MUST include `SourcePrometheus` so that `Kind.IsValid()` returns `true`. + +### Authentication and TLS + +- **REQ-006**: The scrape URL in `ConnStr` MUST support standard URL userinfo for Basic Auth: `http://user:password@host:9187/metrics`. +- **REQ-007**: TLS options MUST be configurable via URL query parameters appended to `ConnStr`, following the same convention as the PostgreSQL DSN (`sslmode`, `sslrootcert`, etc.): + + | Query parameter | Description | + |---|---| + | `tlsrootcert=` | Path to a PEM-encoded CA certificate file used to verify the server certificate. Omit to use system roots. | + | `tlsskipverify=true` | When `true`, TLS certificate verification is disabled. Default: absent / `false`. | + + Example: `https://host:9187/metrics?tlsrootcert=/etc/ssl/ca.pem` + + The `ConnStr` URL MUST be stripped of these query parameters before being used as the actual HTTP request URL, so they are never forwarded to the exporter. + +- **SEC-001**: Passwords stored in `ConnStr` MUST NOT be logged. The logger MUST redact the userinfo component before printing any URL. +- **SEC-002**: `tlsskipverify=true` in the scrape URL MUST produce a warning-level log entry each time the HTTP client is constructed for that source. +- **CON-002**: Custom headers (e.g., `Authorization: Bearer`) are out of scope for v1.0. + +### Metric Definition Extension + +- **REQ-008**: `metrics.Metric` struct MUST have a new field `PromSelectors []string` (YAML tag `prom_selectors`). Example: + + ```yaml + metrics: + pg_connections: + description: "Connection state summary from postgres_exporter" + prom_selectors: + - pg_stat_activity_count + - pg_stat_activity_max_tx_duration + storage_name: pg_connections + ``` + +- **REQ-009**: `PromSelectors` MUST be ignored for non-`prometheus` source kinds. A metric that has both `SQLs` and `PromSelectors` is valid; `SQLs` is used for postgres-family sources and `PromSelectors` for `prometheus` sources. +- **REQ-010**: If a metric definition referenced by a `prometheus` source has neither `PromSelectors` nor `SQLs`, the reaper MUST log a warning and skip that metric for the affected source. +- **GUD-002**: Each entry in `PromSelectors` MUST be an exact Prometheus metric family name (no glob, no regex). Regex support may be added in a future version. +- **REQ-011**: Metric families present in the scrape response but NOT listed in any `PromSelectors` of the active metric set MUST be silently discarded. + +### Scrape Lifecycle + +- **REQ-012**: `SourceConn` MUST carry an `HTTPClient *http.Client` field alongside the existing `Conn db.PgxPoolIface`. For `prometheus` sources, `Conn` MUST remain `nil`; `HTTPClient` MUST be non-nil after `Connect()`. +- **REQ-013**: `SourceConn.Connect()` MUST parse TLS query parameters (`tlsrootcert`, `tlsskipverify`) from `ConnStr`, construct an `*http.Client` with the derived TLS config, and validate reachability by issuing a HEAD or GET to the scrape URL (with TLS parameters stripped). If the exporter is unreachable, `Connect()` MUST return an error. +- **REQ-014**: `SourceConn.Ping()` for a `prometheus` source MUST issue a HEAD request to the scrape URL and return an error if the status code is not 2xx. +- **REQ-015**: `SourceConn.IsPostgresSource()` MUST return `false` for `Kind == "prometheus"`. +- **REQ-016**: `SourceConn.FetchRuntimeInfo()` for `prometheus` sources MUST be a no-op that sets `RuntimeInfo.VersionStr = "prometheus"` and `RuntimeInfo.Version = 0` without error. +- **REQ-017**: The reaper MUST dispatch `reapMetricMeasurements` for `prometheus` sources using the same goroutine/interval mechanism used for postgres sources. +- **REQ-018**: Inside `reapMetricMeasurements`, when `source.Kind == "prometheus"`, the reaper MUST call a new function `ScrapeMeasurements(ctx, sourceConn, metricDef)` instead of `QueryMeasurements`. + +### `ScrapeMeasurements` Behaviour + +- **REQ-019**: `ScrapeMeasurements` MUST issue an HTTP GET to `source.ConnStr`, parse the Prometheus text exposition format, and return `metrics.Measurements`. +- **REQ-020**: The function signature MUST be: + ```go + func ScrapeMeasurements(ctx context.Context, md *sources.SourceConn, m metrics.Metric) (metrics.Measurements, error) + ``` +- **REQ-021**: The parser MUST use `github.com/prometheus/common/expfmt` (already an indirect dependency via the Prometheus client library). No new major dependencies are permitted. +- **REQ-022**: Only metric families whose `Name` appears in `m.PromSelectors` MUST be retained. All others MUST be discarded before constructing measurement rows. +- **REQ-023**: Samples from all selected metric families MUST be joined by their label set into a single measurement row per unique label combination. This mirrors how a SQL SELECT returns multiple named columns in one row: + - Each Prometheus label becomes a `tag_` column (string). The `__name__` label MUST be omitted. + - Each selected metric family name becomes a **value column** whose name is the metric family name and whose value is the sample's float64. + - `epoch_ns` (int64, Unix nanoseconds) is taken from the sample timestamp when present; otherwise `time.Now().UnixNano()`. + + Example: given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and two samples both labelled `{datname="mydb", state="active"}`, the result is **one row**: + + | `epoch_ns` | `tag_datname` | `tag_state` | `pg_stat_activity_count` | `pg_stat_activity_max_tx_duration` | + |---|---|---|---|---| + | 1714123456… | mydb | active | 42.0 | 5.3 | + +- **REQ-023a**: When selected metric families have partially overlapping label sets, rows are formed by the **union** of all label names. Columns absent for a given label combination MUST be set to `0.0`. +- **REQ-024**: Non-finite values (`+Inf`, `-Inf`, `NaN`) in any value column MUST be preserved as-is. Sinks are responsible for handling them. +- **REQ-025**: Histogram and Summary metric families expose synthetic per-bucket/per-quantile samples with extra labels (`le`, `quantile`). Because these extra labels are part of the label set, different bucket samples already produce distinct rows naturally via the join-by-label-set rule in REQ-023. Users who select `my_histogram_bucket` in `prom_selectors` will receive one row per `(label_set ∪ {le})` combination. The `_sum` and `_count` families may be selected independently. +- **CON-003**: Protobuf exposition format (`application/vnd.google.apis.metrics.protobuf`) is out of scope for v1.0. The scrape request MUST only advertise `text/plain` in the `Accept` header. + +### Prometheus → Prometheus Proxy Path + +- **REQ-026**: A minimal field `SourceKind sources.Kind` MUST be added to `MeasurementEnvelope` to carry the originating source kind. The reaper MUST populate it when building envelopes from `prometheus` sources. All other sinks MUST ignore it. +- **REQ-027**: `PrometheusWriter.Write()` MUST detect a Prom-sourced envelope by checking `envelope.SourceKind == sources.SourcePrometheus`. +- **REQ-028**: When a Prom-sourced envelope is detected, `PrometheusWriter` MUST iterate over every non-`tag_*`, non-`epoch_ns` column in each row and emit one `prometheus.MustNewConstMetric` per column, using the **column name** as the Prometheus metric name. The column name is the original Prometheus metric family name, so the exporter's naming is preserved verbatim. +- **REQ-029**: All `tag_*` columns MUST become Prometheus label key-value pairs on every emitted metric. +- **REQ-030**: The column's float64 value MUST be used as the metric's value. The `epoch_ns` column MUST be converted to a `time.Time` and passed as the metric's timestamp. +- **REQ-031**: Each unique `(column_name, label_set)` within a single `Collect()` call MUST produce a distinct `prometheus.Desc`. Duplicates MUST be deduplicated (same mechanism already used for postgres-sourced metrics). +- **GUD-003**: The pgwatch metric name (from `MetricName` in the envelope) SHOULD be emitted as an additional label `pgwatch_metric` to allow Prometheus queries to filter by pgwatch grouping. This is a guideline, not a hard requirement; implementations MAY omit it. +- **GUD-004**: The Prometheus namespace (`PrometheusWriter.Namespace`) MUST NOT be prepended to column names in Prom-sourced envelopes, since those column names already carry the exporter's namespace (e.g., `pg_stat_activity_count`). Prepending would produce names like `pgwatch_pg_stat_activity_count`, which breaks existing dashboards. + +### Preset Support + +- **REQ-032**: A new built-in preset SHOULD be provided for common postgres_exporter metrics. Example: + + ```yaml + presets: + postgres-exporter-basic: + description: "Core metrics from postgres_exporter" + metrics: + pg_connections: 30 + pg_replication_lag: 30 + pg_stat_bgwriter: 60 + ``` + +- **GUD-005**: Built-in Prom-targeted metric definitions and presets SHOULD be placed in a separate `metrics_prometheus.yaml` file and merged at startup alongside the main `metrics.yaml`. + +### YAML / Configuration + +- **REQ-033**: The YAML representation of a `prometheus` source MUST follow the existing `Source` YAML schema with `kind: prometheus`. Example: + + ```yaml + sources: + - name: postgres-exporter-prod + kind: prometheus + # TLS options are query parameters; Basic Auth is URL userinfo + conn_str: "https://user:secret@localhost:9187/metrics?tlsrootcert=/etc/ssl/certs/my-ca.pem" + is_enabled: true + preset_metrics: postgres-exporter-basic + custom_tags: + env: production # enriches every measurement row + ``` + +- **REQ-034**: Existing YAML and database configuration readers MUST continue to function without modification for all existing source kinds. + +--- + +## 4. Interfaces & Data Contracts + +### 4.1 `sources.Kind` + +```go +// internal/sources/types.go +const ( + SourcePostgres Kind = "postgres" + SourcePostgresContinuous Kind = "postgres-continuous-discovery" + SourcePgBouncer Kind = "pgbouncer" + SourcePgPool Kind = "pgpool" + SourcePatroni Kind = "patroni" + SourcePrometheus Kind = "prometheus" // NEW +) +``` + +### 4.2 `sources.Source` (extended) + +```go +// internal/sources/types.go +type Source struct { + Name string `yaml:"name" db:"name"` + Group string `yaml:"group" db:"group"` + ConnStr string `yaml:"conn_str" db:"connstr"` + Metrics metrics.MetricIntervals `yaml:"custom_metrics" db:"config"` + MetricsStandby metrics.MetricIntervals `yaml:"custom_metrics_standby" db:"config_standby"` + Kind Kind `yaml:"kind" db:"dbtype"` + IncludePattern string `yaml:"include_pattern" db:"include_pattern"` + ExcludePattern string `yaml:"exclude_pattern" db:"exclude_pattern"` + PresetMetrics string `yaml:"preset_metrics" db:"preset_config"` + PresetMetricsStandby string `yaml:"preset_metrics_standby" db:"preset_config_standby"` + IsEnabled bool `yaml:"is_enabled" db:"is_enabled"` + CustomTags map[string]string `yaml:"custom_tags" db:"custom_tags"` + OnlyIfMaster bool `yaml:"only_if_master" db:"only_if_master"` + // No new fields required for prometheus TLS — options are encoded as URL query + // parameters in ConnStr (e.g. ?tlsrootcert=/ca.pem&tlsskipverify=true). +} +``` + +`CustomTags` values are propagated verbatim into every `MeasurementEnvelope.CustomTags` for that +source, enriching measurement rows with extra dimensions. They MUST NOT be used to pass +configuration parameters. + +### 4.3 `metrics.Metric` (extended) + +```go +// internal/metrics/types.go +type Metric struct { + SQLs SQLs `yaml:",omitempty"` + InitSQL string `yaml:"init_sql,omitempty"` + NodeStatus string `yaml:"node_status,omitempty"` + Gauges []string `yaml:",omitempty"` + IsInstanceLevel bool `yaml:"is_instance_level,omitempty"` + StorageName string `yaml:"storage_name,omitempty"` + Description string `yaml:"description,omitempty"` + PromSelectors []string `yaml:"prom_selectors,omitempty"` // NEW +} +``` + +### 4.4 `sources.SourceConn` (extended) + +```go +// internal/sources/conn.go +type SourceConn struct { + Source + Conn db.PgxPoolIface // nil for prometheus sources + ConnConfig *pgxpool.Config // nil for prometheus sources + HTTPClient *http.Client // NEW: non-nil only for prometheus sources + RuntimeInfo + sync.RWMutex +} +``` + +### 4.5 `ScrapeMeasurements` function signature + +```go +// internal/reaper/database.go (or new file internal/reaper/prometheus.go) +func ScrapeMeasurements(ctx context.Context, md *sources.SourceConn, m metrics.Metric) (metrics.Measurements, error) +``` + +### 4.6 Measurement row shape for a Prometheus metric + +Given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and a scrape +containing both families with the same label set `{datname="mydb", state="active"}`, the result +is a **single row** — identical in structure to what a SQL metric query would return: + +```json +{ + "epoch_ns": 1714123456789000000, + "tag_datname": "mydb", + "tag_state": "active", + "pg_stat_activity_count": 42.0, + "pg_stat_activity_max_tx_duration": 5.3 +} +``` + +Each Prometheus metric family name becomes a value column. Prometheus labels become `tag_` columns. +There is no `value` column and no `tag_metric_family` column. This is the same shape as a pgwatch +SQL metric that returns multiple named columns. + +### 4.7 `MeasurementEnvelope` (extended) + +```go +// internal/metrics/types.go +type MeasurementEnvelope struct { + DBName string + MetricName string + CustomTags map[string]string + Data Measurements + SourceKind string // NEW: set to sources.SourcePrometheus for prometheus-sourced envelopes; empty otherwise +} +``` + +### 4.8 `PrometheusWriter` proxy detection predicate + +```go +// Detects whether the envelope originated from a Prometheus exporter source. +func isPromSourcedEnvelope(envelope metrics.MeasurementEnvelope) bool { + return envelope.SourceKind == string(sources.SourcePrometheus) +} +``` + +--- + +## 5. Acceptance Criteria + +- **AC-001**: Given a valid `prometheus` source pointing at a running exporter, when the reaper starts, then a scrape goroutine is launched per configured metric at the specified interval, and measurements appear in the sink. +- **AC-002**: Given a `prometheus` source with `preset_metrics: postgres-exporter-basic`, when the reaper resolves the preset, then only metric families listed under the resolved metric definitions are collected; all other families from the scrape are discarded. +- **AC-003**: Given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and both families returning samples with `{datname="mydb", state="active"}`, when `ScrapeMeasurements` processes the scrape, then the result is **one row** containing `tag_datname = "mydb"`, `tag_state = "active"`, `pg_stat_activity_count = 42.0`, and `pg_stat_activity_max_tx_duration = 5.3` — not two separate rows. +- **AC-004**: Given `conn_str` containing `?tlsskipverify=true`, when the scrape HTTP client is constructed, then TLS verification is disabled and a warning is logged. +- **AC-005**: Given `conn_str: "http://user:secret@localhost:9187/metrics"`, when any log message referencing the URL is produced, then the password `secret` MUST NOT appear in the log output. +- **AC-006**: Given `source.Kind == "prometheus"` and `PrometheusWriter` as the active sink, when `Write()` is called with a Prom-sourced envelope, then each value column in each row is emitted as a separate Prometheus metric whose name is the column name (original metric family name) without the pgwatch namespace prefix. +- **AC-007**: Given a metric definition with `prom_selectors: [pg_stat_activity_count]` assigned to a `postgres` source, when the reaper collects that metric, then `PromSelectors` is ignored and the standard SQL path is used. +- **AC-008**: Given a `prometheus` source with a metric that has neither `PromSelectors` nor `SQLs`, when the reaper initialises, then a warning is logged and no goroutine is started for that metric. +- **AC-009**: Given the exporter is temporarily unreachable during a scrape, when `ScrapeMeasurements` returns an error, then the reaper logs the error, does not write to the sink for that interval, and retries on the next interval without crashing. +- **AC-010**: Given a Prometheus histogram metric family `my_histogram_bucket` in `prom_selectors`, when `ScrapeMeasurements` processes the scrape, then each bucket produces a distinct row because the `le` label is part of the label set used for joining, and `my_histogram_bucket` appears as a value column in each row. + +--- + +## 6. Test Automation Strategy + +- **Test Levels**: Unit, Integration. +- **Unit Tests**: + - `ScrapeMeasurements` with a table-driven test using a mock HTTP server (`httptest.NewServer`) serving pre-recorded Prometheus exposition text. Cover: normal gauges, counters, histograms, summaries, missing families (selector filtering), non-finite values, missing timestamp (defaults to `time.Now()`). + - `isPromSourcedEnvelope` with empty data and data containing/lacking `tag_metric_family`. + - `SourceConn.Connect()` with mock TLS server for Basic Auth and `tls_skip_verify` cases. + - `PrometheusWriter.Write()` with Prom-sourced envelopes: verify emitted metric names and absence of namespace prefix. +- **Integration Tests**: + - Start a minimal Prometheus text-format HTTP server in the test, register a `prometheus` source, run the reaper loop for N seconds, and assert measurements appear in the JSON sink file. + - Test Prom→Prom proxy: assert that the Prometheus scrape endpoint of pgwatch re-exposes the original metric family names. +- **Frameworks**: Standard `testing` package; `httptest`; `testify/assert` (already used in the codebase). +- **Coverage**: New packages/files introduced by this feature MUST achieve ≥ 80 % statement coverage. +- **CI**: Existing `go test ./...` pipeline covers new tests automatically. + +--- + +## 7. Rationale & Context + +### Why reuse `Source` struct instead of a new struct? + +All existing configuration readers (YAML, PostgreSQL config DB, gRPC API) operate on `Source`. +Adding a new struct would require changes in every reader and writer. The `prometheus` kind follows +the same pattern as `pgbouncer` and `pgpool`, which also reuse `Source` with non-standard connection strings. + +### Why selector lists instead of regex or auto-discovery? + +Auto-discovery (all families → individual pgwatch metrics) would create an unbounded, unpredictable +set of tables in the sink on first scrape. Selector lists make the schema explicit, predictable, and +consistent with the philosophy that a pgwatch metric is a deliberately designed measurement set. +Regex can be added later without breaking backward compatibility. + +### Why metric family names become value columns instead of producing one row per sample? + +A pgwatch metric is modelled after a SQL query result: multiple named value columns in a single row, +distinguished by their tag columns. For example, a SQL metric for connection state returns +`pg_stat_activity_count` and `pg_stat_activity_max_tx_duration` as two columns of the same row for +each `(datname, state)` pair. The Prometheus counterpart should produce the same shape so that sink +schemas, Grafana dashboards, and downstream tooling are identical regardless of whether the source +is a PostgreSQL connection or a Prometheus exporter. One-row-per-sample would produce a tall/narrow +table incompatible with this convention. + +### Why keep the same `MeasurementEnvelope` for the proxy path? + +Introducing a separate code path (e.g., a direct HTTP relay bypassing the reaper) would split the +lifecycle management, filtering, and custom-tag injection logic into two places. The chosen approach +(parse → `MeasurementEnvelope` → lighter Prom sink write) keeps a single pipeline. The performance +difference is negligible because the dominant cost is the HTTP scrape itself, not the in-process +row transformation. + +### Why not prepend the pgwatch namespace in Prom→Prom mode? + +When pgwatch proxies an existing exporter, users expect to query metrics under their original names +(e.g., `pg_stat_activity_count`, not `pgwatch_pg_stat_activity_count`). Prepending a namespace would +break existing Grafana dashboards and alerting rules that target those metrics. + +--- + +## 8. Dependencies & External Integrations + +### External Systems + +- **EXT-001**: Any HTTP(S) service exposing Prometheus text exposition format on a `/metrics` endpoint (e.g., `node_exporter`, `postgres_exporter`, `redis_exporter`). + +### Third-Party Services + +None beyond what is already used. + +### Infrastructure Dependencies + +- **INF-001**: `github.com/prometheus/common/expfmt` — Prometheus text format parser. Already present as an indirect dependency via `github.com/prometheus/client_golang`. Must be promoted to a direct dependency in `go.mod`. +- **INF-002**: Standard library `net/http` for scraping. No additional HTTP client library is required. From 0eecb70a2a7ad6c44f42bb4bedf28aa3a979fa20 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 8 May 2026 18:55:22 +0200 Subject: [PATCH 02/27] update spec with a simpler metric definitions --- ...architecture-prometheus-exporter-source.md | 384 +++++++++++------- 1 file changed, 230 insertions(+), 154 deletions(-) diff --git a/spec/architecture-prometheus-exporter-source.md b/spec/architecture-prometheus-exporter-source.md index 7779d74a4c..fa47cb2189 100644 --- a/spec/architecture-prometheus-exporter-source.md +++ b/spec/architecture-prometheus-exporter-source.md @@ -1,7 +1,8 @@ --- title: Prometheus Exporter as a pgwatch Source -version: 1.0 +version: 1.2 date_created: 2026-04-26 +date_updated: 2026-05-08 tags: [architecture, design, prometheus, sources, metrics, sinks] --- @@ -9,32 +10,37 @@ tags: [architecture, design, prometheus, sources, metrics, sinks] This specification defines how pgwatch fetches metrics from external Prometheus exporters (e.g., `node_exporter`, `postgres_exporter`, custom exporters) and routes them through the -standard pgwatch pipeline. It covers the new source kind, the metric definition extension -that maps Prometheus metric families to pgwatch logical metrics, label-to-column conventions, -and a lightweight proxy path that avoids double-conversion when the configured sink is also -Prometheus. +standard pgwatch pipeline. It covers the new source kind, the scrape lifecycle, +label-to-column conventions, and a lightweight proxy path that avoids double-conversion +when the configured sink is also Prometheus. + +**No dedicated pgwatch metric definitions are required for Prometheus sources.** The Prometheus +metric family names are used directly as pgwatch metric names via the standard `custom_metrics` +/ `preset_metrics` configuration on the source. --- ## 1. Purpose & Scope -**Purpose**: Allow pgwatch to scrape arbitrary Prometheus exporters, apply metric grouping and +**Purpose**: Allow pgwatch to scrape arbitrary Prometheus exporters, apply family-level filtering, and write measurements to any configured sink — including acting as a low-overhead proxy when the sink is a Prometheus endpoint. **In scope**: - New source `Kind` value `prometheus`. -- Extension of `metrics.Metric` for Prometheus selector lists. -- HTTP scrape lifecycle inside `SourceConn` and the reaper. +- Use of `custom_metrics` / `preset_metrics` to specify Prometheus family names and their emit intervals. +- Single-goroutine-per-source scrape lifecycle: one HTTP GET per tick, per-family emit cadence gated by `lastEmitted` state. - Measurement row structure for Prometheus samples. - Prometheus-to-Prometheus (Prom→Prom) proxy optimisation in `PrometheusWriter`. - Authentication and TLS configuration. +- "Scrape-all" mode when no metrics are configured. **Out of scope**: - Push-gateway integration. - Prometheus alerting rules. - Remote-write protocol (only the `/metrics` exposition format is targeted). - Grafana dashboard changes. +- Any extension to the `metrics.Metric` struct. **Audience**: pgwatch maintainers implementing the feature. The document is also intended to be consumed directly by AI coding assistants. @@ -48,14 +54,17 @@ consumed directly by AI coding assistants. | **Prometheus exporter** | An HTTP service that exposes metrics in Prometheus exposition format (text or protobuf) at a `/metrics` endpoint. | | **Metric family** | A single named group of Prometheus samples sharing the same `__name__` label (e.g., `pg_stat_activity_count`). | | **Sample** | A single (label-set, float64 value, optional timestamp) triple within a metric family. | -| **pgwatch metric** | A named, schema-fixed set of measurements stored as one table/series in the sink. | +| **pgwatch metric** | For `prometheus` sources: the Prometheus metric family name used as the key in `custom_metrics`. For other source kinds: a named, SQL-based measurement set. | | **pgwatch source** | A configured entity from which measurements are collected. | -| **scrape** | An HTTP GET request to a Prometheus exporter's `/metrics` endpoint. | -| **selector list** | The ordered list of Prometheus metric family names that are grouped into one pgwatch metric. | +| **scrape** | An HTTP GET request to a Prometheus exporter's `/metrics` endpoint. Always returns all families. | +| **scrape interval** | The HTTP request cadence for a `prometheus` source: `min` of all configured emit intervals (or 60 s in scrape-all mode). Drives how often the goroutine wakes and issues an HTTP GET. | +| **emit interval** | The per-family cadence at which measurements are forwarded to the sink. Configured as the value in `custom_metrics`. May be longer than the scrape interval. | +| **lastEmitted** | Per-family timestamp of the most recent successful sink write. Used to gate whether a family's measurements are forwarded on a given scrape tick. | +| **scrape-all mode** | Activated when a `prometheus` source has neither `custom_metrics` nor `preset_metrics` configured. The reaper scrapes all available families at a fixed 60 s interval and emits every family on every tick. | | **MeasurementEnvelope** | The internal pgwatch structure `{DBName, MetricName, CustomTags, Data Measurements}` passed from reaper to sinks. | | **Prom→Prom path** | The code path activated when `source.Kind == "prometheus"` and the active sink is `PrometheusWriter`. | | **tag column** | A pgwatch measurement column whose name starts with `tag_`. Treated as an indexed dimension. | -| **value column** | A numeric measurement column not prefixed `tag_`. | +| **value column** | A numeric measurement column not prefixed `tag_`. For `prometheus` sources, named after the metric family. | | **epoch_ns** | Mandatory measurement column: Unix timestamp in nanoseconds (int64). | | **ConnStr** | The `Source.ConnStr` field, repurposed as the scrape URL for `prometheus` sources. | @@ -67,16 +76,16 @@ consumed directly by AI coding assistants. - **REQ-001**: `sources.Kind` MUST include a new constant `SourcePrometheus = "prometheus"`. - **REQ-002**: A `prometheus` source MUST use `Source.ConnStr` as the HTTP(S) scrape URL (e.g., `http://host:9187/metrics`). No PostgreSQL DSN is required or expected. -- **REQ-003**: `Source.Metrics` (type `metrics.MetricIntervals`) MUST continue to map pgwatch metric name → scrape interval in seconds, identical to existing source kinds. -- **REQ-004**: `Source.PresetMetrics` MUST be supported for `prometheus` sources (preset resolves to a `MetricIntervals` map as usual). +- **REQ-003**: `Source.Metrics` (type `metrics.MetricIntervals`) MUST map Prometheus metric family name → emit interval in seconds. Each key is an exact family name; the value is how often that family's measurements are forwarded to the sink. +- **REQ-004**: `Source.PresetMetrics` MUST be supported for `prometheus` sources. A preset resolves to a `MetricIntervals` map whose keys are Prometheus family names and whose values are emit intervals. +- **REQ-005**: `sources.Kinds` slice MUST include `SourcePrometheus` so that `Kind.IsValid()` returns `true`. - **CON-001**: Fields `Source.IncludePattern`, `Source.ExcludePattern`, `Source.OnlyIfMaster`, `Source.MetricsStandby`, and `Source.PresetMetricsStandby` are irrelevant for `prometheus` sources and MUST be ignored without error. - **GUD-001**: `Source.Name` SHOULD be set to a human-readable identifier for the exporter (e.g., `node-exporter-prod-db01`). It maps to `MeasurementEnvelope.DBName`. -- **REQ-005**: `sources.Kinds` slice MUST include `SourcePrometheus` so that `Kind.IsValid()` returns `true`. ### Authentication and TLS - **REQ-006**: The scrape URL in `ConnStr` MUST support standard URL userinfo for Basic Auth: `http://user:password@host:9187/metrics`. -- **REQ-007**: TLS options MUST be configurable via URL query parameters appended to `ConnStr`, following the same convention as the PostgreSQL DSN (`sslmode`, `sslrootcert`, etc.): +- **REQ-007**: TLS options MUST be configurable via URL query parameters appended to `ConnStr`: | Query parameter | Description | |---|---| @@ -91,86 +100,74 @@ consumed directly by AI coding assistants. - **SEC-002**: `tlsskipverify=true` in the scrape URL MUST produce a warning-level log entry each time the HTTP client is constructed for that source. - **CON-002**: Custom headers (e.g., `Authorization: Bearer`) are out of scope for v1.0. -### Metric Definition Extension +### Metric Filtering -- **REQ-008**: `metrics.Metric` struct MUST have a new field `PromSelectors []string` (YAML tag `prom_selectors`). Example: - - ```yaml - metrics: - pg_connections: - description: "Connection state summary from postgres_exporter" - prom_selectors: - - pg_stat_activity_count - - pg_stat_activity_max_tx_duration - storage_name: pg_connections - ``` - -- **REQ-009**: `PromSelectors` MUST be ignored for non-`prometheus` source kinds. A metric that has both `SQLs` and `PromSelectors` is valid; `SQLs` is used for postgres-family sources and `PromSelectors` for `prometheus` sources. -- **REQ-010**: If a metric definition referenced by a `prometheus` source has neither `PromSelectors` nor `SQLs`, the reaper MUST log a warning and skip that metric for the affected source. -- **GUD-002**: Each entry in `PromSelectors` MUST be an exact Prometheus metric family name (no glob, no regex). Regex support may be added in a future version. -- **REQ-011**: Metric families present in the scrape response but NOT listed in any `PromSelectors` of the active metric set MUST be silently discarded. +- **REQ-008**: For `prometheus` sources, each key in the resolved `MetricIntervals` map (from `custom_metrics` or `preset_metrics`) MUST be treated as an exact Prometheus metric family name. No `metrics.Metric` definition file entry is required or consulted for `prometheus` sources. +- **REQ-009**: Metric families present in the scrape response but NOT listed in the resolved `MetricIntervals` map MUST be silently discarded when the map is non-empty. +- **REQ-010**: When both `Source.Metrics` and `Source.PresetMetrics` are empty, the source MUST operate in **scrape-all mode**: every metric family returned by the exporter is collected at a default interval of 60 seconds. The reaper MUST emit a warning-level log message when activating scrape-all mode, noting the potential for an unbounded number of measurement tables in the sink. +- **GUD-002**: Regex and glob patterns in family names are not supported in v1.0. Each name MUST be an exact match. ### Scrape Lifecycle -- **REQ-012**: `SourceConn` MUST carry an `HTTPClient *http.Client` field alongside the existing `Conn db.PgxPoolIface`. For `prometheus` sources, `Conn` MUST remain `nil`; `HTTPClient` MUST be non-nil after `Connect()`. -- **REQ-013**: `SourceConn.Connect()` MUST parse TLS query parameters (`tlsrootcert`, `tlsskipverify`) from `ConnStr`, construct an `*http.Client` with the derived TLS config, and validate reachability by issuing a HEAD or GET to the scrape URL (with TLS parameters stripped). If the exporter is unreachable, `Connect()` MUST return an error. -- **REQ-014**: `SourceConn.Ping()` for a `prometheus` source MUST issue a HEAD request to the scrape URL and return an error if the status code is not 2xx. -- **REQ-015**: `SourceConn.IsPostgresSource()` MUST return `false` for `Kind == "prometheus"`. -- **REQ-016**: `SourceConn.FetchRuntimeInfo()` for `prometheus` sources MUST be a no-op that sets `RuntimeInfo.VersionStr = "prometheus"` and `RuntimeInfo.Version = 0` without error. -- **REQ-017**: The reaper MUST dispatch `reapMetricMeasurements` for `prometheus` sources using the same goroutine/interval mechanism used for postgres sources. -- **REQ-018**: Inside `reapMetricMeasurements`, when `source.Kind == "prometheus"`, the reaper MUST call a new function `ScrapeMeasurements(ctx, sourceConn, metricDef)` instead of `QueryMeasurements`. - -### `ScrapeMeasurements` Behaviour - -- **REQ-019**: `ScrapeMeasurements` MUST issue an HTTP GET to `source.ConnStr`, parse the Prometheus text exposition format, and return `metrics.Measurements`. -- **REQ-020**: The function signature MUST be: +- **REQ-011**: `SourceConn` MUST carry an `HTTPClient *http.Client` field alongside the existing `Conn db.PgxPoolIface`. For `prometheus` sources, `Conn` MUST remain `nil`; `HTTPClient` MUST be non-nil after `Connect()`. +- **REQ-012**: `SourceConn.Connect()` MUST parse TLS query parameters (`tlsrootcert`, `tlsskipverify`) from `ConnStr`, construct an `*http.Client` with the derived TLS config, and validate reachability by issuing a HEAD or GET to the scrape URL (with TLS parameters stripped). If the exporter is unreachable, `Connect()` MUST return an error. +- **REQ-013**: `SourceConn.Ping()` for a `prometheus` source MUST issue a HEAD request to the scrape URL and return an error if the status code is not 2xx. +- **REQ-014**: `SourceConn.IsPostgresSource()` MUST return `false` for `Kind == "prometheus"`. +- **REQ-015**: `SourceConn.FetchRuntimeInfo()` for `prometheus` sources MUST be a no-op that sets `RuntimeInfo.VersionStr = "prometheus"` and `RuntimeInfo.Version = 0` without error. +- **REQ-016**: The reaper MUST launch **one goroutine per `prometheus` source** (not one per family). The goroutine MUST NOT be shared with postgres-style per-metric goroutines. +- **REQ-017**: The goroutine MUST compute the **scrape interval** as `min` of all emit interval values in the resolved `MetricIntervals` map. In scrape-all mode the scrape interval MUST default to 60 seconds. +- **REQ-018**: On every scrape tick the goroutine MUST issue a single HTTP GET to the exporter (via `ScrapeAll`) and receive measurements for all available families in one response. +- **REQ-019**: The goroutine MUST maintain a `lastEmitted map[string]time.Time` keyed by family name, initialised empty. After a successful scrape, for each family present in the response: + - In **filtered mode**: skip families not present in the resolved `MetricIntervals` map. + - Emit a `MeasurementEnvelope` to the sink for family `f` only when `time.Since(lastEmitted[f]) >= emitInterval[f]`. On first scrape (zero `lastEmitted` value) the family MUST always be emitted. + - Update `lastEmitted[f]` to the current time after a successful sink write. + - In **scrape-all mode**: all families are emitted on every tick (emit interval equals the 60 s scrape interval for all). +- **REQ-020**: If `ScrapeAll` returns an error, the goroutine MUST log the error at warning level, leave all `lastEmitted` values unchanged, and retry on the next tick without crashing. + +### `ScrapeAll` Behaviour + +- **REQ-021**: `ScrapeAll` MUST issue a single HTTP GET to the scrape URL (derived from `source.ConnStr` with TLS parameters stripped), parse the full Prometheus text exposition response, and return measurements for every discovered family. +- **REQ-022**: The function signature MUST be: ```go - func ScrapeMeasurements(ctx context.Context, md *sources.SourceConn, m metrics.Metric) (metrics.Measurements, error) + // internal/reaper/prometheus.go + + // ScrapeAll fetches the exporter's /metrics endpoint and returns one + // Measurements slice per discovered metric family, keyed by family name. + func ScrapeAll(ctx context.Context, sc *sources.SourceConn) (map[string]metrics.Measurements, error) ``` -- **REQ-021**: The parser MUST use `github.com/prometheus/common/expfmt` (already an indirect dependency via the Prometheus client library). No new major dependencies are permitted. -- **REQ-022**: Only metric families whose `Name` appears in `m.PromSelectors` MUST be retained. All others MUST be discarded before constructing measurement rows. -- **REQ-023**: Samples from all selected metric families MUST be joined by their label set into a single measurement row per unique label combination. This mirrors how a SQL SELECT returns multiple named columns in one row: +- **REQ-023**: The parser MUST use `github.com/prometheus/common/expfmt` (already an indirect dependency via the Prometheus client library). No new major dependencies are permitted. +- **REQ-024**: Each sample from a metric family becomes **one measurement row**: - Each Prometheus label becomes a `tag_` column (string). The `__name__` label MUST be omitted. - - Each selected metric family name becomes a **value column** whose name is the metric family name and whose value is the sample's float64. + - A single value column named after the metric family holds the sample's float64 value. - `epoch_ns` (int64, Unix nanoseconds) is taken from the sample timestamp when present; otherwise `time.Now().UnixNano()`. - - Example: given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and two samples both labelled `{datname="mydb", state="active"}`, the result is **one row**: - - | `epoch_ns` | `tag_datname` | `tag_state` | `pg_stat_activity_count` | `pg_stat_activity_max_tx_duration` | - |---|---|---|---|---| - | 1714123456… | mydb | active | 42.0 | 5.3 | - -- **REQ-023a**: When selected metric families have partially overlapping label sets, rows are formed by the **union** of all label names. Columns absent for a given label combination MUST be set to `0.0`. -- **REQ-024**: Non-finite values (`+Inf`, `-Inf`, `NaN`) in any value column MUST be preserved as-is. Sinks are responsible for handling them. -- **REQ-025**: Histogram and Summary metric families expose synthetic per-bucket/per-quantile samples with extra labels (`le`, `quantile`). Because these extra labels are part of the label set, different bucket samples already produce distinct rows naturally via the join-by-label-set rule in REQ-023. Users who select `my_histogram_bucket` in `prom_selectors` will receive one row per `(label_set ∪ {le})` combination. The `_sum` and `_count` families may be selected independently. +- **REQ-025**: Non-finite values (`+Inf`, `-Inf`, `NaN`) in the value column MUST be preserved as-is. Sinks are responsible for handling them. - **CON-003**: Protobuf exposition format (`application/vnd.google.apis.metrics.protobuf`) is out of scope for v1.0. The scrape request MUST only advertise `text/plain` in the `Accept` header. ### Prometheus → Prometheus Proxy Path -- **REQ-026**: A minimal field `SourceKind sources.Kind` MUST be added to `MeasurementEnvelope` to carry the originating source kind. The reaper MUST populate it when building envelopes from `prometheus` sources. All other sinks MUST ignore it. -- **REQ-027**: `PrometheusWriter.Write()` MUST detect a Prom-sourced envelope by checking `envelope.SourceKind == sources.SourcePrometheus`. -- **REQ-028**: When a Prom-sourced envelope is detected, `PrometheusWriter` MUST iterate over every non-`tag_*`, non-`epoch_ns` column in each row and emit one `prometheus.MustNewConstMetric` per column, using the **column name** as the Prometheus metric name. The column name is the original Prometheus metric family name, so the exporter's naming is preserved verbatim. +- **REQ-026**: A field `SourceKind string` MUST be added to `MeasurementEnvelope` to carry the originating source kind. The reaper MUST set it to `string(sources.SourcePrometheus)` for `prometheus`-sourced envelopes. All other sinks MUST ignore it. +- **REQ-027**: `PrometheusWriter.Write()` MUST detect a Prom-sourced envelope by checking `envelope.SourceKind == string(sources.SourcePrometheus)`. +- **REQ-028**: When a Prom-sourced envelope is detected, `PrometheusWriter` MUST emit one `prometheus.MustNewConstMetric` per row, using the **value column name** (the family name) as the Prometheus metric name. Since each envelope carries exactly one family's measurements, the value column name equals `envelope.MetricName`. - **REQ-029**: All `tag_*` columns MUST become Prometheus label key-value pairs on every emitted metric. - **REQ-030**: The column's float64 value MUST be used as the metric's value. The `epoch_ns` column MUST be converted to a `time.Time` and passed as the metric's timestamp. -- **REQ-031**: Each unique `(column_name, label_set)` within a single `Collect()` call MUST produce a distinct `prometheus.Desc`. Duplicates MUST be deduplicated (same mechanism already used for postgres-sourced metrics). -- **GUD-003**: The pgwatch metric name (from `MetricName` in the envelope) SHOULD be emitted as an additional label `pgwatch_metric` to allow Prometheus queries to filter by pgwatch grouping. This is a guideline, not a hard requirement; implementations MAY omit it. -- **GUD-004**: The Prometheus namespace (`PrometheusWriter.Namespace`) MUST NOT be prepended to column names in Prom-sourced envelopes, since those column names already carry the exporter's namespace (e.g., `pg_stat_activity_count`). Prepending would produce names like `pgwatch_pg_stat_activity_count`, which breaks existing dashboards. +- **REQ-031**: Each unique `(metric_name, label_set)` within a single `Collect()` call MUST produce a distinct `prometheus.Desc`. Duplicates MUST be deduplicated (same mechanism already used for postgres-sourced metrics). +- **GUD-003**: The Prometheus namespace (`PrometheusWriter.Namespace`) MUST NOT be prepended to metric names in Prom-sourced envelopes, since those names already carry the exporter's namespace (e.g., `pg_stat_activity_count`). Prepending would produce names like `pgwatch_pg_stat_activity_count`, which breaks existing dashboards. ### Preset Support -- **REQ-032**: A new built-in preset SHOULD be provided for common postgres_exporter metrics. Example: +- **REQ-032**: A new built-in preset SHOULD be provided for common `postgres_exporter` metric families. Example: ```yaml presets: postgres-exporter-basic: description: "Core metrics from postgres_exporter" metrics: - pg_connections: 30 - pg_replication_lag: 30 - pg_stat_bgwriter: 60 + pg_stat_activity_count: 30 + pg_stat_bgwriter_checkpoints_timed: 60 + pg_stat_replication_pg_wal_lsn_diff: 30 ``` -- **GUD-005**: Built-in Prom-targeted metric definitions and presets SHOULD be placed in a separate `metrics_prometheus.yaml` file and merged at startup alongside the main `metrics.yaml`. +- **GUD-004**: Built-in Prom-targeted presets SHOULD be kept in a dedicated section or file separate from SQL-based presets to avoid confusion. ### YAML / Configuration @@ -180,12 +177,19 @@ consumed directly by AI coding assistants. sources: - name: postgres-exporter-prod kind: prometheus - # TLS options are query parameters; Basic Auth is URL userinfo conn_str: "https://user:secret@localhost:9187/metrics?tlsrootcert=/etc/ssl/certs/my-ca.pem" is_enabled: true - preset_metrics: postgres-exporter-basic + # emit intervals differ per family; HTTP scrape cadence = min(30, 60) = 30 s + custom_metrics: + pg_stat_activity_count: 30 + pg_stat_bgwriter_checkpoints_timed: 60 custom_tags: - env: production # enriches every measurement row + env: production + + - name: node-exporter-prod # scrape-all: no metrics configured, scrapes every 60 s + kind: prometheus + conn_str: "http://localhost:9100/metrics" + is_enabled: true ``` - **REQ-034**: Existing YAML and database configuration readers MUST continue to function without modification for all existing source kinds. @@ -208,10 +212,14 @@ const ( ) ``` -### 4.2 `sources.Source` (extended) +### 4.2 `sources.Source` (unchanged) + +No new fields are required. The existing `Metrics` (`custom_metrics`) and `PresetMetrics` fields +carry Prometheus family names and intervals for `prometheus` sources exactly as they carry SQL +metric names and intervals for postgres sources. ```go -// internal/sources/types.go +// internal/sources/types.go — no structural changes type Source struct { Name string `yaml:"name" db:"name"` Group string `yaml:"group" db:"group"` @@ -226,30 +234,17 @@ type Source struct { IsEnabled bool `yaml:"is_enabled" db:"is_enabled"` CustomTags map[string]string `yaml:"custom_tags" db:"custom_tags"` OnlyIfMaster bool `yaml:"only_if_master" db:"only_if_master"` - // No new fields required for prometheus TLS — options are encoded as URL query - // parameters in ConnStr (e.g. ?tlsrootcert=/ca.pem&tlsskipverify=true). } ``` `CustomTags` values are propagated verbatim into every `MeasurementEnvelope.CustomTags` for that -source, enriching measurement rows with extra dimensions. They MUST NOT be used to pass -configuration parameters. +source, enriching measurement rows with extra dimensions. -### 4.3 `metrics.Metric` (extended) +### 4.3 `metrics.Metric` (unchanged) -```go -// internal/metrics/types.go -type Metric struct { - SQLs SQLs `yaml:",omitempty"` - InitSQL string `yaml:"init_sql,omitempty"` - NodeStatus string `yaml:"node_status,omitempty"` - Gauges []string `yaml:",omitempty"` - IsInstanceLevel bool `yaml:"is_instance_level,omitempty"` - StorageName string `yaml:"storage_name,omitempty"` - Description string `yaml:"description,omitempty"` - PromSelectors []string `yaml:"prom_selectors,omitempty"` // NEW -} -``` +No extension to `metrics.Metric` is required. `prometheus` sources do not reference any +`metrics.Metric` definition; the family name from `custom_metrics` is used directly as both +the collection key and the value column name in measurement rows. ### 4.4 `sources.SourceConn` (extended) @@ -257,42 +252,86 @@ type Metric struct { // internal/sources/conn.go type SourceConn struct { Source - Conn db.PgxPoolIface // nil for prometheus sources - ConnConfig *pgxpool.Config // nil for prometheus sources - HTTPClient *http.Client // NEW: non-nil only for prometheus sources + Conn db.PgxPoolIface // nil for prometheus sources + ConnConfig *pgxpool.Config // nil for prometheus sources + HTTPClient *http.Client // NEW: non-nil only for prometheus sources RuntimeInfo sync.RWMutex } ``` -### 4.5 `ScrapeMeasurements` function signature +### 4.5 `ScrapeAll` function signature ```go -// internal/reaper/database.go (or new file internal/reaper/prometheus.go) -func ScrapeMeasurements(ctx context.Context, md *sources.SourceConn, m metrics.Metric) (metrics.Measurements, error) +// internal/reaper/prometheus.go + +// ScrapeAll fetches the exporter's /metrics endpoint and returns one +// Measurements slice per discovered metric family, keyed by family name. +// It is the only HTTP call made per scrape tick; family filtering and +// per-family emit gating are performed by the caller. +func ScrapeAll(ctx context.Context, sc *sources.SourceConn) (map[string]metrics.Measurements, error) ``` -### 4.6 Measurement row shape for a Prometheus metric +### 4.6 Scrape goroutine pseudo-code + +```go +// One goroutine per prometheus source, started by the reaper. +func runPrometheusSource(ctx context.Context, sc *sources.SourceConn, intervals metrics.MetricIntervals) { + scrapeInterval := minInterval(intervals) // 60 s when intervals is empty (scrape-all) + lastEmitted := map[string]time.Time{} + ticker := time.NewTicker(scrapeInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + allFamilies, err := ScrapeAll(ctx, sc) + if err != nil { + log.Warn("scrape failed", "source", sc.Name, "err", err) + continue + } + for family, measurements := range allFamilies { + emitInterval := intervals[family] // zero duration when scrape-all + if emitInterval == 0 { + emitInterval = scrapeInterval // scrape-all: emit every tick + } else if last, ok := lastEmitted[family]; ok && now.Sub(last) < emitInterval { + continue // not yet due + } + sink.Write(buildEnvelope(sc, family, measurements)) + lastEmitted[family] = now + } + } + } +} +``` -Given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and a scrape -containing both families with the same label set `{datname="mydb", state="active"}`, the result -is a **single row** — identical in structure to what a SQL metric query would return: +### 4.7 Measurement row shape + +For family `pg_stat_activity_count` with label set `{datname="mydb", state="active"}`: ```json { "epoch_ns": 1714123456789000000, "tag_datname": "mydb", "tag_state": "active", - "pg_stat_activity_count": 42.0, - "pg_stat_activity_max_tx_duration": 5.3 + "pg_stat_activity_count": 42.0 } ``` -Each Prometheus metric family name becomes a value column. Prometheus labels become `tag_` columns. -There is no `value` column and no `tag_metric_family` column. This is the same shape as a pgwatch -SQL metric that returns multiple named columns. +Each sample produces one row. The single value column is named after the metric family. +Different label combinations within the same family produce separate rows. + +For a histogram family `my_histogram_bucket`, each bucket is a distinct row because `le` is a +label and therefore becomes `tag_le`: -### 4.7 `MeasurementEnvelope` (extended) +| `epoch_ns` | `tag_datname` | `tag_le` | `my_histogram_bucket` | +|---|---|---|---| +| 1714… | mydb | 0.005 | 0.0 | +| 1714… | mydb | 0.01 | 3.0 | +| 1714… | mydb | +Inf | 42.0 | + +### 4.8 `MeasurementEnvelope` (extended) ```go // internal/metrics/types.go @@ -301,11 +340,11 @@ type MeasurementEnvelope struct { MetricName string CustomTags map[string]string Data Measurements - SourceKind string // NEW: set to sources.SourcePrometheus for prometheus-sourced envelopes; empty otherwise + SourceKind string // NEW: "prometheus" for prometheus-sourced envelopes; empty otherwise } ``` -### 4.8 `PrometheusWriter` proxy detection predicate +### 4.9 `PrometheusWriter` proxy detection predicate ```go // Detects whether the envelope originated from a Prometheus exporter source. @@ -318,16 +357,17 @@ func isPromSourcedEnvelope(envelope metrics.MeasurementEnvelope) bool { ## 5. Acceptance Criteria -- **AC-001**: Given a valid `prometheus` source pointing at a running exporter, when the reaper starts, then a scrape goroutine is launched per configured metric at the specified interval, and measurements appear in the sink. -- **AC-002**: Given a `prometheus` source with `preset_metrics: postgres-exporter-basic`, when the reaper resolves the preset, then only metric families listed under the resolved metric definitions are collected; all other families from the scrape are discarded. -- **AC-003**: Given `prom_selectors: [pg_stat_activity_count, pg_stat_activity_max_tx_duration]` and both families returning samples with `{datname="mydb", state="active"}`, when `ScrapeMeasurements` processes the scrape, then the result is **one row** containing `tag_datname = "mydb"`, `tag_state = "active"`, `pg_stat_activity_count = 42.0`, and `pg_stat_activity_max_tx_duration = 5.3` — not two separate rows. -- **AC-004**: Given `conn_str` containing `?tlsskipverify=true`, when the scrape HTTP client is constructed, then TLS verification is disabled and a warning is logged. -- **AC-005**: Given `conn_str: "http://user:secret@localhost:9187/metrics"`, when any log message referencing the URL is produced, then the password `secret` MUST NOT appear in the log output. -- **AC-006**: Given `source.Kind == "prometheus"` and `PrometheusWriter` as the active sink, when `Write()` is called with a Prom-sourced envelope, then each value column in each row is emitted as a separate Prometheus metric whose name is the column name (original metric family name) without the pgwatch namespace prefix. -- **AC-007**: Given a metric definition with `prom_selectors: [pg_stat_activity_count]` assigned to a `postgres` source, when the reaper collects that metric, then `PromSelectors` is ignored and the standard SQL path is used. -- **AC-008**: Given a `prometheus` source with a metric that has neither `PromSelectors` nor `SQLs`, when the reaper initialises, then a warning is logged and no goroutine is started for that metric. -- **AC-009**: Given the exporter is temporarily unreachable during a scrape, when `ScrapeMeasurements` returns an error, then the reaper logs the error, does not write to the sink for that interval, and retries on the next interval without crashing. -- **AC-010**: Given a Prometheus histogram metric family `my_histogram_bucket` in `prom_selectors`, when `ScrapeMeasurements` processes the scrape, then each bucket produces a distinct row because the `le` label is part of the label set used for joining, and `my_histogram_bucket` appears as a value column in each row. +- **AC-001**: Given a valid `prometheus` source pointing at a running exporter, when the reaper starts, then **exactly one** goroutine is launched for that source; measurements appear in the sink with one row per sample. +- **AC-002**: Given `custom_metrics: {pg_stat_activity_count: 30, pg_stat_bgwriter_checkpoints_timed: 60}`, when the reaper runs, then the HTTP scrape cadence is 30 s (the minimum); `pg_stat_activity_count` measurements are forwarded every 30 s and `pg_stat_bgwriter_checkpoints_timed` measurements are forwarded every 60 s, but only one HTTP GET is made per 30 s tick. +- **AC-003**: Given `custom_metrics: {pg_stat_activity_count: 30}` and a scrape returning samples with `{datname="mydb", state="active"}` and `{datname="mydb", state="idle"}`, when the goroutine processes the scrape, then the result is **two rows** — one per label combination — each with a single value column `pg_stat_activity_count`. +- **AC-004**: Given `custom_metrics: {pg_stat_activity_count: 30}` and a scrape response that also contains `pg_stat_bgwriter_checkpoints_timed`, when the goroutine processes the scrape, then `pg_stat_bgwriter_checkpoints_timed` measurements are discarded and never forwarded to the sink. +- **AC-005**: Given `conn_str` containing `?tlsskipverify=true`, when the scrape HTTP client is constructed, then TLS verification is disabled and a warning is logged. +- **AC-006**: Given `conn_str: "http://user:secret@localhost:9187/metrics"`, when any log message referencing the URL is produced, then the password `secret` MUST NOT appear in the log output. +- **AC-007**: Given `source.Kind == "prometheus"` and `PrometheusWriter` as the active sink, when `Write()` is called with a Prom-sourced envelope for family `pg_stat_activity_count`, then the emitted Prometheus metric is named `pg_stat_activity_count` without any pgwatch namespace prefix. +- **AC-008**: Given a `prometheus` source with neither `custom_metrics` nor `preset_metrics`, when the reaper initialises, then a warning is logged and all families from each scrape are written to the sink as separate envelopes at a 60 s cadence (scrape-all mode). +- **AC-009**: Given the exporter is temporarily unreachable during a scrape tick, when `ScrapeAll` returns an error, then the reaper logs the error, leaves all `lastEmitted` values unchanged, does not write to the sink for that tick, and retries on the next tick without crashing. +- **AC-010**: Given a `prometheus` source with `preset_metrics: postgres-exporter-basic`, when the reaper resolves the preset, then only the metric families listed in that preset's `metrics` map are forwarded to the sink. +- **AC-011**: Given a Prometheus histogram family `my_histogram_bucket` in `custom_metrics`, when the goroutine processes the scrape, then each bucket produces a distinct row with `tag_le` holding the bucket boundary and `my_histogram_bucket` as the single value column. --- @@ -335,13 +375,15 @@ func isPromSourcedEnvelope(envelope metrics.MeasurementEnvelope) bool { - **Test Levels**: Unit, Integration. - **Unit Tests**: - - `ScrapeMeasurements` with a table-driven test using a mock HTTP server (`httptest.NewServer`) serving pre-recorded Prometheus exposition text. Cover: normal gauges, counters, histograms, summaries, missing families (selector filtering), non-finite values, missing timestamp (defaults to `time.Now()`). - - `isPromSourcedEnvelope` with empty data and data containing/lacking `tag_metric_family`. - - `SourceConn.Connect()` with mock TLS server for Basic Auth and `tls_skip_verify` cases. - - `PrometheusWriter.Write()` with Prom-sourced envelopes: verify emitted metric names and absence of namespace prefix. + - `ScrapeAll` with a table-driven test using a mock HTTP server (`httptest.NewServer`) serving pre-recorded Prometheus exposition text. Cover: normal gauges, counters, histograms (verify `le` → `tag_le`), non-finite values, missing timestamp (defaults to `time.Now()`), all families returned in the map. + - Scrape goroutine emit-gating logic: mock `ScrapeAll` to return two families with intervals `{A: 30s, B: 60s}`; advance a fake clock and assert that after 30 s only A is emitted, after 60 s both are emitted, and only one `ScrapeAll` call is made per 30 s tick. + - Filtered mode: assert families not in `MetricIntervals` are discarded after `ScrapeAll`. + - `SourceConn.Connect()` with mock TLS server: Basic Auth redaction in logs, `tlsskipverify` warning. + - `PrometheusWriter.Write()` with Prom-sourced envelopes: verify emitted metric names match family names and the pgwatch namespace prefix is absent. - **Integration Tests**: - - Start a minimal Prometheus text-format HTTP server in the test, register a `prometheus` source, run the reaper loop for N seconds, and assert measurements appear in the JSON sink file. + - Start a minimal Prometheus text-format HTTP server in the test, register a `prometheus` source with two families at different intervals, run the reaper loop, and assert the correct per-family emit cadence in the JSON sink file with a single HTTP call per tick. - Test Prom→Prom proxy: assert that the Prometheus scrape endpoint of pgwatch re-exposes the original metric family names. + - Test scrape-all mode: register a `prometheus` source with no metrics configured, verify all families appear in the sink and a warning was logged. - **Frameworks**: Standard `testing` package; `httptest`; `testify/assert` (already used in the codebase). - **Coverage**: New packages/files introduced by this feature MUST achieve ≥ 80 % statement coverage. - **CI**: Existing `go test ./...` pipeline covers new tests automatically. @@ -350,42 +392,76 @@ func isPromSourcedEnvelope(envelope metrics.MeasurementEnvelope) bool { ## 7. Rationale & Context +### Why one goroutine per source, not one per family? + +A Prometheus `/metrics` endpoint is a single HTTP resource that always returns all metric +families in one response body. There is no protocol mechanism to request individual families. +Launching one goroutine per configured family would therefore issue `N` full HTTP GET requests +per collection cycle where each response is identical — O(N) bandwidth and O(N) exporter load +for the same data. With a busy exporter (e.g., `node_exporter` with 200+ families) and small +intervals this is a significant waste. The single-goroutine model issues exactly one HTTP GET +per `min(intervals)` tick and fans out the result in memory, which is the correct abstraction +for a scrape-based data source. + +### Why `min(intervals)` as the scrape cadence? + +Scraping less frequently than the shortest configured emit interval would cause data to be +stale by more than the operator intended. Scraping at `min(intervals)` guarantees that every +family can be forwarded to the sink as soon as its emit interval elapses. Families with longer +intervals are simply filtered at sink-write time using `lastEmitted` state; the extra scrape +response data they produce is discarded in memory at negligible cost. + +### Why use `custom_metrics` keys as family names instead of a dedicated selector field? + +Prometheus exporters expose a fixed set of metric families determined by the exporter binary — +pgwatch cannot define or alter them. Introducing a `PromSelectors` field in `metrics.Metric` +would require a parallel metric definition file for Prometheus sources, adding schema complexity +with no practical benefit. The `custom_metrics` / `preset_metrics` mechanism already provides +exactly the right abstraction: a named set of things to collect at a given interval. For +`prometheus` sources, "things to collect" are family names rather than SQL metric names; the +data model is identical, and no new struct fields are needed. + +### Why one row per sample instead of joining families into wide rows? + +An earlier design grouped multiple Prometheus families sharing a label set into a single wide +row (one row per unique label combination, multiple value columns). This was abandoned because: + +1. **Unnecessary coupling**: Prometheus families are independently named, versioned, and may + appear or disappear between scrapes. Joining by label set forces choosing a primary family + and handling partial overlap with `0.0` fill — complexity with no added value. +2. **Natural sink granularity**: one family → one table in PostgreSQL/TimescaleDB is the + correct granularity. It matches how `postgres_exporter` itself models metrics and how + existing Grafana dashboards query them. +3. **Prom→Prom fidelity**: in the proxy path the original family name must be the emitted + metric name. With one-row-per-sample, the value column name IS the family name, making the + proxy path trivial. Wide rows would require reconstructing the family name from context. + +### Why allow scrape-all mode instead of requiring explicit metric lists? + +Exploration is a common workflow: operators want to quickly connect pgwatch to an unfamiliar +exporter and see what it exposes before committing to a fixed list. Requiring explicit +configuration for every family would make exploration impossible. The mandatory warning log +ensures operators understand the schema implications before deploying to production. + ### Why reuse `Source` struct instead of a new struct? All existing configuration readers (YAML, PostgreSQL config DB, gRPC API) operate on `Source`. -Adding a new struct would require changes in every reader and writer. The `prometheus` kind follows -the same pattern as `pgbouncer` and `pgpool`, which also reuse `Source` with non-standard connection strings. - -### Why selector lists instead of regex or auto-discovery? - -Auto-discovery (all families → individual pgwatch metrics) would create an unbounded, unpredictable -set of tables in the sink on first scrape. Selector lists make the schema explicit, predictable, and -consistent with the philosophy that a pgwatch metric is a deliberately designed measurement set. -Regex can be added later without breaking backward compatibility. - -### Why metric family names become value columns instead of producing one row per sample? - -A pgwatch metric is modelled after a SQL query result: multiple named value columns in a single row, -distinguished by their tag columns. For example, a SQL metric for connection state returns -`pg_stat_activity_count` and `pg_stat_activity_max_tx_duration` as two columns of the same row for -each `(datname, state)` pair. The Prometheus counterpart should produce the same shape so that sink -schemas, Grafana dashboards, and downstream tooling are identical regardless of whether the source -is a PostgreSQL connection or a Prometheus exporter. One-row-per-sample would produce a tall/narrow -table incompatible with this convention. +The `prometheus` kind follows the same pattern as `pgbouncer` and `pgpool`, which also reuse +`Source` with non-standard connection strings. ### Why keep the same `MeasurementEnvelope` for the proxy path? -Introducing a separate code path (e.g., a direct HTTP relay bypassing the reaper) would split the -lifecycle management, filtering, and custom-tag injection logic into two places. The chosen approach -(parse → `MeasurementEnvelope` → lighter Prom sink write) keeps a single pipeline. The performance -difference is negligible because the dominant cost is the HTTP scrape itself, not the in-process -row transformation. +Introducing a separate code path (e.g., a direct HTTP relay bypassing the reaper) would split +lifecycle management, filtering, and custom-tag injection into two places. The chosen approach +(parse → `MeasurementEnvelope` → lighter Prom sink write) keeps a single pipeline. The +performance difference is negligible because the dominant cost is the HTTP scrape itself, not +the in-process row transformation. ### Why not prepend the pgwatch namespace in Prom→Prom mode? -When pgwatch proxies an existing exporter, users expect to query metrics under their original names -(e.g., `pg_stat_activity_count`, not `pgwatch_pg_stat_activity_count`). Prepending a namespace would -break existing Grafana dashboards and alerting rules that target those metrics. +When pgwatch proxies an existing exporter, users expect to query metrics under their original +names (e.g., `pg_stat_activity_count`, not `pgwatch_pg_stat_activity_count`). Prepending a +namespace would break existing Grafana dashboards and alerting rules. --- From 16ceca834defa98d503ed2659bf5d07ab4497db2 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 8 May 2026 18:56:31 +0200 Subject: [PATCH 03/27] add spec to refactor SourceConn as interface --- spec/refactor-sourceconn-interface.md | 408 ++++++++++++++++++++++++++ 1 file changed, 408 insertions(+) create mode 100644 spec/refactor-sourceconn-interface.md diff --git a/spec/refactor-sourceconn-interface.md b/spec/refactor-sourceconn-interface.md new file mode 100644 index 0000000000..fc2f0b44bb --- /dev/null +++ b/spec/refactor-sourceconn-interface.md @@ -0,0 +1,408 @@ +--- +title: Refactor SourceConn from Concrete Struct to Interface Hierarchy +version: 1.0 +date_created: 2026-05-08 +tags: [architecture, refactoring, sources, interfaces] +--- + +# Introduction + +This specification defines the refactoring of `sources.SourceConn` from a single concrete struct +with optional nil fields into a `SourceConn` interface implemented by two concrete types: +`DbConn` (postgres, pgbouncer, pgpool, patroni) and `PromConn` (prometheus). The refactoring +eliminates nil-field hazards, removes the runtime `IsPostgresSource()` discriminator need at +call sites, and provides clean extension points for future source kinds (e.g., `RestConn`). + +This refactoring is a **prerequisite or parallel track** to implementing the Prometheus exporter +source feature described in `architecture-prometheus-exporter-source.md`. The prometheus spec +assumes this interface hierarchy is in place. + +--- + +## 1. Purpose & Scope + +**Purpose**: Replace the single `SourceConn` struct — which mixes DB-specific fields (`Conn`, +`ConnConfig`, `RuntimeInfo`) with an HTTP-specific field (`HTTPClient`) added for prometheus +support — with a clean interface and two focused implementations. + +**In scope**: +- New `SourceConn` interface in `internal/sources/`. +- `DbConn` concrete type replacing the existing `SourceConn` struct for all DB-backed sources. +- `PromConn` concrete type for prometheus sources. +- Update of `SourceConns` collection type. +- Update of all call sites in `internal/reaper/`, `internal/sources/`, and any other package + that holds `*sources.SourceConn`. +- Constructor functions `NewDbConn` and `NewPromConn`. + +**Out of scope**: +- Changes to `Source` (the configuration struct) — it remains unchanged. +- Changes to sink implementations. +- The `ScrapeAll` function body and prometheus goroutine logic (specified separately). +- Any new source kind beyond `DbConn` and `PromConn`. + +**Audience**: pgwatch maintainers. Also intended for direct consumption by AI coding assistants. + +--- + +## 2. Motivation — Current Problems + +| Problem | Location | +|---|---| +| `SourceConn.Conn` is `nil` for prometheus sources | `internal/sources/conn.go` | +| `SourceConn.ConnConfig` is `nil` for prometheus sources | `internal/sources/conn.go` | +| `SourceConn.RuntimeInfo` is meaningless for prometheus sources | `internal/sources/conn.go` | +| `HTTPClient` (added for prometheus) is `nil` for all DB sources | `internal/sources/conn.go` | +| `IsPostgresSource()` is a runtime type-discriminator on a concrete struct | `internal/sources/conn.go` | +| `FetchRuntimeInfo` is a no-op for prometheus, yet must be implemented | `internal/sources/conn.go` | +| `ParseConfig`, `GetClusterIdentifier`, `SetDatabaseName` panic if called on prometheus source | `internal/sources/conn.go` | +| `FunctionExists`, `TryCreateMissingExtensions`, `TryCreateMetricsHelpers` are unreachable for prometheus | `internal/sources/conn.go` | + +--- + +## 3. Requirements + +### Interface Definition + +- **REQ-001**: A new `SourceConn` interface MUST be defined in `internal/sources/` with the + following method set — the minimum needed by the reaper's shared dispatch loop: + + ```go + type SourceConn interface { + // Connect establishes or validates the underlying transport (DB pool or HTTP client). + Connect(ctx context.Context, opts cmdopts.CmdOpts) error + + // Ping checks liveness of the underlying transport. + Ping(ctx context.Context) error + + // IsPostgresSource returns true for kinds that speak the PostgreSQL wire protocol. + IsPostgresSource() bool + + // GetSource returns the configuration struct for this connection. + GetSource() Source + + // GetMetricInterval returns the effective collection interval for the named metric, + // taking standby configuration into account. + GetMetricInterval(name string) time.Duration + } + ``` + +- **REQ-002**: The existing `SourceConn` struct MUST be renamed to `DbConn`. All its existing + fields and methods that are DB-specific MUST be preserved on `DbConn` unchanged. + +- **REQ-003**: A new `PromConn` struct MUST be defined for prometheus sources. It MUST contain + only fields relevant to HTTP scraping: + + ```go + type PromConn struct { + Source + HTTPClient *http.Client + sync.RWMutex + } + ``` + +- **REQ-004**: Both `DbConn` and `PromConn` MUST implement the `SourceConn` interface. The Go + compiler MUST be able to verify this statically (e.g., via `var _ SourceConn = (*DbConn)(nil)` + and `var _ SourceConn = (*PromConn)(nil)` compile-time assertions in the package). + +- **REQ-005**: `SourceConns` MUST be redefined as `[]SourceConn` (interface slice), replacing + the current `[]*SourceConn` (concrete pointer slice). `GetMonitoredDatabase` MUST be updated + accordingly. + +### `DbConn` + +- **REQ-006**: `DbConn` MUST retain all existing fields: + - `Source` (embedded) + - `Conn db.PgxPoolIface` + - `ConnConfig *pgxpool.Config` + - `RuntimeInfo` (embedded) + - `sync.RWMutex` (embedded) + +- **REQ-007**: All methods currently on `SourceConn` that access `Conn`, `ConnConfig`, or + `RuntimeInfo` MUST remain on `DbConn` with identical signatures: + `ParseConfig`, `GetClusterIdentifier`, `GetDatabaseName`, `SetDatabaseName`, + `IsClientOnSameHost`, `FetchRuntimeInfo`, `FetchVersion`, `DiscoverPlatform`, + `FetchApproxSize`, `FunctionExists`, `TryCreateMissingExtensions`, `TryCreateMetricsHelpers`. + +- **REQ-008**: `DbConn.IsPostgresSource()` MUST return `true` unless `Kind` is `SourcePgBouncer` + or `SourcePgPool` (identical to the current implementation). + +- **REQ-009**: `DbConn.GetSource()` MUST return a copy of the embedded `Source` struct. + +- **REQ-010**: `NewDbConn(s Source) *DbConn` MUST replace the current `NewSourceConn` constructor, + initialising `RuntimeInfo.Extensions` and `RuntimeInfo.ChangeState` as before. + +### `PromConn` + +- **REQ-011**: `PromConn.Connect(ctx, opts)` MUST: + 1. Parse TLS query parameters (`tlsrootcert`, `tlsskipverify`) from `Source.ConnStr`. + 2. Construct an `*http.Client` with the derived `tls.Config` and store it in `HTTPClient`. + 3. Issue a HEAD request to the scrape URL (stripped of TLS query parameters) to validate + reachability. Return an error if the response status is not 2xx or the request fails. + 4. Log a warning if `tlsskipverify=true` is present (SEC-002 from the prometheus spec). + +- **REQ-012**: `PromConn.Ping(ctx)` MUST issue a HEAD request to the scrape URL and return an + error if the status is not 2xx. + +- **REQ-013**: `PromConn.IsPostgresSource()` MUST return `false`. + +- **REQ-014**: `PromConn.GetSource()` MUST return a copy of the embedded `Source` struct. + +- **REQ-015**: `PromConn.GetMetricInterval(name)` MUST return + `time.Duration(Source.Metrics[name]) * time.Second`. Standby metrics are not applicable to + prometheus sources; `Source.MetricsStandby` MUST be ignored. + +- **REQ-016**: `NewPromConn(s Source) *PromConn` MUST be the constructor. `HTTPClient` MUST be + `nil` until `Connect` is called. + +### Constructor Dispatch + +- **REQ-017**: A factory function `NewSourceConn(s Source) SourceConn` MUST be provided that + returns `NewPromConn(s)` when `s.Kind == SourcePrometheus` and `NewDbConn(s)` otherwise. This + preserves a single call site for callers that create connections from a `Source` config + without knowing the kind in advance. + +### Call-Site Updates + +- **REQ-018**: Every function in `internal/reaper/` that currently accepts `*sources.SourceConn` + MUST be updated to the most specific type it actually needs: + - Functions that only issue SQL queries (`QueryMeasurements`, `DetectSprocChanges`, + `DetectTableChanges`, `DetectIndexChanges`, `DetectPrivilegeChanges`, + `DetectConfigurationChanges`, `GetInstanceUpMeasurement`, `GetObjectChangesMeasurement`, + `AddSysinfoToMeasurements`, `CreateSourceHelpers`, `FetchStatsDirectlyFromOS`, + `NewLogParser`, `checkHasRemotePrivileges`) MUST accept `*sources.DbConn`. + - The new `ScrapeAll` function (prometheus spec) MUST accept `*sources.PromConn`. + - Functions that handle both kinds at the dispatch level (`reapMetricMeasurements`, + `FetchMetric`) MUST accept `sources.SourceConn` (the interface). + +- **REQ-019**: `SourceConns.GetMonitoredDatabase` MUST continue to accept a name string and + return `SourceConn` (interface), searching by `sc.GetSource().Name`. + +- **REQ-020**: Any type assertion or type switch on `sources.SourceConn` in the reaper (e.g., + to obtain `*DbConn` for DB-specific operations) MUST be contained within the dispatch + functions identified in REQ-018, not scattered across helper functions. + +--- + +## 4. Interfaces & Data Contracts + +### 4.1 `SourceConn` interface + +```go +// internal/sources/conn.go + +// SourceConn is the runtime handle for a monitored source. Implementations +// are DbConn (all DB-backed kinds) and PromConn (prometheus). +type SourceConn interface { + Connect(ctx context.Context, opts cmdopts.CmdOpts) error + Ping(ctx context.Context) error + IsPostgresSource() bool + GetSource() Source + GetMetricInterval(name string) time.Duration +} +``` + +### 4.2 `DbConn` struct + +```go +// internal/sources/conn.go + +// DbConn is the runtime handle for sources that use the PostgreSQL wire +// protocol: postgres, postgres-continuous-discovery, pgbouncer, pgpool, patroni. +type DbConn struct { + Source + Conn db.PgxPoolIface + ConnConfig *pgxpool.Config + RuntimeInfo + sync.RWMutex +} + +func NewDbConn(s Source) *DbConn { + return &DbConn{ + Source: s, + RuntimeInfo: RuntimeInfo{ + Extensions: make(map[string]int), + ChangeState: make(map[string]map[string]string), + }, + } +} + +// compile-time assertion +var _ SourceConn = (*DbConn)(nil) +``` + +`DbConn` retains all methods currently on `SourceConn` (see §2 for the full list). Their +signatures are identical; only the receiver type changes from `*SourceConn` to `*DbConn`. + +### 4.3 `PromConn` struct + +```go +// internal/sources/conn.go + +// PromConn is the runtime handle for prometheus sources. HTTPClient is nil +// until Connect is called. +type PromConn struct { + Source + HTTPClient *http.Client + sync.RWMutex +} + +func NewPromConn(s Source) *PromConn { + return &PromConn{Source: s} +} + +// compile-time assertion +var _ SourceConn = (*PromConn)(nil) +``` + +### 4.4 `NewSourceConn` factory + +```go +// internal/sources/conn.go + +// NewSourceConn returns the correct SourceConn implementation for the given Source. +func NewSourceConn(s Source) SourceConn { + if s.Kind == SourcePrometheus { + return NewPromConn(s) + } + return NewDbConn(s) +} +``` + +### 4.5 `SourceConns` collection + +```go +// internal/sources/conn.go + +type SourceConns []SourceConn + +func (mds SourceConns) GetMonitoredDatabase(name string) SourceConn { + for _, sc := range mds { + if sc.GetSource().Name == name { + return sc + } + } + return nil +} +``` + +### 4.6 Reaper call-site summary + +| Function | Old parameter type | New parameter type | +|---|---|---| +| `QueryMeasurements` | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectSprocChanges` | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectTableChanges` | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectIndexChanges` | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectPrivilegeChanges` | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectConfigurationChanges` | `*sources.SourceConn` | `*sources.DbConn` | +| `GetInstanceUpMeasurement` | `*sources.SourceConn` | `*sources.DbConn` | +| `GetObjectChangesMeasurement` | `*sources.SourceConn` | `*sources.DbConn` | +| `AddSysinfoToMeasurements` | `*sources.SourceConn` | `*sources.DbConn` | +| `CreateSourceHelpers` | `*sources.SourceConn` | `*sources.DbConn` | +| `FetchStatsDirectlyFromOS` | `*sources.SourceConn` | `*sources.DbConn` | +| `NewLogParser` | `*sources.SourceConn` | `*sources.DbConn` | +| `checkHasRemotePrivileges` | `*sources.SourceConn` | `*sources.DbConn` | +| `ScrapeAll` (new) | — | `*sources.PromConn` | +| `reapMetricMeasurements` | `*sources.SourceConn` | `sources.SourceConn` | +| `FetchMetric` | `*sources.SourceConn` | `sources.SourceConn` | + +--- + +## 5. Acceptance Criteria + +- **AC-001**: `var _ sources.SourceConn = (*sources.DbConn)(nil)` compiles without error. +- **AC-002**: `var _ sources.SourceConn = (*sources.PromConn)(nil)` compiles without error. +- **AC-003**: `sources.NewSourceConn(Source{Kind: SourcePrometheus})` returns a `*PromConn`. +- **AC-004**: `sources.NewSourceConn(Source{Kind: SourcePostgres})` returns a `*DbConn`. +- **AC-005**: `(*PromConn).Conn` does not exist as a field; attempting to access it is a + compile-time error. +- **AC-006**: `(*DbConn).HTTPClient` does not exist as a field; attempting to access it is a + compile-time error. +- **AC-007**: `PromConn.IsPostgresSource()` returns `false`. +- **AC-008**: `DbConn.IsPostgresSource()` returns `false` only for `SourcePgBouncer` and + `SourcePgPool`, and `true` for all other kinds. +- **AC-009**: All existing unit tests in `internal/sources/` and `internal/reaper/` pass + without modification to their test logic (only type references updated). +- **AC-010**: `go vet ./...` and `golangci-lint run` produce no new errors after the refactor. +- **AC-011**: `SourceConns.GetMonitoredDatabase` returns `nil` (not panics) when the name is + not found. + +--- + +## 6. Test Automation Strategy + +- **Approach**: This is a pure refactoring — no new behaviour is introduced. The primary test + strategy is **compilation + existing test suite green**. +- **Compile-time assertions**: Add `var _ SourceConn = (*DbConn)(nil)` and + `var _ SourceConn = (*PromConn)(nil)` in `internal/sources/conn.go` so any interface + drift is caught immediately. +- **Unit tests to add**: + - `TestNewSourceConnDispatch`: verify `NewSourceConn` returns the correct concrete type for + each `Kind` value. + - `TestPromConnIsNotPostgresSource`: verify `PromConn.IsPostgresSource()` is `false`. + - `TestDbConnIsPostgresSource`: table-driven test across all `Kind` values. + - `TestSourceConnsGetMonitoredDatabase`: verify nil return for missing name, correct return + for existing name, works with mixed `DbConn` / `PromConn` in the slice. +- **Coverage**: The `internal/sources` package already has tests; no coverage regression is + expected. New test functions MUST maintain the existing ≥ 80 % coverage level. + +--- + +## 7. Rationale + +### Why an interface instead of a union struct with nil fields? + +The nil-field pattern provides no compile-time safety: calling `DbConn`-specific methods on a +`PromConn` value (once the prometheus source is active) would panic at runtime rather than fail +at compile time. An interface moves the error to compile time: a function that accepts +`*DbConn` cannot be accidentally called with a `*PromConn`. This is the standard Go idiom for +polymorphic types with disjoint behaviour. + +### Why two concrete types instead of more granular splitting (e.g., one per Kind)? + +`pgbouncer`, `pgpool`, `patroni`, and `postgres` all share the same connection mechanism +(`pgx` pool), the same `RuntimeInfo`, and the same `FetchRuntimeInfo` logic with minor +branching on `Kind`. Splitting them further would increase code duplication with no type-safety +gain. The meaningful semantic boundary is "speaks PostgreSQL wire protocol" vs "speaks HTTP + +Prometheus text format", which maps cleanly to `DbConn` vs `PromConn`. A future `RestConn` +would add a third implementation if needed. + +### Why keep `IsPostgresSource()` in the interface? + +Several places in the reaper need to know whether to call `FetchRuntimeInfo` or skip it, and +whether to use standby metrics. Rather than scattering type-switch boilerplate, a single +interface method keeps that logic readable. The method is cheap and side-effect-free, so the +interface overhead is negligible. + +### Why `GetSource() Source` instead of embedding `Source`? + +Embedding `Source` in the interface is not possible in Go; interfaces cannot embed structs. +`GetSource()` is the idiomatic alternative. It returns a value copy, which is safe: callers +that need to mutate the source configuration already hold a concrete type pointer. + +### Why rename the constructor to `NewDbConn` instead of keeping `NewSourceConn`? + +`NewSourceConn` is repurposed as the factory function (REQ-017) that returns the correct +implementation based on `Kind`. Reusing the name for the `DbConn` constructor would conflict. +`NewDbConn` is explicit and self-documenting. + +### Blast radius assessment + +All affected call sites are within two packages: `internal/sources` and `internal/reaper`. No +external packages (sinks, webserver, cmd) hold `*SourceConn` directly — they receive +`SourceConns` or interact via higher-level reaper APIs. The change is therefore contained and +can be reviewed in a single PR. + +--- + +## 8. Migration Guide + +1. Rename `SourceConn` struct → `DbConn` in `internal/sources/conn.go`. +2. Change all `*SourceConn` receiver types on methods → `*DbConn`. +3. Rename `NewSourceConn` → `NewDbConn`; add the new `NewSourceConn` factory. +4. Add `PromConn` struct and its interface methods. +5. Redefine `SourceConns` as `[]SourceConn`; update `GetMonitoredDatabase`. +6. Add compile-time assertions. +7. Update `internal/reaper/` call sites per the table in §4.6. +8. Run `go build ./...` and fix any remaining type errors. +9. Run `go test ./...` — no test logic changes should be required, only type references. From 1b8bae265a6036bcb6ed7a66d30114c1fb3cf145 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 8 May 2026 21:07:03 +0200 Subject: [PATCH 04/27] rebase specs on batch metric fetching --- ...architecture-prometheus-exporter-source.md | 102 ++++++++++--- spec/refactor-sourceconn-interface.md | 141 ++++++++++++++---- 2 files changed, 190 insertions(+), 53 deletions(-) diff --git a/spec/architecture-prometheus-exporter-source.md b/spec/architecture-prometheus-exporter-source.md index fa47cb2189..04cf7e11ce 100644 --- a/spec/architecture-prometheus-exporter-source.md +++ b/spec/architecture-prometheus-exporter-source.md @@ -1,6 +1,6 @@ --- title: Prometheus Exporter as a pgwatch Source -version: 1.2 +version: 1.3 date_created: 2026-04-26 date_updated: 2026-05-08 tags: [architecture, design, prometheus, sources, metrics, sinks] @@ -29,7 +29,7 @@ proxy when the sink is a Prometheus endpoint. **In scope**: - New source `Kind` value `prometheus`. - Use of `custom_metrics` / `preset_metrics` to specify Prometheus family names and their emit intervals. -- Single-goroutine-per-source scrape lifecycle: one HTTP GET per tick, per-family emit cadence gated by `lastEmitted` state. +- `PromSourceReaper`: a dedicated runner type implementing the same `SourceRunner` interface as `SourceReaper` (DB sources), with a GCD-based tick loop, one HTTP GET per tick, and per-family emit cadence gated by `lastEmitted` state. - Measurement row structure for Prometheus samples. - Prometheus-to-Prometheus (Prom→Prom) proxy optimisation in `PrometheusWriter`. - Authentication and TLS configuration. @@ -114,9 +114,9 @@ consumed directly by AI coding assistants. - **REQ-013**: `SourceConn.Ping()` for a `prometheus` source MUST issue a HEAD request to the scrape URL and return an error if the status code is not 2xx. - **REQ-014**: `SourceConn.IsPostgresSource()` MUST return `false` for `Kind == "prometheus"`. - **REQ-015**: `SourceConn.FetchRuntimeInfo()` for `prometheus` sources MUST be a no-op that sets `RuntimeInfo.VersionStr = "prometheus"` and `RuntimeInfo.Version = 0` without error. -- **REQ-016**: The reaper MUST launch **one goroutine per `prometheus` source** (not one per family). The goroutine MUST NOT be shared with postgres-style per-metric goroutines. -- **REQ-017**: The goroutine MUST compute the **scrape interval** as `min` of all emit interval values in the resolved `MetricIntervals` map. In scrape-all mode the scrape interval MUST default to 60 seconds. -- **REQ-018**: On every scrape tick the goroutine MUST issue a single HTTP GET to the exporter (via `ScrapeAll`) and receive measurements for all available families in one response. +- **REQ-016**: The reaper MUST instantiate a `PromSourceReaper` for each `prometheus` source (not one per family) and start it via `go psr.Run(sourceCtx)`, mirroring the `NewSourceReaper` / `go sr.Run(sourceCtx)` pattern used for DB sources. `PromSourceReaper` MUST implement a `Run(ctx context.Context)` method satisfying the `SourceRunner` interface (see `refactor-sourceconn-interface.md` §4.6). +- **REQ-017**: `PromSourceReaper.Run` MUST compute the **scrape interval** using the same `GCDSlice` helper as `SourceReaper.calcTickInterval`: `min` of all emit interval values in the resolved `MetricIntervals` map, floored at `minTickInterval`. In scrape-all mode the scrape interval MUST default to 60 seconds. +- **REQ-018**: On every scrape tick `PromSourceReaper.Run` MUST issue a single HTTP GET to the exporter (via `ScrapeAll`) and receive measurements for all available families in one response. - **REQ-019**: The goroutine MUST maintain a `lastEmitted map[string]time.Time` keyed by family name, initialised empty. After a successful scrape, for each family present in the response: - In **filtered mode**: skip families not present in the resolved `MetricIntervals` map. - Emit a `MeasurementEnvelope` to the sink for family `f` only when `time.Since(lastEmitted[f]) >= emitInterval[f]`. On first scrape (zero `lastEmitted` value) the family MUST always be emitted. @@ -133,7 +133,7 @@ consumed directly by AI coding assistants. // ScrapeAll fetches the exporter's /metrics endpoint and returns one // Measurements slice per discovered metric family, keyed by family name. - func ScrapeAll(ctx context.Context, sc *sources.SourceConn) (map[string]metrics.Measurements, error) + func ScrapeAll(ctx context.Context, sc *sources.PromConn) (map[string]metrics.Measurements, error) ``` - **REQ-023**: The parser MUST use `github.com/prometheus/common/expfmt` (already an indirect dependency via the Prometheus client library). No new major dependencies are permitted. - **REQ-024**: Each sample from a metric family becomes **one measurement row**: @@ -269,37 +269,84 @@ type SourceConn struct { // Measurements slice per discovered metric family, keyed by family name. // It is the only HTTP call made per scrape tick; family filtering and // per-family emit gating are performed by the caller. -func ScrapeAll(ctx context.Context, sc *sources.SourceConn) (map[string]metrics.Measurements, error) +func ScrapeAll(ctx context.Context, sc *sources.PromConn) (map[string]metrics.Measurements, error) ``` -### 4.6 Scrape goroutine pseudo-code +### 4.6 `PromSourceReaper` struct and `Run` pseudo-code ```go -// One goroutine per prometheus source, started by the reaper. -func runPrometheusSource(ctx context.Context, sc *sources.SourceConn, intervals metrics.MetricIntervals) { - scrapeInterval := minInterval(intervals) // 60 s when intervals is empty (scrape-all) - lastEmitted := map[string]time.Time{} +// internal/reaper/prom_source_reaper.go + +// PromSourceReaper manages metric collection for a single prometheus source. +// It mirrors the structure of SourceReaper (DB sources) but issues HTTP scrapes +// instead of SQL queries, using per-family emit gating via lastEmitted. +type PromSourceReaper struct { + reaper *Reaper + md *sources.PromConn + lastEmitted map[string]time.Time +} + +func NewPromSourceReaper(r *Reaper, md *sources.PromConn) *PromSourceReaper { + return &PromSourceReaper{ + reaper: r, + md: md, + lastEmitted: make(map[string]time.Time), + } +} + +// calcScrapeInterval computes GCD of all emit intervals using the same +// GCDSlice helper as SourceReaper.calcTickInterval. +func (psr *PromSourceReaper) calcScrapeInterval() time.Duration { + intervals := make([]int, 0, len(psr.md.Metrics)) + for _, v := range psr.md.Metrics { + intervals = append(intervals, max(v, minTickInterval)) + } + return time.Duration(max(GCDSlice(intervals), minTickInterval)) * time.Second +} + +// Run is the main loop for a single prometheus source. Satisfies SourceRunner. +func (psr *PromSourceReaper) Run(ctx context.Context) { + l := log.GetLogger(ctx).WithField("source", psr.md.Name) + ctx = log.WithLogger(ctx, l) + + intervals := psr.md.Metrics // family name → emit interval in seconds + if len(intervals) == 0 { + l.Warning("no metrics configured for prometheus source, activating scrape-all mode at 60s") + } + + scrapeInterval := psr.calcScrapeInterval() ticker := time.NewTicker(scrapeInterval) defer ticker.Stop() + for { select { case <-ctx.Done(): return case now := <-ticker.C: - allFamilies, err := ScrapeAll(ctx, sc) + allFamilies, err := ScrapeAll(ctx, psr.md) if err != nil { - log.Warn("scrape failed", "source", sc.Name, "err", err) + l.WithError(err).Warning("scrape failed") continue } for family, measurements := range allFamilies { - emitInterval := intervals[family] // zero duration when scrape-all + emitSecs := intervals[family] + emitInterval := time.Duration(emitSecs) * time.Second + if emitInterval == 0 && len(intervals) > 0 { + continue // family not in configured list; discard + } if emitInterval == 0 { emitInterval = scrapeInterval // scrape-all: emit every tick - } else if last, ok := lastEmitted[family]; ok && now.Sub(last) < emitInterval { + } else if last, ok := psr.lastEmitted[family]; ok && now.Sub(last) < emitInterval { continue // not yet due } - sink.Write(buildEnvelope(sc, family, measurements)) - lastEmitted[family] = now + psr.reaper.measurementCh <- metrics.MeasurementEnvelope{ + DBName: psr.md.Name, + MetricName: family, + Data: measurements, + CustomTags: psr.md.CustomTags, + SourceKind: string(sources.SourcePrometheus), + } + psr.lastEmitted[family] = now } } } @@ -376,9 +423,9 @@ func isPromSourcedEnvelope(envelope metrics.MeasurementEnvelope) bool { - **Test Levels**: Unit, Integration. - **Unit Tests**: - `ScrapeAll` with a table-driven test using a mock HTTP server (`httptest.NewServer`) serving pre-recorded Prometheus exposition text. Cover: normal gauges, counters, histograms (verify `le` → `tag_le`), non-finite values, missing timestamp (defaults to `time.Now()`), all families returned in the map. - - Scrape goroutine emit-gating logic: mock `ScrapeAll` to return two families with intervals `{A: 30s, B: 60s}`; advance a fake clock and assert that after 30 s only A is emitted, after 60 s both are emitted, and only one `ScrapeAll` call is made per 30 s tick. + - `PromSourceReaper` emit-gating logic: mock `ScrapeAll` to return two families with intervals `{A: 30s, B: 60s}`; advance a fake clock and assert that after 30 s only A is emitted, after 60 s both are emitted, and only one `ScrapeAll` call is made per 30 s tick. - Filtered mode: assert families not in `MetricIntervals` are discarded after `ScrapeAll`. - - `SourceConn.Connect()` with mock TLS server: Basic Auth redaction in logs, `tlsskipverify` warning. + - `PromConn.Connect()` with mock TLS server: Basic Auth redaction in logs, `tlsskipverify` warning. - `PrometheusWriter.Write()` with Prom-sourced envelopes: verify emitted metric names match family names and the pgwatch namespace prefix is absent. - **Integration Tests**: - Start a minimal Prometheus text-format HTTP server in the test, register a `prometheus` source with two families at different intervals, run the reaper loop, and assert the correct per-family emit cadence in the JSON sink file with a single HTTP call per tick. @@ -411,6 +458,21 @@ family can be forwarded to the sink as soon as its emit interval elapses. Famili intervals are simply filtered at sink-write time using `lastEmitted` state; the extra scrape response data they produce is discarded in memory at negligible cost. +### Why `PromSourceReaper` instead of a standalone goroutine function? + +The existing `SourceReaper` consolidates N per-metric goroutines into a single per-source +goroutine with a GCD-based tick loop and `pgx.Batch` SQL execution. Introducing a parallel +`PromSourceReaper` type follows the same pattern for HTTP sources: one goroutine per source, +GCD-based cadence, per-family emit gating. Keeping the same structural shape means: + +1. **Uniform dispatch**: `Reaper.sourceReapers` holds `SourceRunner` values regardless of + kind. Start/stop lifecycle management in `ShutdownOldWorkers` requires zero kind-specific + branching. +2. **Shared helpers**: `GCDSlice`, `minTickInterval`, and the cancel-func map are reused + without duplication. +3. **Testability**: `PromSourceReaper` can be unit-tested in isolation via a mock `Reaper` + and mock HTTP server, exactly as `SourceReaper` is tested with a mock DB. + ### Why use `custom_metrics` keys as family names instead of a dedicated selector field? Prometheus exporters expose a fixed set of metric families determined by the exporter binary — diff --git a/spec/refactor-sourceconn-interface.md b/spec/refactor-sourceconn-interface.md index fc2f0b44bb..d7cadd89be 100644 --- a/spec/refactor-sourceconn-interface.md +++ b/spec/refactor-sourceconn-interface.md @@ -1,7 +1,8 @@ --- title: Refactor SourceConn from Concrete Struct to Interface Hierarchy -version: 1.0 +version: 1.1 date_created: 2026-05-08 +date_updated: 2026-05-08 tags: [architecture, refactoring, sources, interfaces] --- @@ -163,23 +164,43 @@ support — with a clean interface and two focused implementations. ### Call-Site Updates -- **REQ-018**: Every function in `internal/reaper/` that currently accepts `*sources.SourceConn` - MUST be updated to the most specific type it actually needs: - - Functions that only issue SQL queries (`QueryMeasurements`, `DetectSprocChanges`, - `DetectTableChanges`, `DetectIndexChanges`, `DetectPrivilegeChanges`, - `DetectConfigurationChanges`, `GetInstanceUpMeasurement`, `GetObjectChangesMeasurement`, - `AddSysinfoToMeasurements`, `CreateSourceHelpers`, `FetchStatsDirectlyFromOS`, - `NewLogParser`, `checkHasRemotePrivileges`) MUST accept `*sources.DbConn`. +- **REQ-018**: Every function and struct field in `internal/reaper/` that currently references + `*sources.SourceConn` MUST be updated to the most specific type it actually needs: + - `SourceReaper.md` struct field and `NewSourceReaper` constructor parameter MUST be typed + as `*sources.DbConn`. `SourceReaper` is exclusively a DB-source runner; all its field + accesses (`Conn`, `RuntimeInfo`, `RLock`/`RUnlock`) are valid on `*DbConn` without any + type assertion. + - Functions that only issue SQL queries or access DB-specific runtime info + (`QueryMeasurements`, `DetectSprocChanges`, `DetectTableChanges`, `DetectIndexChanges`, + `DetectPrivilegeChanges`, `DetectConfigurationChanges`, `GetInstanceUpMeasurement`, + `GetObjectChangesMeasurement`, `AddSysinfoToMeasurements`, `CreateSourceHelpers`, + `FetchStatsDirectlyFromOS`, `NewLogParser`, `checkHasRemotePrivileges`) MUST accept + `*sources.DbConn`. - The new `ScrapeAll` function (prometheus spec) MUST accept `*sources.PromConn`. - - Functions that handle both kinds at the dispatch level (`reapMetricMeasurements`, - `FetchMetric`) MUST accept `sources.SourceConn` (the interface). - **REQ-019**: `SourceConns.GetMonitoredDatabase` MUST continue to accept a name string and return `SourceConn` (interface), searching by `sc.GetSource().Name`. -- **REQ-020**: Any type assertion or type switch on `sources.SourceConn` in the reaper (e.g., - to obtain `*DbConn` for DB-specific operations) MUST be contained within the dispatch - functions identified in REQ-018, not scattered across helper functions. +- **REQ-020**: Any type assertion from `sources.SourceConn` (interface) to `*sources.DbConn` + or `*sources.PromConn` MUST only appear in the reaper dispatch block (REQ-023), not + scattered across helper functions or within `SourceReaper` methods. + +### Reaper Dispatch Structure + +- **REQ-021**: A `SourceRunner` interface MUST be defined in `internal/reaper/` with a single + method `Run(ctx context.Context)`. Compile-time assertions + (`var _ SourceRunner = (*SourceReaper)(nil)` and + `var _ SourceRunner = (*PromSourceReaper)(nil)`) MUST be present. + +- **REQ-022**: `Reaper.sourceReapers` MUST be retyped from `map[string]*SourceReaper` to + `map[string]SourceRunner`. + +- **REQ-023**: In `Reaper.Reap()`, the runner creation block MUST dispatch to + `NewSourceReaper(*DbConn)` for DB sources and `NewPromSourceReaper(*PromConn)` for + prometheus sources, selected via a `switch` on `source.Kind`. + +- **REQ-024**: Type assertions from `sources.SourceConn` (interface) to concrete types MUST + only appear in the dispatch block described in REQ-023. --- @@ -285,26 +306,66 @@ func (mds SourceConns) GetMonitoredDatabase(name string) SourceConn { } ``` -### 4.6 Reaper call-site summary - -| Function | Old parameter type | New parameter type | -|---|---|---| -| `QueryMeasurements` | `*sources.SourceConn` | `*sources.DbConn` | -| `DetectSprocChanges` | `*sources.SourceConn` | `*sources.DbConn` | -| `DetectTableChanges` | `*sources.SourceConn` | `*sources.DbConn` | -| `DetectIndexChanges` | `*sources.SourceConn` | `*sources.DbConn` | -| `DetectPrivilegeChanges` | `*sources.SourceConn` | `*sources.DbConn` | -| `DetectConfigurationChanges` | `*sources.SourceConn` | `*sources.DbConn` | -| `GetInstanceUpMeasurement` | `*sources.SourceConn` | `*sources.DbConn` | -| `GetObjectChangesMeasurement` | `*sources.SourceConn` | `*sources.DbConn` | -| `AddSysinfoToMeasurements` | `*sources.SourceConn` | `*sources.DbConn` | -| `CreateSourceHelpers` | `*sources.SourceConn` | `*sources.DbConn` | -| `FetchStatsDirectlyFromOS` | `*sources.SourceConn` | `*sources.DbConn` | -| `NewLogParser` | `*sources.SourceConn` | `*sources.DbConn` | -| `checkHasRemotePrivileges` | `*sources.SourceConn` | `*sources.DbConn` | -| `ScrapeAll` (new) | — | `*sources.PromConn` | -| `reapMetricMeasurements` | `*sources.SourceConn` | `sources.SourceConn` | -| `FetchMetric` | `*sources.SourceConn` | `sources.SourceConn` | +### 4.6 `SourceRunner` interface + +```go +// internal/reaper/source_runner.go + +// SourceRunner is the common interface for per-source collection goroutines. +// It allows Reaper to manage SourceReaper (DB sources) and PromSourceReaper +// (prometheus sources) through a single map without kind-specific branching. +type SourceRunner interface { + Run(ctx context.Context) +} + +// compile-time assertions +var _ SourceRunner = (*SourceReaper)(nil) +var _ SourceRunner = (*PromSourceReaper)(nil) +``` + +`Reaper.sourceReapers` MUST be declared as `map[string]SourceRunner`. The runner creation +block in `Reaper.Reap()` type-asserts the `SourceConn` interface value to the correct concrete +type and passes it to the appropriate constructor: + +```go +if _, exists := r.sourceReapers[source.Name]; !exists { + var runner SourceRunner + switch source.Kind { + case sources.SourcePrometheus: + runner = NewPromSourceReaper(r, monitoredSource.(*sources.PromConn)) + default: + runner = NewSourceReaper(r, monitoredSource.(*sources.DbConn)) + } + sourceCtx, cancel := context.WithCancel(ctx) + r.cancelFuncs[source.Name] = cancel + r.sourceReapers[source.Name] = runner + go runner.Run(sourceCtx) +} +``` + +### 4.7 Reaper call-site summary + +The following functions and struct fields change type. + +| Symbol | Kind | Old type | New type | +|---|---|---|---| +| `SourceReaper.md` | struct field | `*sources.SourceConn` | `*sources.DbConn` | +| `NewSourceReaper` | constructor param | `*sources.SourceConn` | `*sources.DbConn` | +| `Reaper.sourceReapers` | struct field | `map[string]*SourceReaper` | `map[string]SourceRunner` | +| `QueryMeasurements` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectSprocChanges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectTableChanges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectIndexChanges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectPrivilegeChanges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `DetectConfigurationChanges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `GetInstanceUpMeasurement` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `GetObjectChangesMeasurement` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `AddSysinfoToMeasurements` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `CreateSourceHelpers` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `FetchStatsDirectlyFromOS` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `NewLogParser` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `checkHasRemotePrivileges` | function param | `*sources.SourceConn` | `*sources.DbConn` | +| `ScrapeAll` (new) | function param | — | `*sources.PromConn` | --- @@ -326,6 +387,10 @@ func (mds SourceConns) GetMonitoredDatabase(name string) SourceConn { - **AC-010**: `go vet ./...` and `golangci-lint run` produce no new errors after the refactor. - **AC-011**: `SourceConns.GetMonitoredDatabase` returns `nil` (not panics) when the name is not found. +- **AC-012**: `var _ SourceRunner = (*SourceReaper)(nil)` compiles without error. +- **AC-013**: `var _ SourceRunner = (*PromSourceReaper)(nil)` compiles without error. +- **AC-014**: `Reaper.sourceReapers` is declared as `map[string]SourceRunner`; assigning a + `*PromSourceReaper` value to it compiles without error. --- @@ -393,6 +458,16 @@ external packages (sinks, webserver, cmd) hold `*SourceConn` directly — they r `SourceConns` or interact via higher-level reaper APIs. The change is therefore contained and can be reviewed in a single PR. +### Why a `SourceRunner` interface in the reaper instead of a type switch? + +`Reaper.sourceReapers` previously held `map[string]*SourceReaper`. Adding prometheus support +requires storing runners of different concrete types. Two alternatives: (a) `map[string]any` +with runtime type switches at every call site — fragile and error-prone; (b) two separate +maps — duplicates lifecycle management code. The `SourceRunner` interface with a single +`Run(ctx)` method eliminates both problems: the map stays strongly typed, start/stop +lifecycle code in `ShutdownOldWorkers` requires zero kind-specific branching, and future +source kinds (REST, etc.) plug in by implementing only `Run`. + --- ## 8. Migration Guide From f67df2e40d980bf695e8b4025ba3a20d1870e0d3 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 8 May 2026 23:25:28 +0200 Subject: [PATCH 05/27] add `NewFakeExporter()` to `testutil` --- internal/testutil/http.go | 21 ++ spec/tasks/prometheus-exporter-source.md | 286 +++++++++++++++++++++++ spec/tasks/template.md | 245 +++++++++++++++++++ 3 files changed, 552 insertions(+) create mode 100644 internal/testutil/http.go create mode 100644 spec/tasks/prometheus-exporter-source.md create mode 100644 spec/tasks/template.md diff --git a/internal/testutil/http.go b/internal/testutil/http.go new file mode 100644 index 0000000000..d925af2cef --- /dev/null +++ b/internal/testutil/http.go @@ -0,0 +1,21 @@ +package testutil + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +// NewFakeExporter starts an httptest.Server that responds to any request with +// the provided Prometheus exposition-format body and Content-Type +// "text/plain; version=0.0.4". The server is automatically closed when the +// test finishes via t.Cleanup. +func NewFakeExporter(t *testing.T, body string) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; version=0.0.4") + _, _ = w.Write([]byte(body)) + })) + t.Cleanup(srv.Close) + return srv +} diff --git a/spec/tasks/prometheus-exporter-source.md b/spec/tasks/prometheus-exporter-source.md new file mode 100644 index 0000000000..543c508269 --- /dev/null +++ b/spec/tasks/prometheus-exporter-source.md @@ -0,0 +1,286 @@ +--- +description: "Task list for Prometheus Exporter as a pgwatch Source" +--- + +# Tasks: Prometheus Exporter as a pgwatch Source + +**Input**: `spec/architecture-prometheus-exporter-source.md`, `spec/refactor-sourceconn-interface.md` +**Prerequisites**: Both spec files must be read before implementation begins. + +**Tests**: TDD — write failing tests first, then implement. All test tasks are mandatory. + +**Organization**: Tasks are grouped by delivery phase. The interface refactoring (Phase 2) is a +hard prerequisite for all subsequent phases. + +## Format: `[ID] [P?] [Story] Description` + +- **[P]**: Can run in parallel (different files, no dependencies) +- **[Story]**: Which user story or spec section this task belongs to + +--- + +## Phase 1: Setup + +**Purpose**: Verify the build is clean and add the test infrastructure helpers that all phases share. + +- [x] T001 [P] Confirm `go build ./...` passes on current `main` branch as baseline +- [x] T002 [P] Add `httptest` helper `newFakeExporter(t, body string) *httptest.Server` to `internal/testutil/` for reuse across reaper and sources tests + +--- + +## Phase 2: Foundation — `SourceConn` Interface Refactoring + +**Purpose**: Replace the single `SourceConn` concrete struct with a `SourceConn` interface, a +`DbConn` implementation for DB-backed sources, and a `PromConn` implementation for prometheus +sources. No user-facing behaviour changes; all existing tests must still pass. + +**Spec reference**: `spec/refactor-sourceconn-interface.md` + +**⚠️ CRITICAL**: Phases 3–6 depend on `PromConn` and `DbConn` being in place. No phase 3+ work +can begin until this phase is complete and the full test suite is green. + +### Tests for Phase 2 + +> **Write these tests FIRST; ensure they FAIL before implementation.** + +- [ ] T003 [P] Write compile-time interface checks (these will fail until T008–T011 are done): + - `var _ sources.SourceConn = (*sources.DbConn)(nil)` in `internal/sources/conn_test.go` + - `var _ sources.SourceConn = (*sources.PromConn)(nil)` in `internal/sources/conn_test.go` + - `var _ SourceRunner = (*SourceReaper)(nil)` in `internal/reaper/source_reaper_test.go` +- [ ] T004 [P] Table-driven test for `DbConn.IsPostgresSource()` returns `true` for all DB kinds in `internal/sources/conn_test.go` +- [ ] T005 [P] Table-driven test for `PromConn.IsPostgresSource()` returns `false` in `internal/sources/conn_test.go` +- [ ] T006 [P] Test that `PromConn.FetchRuntimeInfo()` sets `VersionStr = "prometheus"` and `Version = 0` in `internal/sources/conn_test.go` + +### Implementation for Phase 2 + +- [ ] T007 Define `SourceConn` interface in `internal/sources/conn.go` with methods: `Connect`, `Ping`, `IsPostgresSource`, `GetSource`, `GetMetricInterval` (REQ-001 from refactor spec) +- [ ] T008 Rename existing `SourceConn` struct → `DbConn` in `internal/sources/conn.go`; preserve all DB-specific fields and methods unchanged (REQ-002) +- [ ] T009 [P] Create `PromConn` struct in `internal/sources/conn.go` with `Source`, `HTTPClient *http.Client`, `sync.RWMutex`; add `NewPromConn` constructor (REQ-003) +- [ ] T010 [P] Add `NewDbConn` constructor aliasing the existing `NewSourceConn`; update `SourceConns` to `[]SourceConn` (interface slice) (REQ-004/REQ-005) +- [ ] T011 Update all call sites in `internal/reaper/`, `internal/sources/`, and other packages that hold `*sources.SourceConn` to use the interface or concrete type as required +- [ ] T012 Define `SourceRunner` interface in `internal/reaper/runner.go`: + ```go + type SourceRunner interface { Run(ctx context.Context) } + var _ SourceRunner = (*SourceReaper)(nil) + ``` +- [ ] T013 Change `Reaper.sourceReapers` field type from `map[string]*SourceReaper` to `map[string]SourceRunner` in `internal/reaper/reaper.go` + +**Checkpoint**: `go test ./...` is green; interface checks in T003 compile and pass. + +--- + +## Phase 3: Source Kind Registration (US1 — Source Definition) + +**Purpose**: Register `SourcePrometheus` as a valid `Kind`, add it to `Kinds`, and ensure YAML +round-trip works. Covers REQ-001, REQ-004, REQ-005, REQ-034. + +### Tests for Phase 3 + +> **Write FIRST; ensure they FAIL before implementation.** + +- [ ] T014 [P] Table-driven test: `Kind("prometheus").IsValid()` returns `true`; existing kinds still valid; `Kind("unknown").IsValid()` returns `false` — in `internal/sources/types_test.go` +- [ ] T015 [P] YAML unmarshal test: a `prometheus` source with `conn_str`, `custom_metrics`, and `custom_tags` round-trips correctly through `sources.Sources.Validate()` — in `internal/sources/yaml_test.go` +- [ ] T016 [P] Test `sources.Validate()` with `kind: prometheus` does NOT error on `IncludePattern`, `ExcludePattern`, `OnlyIfMaster`, `MetricsStandby`, `PresetMetricsStandby` being empty — in `internal/sources/types_test.go` (CON-001) + +### Implementation for Phase 3 + +- [ ] T017 Add `SourcePrometheus Kind = "prometheus"` constant and append it to `Kinds` slice in `internal/sources/types.go` (REQ-001, REQ-005) +- [ ] T018 Update `Sources.Validate()` in `internal/sources/types.go` to accept `kind: prometheus`; no special validation needed for ignored fields (CON-001) + +**Checkpoint**: `go test ./internal/sources/...` is green. + +--- + +## Phase 4: PromConn Connection Lifecycle (US2 — Authentication & TLS) + +**Purpose**: Implement `PromConn.Connect()`, `Ping()`, `FetchRuntimeInfo()` with TLS query-parameter +parsing, Basic Auth from userinfo, and password redaction. Covers REQ-006, REQ-007, REQ-011–REQ-015, +SEC-001, SEC-002. + +### Tests for Phase 4 + +> **Write FIRST; ensure they FAIL before implementation.** + +- [ ] T019 [P] Test `PromConn.Connect()` against `httptest.NewTLSServer` with `tlsskipverify=true` in `internal/sources/conn_test.go`: + - Connection succeeds + - TLS params are stripped from the actual request URL + - Warning log entry is emitted (SEC-002) +- [ ] T020 [P] Test `PromConn.Connect()` with `http://user:secret@host/metrics`: Basic Auth header sent, password not logged (SEC-001); use a capturing handler +- [ ] T021 [P] Test `PromConn.Connect()` returns error when exporter is unreachable (REQ-012) +- [ ] T022 [P] Test `PromConn.Ping()` returns nil for 2xx, error for 4xx/5xx in `internal/sources/conn_test.go` (REQ-013) +- [ ] T023 [P] Test `redactURL(url)` helper strips password from userinfo component, leaves URL otherwise unchanged — in `internal/sources/conn_test.go` (SEC-001) + +### Implementation for Phase 4 + +- [ ] T024 Add `redactURL(rawURL string) string` helper to `internal/sources/conn.go` that clears the password from `url.URL.User` before returning the URL string (SEC-001) +- [ ] T025 Implement `PromConn.Connect(ctx, opts)` in `internal/sources/conn.go`: + - Parse `tlsrootcert` and `tlsskipverify` query params from `ConnStr` + - Log warning when `tlsskipverify=true` (SEC-002) + - Build `*http.Client` with derived `tls.Config` + - Strip TLS params from URL before making reachability HEAD/GET + - Validate reachability; return error if unreachable (REQ-012) +- [ ] T026 Implement `PromConn.Ping(ctx)` — HEAD request; error if status ≥ 300 (REQ-013) +- [ ] T027 Implement `PromConn.FetchRuntimeInfo()` — no-op setting `VersionStr = "prometheus"`, `Version = 0` (REQ-015); note: `PromConn` has no `RuntimeInfo` field, so this may just be a method stub returning these values + +**Checkpoint**: `go test ./internal/sources/...` is green. + +--- + +## Phase 5: `ScrapeAll` Core (US3 — Scrape & Measurement Rows) + +**Purpose**: Implement `ScrapeAll` in `internal/reaper/prometheus.go` and extend +`MeasurementEnvelope` with `SourceKind`. Covers REQ-021–REQ-026, REQ-023, REQ-024, REQ-025. + +### Tests for Phase 5 + +> **Write FIRST; ensure they FAIL before implementation.** + +- [ ] T028 [P] Table-driven test `TestScrapeAll` in `internal/reaper/prometheus_test.go`: + - Serve fixture exposition text via `httptest.NewServer` + - Assert each sample maps to one `Measurement` with `tag_