diff --git a/contrib/sample.sources.yaml b/contrib/sample.sources.yaml index aa1f74d4d6..9479cb7b84 100644 --- a/contrib/sample.sources.yaml +++ b/contrib/sample.sources.yaml @@ -1,3 +1,11 @@ +- name: postgres-exporter-prod + kind: prometheus + conn_str: "http://localhost:9187/metrics" + preset_metrics: postgres-exporter-basic # scrape cadence = min(30, 60) = 30 s + custom_tags: + env: production + is_enabled: true + - name: test1 # An arbitrary unique name for the monitored source conn_str: postgresql://postgres@localhost/postgres kind: postgres # One of the: diff --git a/docker/scripts/add-test-db.sh b/docker/scripts/add-test-db.sh index a827bf2171..492fa6e459 100755 --- a/docker/scripts/add-test-db.sh +++ b/docker/scripts/add-test-db.sh @@ -14,11 +14,14 @@ docker compose exec -T -i postgres psql -d pgwatch_metrics -v ON_ERROR_STOP=1 docker compose exec -T postgres psql -d pgwatch -v ON_ERROR_STOP=1 -c \ "TRUNCATE pgwatch.source CASCADE; INSERT INTO pgwatch.source - (name, dbtype, preset_config, connstr) + (name, dbtype, preset_config, connstr, custom_tags) VALUES - ('demo', 'postgres', 'debug', 'postgresql://pgwatch:pgwatchadmin@postgres/pgwatch'), - ('demo_metrics', 'postgres', 'full', 'postgresql://pgwatch:pgwatchadmin@postgres/pgwatch_metrics'), - ('demo_standby', 'postgres', 'full', 'postgresql://pgwatch:pgwatchadmin@postgres-standby/pgwatch'), - ('demo_patroni', 'patroni', 'basic', 'etcd://etcd1:2379,etcd2:2379,etcd3:2379/service/demo'), - ('demo_pgbouncer', 'pgbouncer', 'pgbouncer', 'postgresql://pgwatch:pgwatchadmin@pgbouncer/pgbouncer'), - ('demo_pgpool', 'pgpool', 'pgpool', 'postgresql://pgwatch:pgwatchadmin@pgpool/pgwatch');" + ('demo', 'postgres', 'debug', 'postgresql://pgwatch:pgwatchadmin@postgres/pgwatch', NULL), + ('demo_metrics', 'postgres', 'full', 'postgresql://pgwatch:pgwatchadmin@postgres/pgwatch_metrics', NULL), + ('demo_standby', 'postgres', 'full', 'postgresql://pgwatch:pgwatchadmin@postgres-standby/pgwatch', NULL), + ('demo_patroni', 'patroni', 'basic', 'etcd://etcd1:2379,etcd2:2379,etcd3:2379/service/demo', NULL), + ('demo_pgbouncer', 'pgbouncer', 'pgbouncer', 'postgresql://pgwatch:pgwatchadmin@pgbouncer/pgbouncer', NULL), + ('demo_pgpool', 'pgpool', 'pgpool', 'postgresql://pgwatch:pgwatchadmin@pgpool/pgwatch', NULL), + ('patroni1-prom', 'prometheus', 'patroni', 'http://patroni1:8008/metrics', '{\"cluster\": \"demo\", \"node\": \"patroni1\"}'), + ('patroni2-prom', 'prometheus', 'patroni', 'http://patroni2:8008/metrics', '{\"cluster\": \"demo\", \"node\": \"patroni2\"}'), + ('patroni3-prom', 'prometheus', 'patroni', 'http://patroni3:8008/metrics', '{\"cluster\": \"demo\", \"node\": \"patroni3\"}');" diff --git a/docs/howto/monitor_prometheus_exporter.md b/docs/howto/monitor_prometheus_exporter.md new file mode 100644 index 0000000000..ffb5b99ef7 --- /dev/null +++ b/docs/howto/monitor_prometheus_exporter.md @@ -0,0 +1,152 @@ +--- +title: Monitoring a Prometheus Exporter +--- + +pgwatch's native strength is collecting metrics from PostgreSQL via SQL queries. For metrics that +cannot be expressed as SQL — such as HA cluster state, node roles, replication lag as seen by the +cluster manager, or OS-level metrics like CPU, memory, and I/O that are not accessible to Postgres +without special extensions — pgwatch can scrape any HTTP endpoint that exposes data in the +[Prometheus text exposition format](https://prometheus.io/docs/instrumenting/exposition_formats/). + +A typical use case is [Patroni](https://patroni.readthedocs.io/), which exposes cluster health +metrics (node role, WAL position, DCS connectivity, …) through its `GET /metrics` endpoint. These +metrics complement the SQL-based metrics already collected from the PostgreSQL instances themselves. + +## Quick Start + +Add a source with `kind: prometheus`, point `conn_str` at Patroni's `/metrics` endpoint, and choose +the built-in `patroni` preset: + +```yaml +- name: patroni-prod-node1 + kind: prometheus + conn_str: "http://patroni-node1:8008/metrics" + preset_metrics: patroni + is_enabled: true +``` + +pgwatch periodically fetches the URL, parses the Prometheus text format, and forwards each metric +family to the configured sink. + +## Connection String Format + +The `conn_str` field is a plain HTTP or HTTPS URL. Optional query parameters control TLS +validation and are stripped before the request is sent to the exporter: + +| Query parameter | Description | +|---|---| +| `tlsskipverify=true` | Disable TLS certificate verification (use with caution). | +| `tlsrootcert=` | Absolute path to a PEM-encoded CA certificate file used to verify the server certificate. | + +Basic Auth credentials are embedded in the URL in the standard `user:password@host` form. + +### Examples + +```yaml +# Plain HTTP – no auth +conn_str: "http://patroni-node1:8008/metrics" + +# HTTPS with a custom CA certificate +conn_str: "https://patroni-node1:8008/metrics?tlsrootcert=/etc/ssl/certs/my-ca.pem" + +# HTTPS with Basic Auth and TLS certificate verification disabled +conn_str: "https://user:secret@patroni-node1:8008/metrics?tlsskipverify=true" +``` + +## Presets + +### `patroni` + +Covers the key metric families emitted by Patroni's `GET /metrics` endpoint (default port **8008**): + +| Metric family | What it measures | Interval | +|---|---|---| +| `patroni_postgres_running` | 1 if Postgres is running | 30 s | +| `patroni_primary` | 1 if this node is the primary | 30 s | +| `patroni_replica` | 1 if this node is a replica | 30 s | +| `patroni_standby_leader` | 1 if this node is standby leader | 30 s | +| `patroni_cluster_unlocked` | 1 if the cluster has no leader lock | 30 s | +| `patroni_xlog_location` | WAL location on primary | 30 s | +| `patroni_xlog_received_location` | received WAL on replica | 30 s | +| `patroni_xlog_replayed_location` | replayed WAL on replica | 30 s | +| `patroni_dcs_last_seen` | seconds since DCS last contacted | 30 s | +| `patroni_pending_restart` | 1 if node needs a restart | 60 s | +| `patroni_is_paused` | 1 if auto-failover is disabled | 60 s | +| `patroni_postgres_timeline` | Postgres timeline | 60 s | + +### `postgres-exporter-basic` + +Covers the most important metric families emitted by +[postgres_exporter](https://github.com/prometheus-community/postgres_exporter): + +| Metric family | Interval | +|---|---| +| `pg_stat_activity_count` | 30 s | +| `pg_stat_bgwriter_checkpoints_timed` | 60 s | +| `pg_stat_replication_pg_wal_lsn_diff` | 30 s | + +## Custom Metrics + +You can specify individual metric families and their intervals with `custom_metrics`: + +```yaml +- name: patroni-prod-node1 + kind: prometheus + conn_str: "http://patroni-node1:8008/metrics" + custom_metrics: + patroni_primary: 30 + patroni_cluster_unlocked: 30 + patroni_dcs_last_seen: 30 + is_enabled: true +``` + +## Custom Tags + +Use `custom_tags` to attach arbitrary key-value pairs to every stored data point. This is +useful when multiple nodes feed the same sink and you need to distinguish them: + +```yaml +- name: patroni-prod-node1 + kind: prometheus + conn_str: "http://patroni-node1:8008/metrics" + preset_metrics: patroni + custom_tags: + cluster: prod + node: node1 + is_enabled: true +``` + +## Full Example + +The following snippet monitors all three nodes of a Patroni HA cluster. Each node's metrics are +tagged with the cluster name and node identifier so they can be queried independently: + +```yaml +- name: patroni-prod-node1 + kind: prometheus + conn_str: "http://patroni-node1:8008/metrics" + preset_metrics: patroni + custom_tags: + cluster: prod + node: node1 + is_enabled: true + +- name: patroni-prod-node2 + kind: prometheus + conn_str: "http://patroni-node2:8008/metrics" + preset_metrics: patroni + custom_tags: + cluster: prod + node: node2 + is_enabled: true + +- name: patroni-prod-node3 + kind: prometheus + conn_str: "http://patroni-node3:8008/metrics" + preset_metrics: patroni + custom_tags: + cluster: prod + node: node3 + is_enabled: true +``` + diff --git a/go.mod b/go.mod index cb280f2bdf..f0f63f405f 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/json-iterator/go v1.1.12 github.com/pashagolub/pgxmock/v4 v4.9.0 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.67.5 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/sethvargo/go-retry v0.3.0 github.com/shirou/gopsutil/v4 v4.26.4 @@ -76,8 +78,6 @@ require ( github.com/opencontainers/image-spec v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect diff --git a/grafana/postgres/v12/patroni-overview.json b/grafana/postgres/v12/patroni-overview.json new file mode 100644 index 0000000000..2b39ffad2a --- /dev/null +++ b/grafana/postgres/v12/patroni-overview.json @@ -0,0 +1,1395 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "Patroni cluster health, roles and replication monitoring. Requires prometheus-type sources configured in pgwatch.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": 50, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 0}, + "id": 20, + "panels": [], + "title": "All Clusters Health", + "type": "row" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "description": "Total number of Patroni clusters currently monitored", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "text", "value": null}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 0, "y": 1}, + "id": 21, + "options": { + "colorMode": "value", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "format": "table", "rawQuery": true, + "rawSql": "SELECT count(DISTINCT tag_data->>'scope')::float AS value\nFROM patroni_primary\nWHERE $__timeFilter(time)", + "refId": "A", "resultFormat": "table" + }], + "title": "Clusters Monitored", + "type": "stat" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "description": "Clusters missing a leader lock (split-brain risk). Should be 0.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "red", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 6, "y": 1}, + "id": 22, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "format": "table", "rawQuery": true, + "rawSql": "SELECT count(*)::float AS value\nFROM (\n SELECT DISTINCT ON (tag_data->>'scope')\n (data->>'patroni_cluster_unlocked')::int AS unlocked\n FROM patroni_cluster_unlocked\n WHERE $__timeFilter(time)\n ORDER BY tag_data->>'scope', time DESC\n) latest\nWHERE unlocked = 1", + "refId": "A", "resultFormat": "table" + }], + "title": "Clusters Without Leader", + "type": "stat" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "description": "Clusters with auto-failover paused. Should be 0 in normal operation.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "yellow", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 12, "y": 1}, + "id": 23, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "format": "table", "rawQuery": true, + "rawSql": "SELECT count(*)::float AS value\nFROM (\n SELECT DISTINCT ON (tag_data->>'scope')\n (data->>'patroni_is_paused')::int AS is_paused\n FROM patroni_is_paused\n WHERE $__timeFilter(time)\n ORDER BY tag_data->>'scope', time DESC\n) latest\nWHERE is_paused = 1", + "refId": "A", "resultFormat": "table" + }], + "title": "Paused Clusters", + "type": "stat" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "description": "Nodes requiring a Postgres restart to apply configuration changes", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "yellow", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 18, "y": 1}, + "id": 24, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "format": "table", "rawQuery": true, + "rawSql": "SELECT count(*)::float AS value\nFROM (\n SELECT DISTINCT ON (dbname)\n (data->>'patroni_pending_restart')::int AS pending\n FROM patroni_pending_restart\n WHERE $__timeFilter(time)\n ORDER BY dbname, time DESC\n) latest\nWHERE pending = 1", + "refId": "A", "resultFormat": "table" + }], + "title": "Nodes Pending Restart", + "type": "stat" + }, + { + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "description": "Per-cluster health summary. Use the Cluster variable above to filter the node detail panels below.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "custom": {"align": "auto", "cellOptions": {"type": "auto"}, "inspect": false}, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}]} + }, + "overrides": [ + { + "matcher": {"id": "byName", "options": "Has Leader"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"NO": {"color": "red", "index": 0}, "YES": {"color": "green", "index": 1}}, "type": "value"}]} + ] + }, + { + "matcher": {"id": "byName", "options": "Paused"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"NO": {"color": "green", "index": 0}, "YES": {"color": "yellow", "index": 1}}, "type": "value"}]} + ] + }, + { + "matcher": {"id": "byName", "options": "Pending Restart"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"NO": {"color": "green", "index": 0}, "YES": {"color": "yellow", "index": 1}}, "type": "value"}]} + ] + } + ] + }, + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 4}, + "id": 25, + "options": { + "cellHeight": "sm", + "footer": {"countRows": false, "reducer": ["sum"], "show": false}, + "showHeader": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "format": "table", "rawQuery": true, + "rawSql": "WITH latest_primary AS (\n SELECT DISTINCT ON (dbname)\n tag_data->>'scope' AS cluster,\n dbname,\n (data->>'patroni_primary')::int AS is_primary\n FROM patroni_primary\n WHERE $__timeFilter(time)\n ORDER BY dbname, time DESC\n),\ncluster_unlocked AS (\n SELECT tag_data->>'scope' AS cluster,\n max((data->>'patroni_cluster_unlocked')::int) AS unlocked\n FROM patroni_cluster_unlocked\n WHERE $__timeFilter(time)\n GROUP BY 1\n),\ncluster_paused AS (\n SELECT tag_data->>'scope' AS cluster,\n max((data->>'patroni_is_paused')::int) AS is_paused\n FROM patroni_is_paused\n WHERE $__timeFilter(time)\n GROUP BY 1\n),\ncluster_pending AS (\n SELECT tag_data->>'scope' AS cluster,\n max((data->>'patroni_pending_restart')::int) AS pending_restart\n FROM patroni_pending_restart\n WHERE $__timeFilter(time)\n GROUP BY 1\n)\nSELECT\n lp.cluster AS \"Cluster\",\n count(*) AS \"Nodes\",\n sum(lp.is_primary)::int AS \"Primaries\",\n CASE WHEN cu.unlocked = 0 THEN 'YES' ELSE 'NO' END AS \"Has Leader\",\n CASE WHEN cp.is_paused = 1 THEN 'YES' ELSE 'NO' END AS \"Paused\",\n CASE WHEN cpe.pending_restart = 1 THEN 'YES' ELSE 'NO' END AS \"Pending Restart\"\nFROM latest_primary lp\nLEFT JOIN cluster_unlocked cu USING (cluster)\nLEFT JOIN cluster_paused cp USING (cluster)\nLEFT JOIN cluster_pending cpe USING (cluster)\nGROUP BY lp.cluster, cu.unlocked, cp.is_paused, cpe.pending_restart\nORDER BY lp.cluster", + "refId": "A", "resultFormat": "table" + }], + "title": "Cluster Health Summary", + "type": "table" + }, + { + "collapsed": false, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 12}, + "id": 26, + "panels": [], + "title": "Node Detail — $cluster", + "type": "row" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Current role of each Patroni node: PRIMARY (1) or REPLICA (0)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "blue", + "index": 0, + "text": "REPLICA" + }, + "1": { + "color": "green", + "index": 1, + "text": "PRIMARY" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "blue", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 13 + }, + "id": 2, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_primary')::float AS value\nFROM patroni_primary\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Node Role", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Whether Postgres process is running on each node (1 = running)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "STOPPED" + }, + "1": { + "color": "green", + "index": 1, + "text": "RUNNING" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 13 + }, + "id": 3, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_postgres_running')::float AS value\nFROM patroni_postgres_running\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Postgres Running", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Whether the cluster has a distributed lock (leader). 0 = has leader (healthy), 1 = no leader (split-brain risk)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "HAS LEADER" + }, + "1": { + "color": "red", + "index": 1, + "text": "NO LEADER" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 16, + "y": 13 + }, + "id": 4, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_cluster_unlocked')::float AS value\nFROM patroni_cluster_unlocked\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Cluster Leader Lock", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Whether automatic failover is paused for the cluster (1 = paused)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "ACTIVE" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "PAUSED" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 19 + }, + "id": 5, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_is_paused')::float AS value\nFROM patroni_is_paused\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Failover State", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Node requires Postgres restart to apply configuration changes (1 = restart needed)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "OK" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "RESTART NEEDED" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 19 + }, + "id": 6, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_pending_restart')::float AS value\nFROM patroni_pending_restart\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Pending Restart", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Whether node is a standby leader (replica that manages the standby cluster, 1 = standby leader)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "text", + "index": 0, + "text": "NO" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "YES" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "text", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 19 + }, + "id": 7, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_standby_leader')::float AS value\nFROM patroni_standby_leader\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "Standby Leader", + "type": "stat" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "PostgreSQL WAL timeline number per node", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "text", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 19 + }, + "id": 8, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_postgres_timeline')::float AS value\nFROM patroni_postgres_timeline\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "WAL Timeline", + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 24 + }, + "id": 9, + "panels": [], + "title": "WAL & Replication", + "type": "row" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "WAL write location in bytes. Non-zero only on the primary node.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 25 + }, + "id": 10, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_xlog_location')::float AS value\nFROM patroni_xlog_location\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "WAL Write Location (Primary)", + "type": "timeseries" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "WAL received vs replayed location on replica nodes in bytes", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 33 + }, + "id": 11, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_xlog_received_location')::float AS value\nFROM patroni_xlog_received_location\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "WAL Received Location (Replicas)", + "type": "timeseries" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "WAL replayed location on replica nodes in bytes", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 33 + }, + "id": 12, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric, (data->>'patroni_xlog_replayed_location')::float AS value\nFROM patroni_xlog_replayed_location\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "WAL Replayed Location (Replicas)", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 41 + }, + "id": 13, + "panels": [], + "title": "DCS Health", + "type": "row" + }, + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "description": "Seconds since each node last successfully contacted the DCS (etcd/Consul/ZooKeeper). High values indicate DCS connectivity issues.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line", + "steps": [ + { + "color": "transparent", + "value": null + }, + { + "color": "red", + "value": 30 + } + ] + } + }, + "decimals": 1, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 30 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 42 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "grafana-postgresql-datasource", + "uid": "pgwatch-metrics" + }, + "format": "time_series", + "group": [], + "groupBy": [], + "metricColumn": "none", + "orderByTime": "ASC", + "policy": "default", + "rawQuery": true, + "rawSql": "SELECT time, dbname AS metric,\n extract(epoch from now())::bigint - (data->>'patroni_dcs_last_seen')::bigint AS value\nFROM patroni_dcs_last_seen\nWHERE $__timeFilter(time) AND dbname IN ($dbname)\nORDER BY time", + "refId": "A", + "resultFormat": "time_series", + "tags": [], + "timeColumn": "time" + } + ], + "title": "DCS Last Seen Age", + "type": "timeseries" + }, + { + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 24, + "x": 0, + "y": 50 + }, + "id": 15, + "options": { + "code": { + "language": "plaintext", + "showLineNumbers": false, + "showMiniMap": false + }, + "content": "### Brought to you by\n\n[![Cybertec – The PostgreSQL Database Company](https://www.cybertec-postgresql.com/wp-content/uploads/2025/02/cybertec-logo-white-blue.svg)](https://www.cybertec-postgresql.com/en/)\n", + "mode": "markdown" + }, + "pluginVersion": "12.0.0", + "title": "", + "transparent": true, + "type": "text" + } + ], + "preload": false, + "refresh": "30s", + "schemaVersion": 41, + "tags": [ + "pgwatch" + ], + "templating": { + "list": [ + { + "current": {"text": "All", "value": ["$__all"]}, + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "definition": "", + "includeAll": true, + "label": "Cluster", + "multi": true, + "name": "cluster", + "options": [], + "query": "SELECT DISTINCT tag_data->>'scope' FROM patroni_primary WHERE tag_data IS NOT NULL ORDER BY 1", + "refresh": 1, + "regex": "", + "type": "query" + }, + { + "current": {"text": "All", "value": ["$__all"]}, + "datasource": {"type": "grafana-postgresql-datasource", "uid": "pgwatch-metrics"}, + "definition": "", + "includeAll": true, + "label": "Node", + "multi": true, + "name": "dbname", + "options": [], + "query": "SELECT DISTINCT dbname FROM patroni_primary WHERE tag_data->>'scope' IN ($cluster) ORDER BY 1", + "refresh": 2, + "regex": "", + "type": "query" + } + ] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Patroni Cluster Overview", + "uid": "patroni-overview", + "version": 1 +} diff --git a/grafana/prometheus/v12/patroni-overview-prometheus.json b/grafana/prometheus/v12/patroni-overview-prometheus.json new file mode 100644 index 0000000000..8aaf1df09d --- /dev/null +++ b/grafana/prometheus/v12/patroni-overview-prometheus.json @@ -0,0 +1,1410 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "description": "Patroni cluster health, roles and replication monitoring using Prometheus data source. Requires prometheus-type sources configured in pgwatch.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": 51, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 0}, + "id": 20, + "panels": [], + "title": "All Clusters Health", + "type": "row" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "description": "Total number of Patroni clusters currently monitored", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "text", "value": null}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 0, "y": 1}, + "id": 21, + "options": { + "colorMode": "value", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "count(count by (scope) (patroni_primary))", + "instant": true, + "legendFormat": "", + "refId": "A" + }], + "title": "Clusters Monitored", + "type": "stat" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "description": "Clusters missing a leader lock (split-brain risk). Should be 0.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "red", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 6, "y": 1}, + "id": 22, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "count(max by (scope) (patroni_cluster_unlocked) == 1) OR vector(0)", + "instant": true, + "legendFormat": "", + "refId": "A" + }], + "title": "Clusters Without Leader", + "type": "stat" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "description": "Clusters with auto-failover paused. Should be 0 in normal operation.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "yellow", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 12, "y": 1}, + "id": 23, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "count(max by (scope) (patroni_is_paused) == 1) OR vector(0)", + "instant": true, + "legendFormat": "", + "refId": "A" + }], + "title": "Paused Clusters", + "type": "stat" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "description": "Nodes requiring a Postgres restart to apply configuration changes", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "decimals": 0, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}, {"color": "yellow", "value": 1}]}, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": {"h": 3, "w": 6, "x": 18, "y": 1}, + "id": 24, + "options": { + "colorMode": "background", "graphMode": "none", "justifyMode": "center", + "orientation": "horizontal", "percentChangeColorMode": "standard", + "reduceOptions": {"calcs": ["lastNotNull"], "fields": "", "values": false}, + "showPercentChange": false, "textMode": "auto", "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [{ + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "sum(patroni_pending_restart) OR vector(0)", + "instant": true, + "legendFormat": "", + "refId": "A" + }], + "title": "Nodes Pending Restart", + "type": "stat" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "description": "Per-cluster health summary. Use the Cluster variable above to filter the node detail panels below.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "custom": {"align": "auto", "cellOptions": {"type": "auto"}, "inspect": false}, + "mappings": [], + "thresholds": {"mode": "absolute", "steps": [{"color": "green", "value": null}]} + }, + "overrides": [ + { + "matcher": {"id": "byName", "options": "Has Leader"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"0": {"color": "green", "index": 0, "text": "YES"}, "1": {"color": "red", "index": 1, "text": "NO"}}, "type": "value"}]} + ] + }, + { + "matcher": {"id": "byName", "options": "Paused"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"0": {"color": "green", "index": 0, "text": "NO"}, "1": {"color": "yellow", "index": 1, "text": "YES"}}, "type": "value"}]} + ] + }, + { + "matcher": {"id": "byName", "options": "Pending Restart"}, + "properties": [ + {"id": "custom.cellOptions", "value": {"type": "color-background"}}, + {"id": "mappings", "value": [{"options": {"0": {"color": "green", "index": 0, "text": "NO"}, "1": {"color": "yellow", "index": 1, "text": "YES"}}, "type": "value"}]} + ] + } + ] + }, + "gridPos": {"h": 8, "w": 24, "x": 0, "y": 4}, + "id": 25, + "options": { + "cellHeight": "sm", + "footer": {"countRows": false, "reducer": ["sum"], "show": false}, + "showHeader": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "count by (scope) (patroni_primary)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "A" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "sum by (scope) (patroni_primary)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "B" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "max by (scope) (patroni_cluster_unlocked)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "C" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "max by (scope) (patroni_is_paused)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "D" + }, + { + "datasource": {"type": "prometheus", "uid": "pgwatch-prometheus"}, + "editorMode": "code", + "expr": "max by (scope) (patroni_pending_restart)", + "format": "table", + "instant": true, + "legendFormat": "", + "refId": "E" + } + ], + "title": "Cluster Health Summary", + "transformations": [ + {"id": "merge", "options": {}}, + { + "id": "organize", + "options": { + "excludeByName": {"Time": true}, + "renameByName": { + "scope": "Cluster", + "Value #A": "Nodes", + "Value #B": "Primaries", + "Value #C": "Has Leader", + "Value #D": "Paused", + "Value #E": "Pending Restart" + } + } + } + ], + "type": "table" + }, + { + "collapsed": false, + "gridPos": {"h": 1, "w": 24, "x": 0, "y": 12}, + "id": 26, + "panels": [], + "title": "Node Detail — $cluster", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Current role of each Patroni node: PRIMARY (1) or REPLICA (0)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "blue", + "index": 0, + "text": "REPLICA" + }, + "1": { + "color": "green", + "index": 1, + "text": "PRIMARY" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "blue", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 13 + }, + "id": 2, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_primary{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Node Role", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Whether Postgres process is running on each node (1 = running)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "STOPPED" + }, + "1": { + "color": "green", + "index": 1, + "text": "RUNNING" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "green", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 13 + }, + "id": 3, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_postgres_running{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Postgres Running", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Whether the cluster has a distributed lock (leader). 0 = has leader (healthy), 1 = no leader (split-brain risk)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "HAS LEADER" + }, + "1": { + "color": "red", + "index": 1, + "text": "NO LEADER" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 16, + "y": 13 + }, + "id": 4, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_cluster_unlocked{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Cluster Leader Lock", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Whether automatic failover is paused for the cluster (1 = paused)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "ACTIVE" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "PAUSED" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 0, + "y": 19 + }, + "id": 5, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_is_paused{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Failover State", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Node requires Postgres restart to apply configuration changes (1 = restart needed)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 0, + "text": "OK" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "RESTART NEEDED" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 6, + "y": 19 + }, + "id": 6, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_pending_restart{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Pending Restart", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Whether node is a standby leader (1 = standby leader)", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "text", + "index": 0, + "text": "NO" + }, + "1": { + "color": "yellow", + "index": 1, + "text": "YES" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "text", + "value": null + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 12, + "y": 19 + }, + "id": 7, + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_standby_leader{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "Standby Leader", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "PostgreSQL WAL timeline number per node", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "text", + "value": null + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 6, + "x": 18, + "y": 19 + }, + "id": 8, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "center", + "orientation": "horizontal", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "value_and_name", + "wideLayout": true + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_postgres_timeline{dbname=~\"$dbname\"}", + "instant": true, + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "WAL Timeline", + "type": "stat" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 24 + }, + "id": 9, + "panels": [], + "title": "WAL & Replication", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "WAL write location in bytes. Non-zero only on the primary node.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 25 + }, + "id": 10, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_xlog_location{dbname=~\"$dbname\"}", + "interval": "$agg_interval", + "legendFormat": "{{dbname}} xlog_location", + "refId": "A" + } + ], + "title": "WAL Write Location (Primary)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "WAL received vs replayed location on replica nodes in bytes", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 33 + }, + "id": 11, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_xlog_received_location{dbname=~\"$dbname\"}", + "interval": "$agg_interval", + "legendFormat": "{{dbname}} received", + "refId": "A" + } + ], + "title": "WAL Received Location (Replicas)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "WAL replayed location on replica nodes in bytes", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 33 + }, + "id": 12, + "options": { + "legend": { + "calcs": [ + "lastNotNull" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "patroni_xlog_replayed_location{dbname=~\"$dbname\"}", + "interval": "$agg_interval", + "legendFormat": "{{dbname}} replayed", + "refId": "A" + } + ], + "title": "WAL Replayed Location (Replicas)", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 41 + }, + "id": 13, + "panels": [], + "title": "DCS Health", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "description": "Seconds since each node last successfully contacted the DCS (etcd/Consul/ZooKeeper). High values indicate DCS connectivity issues.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "line", + "steps": [ + { + "color": "transparent", + "value": null + }, + { + "color": "red", + "value": 30 + } + ] + } + }, + "decimals": 1, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 30 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 42 + }, + "id": 14, + "options": { + "legend": { + "calcs": [ + "lastNotNull", + "max" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "12.3.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "pgwatch-prometheus" + }, + "editorMode": "code", + "expr": "time() - patroni_dcs_last_seen{dbname=~\"$dbname\"}", + "interval": "$agg_interval", + "legendFormat": "{{dbname}}", + "refId": "A" + } + ], + "title": "DCS Last Seen Age", + "type": "timeseries" + }, + { + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 24, + "x": 0, + "y": 50 + }, + "id": 15, + "options": { + "code": { + "language": "plaintext", + "showLineNumbers": false, + "showMiniMap": false + }, + "content": "### Brought to you by\n\n[![Cybertec – The PostgreSQL Database Company](https://www.cybertec-postgresql.com/wp-content/uploads/2025/02/cybertec-logo-white-blue.svg)](https://www.cybertec-postgresql.com/en/)\n", + "mode": "markdown" + }, + "pluginVersion": "12.0.0", + "title": "", + "transparent": true, + "type": "text" + } + ], + "preload": false, + "refresh": "30s", + "schemaVersion": 41, + "tags": [ + "pgwatch" + ], + "templating": { + "list": [ + { + "current": {"selected": false, "text": "All", "value": ["$__all"]}, + "datasource": {"type": "prometheus"}, + "definition": "label_values(patroni_primary, scope)", + "hide": 0, + "includeAll": true, + "label": "Cluster", + "multi": true, + "name": "cluster", + "options": [], + "query": "label_values(patroni_primary, scope)", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "current": {"selected": false, "text": "All", "value": ["$__all"]}, + "datasource": {"type": "prometheus"}, + "definition": "label_values(patroni_primary{scope=~\"$cluster\"}, dbname)", + "hide": 0, + "includeAll": true, + "label": "Node", + "multi": true, + "name": "dbname", + "options": [], + "query": "label_values(patroni_primary{scope=~\"$cluster\"}, dbname)", + "refresh": 2, + "regex": "", + "skipUrlSync": false, + "sort": 1, + "type": "query" + }, + { + "current": {"selected": false, "text": "5m", "value": "5m"}, + "hide": 0, + "includeAll": false, + "label": "Aggregation interval", + "multi": false, + "name": "agg_interval", + "options": [ + {"selected": false, "text": "1m", "value": "1m"}, + {"selected": true, "text": "5m", "value": "5m"}, + {"selected": false, "text": "10m", "value": "10m"}, + {"selected": false, "text": "30m", "value": "30m"}, + {"selected": false, "text": "1h", "value": "1h"} + ], + "query": "1m,5m,10m,30m,1h", + "queryValue": "", + "skipUrlSync": false, + "type": "custom" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Patroni Cluster Overview (Prometheus)", + "uid": "patroni-overview-prometheus", + "version": 1, + "weekStart": "" +} diff --git a/internal/cmdopts/cmdsource.go b/internal/cmdopts/cmdsource.go index ff7a8f45a4..694ae8316d 100644 --- a/internal/cmdopts/cmdsource.go +++ b/internal/cmdopts/cmdsource.go @@ -6,6 +6,7 @@ import ( "fmt" "net/url" + "github.com/cybertec-postgresql/pgwatch/v5/internal/log" "github.com/cybertec-postgresql/pgwatch/v5/internal/sources" ) @@ -58,7 +59,11 @@ func (cmd *SourcePingCommand) Execute(args []string) error { _, e = sources.ResolveDatabasesFromPostgres(s) default: mdb := sources.NewSourceConn(s) - e = mdb.Connect(context.Background(), cmd.owner.Sources) + // we don't want to log connection errors here, so we use a noop logger in the context + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + if e = mdb.Connect(ctx, cmd.owner.Sources); e == nil { + e = mdb.Ping(ctx) + } } if e != nil { fmt.Printf("FAIL:\t%s (%s)\n", s.Name, e) @@ -105,14 +110,19 @@ func (cmd *SourceResolveCommand) Execute(args []string) error { } var connstr url.URL connstr.Scheme = "postgresql" - for _, s := range conns { - if s.ConnStr > "" { - fmt.Printf("%s=%s\n", s.Name, s.ConnStr) + for _, conn := range conns { + s, ok := conn.(*sources.DbConn) + if !ok { + continue + } + src := s.GetSource() + if src.ConnStr > "" { + fmt.Printf("%s=%s\n", src.Name, src.ConnStr) } else { connstr.Host = fmt.Sprintf("%s:%d", s.ConnConfig.ConnConfig.Host, s.ConnConfig.ConnConfig.Port) connstr.User = url.UserPassword(s.ConnConfig.ConnConfig.User, s.ConnConfig.ConnConfig.Password) connstr.Path = s.ConnConfig.ConnConfig.Database - fmt.Printf("%s=%s\n", s.Name, connstr.String()) + fmt.Printf("%s=%s\n", src.Name, connstr.String()) } } cmd.owner.CompleteCommand(ExitCodeOK) diff --git a/internal/metrics/default_test.go b/internal/metrics/default_test.go index 8c2e128c09..4b7d73cf76 100644 --- a/internal/metrics/default_test.go +++ b/internal/metrics/default_test.go @@ -42,3 +42,26 @@ func TestDefaultMetricReaderUnsupportedOperations(t *testing.T) { assert.NotNil(t, metrics, "The metrics object should not be nil") assert.NoError(t, err, "GetMetrics should return default metrics") } + +// T047: preset postgres-exporter-basic is resolvable and contains the required metric families. +func TestDefaultMetrics_PostgresExporterBasicPreset(t *testing.T) { + m := metrics.GetDefaultMetrics() + if !assert.NotNil(t, m, "GetDefaultMetrics must not return nil") { + return + } + + preset, ok := m.PresetDefs["postgres-exporter-basic"] + if !assert.True(t, ok, "preset 'postgres-exporter-basic' must exist in default metrics") { + return + } + + requiredFamilies := []string{ + "pg_stat_activity_count", + "pg_stat_bgwriter_checkpoints_timed", + "pg_stat_replication_pg_wal_lsn_diff", + } + for _, family := range requiredFamilies { + assert.Contains(t, preset.Metrics, family, + "preset 'postgres-exporter-basic' must include metric family %q", family) + } +} diff --git a/internal/metrics/metrics.yaml b/internal/metrics/metrics.yaml index e358a898af..ba743fdcc2 100644 --- a/internal/metrics/metrics.yaml +++ b/internal/metrics/metrics.yaml @@ -4482,6 +4482,31 @@ presets: wal: 60 wal_receiver: 120 wal_size: 300 + # Prometheus-targeted presets + # These presets map Prometheus metric family names (as emitted by exporters such as + # postgres_exporter or node_exporter) to per-family emit intervals in seconds. + # They are only meaningful for sources with kind: prometheus. + patroni: + description: "Core metrics from Patroni /metrics endpoint (port 8008)" + metrics: + patroni_postgres_running: 30 + patroni_primary: 30 + patroni_replica: 30 + patroni_standby_leader: 30 + patroni_cluster_unlocked: 30 + patroni_xlog_location: 30 + patroni_xlog_received_location: 30 + patroni_xlog_replayed_location: 30 + patroni_dcs_last_seen: 30 + patroni_pending_restart: 60 + patroni_is_paused: 60 + patroni_postgres_timeline: 60 + postgres-exporter-basic: + description: "Core metrics from postgres_exporter" + metrics: + pg_stat_activity_count: 30 + pg_stat_bgwriter_checkpoints_timed: 60 + pg_stat_replication_pg_wal_lsn_diff: 30 debug: description: all available metrics with 30 second intervals for debugging and development metrics: diff --git a/internal/metrics/postgres_schema.go b/internal/metrics/postgres_schema.go index f9fbd19117..79edd05a2e 100644 --- a/internal/metrics/postgres_schema.go +++ b/internal/metrics/postgres_schema.go @@ -65,7 +65,7 @@ func (dmrw *dbMetricReaderWriter) NeedsMigration() (bool, error) { } // MigrationsCount is the total number of migrations in pgwatch.migration table -const MigrationsCount = 2 +const MigrationsCount = 3 // migrations holds function returning all upgrade migrations needed var migrations func() migrator.Option = func() migrator.Option { @@ -153,8 +153,19 @@ var migrations func() migrator.Option = func() migrator.Option { }, }, - // adding new migration here, update "pgwatch"."migration" in "postgres_schema.sql" - // and "dbapi" variable in main.go! + &migrator.Migration{ + Name: "01405 Add prometheus to source dbtype check constraint", + Func: func(ctx context.Context, tx pgx.Tx) error { + _, err := tx.Exec(ctx, ` + ALTER TABLE pgwatch.source + DROP CONSTRAINT IF EXISTS source_dbtype_check, + ADD CHECK (dbtype IN ('postgres', 'pgbouncer', 'postgres-continuous-discovery', 'patroni', 'pgpool', 'prometheus')); + `) + return err + }, + }, + + // adding new migration here, update "pgwatch"."migration" in "postgres_schema.sql"! // &migrator.Migration{ // Name: "000XX Short description of a migration", diff --git a/internal/metrics/postgres_schema.sql b/internal/metrics/postgres_schema.sql index 57c888609b..fceeffae8b 100644 --- a/internal/metrics/postgres_schema.sql +++ b/internal/metrics/postgres_schema.sql @@ -67,7 +67,7 @@ CREATE TABLE IF NOT EXISTS pgwatch.source( config_standby jsonb, CONSTRAINT preset_or_custom_config CHECK (COALESCE(preset_config, config::text) IS NOT NULL AND (preset_config IS NULL OR config IS NULL)), CONSTRAINT preset_or_custom_config_standby CHECK (preset_config_standby IS NULL OR config_standby IS NULL), - CHECK (dbtype IN ('postgres', 'pgbouncer', 'postgres-continuous-discovery', 'patroni', 'pgpool')), + CHECK (dbtype IN ('postgres', 'pgbouncer', 'postgres-continuous-discovery', 'patroni', 'pgpool', 'prometheus')), CHECK ("group" ~ E'\\w+') ); @@ -84,4 +84,5 @@ INSERT INTO pgwatch.migration (id, version) VALUES (0, '00179 Apply metrics migrations for v3'), - (1, '00824 Refactor recommendations metrics to use metric_storage_name'); + (1, '00824 Refactor recommendations metrics to use metric_storage_name'), + (2, '01405 Add prometheus to source dbtype check constraint'); diff --git a/internal/metrics/postgres_schema_test.go b/internal/metrics/postgres_schema_test.go index ac6f15520d..7d810b27a0 100644 --- a/internal/metrics/postgres_schema_test.go +++ b/internal/metrics/postgres_schema_test.go @@ -20,6 +20,9 @@ func TestMigrate(t *testing.T) { conn.ExpectBegin() conn.ExpectExec(`UPDATE pgwatch\.metric`).WillReturnResult(pgxmock.NewResult("UPDATE", 0)) // combined migration SQL without parameters conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1)) + conn.ExpectBegin() + conn.ExpectExec(`ALTER TABLE`).WillReturnResult(pgxmock.NewResult("ALTER TABLE", 0)) + conn.ExpectExec(`INSERT INTO`).WillReturnResult(pgxmock.NewResult("INSERT", 1)) dmrw := &dbMetricReaderWriter{ctx, conn} err = dmrw.Migrate() diff --git a/internal/metrics/types.go b/internal/metrics/types.go index 95b0a6e91c..7fa098e74e 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -155,6 +155,7 @@ func (m Measurements) Touch() { type MeasurementEnvelope struct { DBName string MetricName string + SourceKind string CustomTags map[string]string Data Measurements } diff --git a/internal/metrics/types_test.go b/internal/metrics/types_test.go index 098193d1b0..3fd1455da8 100644 --- a/internal/metrics/types_test.go +++ b/internal/metrics/types_test.go @@ -202,3 +202,31 @@ func TestFilterByNames(t *testing.T) { }) } } + +func TestMeasurementEnvelope_SourceKind(t *testing.T) { + tests := []struct { + name string + env MeasurementEnvelope + want string + }{ + { + name: "explicit source kind", + env: MeasurementEnvelope{ + SourceKind: "prometheus", + }, + want: "prometheus", + }, + { + name: "zero value source kind", + env: MeasurementEnvelope{}, + want: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.NotNil(t, t.Context()) + assert.Equal(t, tt.want, tt.env.SourceKind) + }) + } +} diff --git a/internal/reaper/database.go b/internal/reaper/database.go index 7d80dbae5a..e53a8f804a 100644 --- a/internal/reaper/database.go +++ b/internal/reaper/database.go @@ -1,6 +1,7 @@ package reaper import ( + "cmp" "context" "errors" "fmt" @@ -14,7 +15,356 @@ import ( "github.com/jackc/pgx/v5" ) -func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, args ...any) (metrics.Measurements, error) { +const minTickInterval = 1 // seconds - floor for GCD to help handle zero/negative intervals + +var _ Reaper = (*DbConnReaper)(nil) + +// DbConnReaper manages metric collection for a single monitored database source. +// Instead of one goroutine per metric it runs a single GCD-based tick loop +// and batches SQL queries via pgx.Batch when the source is a real Postgres +// connection (non-pgbouncer, non-pgpool). +type DbConnReaper struct { + reaper *reaper + md *sources.DbConn + lastFetch map[string]time.Time + lastUptimeS int64 // last seen postmaster_uptime_s for restart detection + degradedMetrics map[string]struct{} // metrics that failed individual retry; executed via fetchMetric until they recover +} + +// NewDbConnReaper creates a SourceReaper for the given source connection. +func NewDbConnReaper(r *reaper, md *sources.DbConn) *DbConnReaper { + return &DbConnReaper{ + reaper: r, + md: md, + lastFetch: make(map[string]time.Time), + degradedMetrics: make(map[string]struct{}), + } +} + +// activeMetrics returns a snapshot copy of the currently active metric intervals +// based on the source's recovery state. Copying under the lock prevents data +// races when the caller iterates after the lock is released. +func (sr *DbConnReaper) activeMetrics() map[string]time.Duration { + sr.md.RLock() + defer sr.md.RUnlock() + am := sr.md.Metrics + if sr.md.IsInRecovery && len(sr.md.MetricsStandby) > 0 { + am = sr.md.MetricsStandby + } + c := make(map[string]time.Duration, len(am)) + for k, v := range am { + c[k] = time.Duration(v) * time.Second + } + return c +} + +// GCDSlice computes GCD across a slice. Returns 0 for empty input. +func GCDSlice(vals []int) int { + if len(vals) == 0 { + return 0 + } + g := vals[0] + for _, v := range vals[1:] { + for v != 0 { + g, v = v, g%v + } + } + return g +} + +// calcTickInterval computes GCD of all metric intervals with a minimum floor. +func (sr *DbConnReaper) calcTickInterval() time.Duration { + am := sr.activeMetrics() + intervals := make([]int, 0, len(am)) + for _, d := range am { + intervals = append(intervals, max(int(d.Seconds()), minTickInterval)) + } + return time.Duration(max(GCDSlice(intervals), minTickInterval)) * time.Second +} + +// cacheKey returns the instance-level cache key for the given metric. +func (sr *DbConnReaper) cacheKey(m metrics.Metric, name string) string { + age := sr.reaper.Metrics.CacheAge() + if m.IsInstanceLevel && age > 0 && sr.md.GetMetricInterval(name) < age { + return fmt.Sprintf("%s:%s", sr.md.GetClusterIdentifier(), name) + } + return "" +} + +// isRoleExcluded returns true if the metric should be skipped based on the +// source's recovery state (e.g. primary-only metric on a standby). +func (sr *DbConnReaper) isRoleExcluded(m metrics.Metric) bool { + sr.md.RLock() + defer sr.md.RUnlock() + return (m.PrimaryOnly() && sr.md.IsInRecovery) || (m.StandbyOnly() && !sr.md.IsInRecovery) +} + +// sendEnvelope adds sysinfo and dispatches a MeasurementEnvelope to the +// measurement channel. +func (sr *DbConnReaper) sendEnvelope(ctx context.Context, name, storageName string, data metrics.Measurements) { + log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(data)).Info("measurements fetched") + sr.reaper.AddSysinfoToMeasurements(data, sr.md) + sr.reaper.measurementCh <- metrics.MeasurementEnvelope{ + DBName: sr.md.Name, + MetricName: cmp.Or(storageName, name), + Data: data, + CustomTags: sr.md.CustomTags, + } +} + +// dispatchMetricData handles the post-fetch workflow for a collected metric: +// caching, sysinfo enrichment, sending, and restart detection. +func (sr *DbConnReaper) dispatchMetricData(ctx context.Context, name string, metric metrics.Metric, data metrics.Measurements) { + if key := sr.cacheKey(metric, name); key != "" { + sr.reaper.measurementCache.Put(key, data) + } + sr.sendEnvelope(ctx, name, metric.StorageName, data) + if name == "db_stats" { + sr.detectServerRestart(ctx, data) + } +} + +// batchEntry holds the minimum info needed to execute and dispatch a metric query. +type batchEntry struct { + name string + metric metrics.Metric + sql string +} + +// Run is the main loop for a single source. It replaces N per-metric goroutines +// with one goroutine that batches SQL queries at GCD-aligned ticks. +func (sr *DbConnReaper) Reap(ctx context.Context) { + l := log.GetLogger(ctx).WithField("source", sr.md.Name) + ctx = log.WithLogger(ctx, l) + var err error + for { + if err = sr.md.FetchRuntimeInfo(ctx, false); err != nil { + l.WithError(err).Warning("could not refresh runtime info") + } + + now := time.Now() + var batch []batchEntry + + for name, interval := range sr.activeMetrics() { + if interval <= 0 { + continue + } + if lf := sr.lastFetch[name]; !lf.IsZero() && now.Sub(lf) < interval { + continue + } + + metric, ok := metricDefs.GetMetricDef(name) + if !ok || sr.isRoleExcluded(metric) { + continue + } + + switch { + case name == specialMetricServerLogEventCounts: + if sr.lastFetch[name].IsZero() { + go func() { + if e := sr.runLogParser(ctx); e != nil { + l.WithError(e).Error("log parser error") + } + }() + } + case IsDirectlyFetchableMetric(sr.md, name): + err = sr.fetchOSMetric(ctx, name) + sr.lastFetch[name] = time.Now() + case name == specialMetricChangeEvents || name == specialMetricInstanceUp: + err = sr.fetchSpecialMetric(ctx, name, metric.StorageName) + sr.lastFetch[name] = time.Now() + default: + if cached := sr.reaper.GetMeasurementCache(sr.cacheKey(metric, name)); len(cached) > 0 { + l.WithField("metric", name).Info("instance level cache hit") + sr.sendEnvelope(ctx, name, metric.StorageName, cached) + sr.lastFetch[name] = time.Now() + break + } + sr.md.RLock() + version := sr.md.Version + sr.md.RUnlock() + sql := metric.GetSQL(version) + if sql == "" { + l.WithField("source", sr.md.Name).WithField("version", version).Warning("no SQL found for metric version") + sr.lastFetch[name] = time.Now() + break + } + if _, degraded := sr.degradedMetrics[name]; degraded { + if err = sr.fetchMetric(ctx, batchEntry{name: name, metric: metric, sql: sql}); err != nil { + l.WithError(err).WithField("metric", name).Error("degraded metric fetch failed") + } else { + l.WithField("metric", name).Info("degraded metric recovered, returning to batch execution") + delete(sr.degradedMetrics, name) + } + sr.lastFetch[name] = time.Now() + break + } + batch = append(batch, batchEntry{name: name, metric: metric, sql: sql}) + } + if err != nil { + l.WithError(err).WithField("metric", name).Error("failed to fetch metric") + } + } + + if len(batch) > 0 { + if sr.md.IsPostgresSource() { + err = sr.executeBatch(ctx, batch) + } else { + for _, e := range batch { + err = sr.fetchMetric(ctx, e) + } + } + if err != nil { + l.WithError(err).Error("failed to fetch metrics") + } + + now := time.Now() + for _, e := range batch { + sr.lastFetch[e.name] = now + } + } + select { + case <-ctx.Done(): + return + case <-time.After(sr.calcTickInterval()): + } + } +} + +// executeBatch sends all SQLs in a single pgx.Batch round-trip, dispatching +// each result immediately as it arrives. If any query fails, PostgreSQL's +// extended protocol aborts all subsequent queries in the same sync boundary +// (cascade failure). Any entry that returns an error from the batch is retried +// individually via fetchMetric to isolate real failures from cascade failures. +// Entries that fail even after the individual retry are marked as degraded +// so that subsequent runs use fetchMetric for them until they recover. +func (sr *DbConnReaper) executeBatch(ctx context.Context, entries []batchEntry) error { + batch := &pgx.Batch{} + for _, e := range entries { + batch.Queue(e.sql) + } + + br := sr.md.Conn.SendBatch(ctx, batch) + defer func() { _ = br.Close() }() + + var ( + errs []error + retries []batchEntry + ) + for _, e := range entries { + rows, err := br.Query() + if err != nil { + // May be a real error or a cascade from an earlier failure; retry individually. + retries = append(retries, e) + continue + } + errs = append(errs, sr.CollectAndDispatch(ctx, rows, e.name, e.metric)) + } + + for _, e := range retries { + if err := sr.fetchMetric(ctx, e); err != nil { + errs = append(errs, fmt.Errorf("failed to fetch metric %s: %v", e.name, err)) + log.GetLogger(ctx).WithField("metric", e.name).Warning("metric degraded after repeated failures, switching to individual fetch") + sr.degradedMetrics[e.name] = struct{}{} + } + } + return errors.Join(errs...) +} + +// fetchMetric executes a single SQL query and returns the resulting measurements. +func (sr *DbConnReaper) fetchMetric(ctx context.Context, entry batchEntry) error { + rows, err := sr.md.Conn.Query(ctx, entry.sql, pgx.QueryExecModeSimpleProtocol) + if err != nil { + return err + } + return sr.CollectAndDispatch(ctx, rows, entry.name, entry.metric) +} + +// CollectAndDispatch is a helper that collects rows from a pgx.Rows and dispatches them. +func (sr *DbConnReaper) CollectAndDispatch(ctx context.Context, rows pgx.Rows, name string, metric metrics.Metric) error { + data, err := pgx.CollectRows(rows, metrics.RowToMeasurement) + if err != nil { + return err + } + if len(data) > 0 { + sr.dispatchMetricData(ctx, name, metric, data) + } + return nil +} + +// fetchOSMetric handles gopsutil-based OS metrics. +func (sr *DbConnReaper) fetchOSMetric(ctx context.Context, name string) error { + msg, err := sr.reaper.FetchStatsDirectlyFromOS(ctx, sr.md, name) + if err != nil { + return fmt.Errorf("could not read metric from OS: %v", err) + } + if msg != nil && len(msg.Data) > 0 { + log.GetLogger(ctx).WithField("metric", name).WithField("rows", len(msg.Data)).Info("measurements fetched") + sr.reaper.measurementCh <- *msg + } + return nil +} + +// fetchSpecialMetric handles change_events and instance_up metrics. +func (sr *DbConnReaper) fetchSpecialMetric(ctx context.Context, name, storageName string) error { + var ( + data metrics.Measurements + err error + ) + switch name { + case specialMetricChangeEvents: + data, err = sr.reaper.GetObjectChangesMeasurement(ctx, sr.md) + case specialMetricInstanceUp: + data, err = sr.reaper.GetInstanceUpMeasurement(ctx, sr.md) + } + if err != nil { + return fmt.Errorf("failed to fetch special metric: %v", err) + } + if len(data) > 0 { + sr.sendEnvelope(ctx, name, storageName, data) + } + return err +} + +// runLogParser launches the server log event counts parser. +func (sr *DbConnReaper) runLogParser(ctx context.Context) error { + lp, err := NewLogParser(ctx, sr.md, sr.reaper.measurementCh) + if err != nil { + return fmt.Errorf("failed to initialize log parser: %v", err) + } + if err := lp.ParseLogs(); err != nil { + return fmt.Errorf("log parser error: %v", err) + } + return nil +} + +// detectServerRestart checks for PostgreSQL server restarts via postmaster_uptime_s +// in db_stats metric data and emits an object_changes measurement if detected. +func (sr *DbConnReaper) detectServerRestart(ctx context.Context, data metrics.Measurements) { + if len(data) == 0 { + return + } + uptimeS, ok := data[0]["postmaster_uptime_s"].(int64) + if !ok { + return + } + prev := sr.lastUptimeS + sr.lastUptimeS = uptimeS + if prev > 0 && uptimeS < prev { + l := log.GetLogger(ctx) + l.Warning("Detected server restart (or failover)") + entry := metrics.NewMeasurement(data.GetEpoch()) + entry["details"] = "Detected server restart (or failover)" + sr.reaper.measurementCh <- metrics.MeasurementEnvelope{ + DBName: sr.md.Name, + MetricName: "object_changes", + Data: metrics.Measurements{entry}, + CustomTags: sr.md.CustomTags, + } + } +} + +func QueryMeasurements(ctx context.Context, md *sources.DbConn, sql string, args ...any) (metrics.Measurements, error) { if strings.TrimSpace(sql) == "" { return nil, errors.New("empty SQL") } @@ -30,7 +380,7 @@ func QueryMeasurements(ctx context.Context, md *sources.SourceConn, sql string, return nil, err } -func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) (changeCounts ChangeDetectionResults) { +func (r *reaper) DetectSprocChanges(ctx context.Context, md *sources.DbConn) (changeCounts ChangeDetectionResults) { detectedChanges := make(metrics.Measurements, 0) var firstRun bool l := log.GetLogger(ctx) @@ -106,7 +456,7 @@ func (r *Reaper) DetectSprocChanges(ctx context.Context, md *sources.SourceConn) return changeCounts } -func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults { +func (r *reaper) DetectTableChanges(ctx context.Context, md *sources.DbConn) ChangeDetectionResults { detectedChanges := make(metrics.Measurements, 0) var firstRun bool var changeCounts ChangeDetectionResults @@ -185,7 +535,7 @@ func (r *Reaper) DetectTableChanges(ctx context.Context, md *sources.SourceConn) return changeCounts } -func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults { +func (r *reaper) DetectIndexChanges(ctx context.Context, md *sources.DbConn) ChangeDetectionResults { detectedChanges := make(metrics.Measurements, 0) var firstRun bool var changeCounts ChangeDetectionResults @@ -264,7 +614,7 @@ func (r *Reaper) DetectIndexChanges(ctx context.Context, md *sources.SourceConn) return changeCounts } -func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults { +func (r *reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.DbConn) ChangeDetectionResults { detectedChanges := make(metrics.Measurements, 0) var firstRun bool var changeCounts ChangeDetectionResults @@ -341,7 +691,7 @@ func (r *Reaper) DetectPrivilegeChanges(ctx context.Context, md *sources.SourceC return changeCounts } -func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.SourceConn) ChangeDetectionResults { +func (r *reaper) DetectConfigurationChanges(ctx context.Context, md *sources.DbConn) ChangeDetectionResults { detectedChanges := make(metrics.Measurements, 0) var firstRun bool var changeCounts ChangeDetectionResults @@ -412,7 +762,7 @@ func (r *Reaper) DetectConfigurationChanges(ctx context.Context, md *sources.Sou // GetInstanceUpMeasurement returns a single measurement with "instance_up" metric // used to detect if the instance is up or down -func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) { +func (r *reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.DbConn) (metrics.Measurements, error) { return metrics.Measurements{ metrics.Measurement{ metrics.EpochColumnName: time.Now().UnixNano(), @@ -426,7 +776,7 @@ func (r *Reaper) GetInstanceUpMeasurement(ctx context.Context, md *sources.Sourc }, nil // always return nil error for the status metric } -func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.SourceConn) (metrics.Measurements, error) { +func (r *reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.DbConn) (metrics.Measurements, error) { md.Lock() defer md.Unlock() spN := r.DetectSprocChanges(ctx, md) @@ -441,16 +791,17 @@ func (r *Reaper) GetObjectChangesMeasurement(ctx context.Context, md *sources.So m["details"] = strings.Join([]string{spN.String(), tblN.String(), idxN.String(), cnfN.String(), privN.String()}, " ") return metrics.Measurements{m}, nil } -func (r *Reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) { + +func (r *reaper) CloseResourcesForRemovedMonitoredDBs(hostsToShutDown map[string]bool) { for _, prevDB := range r.prevLoopMonitoredDBs { - if r.monitoredSources.GetMonitoredDatabase(prevDB.Name) == nil { // removed from config - prevDB.Conn.Close() - _ = r.SinksWriter.SyncMetric(prevDB.Name, "", sinks.DeleteOp) + if r.monitoredSources.GetMonitoredDatabase(prevDB.GetSource().Name) == nil { // removed from config + prevDB.Close() + _ = r.SinksWriter.SyncMetric(prevDB.GetSource().Name, "", sinks.DeleteOp) } } for toShutDownDB := range hostsToShutDown { if db := r.monitoredSources.GetMonitoredDatabase(toShutDownDB); db != nil { - db.Conn.Close() + db.Close() } _ = r.SinksWriter.SyncMetric(toShutDownDB, "", sinks.DeleteOp) } diff --git a/internal/reaper/source_reaper_integration_test.go b/internal/reaper/database_integration_test.go similarity index 92% rename from internal/reaper/source_reaper_integration_test.go rename to internal/reaper/database_integration_test.go index 898305b541..7c0a62455b 100644 --- a/internal/reaper/source_reaper_integration_test.go +++ b/internal/reaper/database_integration_test.go @@ -18,7 +18,7 @@ import ( // setupIntegrationDB starts a real Postgres container and returns a SourceConn // with a live pgxpool connection. The caller must call tearDown when done. -func setupIntegrationDB(t *testing.T) (*sources.SourceConn, func()) { +func setupIntegrationDB(t *testing.T) (*sources.DbConn, func()) { t.Helper() if testing.Short() { t.Skip("skipping integration test in short mode") @@ -33,7 +33,7 @@ func setupIntegrationDB(t *testing.T) (*sources.SourceConn, func()) { pool, err := db.New(testutil.TestContext, connStr) require.NoError(t, err, "failed to create connection pool") - md := sources.NewSourceConn(sources.Source{ + md := sources.NewDbConn(sources.Source{ Name: "integration_test", Kind: sources.SourcePostgres, }) @@ -72,7 +72,7 @@ func TestIntegration_ExecuteBatch(t *testing.T) { "integ_uptime": 60, } - r := &Reaper{ + r := &reaper{ Options: &cmdopts.Options{ Metrics: metrics.CmdOpts{}, Sinks: sinks.CmdOpts{}, @@ -80,7 +80,7 @@ func TestIntegration_ExecuteBatch(t *testing.T) { measurementCh: make(chan metrics.MeasurementEnvelope, 10), measurementCache: NewInstanceMetricCache(), } - sr := NewSourceReaper(r, md) + sr := NewDbConnReaper(r, md) err := sr.executeBatch(ctx, []batchEntry{ {name: "integ_version", metric: metricDefs.MetricDefs["integ_version"], sql: "SELECT version() AS pg_version"}, @@ -136,7 +136,7 @@ func TestIntegration_SourceReaper_RunCollectsMetrics(t *testing.T) { "integ_run_size": 5, } - r := &Reaper{ + r := &reaper{ Options: &cmdopts.Options{ Metrics: metrics.CmdOpts{}, Sinks: sinks.CmdOpts{}, @@ -144,13 +144,13 @@ func TestIntegration_SourceReaper_RunCollectsMetrics(t *testing.T) { measurementCh: make(chan metrics.MeasurementEnvelope, 20), measurementCache: NewInstanceMetricCache(), } - sr := NewSourceReaper(r, md) + sr := NewDbConnReaper(r, md) ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger())) done := make(chan struct{}) go func() { - sr.Run(ctx) + sr.Reap(ctx) close(done) }() @@ -187,24 +187,24 @@ func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) { helperSetNodeStatus := func(status string) { metricDefs.MetricDefs["test_metric"] = metrics.Metric{ - SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, + SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, NodeStatus: status, } metricDefs.MetricDefs["server_log_event_counts"] = metrics.Metric{ - SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, + SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, NodeStatus: status, } metricDefs.MetricDefs["psutil_cpu"] = metrics.Metric{ - SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, + SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, NodeStatus: status, } metricDefs.MetricDefs[specialMetricInstanceUp] = metrics.Metric{ - SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, + SQLs: metrics.SQLs{0: "SELECT 1 AS value"}, NodeStatus: status, } } - - r := &Reaper{ + + r := &reaper{ Options: &cmdopts.Options{ Metrics: metrics.CmdOpts{}, Sinks: sinks.CmdOpts{}, @@ -216,10 +216,10 @@ func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) { // using psutil_*, server_log_event_counts, instance_up // to ensure specially-handled metrics have the same behaviour md.Metrics = metrics.MetricIntervals{ - "test_metric": 5, + "test_metric": 5, "server_log_event_counts": 5, - "psutil_cpu": 5, - specialMetricInstanceUp: 5, + "psutil_cpu": 5, + specialMetricInstanceUp: 5, } t.Run("primary-only/standby-only metrics get excluded when node is standby/primary", func(t *testing.T) { @@ -234,9 +234,9 @@ func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) { helperSetNodeStatus(state) - sr := NewSourceReaper(r, md) + sr := NewDbConnReaper(r, md) go func() { - sr.Run(ctx) + sr.Reap(ctx) }() select { @@ -261,9 +261,9 @@ func TestIntegration_SourceReaper_RunExcludesMetricsByNodeStatus(t *testing.T) { helperSetNodeStatus(state) - sr := NewSourceReaper(r, md) + sr := NewDbConnReaper(r, md) go func() { - sr.Run(ctx) + sr.Reap(ctx) }() time.Sleep(2 * time.Second) diff --git a/internal/reaper/database_test.go b/internal/reaper/database_test.go index 3c87242ae7..485e20b922 100644 --- a/internal/reaper/database_test.go +++ b/internal/reaper/database_test.go @@ -3,21 +3,26 @@ package reaper import ( "context" "testing" + "testing/synctest" "time" + "github.com/cybertec-postgresql/pgwatch/v5/internal/cmdopts" + "github.com/cybertec-postgresql/pgwatch/v5/internal/log" "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v5/internal/sinks" "github.com/cybertec-postgresql/pgwatch/v5/internal/sources" + "github.com/jackc/pgx/v5" pgxmock "github.com/pashagolub/pgxmock/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // Helper function to create a test SourceConn with pgxmock -func createTestSourceConn(t *testing.T) (*sources.SourceConn, pgxmock.PgxPoolIface) { +func createTestSourceConn(t *testing.T) (*sources.DbConn, pgxmock.PgxPoolIface) { mock, err := pgxmock.NewPool() require.NoError(t, err) - md := &sources.SourceConn{ + md := &sources.DbConn{ Conn: mock, Source: sources.Source{Name: "testdb"}, RuntimeInfo: sources.RuntimeInfo{ @@ -42,7 +47,7 @@ func TestDetectSprocChanges(t *testing.T) { md, mock := createTestSourceConn(t) defer mock.Close() - reaper := &Reaper{ + reaper := &reaper{ measurementCh: make(chan metrics.MeasurementEnvelope, 10), } @@ -119,7 +124,7 @@ func TestDetectTableChanges(t *testing.T) { md, mock := createTestSourceConn(t) defer mock.Close() - reaper := &Reaper{ + reaper := &reaper{ measurementCh: make(chan metrics.MeasurementEnvelope, 10), } @@ -199,7 +204,7 @@ func TestDetectIndexChanges(t *testing.T) { md, mock := createTestSourceConn(t) defer mock.Close() - reaper := &Reaper{ + reaper := &reaper{ measurementCh: make(chan metrics.MeasurementEnvelope, 10), } @@ -279,7 +284,7 @@ func TestDetectPrivilegeChanges(t *testing.T) { md, mock := createTestSourceConn(t) defer mock.Close() - reaper := &Reaper{ + reaper := &reaper{ measurementCh: make(chan metrics.MeasurementEnvelope, 10), } @@ -349,7 +354,7 @@ func TestDetectConfigurationChanges(t *testing.T) { require.NoError(t, err) defer mock.Close() - md := &sources.SourceConn{ + md := &sources.DbConn{ Conn: mock, Source: sources.Source{Name: "testdb"}, RuntimeInfo: sources.RuntimeInfo{ @@ -358,7 +363,7 @@ func TestDetectConfigurationChanges(t *testing.T) { }, } - reaper := &Reaper{ + reaper := &reaper{ measurementCh: make(chan metrics.MeasurementEnvelope, 10), } @@ -416,7 +421,7 @@ func TestDetectConfigurationChanges(t *testing.T) { func TestGetInstanceUpMeasurement(t *testing.T) { ctx := context.Background() - reaper := &Reaper{} + reaper := &reaper{} testCases := []struct { name string @@ -473,3 +478,563 @@ func TestGetInstanceUpMeasurement(t *testing.T) { }) } } + +func TestGCDSlice(t *testing.T) { + tests := []struct { + name string + vals []int + want int + }{ + {"empty", nil, 0}, + {"single", []int{30}, 30}, + {"exhaustive preset intervals", []int{30, 60, 120, 180, 300, 600, 900, 3600, 7200}, 30}, + {"coprime", []int{7, 11, 13}, 1}, + {"all same", []int{60, 60, 60}, 60}, + {"basic preset", []int{60, 120}, 60}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, GCDSlice(tc.vals)) + }) + } +} + +func TestCalcTickInterval(t *testing.T) { + t.Run("exhaustive preset GCD is 30s", func(t *testing.T) { + sr := &DbConnReaper{ + md: &sources.DbConn{ + Source: sources.Source{ + Metrics: metrics.MetricIntervals{"m1": 30, "m2": 60, "m3": 120, "m4": 300}, + }, + }, + } + assert.Equal(t, 30*time.Second, sr.calcTickInterval()) + }) + + t.Run("GCD floors to minimum 1s", func(t *testing.T) { + sr := &DbConnReaper{ + md: &sources.DbConn{ + Source: sources.Source{ + Metrics: metrics.MetricIntervals{"m1": 3, "m2": 7}, + }, + }, + } + assert.Equal(t, time.Second, sr.calcTickInterval()) + }) + + t.Run("single metric", func(t *testing.T) { + sr := &DbConnReaper{ + md: &sources.DbConn{ + Source: sources.Source{ + Metrics: metrics.MetricIntervals{"m1": 60}, + }, + }, + } + assert.Equal(t, 60*time.Second, sr.calcTickInterval()) + }) + + t.Run("empty metrics", func(t *testing.T) { + sr := &DbConnReaper{ + md: &sources.DbConn{ + Source: sources.Source{ + Metrics: metrics.MetricIntervals{}, + }, + }, + } + assert.Equal(t, time.Second, sr.calcTickInterval()) + }) + + t.Run("standby metrics when in recovery", func(t *testing.T) { + sr := &DbConnReaper{ + md: &sources.DbConn{ + Source: sources.Source{ + Metrics: metrics.MetricIntervals{"m1": 30, "m2": 60}, + MetricsStandby: metrics.MetricIntervals{"m1": 120}, + }, + RuntimeInfo: sources.RuntimeInfo{IsInRecovery: true}, + }, + } + assert.Equal(t, 120*time.Second, sr.calcTickInterval()) + }) +} + +func TestNewSourceReaper(t *testing.T) { + r := &reaper{ + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + md := &sources.DbConn{ + Source: sources.Source{ + Name: "testdb", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"cpu": 30, "mem": 60, "disk": 120}, + }, + } + sr := NewDbConnReaper(r, md) + + assert.NotNil(t, sr.lastFetch) + assert.Empty(t, sr.lastFetch) + assert.Equal(t, r, sr.reaper) + assert.Equal(t, md, sr.md) +} + +func TestSourceReaper_ExecuteBatch(t *testing.T) { + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + + metricDefs.MetricDefs["batch_metric_1"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"}, + } + metricDefs.MetricDefs["batch_metric_2"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 2 as value, 200::bigint as epoch_ns"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "test_source", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"batch_metric_1": 30, "batch_metric_2": 30}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + sr := NewDbConnReaper(r, md) + + rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}). + AddRow(time.Now().UnixNano(), int64(100)) + rows2 := pgxmock.NewRows([]string{"epoch_ns", "value"}). + AddRow(time.Now().UnixNano(), int64(200)) + eb := mock.ExpectBatch() + eb.ExpectQuery("SELECT 1").WillReturnRows(rows1) + eb.ExpectQuery("SELECT 2").WillReturnRows(rows2) + + err = sr.executeBatch(ctx, []batchEntry{ + {name: "batch_metric_1", metric: metricDefs.MetricDefs["batch_metric_1"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"}, + {name: "batch_metric_2", metric: metricDefs.MetricDefs["batch_metric_2"], sql: "SELECT 2 as value, 200::bigint as epoch_ns"}, + }) + assert.NoError(t, err) + + received := 0 + for { + select { + case msg := <-r.measurementCh: + assert.Equal(t, "test_source", msg.DBName) + assert.True(t, msg.MetricName == "batch_metric_1" || msg.MetricName == "batch_metric_2") + received++ + default: + goto done + } + } +done: + assert.Equal(t, 2, received, "should have received 2 measurement envelopes") + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestSourceReaper_RunOneIteration(t *testing.T) { + ctx, cancel := context.WithCancel(log.WithLogger(context.Background(), log.NewNoopLogger())) + + metricDefs.MetricDefs["run_test_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT run_test"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "run_source", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"run_test_metric": 5}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + sr := NewDbConnReaper(r, md) + + // FetchRuntimeInfo sends a query + mock.ExpectQuery("select /\\* pgwatch_generated \\*/"). + WillReturnError(assert.AnError) + + rows := pgxmock.NewRows([]string{"epoch_ns", "value"}). + AddRow(time.Now().UnixNano(), int64(42)) + eb := mock.ExpectBatch() + eb.ExpectQuery("SELECT run_test").WillReturnRows(rows) + + go func() { + time.Sleep(200 * time.Millisecond) + cancel() + }() + + sr.Reap(ctx) + + select { + case msg := <-r.measurementCh: + assert.Equal(t, "run_source", msg.DBName) + assert.Equal(t, "run_test_metric", msg.MetricName) + case <-time.After(time.Second): + t.Error("Expected measurement but timed out") + } +} + +func TestSourceReaper_DetectServerRestart(t *testing.T) { + sr := &DbConnReaper{ + reaper: &reaper{ + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + }, + md: &sources.DbConn{ + Source: sources.Source{Name: "restart_test"}, + }, + } + + // First observation — establish baseline + data := metrics.Measurements{ + {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(1000)}, + } + sr.detectServerRestart(t.Context(), data) + assert.Equal(t, int64(1000), sr.lastUptimeS) + select { + case <-sr.reaper.measurementCh: + t.Error("should not emit restart event on first observation") + default: + } + + // Second observation — uptime increased (normal) + data = metrics.Measurements{ + {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(2000)}, + } + sr.detectServerRestart(t.Context(), data) + assert.Equal(t, int64(2000), sr.lastUptimeS) + select { + case <-sr.reaper.measurementCh: + t.Error("should not emit restart event when uptime increases") + default: + } + + // Third observation — uptime decreased (restart!) + data = metrics.Measurements{ + {"epoch_ns": time.Now().UnixNano(), "postmaster_uptime_s": int64(10)}, + } + sr.detectServerRestart(t.Context(), data) + assert.Equal(t, int64(10), sr.lastUptimeS) + select { + case msg := <-sr.reaper.measurementCh: + assert.Equal(t, "object_changes", msg.MetricName) + assert.Contains(t, msg.Data[0]["details"], "restart") + default: + t.Error("expected restart event") + } +} + +func TestSourceReaper_FetchSpecialMetric(t *testing.T) { + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + + newSR := func(t *testing.T) (*DbConnReaper, *sources.DbConn, pgxmock.PgxPoolIface) { + t.Helper() + md, mock := createTestSourceConn(t) + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + return NewDbConnReaper(r, md), md, mock + } + + sr, _, mock := newSR(t) + defer mock.Close() + + t.Run("instance_up dispatches measurement on ping success", func(t *testing.T) { + mock.ExpectPing() + assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, "")) + select { + case msg := <-sr.reaper.measurementCh: + assert.Equal(t, specialMetricInstanceUp, msg.MetricName) + assert.Len(t, msg.Data, 1) + assert.Equal(t, 1, msg.Data[0][specialMetricInstanceUp]) + default: + t.Error("expected measurement for instance_up") + } + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("instance_up uses storage name when set", func(t *testing.T) { + mock.ExpectPing() + assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricInstanceUp, "infra_up")) + select { + case msg := <-sr.reaper.measurementCh: + assert.Equal(t, "infra_up", msg.MetricName) + default: + t.Error("expected measurement") + } + assert.NoError(t, mock.ExpectationsWereMet()) + }) + + t.Run("change_events dispatches no measurement when no hash defs present", func(t *testing.T) { + // Doesn't contain additional defs for any of {"sproc_hashes", "table_hashes", "index_hashes", "configuration_hashes", "privilege_hashes"} + metricDefs.MetricDefs[specialMetricChangeEvents] = metrics.Metric{} + assert.NoError(t, sr.fetchSpecialMetric(ctx, specialMetricChangeEvents, "")) + select { + case <-sr.reaper.measurementCh: + t.Error("expected no measurement when no changes detected") + default: + } + assert.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestSourceReaper_ExecuteBatch_DegradedOnPersistentFailure(t *testing.T) { + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + + metricDefs.MetricDefs["good_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 1 as value, 100::bigint as epoch_ns"}, + } + metricDefs.MetricDefs["bad_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT bad"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "degrade_test", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"good_metric": 30, "bad_metric": 30}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + sr := NewDbConnReaper(r, md) + + entries := []batchEntry{ + {name: "good_metric", metric: metricDefs.MetricDefs["good_metric"], sql: "SELECT 1 as value, 100::bigint as epoch_ns"}, + {name: "bad_metric", metric: metricDefs.MetricDefs["bad_metric"], sql: "SELECT bad"}, + } + + // batch: good_metric succeeds, bad_metric cascades → retry bad_metric individually → still fails + rows1 := pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(1)) + eb := mock.ExpectBatch() + eb.ExpectQuery("SELECT 1").WillReturnRows(rows1) + eb.ExpectQuery("SELECT bad").WillReturnError(assert.AnError) // cascade + // individual retry of bad_metric + mock.ExpectQuery("SELECT bad").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError) + + err = sr.executeBatch(ctx, entries) + assert.Error(t, err) + assert.Contains(t, sr.degradedMetrics, "bad_metric", "bad_metric should be degraded after persistent failure") + assert.NotContains(t, sr.degradedMetrics, "good_metric", "good_metric should not be degraded") + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestSourceReaper_ExecuteBatch_CascadeRecovery(t *testing.T) { + // A metric that errors in the batch but succeeds on individual retry must NOT be marked degraded. + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + + metricDefs.MetricDefs["cascade_victim"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 3 as value, 300::bigint as epoch_ns"}, + } + metricDefs.MetricDefs["cascade_trigger"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT fail"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "cascade_test", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"cascade_trigger": 30, "cascade_victim": 30}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + sr := NewDbConnReaper(r, md) + + entries := []batchEntry{ + {name: "cascade_trigger", metric: metricDefs.MetricDefs["cascade_trigger"], sql: "SELECT fail"}, + {name: "cascade_victim", metric: metricDefs.MetricDefs["cascade_victim"], sql: "SELECT 3 as value, 300::bigint as epoch_ns"}, + } + + // batch: trigger fails, victim cascades → both retry individually + // trigger fails individually (real error), victim succeeds individually (was only a cascade) + eb := mock.ExpectBatch() + eb.ExpectQuery("SELECT fail").WillReturnError(assert.AnError) + eb.ExpectQuery("SELECT 3").WillReturnError(assert.AnError) // cascade in batch + // individual retries + mock.ExpectQuery("SELECT fail").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError) + mock.ExpectQuery("SELECT 3").WithArgs(pgx.QueryExecModeSimpleProtocol). + WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(time.Now().UnixNano(), int64(3))) + + err = sr.executeBatch(ctx, entries) + assert.Error(t, err, "cascade_trigger error should propagate") + assert.Contains(t, sr.degradedMetrics, "cascade_trigger", "real-failure metric should be degraded") + assert.NotContains(t, sr.degradedMetrics, "cascade_victim", "cascade-only victim must not be degraded") + assert.NoError(t, mock.ExpectationsWereMet()) +} + +func TestSourceReaper_DegradedMetricRecovery(t *testing.T) { + // Uses the real Run loop (via synctest fake clock) to verify the full degraded→recovered + // lifecycle: iteration 1 the degraded metric fails individually (stays degraded), + // iteration 2 it succeeds (removed from degradedMetrics). + synctest.Test(t, func(t *testing.T) { + const ( + metricName = "recovering_metric_real" + metricInterval = 30 + ) + + metricDefs.MetricDefs[metricName] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT 7 as value, 700::bigint as epoch_ns"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "recovery_src", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{metricName: metricInterval}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) + sr := NewDbConnReaper(r, md) + sr.degradedMetrics[metricName] = struct{}{} // pre-seed: metric already degraded + + // Iteration 1: FetchRuntimeInfo + degraded individual fetch → fails → stays degraded + mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError) + mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnError(assert.AnError) + + // Iteration 2: FetchRuntimeInfo + degraded individual fetch → succeeds → recovered + mock.ExpectQuery("select /\\* pgwatch_generated \\*/").WillReturnError(assert.AnError) + mock.ExpectQuery("SELECT 7").WithArgs(pgx.QueryExecModeSimpleProtocol). + WillReturnRows(pgxmock.NewRows([]string{"epoch_ns", "value"}).AddRow(int64(700_000_000_000), int64(7))) + + go sr.Reap(ctx) + + // Run goroutine completes iteration 1 (pgxmock is in-memory, no real I/O) then + // blocks on time.After — the only durably-blocking operation in the loop. + synctest.Wait() + assert.Contains(t, sr.degradedMetrics, metricName, "should still be degraded after first failure") + + // Advance the fake clock past the interval to trigger iteration 2. + // The Run goroutine's time.After(30s) fires first; it runs iteration 2 and + // blocks again before the test goroutine's sleep finishes. + time.Sleep(time.Duration(metricInterval)*time.Second + time.Millisecond) + synctest.Wait() + assert.NotContains(t, sr.degradedMetrics, metricName, "should recover after successful fetchMetric") + + assert.NoError(t, mock.ExpectationsWereMet()) + }) +} + +func TestSourceReaper_NonPostgresSequential(t *testing.T) { + ctx := log.WithLogger(context.Background(), log.NewNoopLogger()) + + metricDefs.MetricDefs["seq_metric"] = metrics.Metric{ + SQLs: metrics.SQLs{0: "SELECT seq_value"}, + } + + mock, err := pgxmock.NewPool() + require.NoError(t, err) + defer mock.Close() + + md := &sources.DbConn{ + Source: sources.Source{ + Name: "seq_test_src", + Kind: sources.SourcePostgres, + Metrics: metrics.MetricIntervals{"seq_metric": 30}, + }, + Conn: mock, + RuntimeInfo: sources.RuntimeInfo{ + Version: 120000, + ChangeState: make(map[string]map[string]string), + }, + } + + r := &reaper{ + Options: &cmdopts.Options{ + Metrics: metrics.CmdOpts{}, + Sinks: sinks.CmdOpts{}, + }, + measurementCh: make(chan metrics.MeasurementEnvelope, 10), + measurementCache: NewInstanceMetricCache(), + } + sr := NewDbConnReaper(r, md) + + rows := pgxmock.NewRows([]string{"epoch_ns", "value"}). + AddRow(time.Now().UnixNano(), int64(42)) + mock.ExpectQuery("SELECT seq_value").WithArgs(pgx.QueryExecModeSimpleProtocol).WillReturnRows(rows) + + err = sr.fetchMetric(ctx, batchEntry{name: "seq_metric", metric: metricDefs.MetricDefs["seq_metric"], sql: "SELECT seq_value"}) + assert.NoError(t, err) + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/reaper/file.go b/internal/reaper/file.go index e1a0e14b3e..23d1e01ea2 100644 --- a/internal/reaper/file.go +++ b/internal/reaper/file.go @@ -44,11 +44,11 @@ from pg_catalog.pg_tablespace where spcname !~ 'pg_.+'` var directlyFetchableOSMetrics = []string{metricPsutilCPU, metricPsutilDisk, metricPsutilDiskIoTotal, metricPsutilMem, metricCPULoad} -func IsDirectlyFetchableMetric(md *sources.SourceConn, metric string) bool { +func IsDirectlyFetchableMetric(md *sources.DbConn, metric string) bool { return slices.Contains(directlyFetchableOSMetrics, metric) && md.IsClientOnSameHost() } -func (r *Reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.SourceConn, metricName string) (*metrics.MeasurementEnvelope, error) { +func (r *reaper) FetchStatsDirectlyFromOS(ctx context.Context, md *sources.DbConn, metricName string) (*metrics.MeasurementEnvelope, error) { var data, pgDirs metrics.Measurements var err error diff --git a/internal/reaper/logparser.go b/internal/reaper/logparser.go index a2127073ce..573d6421ed 100644 --- a/internal/reaper/logparser.go +++ b/internal/reaper/logparser.go @@ -42,7 +42,7 @@ type LogParser struct { *LogConfig ctx context.Context LogsMatchRegex *regexp.Regexp - SourceConn *sources.SourceConn + SourceConn *sources.DbConn Interval time.Duration StoreCh chan<- metrics.MeasurementEnvelope eventCounts map[string]int64 // for the specific DB. [WARNING: 34, ERROR: 10, ...], zeroed on storage send @@ -59,7 +59,7 @@ type LogConfig struct { ServerMessagesLang string } -func NewLogParser(ctx context.Context, mdb *sources.SourceConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error) { +func NewLogParser(ctx context.Context, mdb *sources.DbConn, storeCh chan<- metrics.MeasurementEnvelope) (lp *LogParser, err error) { logger := log.GetLogger(ctx).WithField("source", mdb.Name).WithField("metric", specialMetricServerLogEventCounts) ctx = log.WithLogger(ctx, logger) @@ -140,7 +140,7 @@ func tryDetermineLogSettings(ctx context.Context, conn db.PgxIface) (cfg *LogCon return nil, err } -func checkHasRemotePrivileges(ctx context.Context, mdb *sources.SourceConn, logsDirPath string) error { +func checkHasRemotePrivileges(ctx context.Context, mdb *sources.DbConn, logsDirPath string) error { var logFile string err := mdb.Conn.QueryRow(ctx, "select name from pg_ls_logdir() limit 1").Scan(&logFile) if err != nil && err != pgx.ErrNoRows { diff --git a/internal/reaper/logparser_test.go b/internal/reaper/logparser_test.go index b596433bc5..ab2f3ac82d 100644 --- a/internal/reaper/logparser_test.go +++ b/internal/reaper/logparser_test.go @@ -25,7 +25,7 @@ func TestNewLogParser(t *testing.T) { require.NoError(t, err) defer mock.Close() - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", Metrics: metrics.MetricIntervals{specialMetricServerLogEventCounts: 60.0}, @@ -191,7 +191,7 @@ func TestCheckHasPrivileges(t *testing.T) { WillReturnError(assert.AnError) } - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", }, @@ -221,7 +221,7 @@ func TestCheckHasPrivileges(t *testing.T) { } func TestEventCountsToMetricStoreMessages(t *testing.T) { - mdb := &sources.SourceConn{ + mdb := &sources.DbConn{ Source: sources.Source{ Name: "test-db", Kind: sources.SourcePostgres, @@ -416,7 +416,7 @@ func TestLogParseLocal(t *testing.T) { pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true)) // Create a SourceConn for testing - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", }, @@ -593,7 +593,7 @@ func TestLogParseRemote(t *testing.T) { WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}). AddRow(logFileName, int32(len(logContent)), time.Now())) - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", Metrics: metrics.MetricIntervals{specialMetricServerLogEventCounts: 60}, // 60s interval - won't trigger during test @@ -654,7 +654,7 @@ func TestLogParseRemote(t *testing.T) { mock.ExpectQuery(`select name, size, modification from pg_ls_logdir\(\) where name like '%csv' order by modification desc limit 1;`). WillReturnError(assert.AnError) - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", Metrics: metrics.MetricIntervals{specialMetricServerLogEventCounts: 1}, @@ -718,7 +718,7 @@ incomplete line without proper fields WillReturnRows(pgxmock.NewRows([]string{"name", "size", "modification"}). AddRow(logFileName, int32(len(malformedContent)), time.Now())) - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", Metrics: metrics.MetricIntervals{specialMetricServerLogEventCounts: 60}, // Long interval @@ -787,7 +787,7 @@ incomplete line without proper fields WithArgs(filepath.Join(tempDir, logFileName), int32(0), int32(len(logContent))). WillReturnError(assert.AnError) - sourceConn := &sources.SourceConn{ + sourceConn := &sources.DbConn{ Source: sources.Source{ Name: "test-source", Metrics: metrics.MetricIntervals{specialMetricServerLogEventCounts: 1}, diff --git a/internal/reaper/metric.go b/internal/reaper/metric.go index 8bf87e5e2b..36ca4d4d06 100644 --- a/internal/reaper/metric.go +++ b/internal/reaper/metric.go @@ -75,7 +75,7 @@ type ExistingPartitionInfo struct { } // LoadMetrics loads metric definitions from the reader -func (r *Reaper) LoadMetrics() (err error) { +func (r *reaper) LoadMetrics() (err error) { var newDefs *metrics.Metrics if newDefs, err = r.MetricsReaderWriter.GetMetrics(); err != nil { return @@ -98,15 +98,16 @@ func (r *Reaper) LoadMetrics() (err error) { }(), "metrics and presets refreshed") // update the monitored sources with real metric definitions from presets for _, md := range r.monitoredSources { - if md.PresetMetrics > "" { - md.Lock() - md.Metrics = metricDefs.GetPresetMetrics(md.PresetMetrics) - md.Unlock() + var main, standby metrics.MetricIntervals + src := md.GetSource() + if src.PresetMetrics > "" { + main = metricDefs.GetPresetMetrics(src.PresetMetrics) } - if md.PresetMetricsStandby > "" { - md.Lock() - md.MetricsStandby = metricDefs.GetPresetMetrics(md.PresetMetricsStandby) - md.Unlock() + if src.PresetMetricsStandby > "" { + standby = metricDefs.GetPresetMetrics(src.PresetMetricsStandby) + } + if main != nil || standby != nil { + md.SetMetricIntervals(main, standby) } } return diff --git a/internal/reaper/metric_test.go b/internal/reaper/metric_test.go index 47d5c00ecf..12ac1a6bbb 100644 --- a/internal/reaper/metric_test.go +++ b/internal/reaper/metric_test.go @@ -48,12 +48,12 @@ var ( func TestReaper_FetchStatsDirectlyFromOS(t *testing.T) { a := assert.New(t) - r := &Reaper{Options: &cmdopts.Options{}} + r := &reaper{Options: &cmdopts.Options{}} t.Run("metrics directly fetchable when on same host", func(*testing.T) { conn, _ := pgxmock.NewPool(pgxmock.QueryMatcherOption(pgxmock.QueryMatcherEqual)) expq := conn.ExpectQuery("SELECT COALESCE(inet_client_addr(), inet_server_addr()) IS NULL") expq.Times(uint(len(directlyFetchableOSMetrics))) - md := &sources.SourceConn{Conn: conn} + md := &sources.DbConn{Conn: conn} for _, m := range directlyFetchableOSMetrics { expq.WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(true)) a.True(IsDirectlyFetchableMetric(md, m), "Expected %s to be directly fetchable", m) @@ -67,7 +67,7 @@ func TestReaper_FetchStatsDirectlyFromOS(t *testing.T) { remoteConn, _ := pgxmock.NewPool(pgxmock.QueryMatcherOption(pgxmock.QueryMatcherEqual)) remoteConn.ExpectQuery("SELECT COALESCE(inet_client_addr(), inet_server_addr()) IS NULL"). WillReturnRows(pgxmock.NewRows([]string{"is_unix_socket"}).AddRow(false)) - remoteMd := &sources.SourceConn{Conn: remoteConn} + remoteMd := &sources.DbConn{Conn: remoteConn} a.False(IsDirectlyFetchableMetric(remoteMd, metricCPULoad), "cpu_load should not be directly fetchable when pgwatch is not on the same host as PostgreSQL") }) @@ -126,7 +126,7 @@ func TestReaper_LoadMetrics(t *testing.T) { ctx := log.WithLogger(t.Context(), log.NewNoopLogger()) t.Run("returns error from GetMetrics", func(t *testing.T) { - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return nil, assert.AnError @@ -141,7 +141,7 @@ func TestReaper_LoadMetrics(t *testing.T) { MetricDefs: metrics.MetricDefs{"m1": {Description: "M1"}}, PresetDefs: metrics.PresetDefs{"p1": {Description: "P1", Metrics: metrics.MetricIntervals{"m1": 1.0}}}, } - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return defs, nil }, }, @@ -163,7 +163,7 @@ func TestReaper_LoadMetrics(t *testing.T) { PresetDefs: metrics.PresetDefs{}, } mock := &mockDefinerWriter{} - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return defs, nil }, }, @@ -180,7 +180,7 @@ func TestReaper_LoadMetrics(t *testing.T) { PresetDefs: metrics.PresetDefs{}, } mock := &mockDefinerWriter{defineErr: assert.AnError} - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return defs, nil }, }, @@ -201,13 +201,13 @@ func TestReaper_LoadMetrics(t *testing.T) { "standby1": {Metrics: metrics.MetricIntervals{"m2": 20.0}}, }, } - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return defs, nil }, }, }) r.monitoredSources = sources.SourceConns{ - sources.NewSourceConn(sources.Source{ + sources.NewDbConn(sources.Source{ Name: "src1", PresetMetrics: "preset1", PresetMetricsStandby: "standby1", @@ -215,7 +215,7 @@ func TestReaper_LoadMetrics(t *testing.T) { } assert.NoError(t, r.LoadMetrics()) - sc := r.monitoredSources[0] + sc := r.monitoredSources[0].(*sources.DbConn) assert.Equal(t, metrics.MetricIntervals{"m1": 10.0}, sc.Metrics) assert.Equal(t, metrics.MetricIntervals{"m2": 20.0}, sc.MetricsStandby) }) @@ -226,19 +226,19 @@ func TestReaper_LoadMetrics(t *testing.T) { MetricDefs: metrics.MetricDefs{"cpu": {Description: "CPU"}}, PresetDefs: metrics.PresetDefs{}, } - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ MetricsReaderWriter: &testutil.MockMetricsReaderWriter{ GetMetricsFunc: func() (*metrics.Metrics, error) { return defs, nil }, }, }) r.monitoredSources = sources.SourceConns{ - sources.NewSourceConn(sources.Source{ + sources.NewDbConn(sources.Source{ Name: "src2", Metrics: customMetrics, }), } assert.NoError(t, r.LoadMetrics()) - assert.Equal(t, customMetrics, r.monitoredSources[0].Metrics, "custom metrics should be unchanged") + assert.Equal(t, customMetrics, r.monitoredSources[0].(*sources.DbConn).Metrics, "custom metrics should be unchanged") }) // Regression test for https://github.com/cybertec-postgresql/pgwatch/issues/1091 @@ -261,7 +261,7 @@ func TestReaper_LoadMetrics(t *testing.T) { PresetMetrics: "test_preset", } getMetricsFn := func() (*metrics.Metrics, error) { return initialDefs, nil } - r := NewReaper(ctx, &cmdopts.Options{ + r := newReaper(ctx, &cmdopts.Options{ SourcesReaderWriter: &testutil.MockSourcesReaderWriter{ GetSourcesFunc: func() (sources.Sources, error) { return sources.Sources{src}, nil }, }, @@ -272,14 +272,14 @@ func TestReaper_LoadMetrics(t *testing.T) { }) require.NoError(t, r.LoadSources(ctx)) require.NoError(t, r.LoadMetrics()) - assert.Equal(t, metrics.MetricIntervals{"test_metric": 1}, r.monitoredSources[0].Metrics) + assert.Equal(t, metrics.MetricIntervals{"test_metric": 1}, r.monitoredSources[0].(*sources.DbConn).Metrics) // Attach a mock connection so CloseResourcesForRemovedMonitoredDBs doesn't panic // when the custom_tags change triggers a full source restart. mockConn, err := pgxmock.NewPool() require.NoError(t, err) mockConn.ExpectClose() - r.monitoredSources[0].Conn = mockConn + r.monitoredSources[0].(*sources.DbConn).Conn = mockConn // Simulate what happens between two Reap iterations: // 1. source custom_tags change triggers restart @@ -295,7 +295,7 @@ func TestReaper_LoadMetrics(t *testing.T) { require.NoError(t, r.LoadSources(ctx)) require.NoError(t, r.LoadMetrics()) - assert.Equal(t, metrics.MetricIntervals{"test_metric": 2}, r.monitoredSources[0].Metrics, + assert.Equal(t, metrics.MetricIntervals{"test_metric": 2}, r.monitoredSources[0].(*sources.DbConn).Metrics, "preset interval should be updated after source config change triggered a restart") }) } diff --git a/internal/reaper/prometheus.go b/internal/reaper/prometheus.go new file mode 100644 index 0000000000..5cbf04c84f --- /dev/null +++ b/internal/reaper/prometheus.go @@ -0,0 +1,176 @@ +package reaper + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + + "github.com/cybertec-postgresql/pgwatch/v5/internal/log" + "github.com/cybertec-postgresql/pgwatch/v5/internal/metrics" + "github.com/cybertec-postgresql/pgwatch/v5/internal/sources" +) + +const defaultScrapeInterval = 60 * time.Second + +var _ Reaper = (*PromReaper)(nil) + +// PromReaper drives metric scraping for a single Prometheus source. +// It runs a GCD-based tick loop and applies per-family emit-interval gating. +type PromReaper struct { + reaper *reaper + md *sources.PromConn + lastEmitted map[string]time.Time +} + +// NewPromSourceReaper creates a PromReaper for the given Prometheus source. +func NewPromSourceReaper(r *reaper, md *sources.PromConn) *PromReaper { + return &PromReaper{ + reaper: r, + md: md, + lastEmitted: make(map[string]time.Time), + } +} + +// calcScrapeInterval returns the GCD of all configured metric intervals. +// Defaults to defaultScrapeInterval when no metrics are configured (scrape-all mode). +// Individual intervals below minTickInterval are floored to minTickInterval. +func (pr *PromReaper) calcScrapeInterval() time.Duration { + pr.md.RLock() + m := pr.md.Metrics + pr.md.RUnlock() + + if len(m) == 0 { + return defaultScrapeInterval + } + intervals := make([]int, 0, len(m)) + for _, v := range m { + intervals = append(intervals, max(v, minTickInterval)) + } + return time.Duration(max(GCDSlice(intervals), minTickInterval)) * time.Second +} + +// Reap is the main loop for a Prometheus source. It scrapes all metric families +// on every GCD tick and emits envelopes that have passed their per-family interval. +func (pr *PromReaper) Reap(ctx context.Context) { + l := log.GetLogger(ctx).WithField("source", pr.md.Name) + ctx = log.WithLogger(ctx, l) + + pr.md.RLock() + scrapeAll := len(pr.md.Metrics) == 0 + pr.md.RUnlock() + + if scrapeAll { + l.Warning("no metrics configured for prometheus source, using scrape-all mode with 60 s interval") + } + + for { + envelopes, err := pr.ScrapeAll(ctx) + if err != nil { + l.WithError(err).Warning("prometheus scrape failed") + } else { + now := time.Now() + for _, env := range envelopes { + if !scrapeAll { + emitInterval := pr.md.GetMetricInterval(env.MetricName) + if emitInterval > 0 { + if last := pr.lastEmitted[env.MetricName]; !last.IsZero() && now.Sub(last) < emitInterval { + continue + } + } + } + env.DBName = pr.md.Name + env.CustomTags = pr.md.CustomTags + env.SourceKind = string(sources.SourcePrometheus) + pr.reaper.measurementCh <- env + pr.lastEmitted[env.MetricName] = now + } + } + + select { + case <-ctx.Done(): + return + case <-time.After(pr.calcScrapeInterval()): + } + } +} + +// ScrapeAll fetches Prometheus exposition metrics from pr.md and returns one +// MeasurementEnvelope per metric family. Each sample becomes one Measurement +// with tag_