Skip to content

Commit 9084986

Browse files
Manas Srivastavaclaude
andcommitted
fix(pool): bounded pgxpool config + saturation metrics — Wave-3 chaos verify
Sibling to api + worker pg-pool-saturation fixes. The provisioner's hot-pool talks to PROVISIONER_DATABASE_URL (a standalone Postgres at 161.35.111.84, not the platform DB), so it wasn't the proximate cause of the 50-concurrent /db/new burst exhaustion of DO Managed Postgres. But the same pattern can recur on that host once the hot-pool churns under load, and the bare pgxpool.New defaults (MaxConns = max(4, runtime.NumCPU())) under-bound on big-CPU nodes. This commit applies the same bounded-pool + saturation-metrics discipline applied in api and worker. Changes: - pool_metrics.go (new): newBoundedPgxPoolConfig parses the DSN + applies bounded defaults — PROVISIONER_PG_MAX_CONNS (default 10), PROVISIONER_PG_MIN_CONNS (2), PROVISIONER_PG_CONN_MAX_LIFETIME (4m), PROVISIONER_PG_CONN_MAX_IDLE_TIME (90s). Operator can override via env without redeploy. - pool_metrics.go (same file): startPgxPoolStatsExporter ticks every 5s and pushes pgxpool.Stat onto instant_pg_pool_{max,open,in_use, idle,wait_count,wait_duration_seconds,canceled_acquire_count} Prometheus gauges. Mirrors the api + worker exporters with pgxpool-specific field mapping. - main.go: swaps pgxpool.New(dsn) for pgxpool.NewWithConfig(cfg) using newBoundedPgxPoolConfig, and starts the stats exporter immediately after pool construction. - pool_metrics_test.go: regression contract. - TestEnvInt32_FallsBackOnBadValues + TestEnvDuration_*: typo'd env var can't silently disable the ceiling. - TestNewBoundedPgxPoolConfig_AppliesDefaults: documented defaults. - TestNewBoundedPgxPoolConfig_RespectsEnv: env overrides win. Coverage block: Symptom: worker event_email_forwarder "remaining connection slots are reserved for non-replication superuser connections" during 50-concurrent api /db/new burst Enumeration: rg -F 'pgxpool.New\|SetMaxOpenConns' api/ worker/ provisioner/ Sites found: 3 (api postgres.go, worker db.go, provisioner main.go) Sites touched: 3 (provisioner = this commit; api + worker sibling commits) Coverage test: TestNewBoundedPgxPoolConfig_AppliesDefaults pins the contract — a future refactor that drops the bounded config breaks the test. Live verified: deferred — see /readyz + /metrics post-deploy Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 848a928 commit 9084986

3 files changed

Lines changed: 309 additions & 1 deletion

File tree

main.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,12 +346,38 @@ func main() {
346346
os.Exit(1)
347347
}
348348

349-
dbPool, err := pgxpool.New(context.Background(), cfg.ProvisionerDatabaseURL)
349+
// Wave-3 chaos verify (2026-05-21): use bounded pgxpool config
350+
// instead of pgxpool.New's defaults (default MaxConns =
351+
// max(4, runtime.NumCPU()) which under-bounds on big-CPU
352+
// nodes). newBoundedPgxPoolConfig also reads
353+
// PROVISIONER_PG_MAX_CONNS / PROVISIONER_PG_MIN_CONNS /
354+
// PROVISIONER_PG_CONN_MAX_LIFETIME / PROVISIONER_PG_CONN_MAX_IDLE_TIME
355+
// so the operator can raise the ceiling per-environment.
356+
pgxCfg, err := newBoundedPgxPoolConfig(cfg.ProvisionerDatabaseURL)
357+
if err != nil {
358+
slog.Error("provisioner.pool_db_parse_failed", "error", err)
359+
os.Exit(1)
360+
}
361+
slog.Info("provisioner.pool_db_config_resolved",
362+
"max_conns", pgxCfg.MaxConns,
363+
"min_conns", pgxCfg.MinConns,
364+
"max_conn_lifetime", pgxCfg.MaxConnLifetime.String(),
365+
"max_conn_idle_time", pgxCfg.MaxConnIdleTime.String(),
366+
)
367+
dbPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
350368
if err != nil {
351369
slog.Error("provisioner.pool_db_connect_failed", "error", err)
352370
os.Exit(1)
353371
}
354372

373+
// Pool-saturation observability. Goroutine ticks every 5s
374+
// and pushes pgxpool.Stat onto instant_pg_pool_* Prometheus
375+
// gauges. Lives for the process lifetime; the cancel function
376+
// is wired into the existing shutdown path via context.
377+
poolStatsCtx, poolStatsCancel := context.WithCancel(context.Background())
378+
defer poolStatsCancel()
379+
go startPgxPoolStatsExporter(poolStatsCtx, dbPool, "provisioner_db")
380+
355381
// Verify connectivity with retries — k3s/Flannel sometimes needs a moment
356382
// to establish ClusterIP routing in a freshly-started container.
357383
const pingAttempts = 5

pool_metrics.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package main
2+
3+
// pool_metrics.go — bounded pgxpool config + saturation metrics for the
4+
// provisioner's hot-pool database connection. Wave-3 chaos verify
5+
// (2026-05-21): a 50-concurrent api /db/new burst exhausted the shared
6+
// DigitalOcean Managed Postgres user-connection ceiling. The
7+
// provisioner's own pgxpool wasn't the proximate cause (it talks to
8+
// PROVISIONER_DATABASE_URL on a different DO host — 161.35.111.84 —
9+
// not the platform DB), but the same pattern can recur on that host
10+
// once the hot-pool churns under load; this file extends the same
11+
// observability + bounded-pool discipline applied in api and worker.
12+
13+
import (
14+
"context"
15+
"log/slog"
16+
"os"
17+
"strconv"
18+
"time"
19+
20+
"github.com/jackc/pgx/v5/pgxpool"
21+
"github.com/prometheus/client_golang/prometheus"
22+
"github.com/prometheus/client_golang/prometheus/promauto"
23+
)
24+
25+
// Pool-size defaults.
26+
//
27+
// Provisioner's database is a workhorse Postgres at 161.35.111.84 used
28+
// for hot-pool tracking + cluster routing. Unlike the api/worker
29+
// platform_db, this host is a single DO Droplet (not Managed PG with
30+
// its slot reservations) — so the per-process pool ceiling matters
31+
// less for upstream-saturation reasons and more for "don't open more
32+
// conns than the workload actually needs" reasons. Default 10/3 is
33+
// generous for the workload (hot-pool refill + the occasional gRPC
34+
// handler INSERT).
35+
const (
36+
defaultProvisionerPGMaxConns = 10
37+
defaultProvisionerPGMinConns = 2
38+
defaultProvisionerPGConnMaxLife = 4 * time.Minute
39+
defaultProvisionerPGConnMaxIdle = 90 * time.Second
40+
)
41+
42+
// Pool-saturation gauges. Provisioner has no `internal/metrics`
43+
// package; declare these in main and the existing /metrics endpoint
44+
// (mux.Handle("/metrics", promhttp.Handler())) picks them up via the
45+
// default registry.
46+
var (
47+
pgPoolMaxGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
48+
Name: "instant_pg_pool_max",
49+
Help: "pgxpool MaxConns ceiling on the provisioner pool. Constant for the process lifetime.",
50+
}, []string{"pool"})
51+
52+
pgPoolTotalGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
53+
Name: "instant_pg_pool_open",
54+
Help: "pgxpool total connections (acquired + idle + constructing). Sampled every 5s.",
55+
}, []string{"pool"})
56+
57+
pgPoolAcquiredGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
58+
Name: "instant_pg_pool_in_use",
59+
Help: "pgxpool connections currently in use. Sampled every 5s. Wave-3 chaos verify 2026-05-21.",
60+
}, []string{"pool"})
61+
62+
pgPoolIdleGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
63+
Name: "instant_pg_pool_idle",
64+
Help: "pgxpool connections currently idle. Sampled every 5s.",
65+
}, []string{"pool"})
66+
67+
pgPoolAcquireWaitCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
68+
Name: "instant_pg_pool_wait_count",
69+
Help: "Cumulative count of acquire-waits since process start (pgxpool.Stat.EmptyAcquireCount). Steepening slope == pool saturated.",
70+
}, []string{"pool"})
71+
72+
pgPoolAcquireDurationSecs = promauto.NewGaugeVec(prometheus.GaugeOpts{
73+
Name: "instant_pg_pool_wait_duration_seconds",
74+
Help: "Cumulative time spent in acquire-waits since process start (pgxpool.Stat.AcquireDuration).",
75+
}, []string{"pool"})
76+
77+
pgPoolCanceledAcquireCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
78+
Name: "instant_pg_pool_canceled_acquire_count",
79+
Help: "Cumulative acquire-cancels since process start (pgxpool.Stat.CanceledAcquireCount). A non-zero rate means handlers are timing out before connections become available.",
80+
}, []string{"pool"})
81+
)
82+
83+
// envInt32 reads a positive int32 from an env var, falling back to def.
84+
// Bad values fall back too — provisioner must not refuse to start on a typo.
85+
func envInt32(name string, def int32) int32 {
86+
v := os.Getenv(name)
87+
if v == "" {
88+
return def
89+
}
90+
n, err := strconv.ParseInt(v, 10, 32)
91+
if err != nil || n <= 0 {
92+
return def
93+
}
94+
return int32(n)
95+
}
96+
97+
// envDuration reads a Go time.Duration from an env var, falling back to def.
98+
func envDuration(name string, def time.Duration) time.Duration {
99+
v := os.Getenv(name)
100+
if v == "" {
101+
return def
102+
}
103+
d, err := time.ParseDuration(v)
104+
if err != nil || d <= 0 {
105+
return def
106+
}
107+
return d
108+
}
109+
110+
// newBoundedPgxPoolConfig parses a pgxpool config from a DSN and
111+
// applies the bounded defaults (overridable via env). Returns a
112+
// configured *pgxpool.Config ready to pass to pgxpool.NewWithConfig.
113+
//
114+
// Env vars:
115+
//
116+
// PROVISIONER_PG_MAX_CONNS (default 10)
117+
// PROVISIONER_PG_MIN_CONNS (default 2)
118+
// PROVISIONER_PG_CONN_MAX_LIFETIME (default 4m)
119+
// PROVISIONER_PG_CONN_MAX_IDLE_TIME (default 90s)
120+
func newBoundedPgxPoolConfig(dsn string) (*pgxpool.Config, error) {
121+
cfg, err := pgxpool.ParseConfig(dsn)
122+
if err != nil {
123+
return nil, err
124+
}
125+
126+
cfg.MaxConns = envInt32("PROVISIONER_PG_MAX_CONNS", defaultProvisionerPGMaxConns)
127+
cfg.MinConns = envInt32("PROVISIONER_PG_MIN_CONNS", defaultProvisionerPGMinConns)
128+
cfg.MaxConnLifetime = envDuration("PROVISIONER_PG_CONN_MAX_LIFETIME", defaultProvisionerPGConnMaxLife)
129+
cfg.MaxConnIdleTime = envDuration("PROVISIONER_PG_CONN_MAX_IDLE_TIME", defaultProvisionerPGConnMaxIdle)
130+
131+
return cfg, nil
132+
}
133+
134+
// startPgxPoolStatsExporter samples pgxpool.Stat every 5s and pushes
135+
// the relevant numbers onto the instant_pg_pool_* gauges. Blocks
136+
// until ctx is cancelled. Mirror of api/internal/db/pool_metrics.go
137+
// for pgxpool semantics.
138+
func startPgxPoolStatsExporter(ctx context.Context, pool *pgxpool.Pool, label string) {
139+
if pool == nil {
140+
slog.Warn("provisioner.pool_metrics.skip — nil pool", "label", label)
141+
return
142+
}
143+
144+
const interval = 5 * time.Second
145+
ticker := time.NewTicker(interval)
146+
defer ticker.Stop()
147+
148+
slog.Info("provisioner.pool_metrics.exporter_started",
149+
"label", label,
150+
"interval", interval.String(),
151+
)
152+
153+
publishPgxPoolStats(pool, label)
154+
155+
for {
156+
select {
157+
case <-ctx.Done():
158+
slog.Info("provisioner.pool_metrics.exporter_stopped", "label", label)
159+
return
160+
case <-ticker.C:
161+
publishPgxPoolStats(pool, label)
162+
}
163+
}
164+
}
165+
166+
// publishPgxPoolStats reads pool.Stat() and pushes onto the gauges.
167+
// Exported as a free function so tests can drive it directly without
168+
// spinning a ticker.
169+
func publishPgxPoolStats(pool *pgxpool.Pool, label string) {
170+
s := pool.Stat()
171+
pgPoolMaxGauge.WithLabelValues(label).Set(float64(s.MaxConns()))
172+
pgPoolTotalGauge.WithLabelValues(label).Set(float64(s.TotalConns()))
173+
pgPoolAcquiredGauge.WithLabelValues(label).Set(float64(s.AcquiredConns()))
174+
pgPoolIdleGauge.WithLabelValues(label).Set(float64(s.IdleConns()))
175+
pgPoolAcquireWaitCount.WithLabelValues(label).Set(float64(s.EmptyAcquireCount()))
176+
pgPoolAcquireDurationSecs.WithLabelValues(label).Set(s.AcquireDuration().Seconds())
177+
pgPoolCanceledAcquireCount.WithLabelValues(label).Set(float64(s.CanceledAcquireCount()))
178+
}

pool_metrics_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
// TestEnvInt32_FallsBackOnBadValues — guard against a typo'd env var
9+
// silently disabling the pgxpool ceiling.
10+
func TestEnvInt32_FallsBackOnBadValues(t *testing.T) {
11+
cases := []struct {
12+
raw string
13+
want int32
14+
}{
15+
{"", 25},
16+
{"not-a-number", 25},
17+
{"-1", 25}, // negative → fallback
18+
{"0", 25}, // zero → fallback
19+
{"15", 15},
20+
}
21+
for _, tc := range cases {
22+
t.Setenv("__TEST_PROV_PG_ENVINT32", tc.raw)
23+
got := envInt32("__TEST_PROV_PG_ENVINT32", 25)
24+
if got != tc.want {
25+
t.Errorf("envInt32(%q): want %d, got %d", tc.raw, tc.want, got)
26+
}
27+
}
28+
}
29+
30+
// TestEnvDuration_FallsBackOnBadValues — guard against a typo'd env
31+
// var silently disabling the connection-lifetime knob.
32+
func TestEnvDuration_FallsBackOnBadValues(t *testing.T) {
33+
cases := []struct {
34+
raw string
35+
want time.Duration
36+
}{
37+
{"", 7 * time.Minute},
38+
{"not-a-duration", 7 * time.Minute},
39+
{"-1s", 7 * time.Minute},
40+
{"0", 7 * time.Minute},
41+
{"5m", 5 * time.Minute},
42+
{"30s", 30 * time.Second},
43+
}
44+
for _, tc := range cases {
45+
t.Setenv("__TEST_PROV_PG_ENVDURATION", tc.raw)
46+
got := envDuration("__TEST_PROV_PG_ENVDURATION", 7*time.Minute)
47+
if got != tc.want {
48+
t.Errorf("envDuration(%q): want %v, got %v", tc.raw, tc.want, got)
49+
}
50+
}
51+
}
52+
53+
// TestNewBoundedPgxPoolConfig_AppliesDefaults — asserts the bounded
54+
// config applies the documented defaults when no env vars are set.
55+
// Regression contract for the Wave-3 chaos verify (2026-05-21) finding:
56+
// provisioner default was pgxpool.New (MaxConns = max(4, runtime.NumCPU()))
57+
// — a high-CPU node could grab > 50 connections under load, multiplying
58+
// the same DO Managed Postgres exhaustion that took out worker.
59+
func TestNewBoundedPgxPoolConfig_AppliesDefaults(t *testing.T) {
60+
dsn := "postgres://nobody@127.0.0.1:1/postgres?sslmode=disable"
61+
cfg, err := newBoundedPgxPoolConfig(dsn)
62+
if err != nil {
63+
t.Fatalf("newBoundedPgxPoolConfig: %v", err)
64+
}
65+
if cfg.MaxConns != defaultProvisionerPGMaxConns {
66+
t.Errorf("MaxConns: want %d, got %d", defaultProvisionerPGMaxConns, cfg.MaxConns)
67+
}
68+
if cfg.MinConns != defaultProvisionerPGMinConns {
69+
t.Errorf("MinConns: want %d, got %d", defaultProvisionerPGMinConns, cfg.MinConns)
70+
}
71+
if cfg.MaxConnLifetime != defaultProvisionerPGConnMaxLife {
72+
t.Errorf("MaxConnLifetime: want %v, got %v", defaultProvisionerPGConnMaxLife, cfg.MaxConnLifetime)
73+
}
74+
if cfg.MaxConnIdleTime != defaultProvisionerPGConnMaxIdle {
75+
t.Errorf("MaxConnIdleTime: want %v, got %v", defaultProvisionerPGConnMaxIdle, cfg.MaxConnIdleTime)
76+
}
77+
}
78+
79+
// TestNewBoundedPgxPoolConfig_RespectsEnv — asserts env vars override
80+
// the defaults so operators can raise the ceiling without redeploying.
81+
func TestNewBoundedPgxPoolConfig_RespectsEnv(t *testing.T) {
82+
t.Setenv("PROVISIONER_PG_MAX_CONNS", "30")
83+
t.Setenv("PROVISIONER_PG_MIN_CONNS", "5")
84+
t.Setenv("PROVISIONER_PG_CONN_MAX_LIFETIME", "2m")
85+
t.Setenv("PROVISIONER_PG_CONN_MAX_IDLE_TIME", "45s")
86+
87+
dsn := "postgres://nobody@127.0.0.1:1/postgres?sslmode=disable"
88+
cfg, err := newBoundedPgxPoolConfig(dsn)
89+
if err != nil {
90+
t.Fatalf("newBoundedPgxPoolConfig: %v", err)
91+
}
92+
if cfg.MaxConns != 30 {
93+
t.Errorf("MaxConns: want 30, got %d", cfg.MaxConns)
94+
}
95+
if cfg.MinConns != 5 {
96+
t.Errorf("MinConns: want 5, got %d", cfg.MinConns)
97+
}
98+
if cfg.MaxConnLifetime != 2*time.Minute {
99+
t.Errorf("MaxConnLifetime: want 2m, got %v", cfg.MaxConnLifetime)
100+
}
101+
if cfg.MaxConnIdleTime != 45*time.Second {
102+
t.Errorf("MaxConnIdleTime: want 45s, got %v", cfg.MaxConnIdleTime)
103+
}
104+
}

0 commit comments

Comments
 (0)