|
| 1 | +package postgres |
| 2 | + |
| 3 | +// backend_seam_test.go — coverage for the NewBackend factory, the goredis |
| 4 | +// helper aliases, k8sEnv/k8sEnvInt, and the cluster-router paths the existing |
| 5 | +// tests don't reach (at-capacity Pick, refreshCounts/dbCount, pollLoop, |
| 6 | +// ProviderResourceID). |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "os" |
| 11 | + "testing" |
| 12 | + "time" |
| 13 | +) |
| 14 | + |
| 15 | +// osWriteFileBackend writes a kubeconfig fixture for the NewBackend k8s tests. |
| 16 | +func osWriteFileBackend(path, content string) error { |
| 17 | + return os.WriteFile(path, []byte(content), 0o600) |
| 18 | +} |
| 19 | + |
| 20 | +func TestK8sEnv_Seam(t *testing.T) { |
| 21 | + t.Setenv("K8S_TEST_KEY", "v") |
| 22 | + if k8sEnv("K8S_TEST_KEY", "fb") != "v" { |
| 23 | + t.Error("should return env value") |
| 24 | + } |
| 25 | + if k8sEnv("K8S_UNSET_KEY_XYZ", "fb") != "fb" { |
| 26 | + t.Error("should return fallback") |
| 27 | + } |
| 28 | +} |
| 29 | + |
| 30 | +func TestK8sEnvInt_Seam(t *testing.T) { |
| 31 | + t.Setenv("K8S_INT_KEY", "42") |
| 32 | + if k8sEnvInt("K8S_INT_KEY", 7) != 42 { |
| 33 | + t.Error("should parse env int") |
| 34 | + } |
| 35 | + t.Setenv("K8S_INT_BAD", "notanint") |
| 36 | + if k8sEnvInt("K8S_INT_BAD", 7) != 7 { |
| 37 | + t.Error("bad int should fall back") |
| 38 | + } |
| 39 | + if k8sEnvInt("K8S_INT_UNSET_XYZ", 9) != 9 { |
| 40 | + t.Error("unset should fall back") |
| 41 | + } |
| 42 | +} |
| 43 | + |
| 44 | +func TestGoredisAliases_Seam(t *testing.T) { |
| 45 | + if _, err := goredisParseURL("not-a-redis-url"); err == nil { |
| 46 | + t.Error("expected parse error") |
| 47 | + } |
| 48 | + opt, err := goredisParseURL("redis://127.0.0.1:6379") |
| 49 | + if err != nil { |
| 50 | + t.Fatalf("parse: %v", err) |
| 51 | + } |
| 52 | + if c := goredisNewClient(opt); c == nil { |
| 53 | + t.Error("nil client") |
| 54 | + } |
| 55 | +} |
| 56 | + |
| 57 | +func TestNewBackend_Neon_Seam(t *testing.T) { |
| 58 | + b := NewBackend("neon", "", "", "apikey", "region") |
| 59 | + if _, ok := b.(*NeonBackend); !ok { |
| 60 | + t.Errorf("want *NeonBackend, got %T", b) |
| 61 | + } |
| 62 | +} |
| 63 | + |
| 64 | +func TestNewBackend_DefaultLocal(t *testing.T) { |
| 65 | + b := NewBackend("", "postgres://u@h/db", "", "", "") |
| 66 | + if _, ok := b.(*LocalBackend); !ok { |
| 67 | + t.Errorf("want *LocalBackend, got %T", b) |
| 68 | + } |
| 69 | +} |
| 70 | + |
| 71 | +func TestNewBackend_ClusterURLs(t *testing.T) { |
| 72 | + b := NewBackend("", "", "url0,url1, ,url2,", "", "") |
| 73 | + lb, ok := b.(*LocalBackend) |
| 74 | + if !ok { |
| 75 | + t.Fatalf("want *LocalBackend, got %T", b) |
| 76 | + } |
| 77 | + // trailing empty + whitespace entries filtered → 3 clusters |
| 78 | + if len(lb.router.adminURLs) != 3 { |
| 79 | + t.Errorf("adminURLs = %v; want 3 filtered", lb.router.adminURLs) |
| 80 | + } |
| 81 | +} |
| 82 | + |
| 83 | +func TestNewBackend_ClusterURLs_AllEmpty_FallsBack(t *testing.T) { |
| 84 | + b := NewBackend("", "cust", " , ,", "", "") |
| 85 | + lb := b.(*LocalBackend) |
| 86 | + if len(lb.router.adminURLs) != 1 { |
| 87 | + t.Errorf("all-empty cluster list should fall back to single; got %v", lb.router.adminURLs) |
| 88 | + } |
| 89 | +} |
| 90 | + |
| 91 | +// k8s backend with no kubeconfig + no in-cluster config → newK8sBackend fails → |
| 92 | +// NewBackend falls back to local. Covers the fallback branch in the factory. |
| 93 | +func TestNewBackend_K8s_FallbackToLocal(t *testing.T) { |
| 94 | + t.Setenv("K8S_KUBECONFIG", "/nonexistent/kubeconfig-path") |
| 95 | + b := NewBackend("k8s", "cust-url", "", "", "") |
| 96 | + if _, ok := b.(*LocalBackend); !ok { |
| 97 | + t.Errorf("k8s init failure should fall back to *LocalBackend, got %T", b) |
| 98 | + } |
| 99 | +} |
| 100 | + |
| 101 | +// NewBackend("k8s") with a valid kubeconfig + a parseable REDIS_URL_FOR_ROUTES |
| 102 | +// exercises the route-registry-enabled block (the only sub-95 gap in NewBackend). |
| 103 | +func TestNewBackend_K8s_RouteRegistryEnabled(t *testing.T) { |
| 104 | + dir := t.TempDir() |
| 105 | + kc := dir + "/kubeconfig" |
| 106 | + if err := osWriteFileBackend(kc, minimalKubeconfig); err != nil { |
| 107 | + t.Fatalf("write kubeconfig: %v", err) |
| 108 | + } |
| 109 | + t.Setenv("K8S_KUBECONFIG", kc) |
| 110 | + t.Setenv("REDIS_URL_FOR_ROUTES", "redis://127.0.0.1:6379") |
| 111 | + b := NewBackend("k8s", "cust", "", "", "") |
| 112 | + kb, ok := b.(*K8sBackend) |
| 113 | + if !ok { |
| 114 | + t.Fatalf("want *K8sBackend, got %T", b) |
| 115 | + } |
| 116 | + if kb.rdb == nil { |
| 117 | + t.Error("route registry should be enabled when REDIS_URL_FOR_ROUTES parses") |
| 118 | + } |
| 119 | +} |
| 120 | + |
| 121 | +// NewBackend("k8s") with a valid kubeconfig but an UNPARSEABLE route Redis URL |
| 122 | +// exercises the route-registry-disabled (warn) branch. |
| 123 | +func TestNewBackend_K8s_RouteRegistryBadURL(t *testing.T) { |
| 124 | + dir := t.TempDir() |
| 125 | + kc := dir + "/kubeconfig" |
| 126 | + if err := osWriteFileBackend(kc, minimalKubeconfig); err != nil { |
| 127 | + t.Fatalf("write kubeconfig: %v", err) |
| 128 | + } |
| 129 | + t.Setenv("K8S_KUBECONFIG", kc) |
| 130 | + t.Setenv("REDIS_URL_FOR_ROUTES", "::::not-a-redis-url") |
| 131 | + b := NewBackend("k8s", "cust", "", "", "") |
| 132 | + kb, ok := b.(*K8sBackend) |
| 133 | + if !ok { |
| 134 | + t.Fatalf("want *K8sBackend, got %T", b) |
| 135 | + } |
| 136 | + if kb.rdb != nil { |
| 137 | + t.Error("route registry should stay disabled when the URL fails to parse") |
| 138 | + } |
| 139 | +} |
| 140 | + |
| 141 | +func TestNewDedicatedBackend_Seam(t *testing.T) { |
| 142 | + b := NewDedicatedBackend("dsn", "") |
| 143 | + if _, ok := b.(*DedicatedProvider); !ok { |
| 144 | + t.Errorf("want *DedicatedProvider, got %T", b) |
| 145 | + } |
| 146 | +} |
| 147 | + |
| 148 | +// --- cluster_router uncovered paths --- |
| 149 | + |
| 150 | +func TestProviderResourceID(t *testing.T) { |
| 151 | + r := newClusterRouter([]string{"u0", "u1"}, 0) |
| 152 | + if r.ProviderResourceID(1) != "local:1" { |
| 153 | + t.Errorf("got %q", r.ProviderResourceID(1)) |
| 154 | + } |
| 155 | +} |
| 156 | + |
| 157 | +func TestPick_AllAtCapacity_FallsBackToZero(t *testing.T) { |
| 158 | + r := newClusterRouter([]string{"u0", "u1"}, 1) |
| 159 | + // Saturate both clusters' counts so headroom <= 0 everywhere. |
| 160 | + r.mu.Lock() |
| 161 | + r.counts[0] = 5 |
| 162 | + r.counts[1] = 5 |
| 163 | + r.mu.Unlock() |
| 164 | + idx, url, err := r.Pick() |
| 165 | + if err != nil { |
| 166 | + t.Fatalf("Pick: %v", err) |
| 167 | + } |
| 168 | + if idx != 0 || url != "u0" { |
| 169 | + t.Errorf("at-capacity Pick should fall back to index 0; got %d/%q", idx, url) |
| 170 | + } |
| 171 | +} |
| 172 | + |
| 173 | +func TestPick_AllURLsEmpty_BestNegativeFallback(t *testing.T) { |
| 174 | + // Non-empty slice but every URL blank → loop never sets best → best<0 path. |
| 175 | + r := newClusterRouter([]string{"", ""}, 0) |
| 176 | + idx, _, err := r.Pick() |
| 177 | + if err != nil { |
| 178 | + t.Fatalf("Pick: %v", err) |
| 179 | + } |
| 180 | + if idx != 0 { |
| 181 | + t.Errorf("best<0 fallback should pick index 0; got %d", idx) |
| 182 | + } |
| 183 | +} |
| 184 | + |
| 185 | +func TestPick_NoClusters_Error(t *testing.T) { |
| 186 | + r := newClusterRouter(nil, 0) |
| 187 | + if _, _, err := r.Pick(); err == nil { |
| 188 | + t.Error("expected no-clusters error") |
| 189 | + } |
| 190 | +} |
| 191 | + |
| 192 | +func TestRefreshCounts_ConnectFails_KeepsPrevious(t *testing.T) { |
| 193 | + // Unreachable admin URL → dbCount errors → previous count retained. |
| 194 | + r := newClusterRouter([]string{"postgres://x@127.0.0.1:1/none", ""}, 0) |
| 195 | + r.mu.Lock() |
| 196 | + r.counts[0] = 3 |
| 197 | + r.mu.Unlock() |
| 198 | + r.refreshCounts(context.Background()) |
| 199 | + r.mu.RLock() |
| 200 | + got := r.counts[0] |
| 201 | + r.mu.RUnlock() |
| 202 | + if got != 3 { |
| 203 | + t.Errorf("count after failed poll = %d; want previous 3", got) |
| 204 | + } |
| 205 | +} |
| 206 | + |
| 207 | +func TestDbCount_ConnectError(t *testing.T) { |
| 208 | + r := newClusterRouter([]string{"x"}, 0) |
| 209 | + if _, err := r.dbCount(context.Background(), "postgres://x@127.0.0.1:1/none"); err == nil { |
| 210 | + t.Error("expected connect error") |
| 211 | + } |
| 212 | +} |
| 213 | + |
| 214 | +func TestDbCount_Success_ViaSeam(t *testing.T) { |
| 215 | + fc := &fakePGConn{scanInt64: 11} |
| 216 | + withPGXConnect(t, fc, nil) |
| 217 | + r := newClusterRouter([]string{"u0"}, 0) |
| 218 | + n, err := r.dbCount(context.Background(), "u0") |
| 219 | + if err != nil || n != 11 { |
| 220 | + t.Errorf("dbCount = %d, %v", n, err) |
| 221 | + } |
| 222 | +} |
| 223 | + |
| 224 | +func TestDbCount_ScanError_ViaSeam(t *testing.T) { |
| 225 | + fc := &fakePGConn{queryRowErr: errSeam} |
| 226 | + withPGXConnect(t, fc, nil) |
| 227 | + r := newClusterRouter([]string{"u0"}, 0) |
| 228 | + if _, err := r.dbCount(context.Background(), "u0"); err == nil { |
| 229 | + t.Error("expected scan error") |
| 230 | + } |
| 231 | +} |
| 232 | + |
| 233 | +// pollLoop ticker branch: shrink the poll interval so ticker.C fires and the |
| 234 | +// periodic refreshCounts runs, then cancel. |
| 235 | +func TestPollLoop_TickerFires(t *testing.T) { |
| 236 | + fc := &fakePGConn{scanInt64: 1} |
| 237 | + withPGXConnect(t, fc, nil) |
| 238 | + r := newClusterRouter([]string{"u0"}, 0) |
| 239 | + r.pollInterval = 5 * time.Millisecond // per-instance, no shared-global race |
| 240 | + ctx, cancel := context.WithCancel(context.Background()) |
| 241 | + done := make(chan struct{}) |
| 242 | + go func() { r.pollLoop(ctx); close(done) }() |
| 243 | + time.Sleep(40 * time.Millisecond) // let several ticks fire |
| 244 | + cancel() |
| 245 | + select { |
| 246 | + case <-done: |
| 247 | + case <-time.After(2 * time.Second): |
| 248 | + t.Fatal("pollLoop did not return after cancel") |
| 249 | + } |
| 250 | +} |
| 251 | + |
| 252 | +// pollLoop with a non-positive pollInterval falls back to the default — covers |
| 253 | +// the interval<=0 guard. We cancel immediately after start so the default-60s |
| 254 | +// ticker never actually fires (we only need the guard line executed). |
| 255 | +func TestPollLoop_ZeroIntervalFallsBackToDefault(t *testing.T) { |
| 256 | + fc := &fakePGConn{scanInt64: 1} |
| 257 | + withPGXConnect(t, fc, nil) |
| 258 | + r := newClusterRouter([]string{"u0"}, 0) |
| 259 | + r.pollInterval = 0 // → guard sets interval = defaultClusterPollInterval |
| 260 | + ctx, cancel := context.WithCancel(context.Background()) |
| 261 | + cancel() // pre-cancel: after the immediate refresh + ticker setup, return |
| 262 | + done := make(chan struct{}) |
| 263 | + go func() { r.pollLoop(ctx); close(done) }() |
| 264 | + select { |
| 265 | + case <-done: |
| 266 | + case <-time.After(2 * time.Second): |
| 267 | + t.Fatal("pollLoop did not return") |
| 268 | + } |
| 269 | +} |
| 270 | + |
| 271 | +// pollLoop ctx.Done() return path: call pollLoop directly with an |
| 272 | +// already-cancelled context and a fresh (never-closed) done channel so the |
| 273 | +// select can only fire on ctx.Done(). Deterministic — no race with done. |
| 274 | +func TestPollLoop_CtxDoneReturns(t *testing.T) { |
| 275 | + fc := &fakePGConn{scanInt64: 1} |
| 276 | + withPGXConnect(t, fc, nil) |
| 277 | + r := newClusterRouter([]string{"u0"}, 0) |
| 278 | + ctx, cancel := context.WithCancel(context.Background()) |
| 279 | + cancel() // pre-cancel: after the immediate refresh, select hits ctx.Done() |
| 280 | + done := make(chan struct{}) |
| 281 | + go func() { r.pollLoop(ctx); close(done) }() |
| 282 | + select { |
| 283 | + case <-done: |
| 284 | + case <-time.After(2 * time.Second): |
| 285 | + t.Fatal("pollLoop did not return on ctx.Done()") |
| 286 | + } |
| 287 | +} |
| 288 | + |
| 289 | +// pollLoop done-channel return path: drive directly and signal exit via |
| 290 | +// Shutdown (closes done), joining the goroutine so it can't leak. |
| 291 | +func TestPollLoop_ShutdownReturns(t *testing.T) { |
| 292 | + fc := &fakePGConn{scanInt64: 1} |
| 293 | + withPGXConnect(t, fc, nil) |
| 294 | + r := newClusterRouter([]string{"u0"}, 0) |
| 295 | + done := make(chan struct{}) |
| 296 | + go func() { r.pollLoop(context.Background()); close(done) }() |
| 297 | + // Wait until the poller is up, then Shutdown (closes r.done) → return. |
| 298 | + deadline := time.Now().Add(2 * time.Second) |
| 299 | + for r.pollStarts.Load() == 0 && time.Now().Before(deadline) { |
| 300 | + time.Sleep(time.Millisecond) |
| 301 | + } |
| 302 | + r.Shutdown() |
| 303 | + select { |
| 304 | + case <-done: |
| 305 | + case <-time.After(2 * time.Second): |
| 306 | + t.Fatal("pollLoop did not return after Shutdown") |
| 307 | + } |
| 308 | +} |
0 commit comments