Skip to content

Commit 07b6dd6

Browse files
fix(queue): circuit breaker on NATS/queue backend (sweep #2) (#42)
Queue/NATS was the only provisioner backend with no circuit breaker: a wedged shared-NATS account/JWT call (or a kube-apiserver stall on the dedicated path) could pile up with no fast-fail and no /readyz signal, unlike postgres/redis/mongo which all trip after 5 consecutive failures. Add a QueueAdmin breaker (BackendQueueAdmin = "queue_admin"), a breakerForQueue(useDedicated) helper (shared → QueueAdmin, dedicated → K8sAPI, mirroring breakerForMongo), and wrap all 4 queue dispatch sites (Provision shared + dedicated, Deprovision shared + dedicated) in callBackend/callBackendVoid. Register QueueAdmin in collectBreakerInspectors so a tripped queue breaker surfaces as backend_queue_admin on /readyz. Tests: TestServer_QueueCircuitTripsOnRepeatedFailures (5 failures trip the breaker; 6th short-circuits to Unavailable WITHOUT invoking the backend, asserted via a call counter). The two registry-drift guards (TestBreakers_EveryStructFieldHasAnInspector, _AllBackendsSurfaced) caught the new breaker and are updated — they enforce that a new breaker is wired into BOTH the inspector slice and /readyz. Full `go test ./... -short` green. Co-authored-by: Manas Srivastava <[email protected]> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent ece0fce commit 07b6dd6

6 files changed

Lines changed: 118 additions & 39 deletions

File tree

coverage_registry_test.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,31 +37,32 @@ import (
3737
// collectBreakerInspectors fails this test.
3838
//
3939
// COVERAGE BLOCK (rule 17):
40-
// Symptom: a new circuit-protected backend is added to
41-
// *circuit.Breakers (new field, e.g. NATSAdmin) but
42-
// collectBreakerInspectors in main.go is not
43-
// extended. The new backend's circuit state never
44-
// surfaces on /readyz. A future tripped breaker
45-
// doesn't show up as a degraded check, the NR alert
46-
// filter (which keys on `circuit.opened` log lines)
47-
// fires but operators have no /readyz signal to
48-
// cross-reference.
49-
// Enumeration: reflect over the *circuit.Breakers struct fields
50-
// whose type is *circuit.Breaker. This IS the
51-
// registry.
52-
// Sites found: N fields on Breakers (currently 5: PostgresAdmin,
53-
// PostgresK8s, RedisAdmin, MongoAdmin, K8sAPI).
54-
// Sites touched: each must be findable in the inspector slice
55-
// collectBreakerInspectors returns. The match is
56-
// by .Name() — every breaker carries the backend
57-
// name as its identity.
58-
// Coverage test: missing-from-inspectors and missing-from-struct
59-
// both fail.
60-
// Live verified: provisioner /readyz output today shows
61-
// backend_postgres_admin / backend_postgres_k8s /
62-
// backend_redis_admin / backend_mongo_admin /
63-
// backend_k8s_api — five degraded checks. This test
64-
// pins that count to whatever the struct says.
40+
//
41+
// Symptom: a new circuit-protected backend is added to
42+
// *circuit.Breakers (new field, e.g. NATSAdmin) but
43+
// collectBreakerInspectors in main.go is not
44+
// extended. The new backend's circuit state never
45+
// surfaces on /readyz. A future tripped breaker
46+
// doesn't show up as a degraded check, the NR alert
47+
// filter (which keys on `circuit.opened` log lines)
48+
// fires but operators have no /readyz signal to
49+
// cross-reference.
50+
// Enumeration: reflect over the *circuit.Breakers struct fields
51+
// whose type is *circuit.Breaker. This IS the
52+
// registry.
53+
// Sites found: N fields on Breakers (currently 5: PostgresAdmin,
54+
// PostgresK8s, RedisAdmin, MongoAdmin, K8sAPI).
55+
// Sites touched: each must be findable in the inspector slice
56+
// collectBreakerInspectors returns. The match is
57+
// by .Name() — every breaker carries the backend
58+
// name as its identity.
59+
// Coverage test: missing-from-inspectors and missing-from-struct
60+
// both fail.
61+
// Live verified: provisioner /readyz output today shows
62+
// backend_postgres_admin / backend_postgres_k8s /
63+
// backend_redis_admin / backend_mongo_admin /
64+
// backend_k8s_api — five degraded checks. This test
65+
// pins that count to whatever the struct says.
6566
func TestBreakers_EveryStructFieldHasAnInspector(t *testing.T) {
6667
bs := circuit.NewBreakers()
6768
got := collectBreakerInspectors(bs)
@@ -90,6 +91,7 @@ func TestBreakers_EveryStructFieldHasAnInspector(t *testing.T) {
9091
"PostgresAdmin": circuit.BackendPostgresAdmin,
9192
"RedisAdmin": circuit.BackendRedisAdmin,
9293
"MongoAdmin": circuit.BackendMongoAdmin,
94+
"QueueAdmin": circuit.BackendQueueAdmin,
9395
"K8sAPI": circuit.BackendK8sAPI,
9496
}
9597

internal/circuit/breakers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const (
1515
BackendPostgresAdmin = "postgres_admin" // shared postgres-customers CREATE DATABASE / CREATE USER (local.go)
1616
BackendRedisAdmin = "redis_admin" // shared redis-provision ACL SETUSER / namespace ops
1717
BackendMongoAdmin = "mongo_admin" // shared mongo admin CREATE USER / role grants
18+
BackendQueueAdmin = "queue_admin" // shared NATS account/JWT provisioning + deprovision
1819
BackendK8sAPI = "k8s_api" // raw kube-apiserver client calls (kubectl-equivalent)
1920
)
2021

@@ -40,6 +41,7 @@ type Breakers struct {
4041
PostgresAdmin *Breaker
4142
RedisAdmin *Breaker
4243
MongoAdmin *Breaker
44+
QueueAdmin *Breaker
4345
K8sAPI *Breaker
4446
}
4547

@@ -56,6 +58,7 @@ func NewBreakers() *Breakers {
5658
PostgresAdmin: newDefault(BackendPostgresAdmin),
5759
RedisAdmin: newDefault(BackendRedisAdmin),
5860
MongoAdmin: newDefault(BackendMongoAdmin),
61+
QueueAdmin: newDefault(BackendQueueAdmin),
5962
K8sAPI: newDefault(BackendK8sAPI),
6063
}
6164
}

internal/server/server.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,18 @@ func (s *Server) breakerForMongo(useDedicated bool) *circuit.Breaker {
268268
return s.breakers.MongoAdmin
269269
}
270270

271+
// breakerForQueue — the shared NATS backend issues account/JWT provisioning on
272+
// the shared cluster (`queue_admin`); the dedicated k8s backend issues
273+
// kube-apiserver calls (`k8s_api`), the same split as Mongo. Queue was the only
274+
// backend with no breaker (sweep #2): a wedged NATS / kube-apiserver call could
275+
// pile up with no fast-fail and no /readyz signal.
276+
func (s *Server) breakerForQueue(useDedicated bool) *circuit.Breaker {
277+
if useDedicated {
278+
return s.breakers.K8sAPI
279+
}
280+
return s.breakers.QueueAdmin
281+
}
282+
271283
// callBackend wraps a backend invocation behind the given breaker. Returns
272284
// circuit.ErrOpen verbatim when the breaker is open so the caller can map
273285
// it to a gRPC Unavailable via mapError. The breaker's caller-deadline
@@ -584,7 +596,9 @@ func (s *Server) provisionQueue(ctx context.Context, req *provisionerv1.Provisio
584596
// Pro and team tiers with a dedicated k8s backend get their own NATS pod.
585597
if isDedicatedTier(req.Tier) && s.dedicatedQueueBackend != nil {
586598
slog.Info("server.provisionQueue: using dedicated backend", "token", req.Token, "tier", req.Tier)
587-
creds, err := s.dedicatedQueueBackend.Provision(ctx, req.Token, req.Tier)
599+
creds, err := callBackend(s.breakerForQueue(true), func() (*queue.Credentials, error) {
600+
return s.dedicatedQueueBackend.Provision(ctx, req.Token, req.Tier)
601+
})
588602
if err != nil {
589603
return nil, mapError("ProvisionResource.queue.dedicated", err)
590604
}
@@ -611,7 +625,9 @@ func (s *Server) provisionQueue(ctx context.Context, req *provisionerv1.Provisio
611625
}
612626

613627
slog.Info("server.provisionQueue: pool miss, provisioning live", "token", req.Token)
614-
creds, err := s.queueBackend.Provision(ctx, req.Token, req.Tier)
628+
creds, err := callBackend(s.breakerForQueue(false), func() (*queue.Credentials, error) {
629+
return s.queueBackend.Provision(ctx, req.Token, req.Tier)
630+
})
615631
if err != nil {
616632
return nil, mapError("ProvisionResource.queue", err)
617633
}
@@ -718,12 +734,16 @@ func (s *Server) DeprovisionResource(ctx context.Context, req *provisionerv1.Dep
718734
!strings.HasPrefix(req.ProviderResourceId, "instant-customer-") {
719735
slog.Info("server.DeprovisionResource: queue using dedicated backend",
720736
"token", req.Token, "provider_resource_id", req.ProviderResourceId)
721-
if err := s.dedicatedQueueBackend.Deprovision(ctx, req.Token, req.ProviderResourceId); err != nil {
737+
if err := callBackendVoid(s.breakerForQueue(true), func() error {
738+
return s.dedicatedQueueBackend.Deprovision(ctx, req.Token, req.ProviderResourceId)
739+
}); err != nil {
722740
return nil, mapError("DeprovisionResource.queue.dedicated", err)
723741
}
724742
return &provisionerv1.DeprovisionResponse{Deprovisioned: true}, nil
725743
}
726-
if err := s.queueBackend.Deprovision(ctx, req.Token, req.ProviderResourceId); err != nil {
744+
if err := callBackendVoid(s.breakerForQueue(false), func() error {
745+
return s.queueBackend.Deprovision(ctx, req.Token, req.ProviderResourceId)
746+
}); err != nil {
727747
return nil, mapError("DeprovisionResource.queue", err)
728748
}
729749
return &provisionerv1.DeprovisionResponse{Deprovisioned: true}, nil

internal/server/server_coverage_test.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"google.golang.org/grpc/status"
2222
"google.golang.org/grpc/test/bufconn"
2323

24+
commonv1 "instant.dev/proto/common/v1"
25+
provisionerv1 "instant.dev/proto/provisioner/v1"
2426
"instant.dev/provisioner/internal/backend/mongo"
2527
"instant.dev/provisioner/internal/backend/postgres"
2628
"instant.dev/provisioner/internal/backend/queue"
@@ -29,8 +31,6 @@ import (
2931
"instant.dev/provisioner/internal/config"
3032
"instant.dev/provisioner/internal/pool"
3133
"instant.dev/provisioner/internal/server"
32-
commonv1 "instant.dev/proto/common/v1"
33-
provisionerv1 "instant.dev/proto/provisioner/v1"
3434
)
3535

3636
// --- fake pool claimer for exercising the s.pool != nil branches ---
@@ -342,6 +342,58 @@ func TestProvisionResource_Queue_DedicatedFailure_ReturnsError(t *testing.T) {
342342
assertCode(t, err, codes.InvalidArgument)
343343
}
344344

345+
// countingFailQueueBackend records its call count so a breaker short-circuit
346+
// (the tripped breaker must NOT invoke the backend) can be asserted. The error
347+
// is non-retryable so mapError classifies it Internal — distinguishable from
348+
// the Unavailable an open breaker returns.
349+
type countingFailQueueBackend struct{ calls int }
350+
351+
func (b *countingFailQueueBackend) Provision(context.Context, string, string) (*queue.Credentials, error) {
352+
b.calls++
353+
return nil, errors.New("permission denied: nsc account create")
354+
}
355+
356+
func (b *countingFailQueueBackend) Deprovision(context.Context, string, string) error {
357+
b.calls++
358+
return errors.New("permission denied: nsc account delete")
359+
}
360+
361+
// TestServer_QueueCircuitTripsOnRepeatedFailures — sweep #2: queue/NATS was the
362+
// only backend with no circuit breaker. After the default threshold of shared
363+
// queue failures the QueueAdmin breaker must open and the next Provision must
364+
// short-circuit to Unavailable WITHOUT invoking the backend.
365+
func TestServer_QueueCircuitTripsOnRepeatedFailures(t *testing.T) {
366+
qb := &countingFailQueueBackend{}
367+
srv := server.NewWithBackends(
368+
&config.Config{},
369+
&mockPostgresBackend{}, &mockRedisBackend{}, &mockMongoBackend{}, qb,
370+
nil, nil, nil, nil, nil, nil,
371+
)
372+
srv.SetBreakers(freshBreakers())
373+
374+
for i := 0; i < 5; i++ {
375+
_, err := srv.ProvisionResource(context.Background(), &provisionerv1.ProvisionRequest{
376+
Token: "tok", Tier: "hobby", ResourceType: commonv1.ResourceType_RESOURCE_TYPE_QUEUE,
377+
})
378+
if err == nil {
379+
t.Fatalf("attempt %d: expected queue backend error, got nil", i+1)
380+
}
381+
}
382+
if qb.calls != 5 {
383+
t.Fatalf("queue backend should have been called 5 times before tripping, got %d", qb.calls)
384+
}
385+
386+
// 6th call: the QueueAdmin breaker is open → short-circuit to Unavailable
387+
// without touching the backend.
388+
_, err := srv.ProvisionResource(context.Background(), &provisionerv1.ProvisionRequest{
389+
Token: "tok", Tier: "hobby", ResourceType: commonv1.ResourceType_RESOURCE_TYPE_QUEUE,
390+
})
391+
assertCode(t, err, codes.Unavailable)
392+
if qb.calls != 5 {
393+
t.Fatalf("open QueueAdmin breaker should have short-circuited; backend called %d times (expected 5)", qb.calls)
394+
}
395+
}
396+
345397
// Dedicated provision error → mapError'd.
346398
func TestProvisionResource_Redis_DedicatedFailure_ReturnsError(t *testing.T) {
347399
srv := server.NewWithBackends(

main.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
// Observability — relocated 2026-05-12 from the api repo's reference scaffold
66
// (track B2 of the observability rollout). What this file wires up:
77
//
8-
// 1. slog default handler decorated with instant.dev/common/logctx so every
9-
// log line carries service / commit_id / trace_id / tid / team_id.
10-
// 2. New Relic Go agent (fail-open: an unset NEW_RELIC_LICENSE_KEY logs a
11-
// warning and returns nil; a nil app is safe to pass to nrgrpc).
12-
// 3. nrgrpc.UnaryServerInterceptor chained with a trace-id stamper so
13-
// W3C-propagated trace IDs reach downstream slog calls in handlers.
14-
// 4. HTTP sidecar on :8092 exposing /healthz with build metadata JSON.
15-
// Same shape as api and worker /healthz so a single jq filter works.
8+
// 1. slog default handler decorated with instant.dev/common/logctx so every
9+
// log line carries service / commit_id / trace_id / tid / team_id.
10+
// 2. New Relic Go agent (fail-open: an unset NEW_RELIC_LICENSE_KEY logs a
11+
// warning and returns nil; a nil app is safe to pass to nrgrpc).
12+
// 3. nrgrpc.UnaryServerInterceptor chained with a trace-id stamper so
13+
// W3C-propagated trace IDs reach downstream slog calls in handlers.
14+
// 4. HTTP sidecar on :8092 exposing /healthz with build metadata JSON.
15+
// Same shape as api and worker /healthz so a single jq filter works.
1616
package main
1717

1818
import (
@@ -77,6 +77,7 @@ func collectBreakerInspectors(bs *circuit.Breakers) []handlers.CircuitInspector
7777
breakerAdapter{b: bs.PostgresK8s},
7878
breakerAdapter{b: bs.RedisAdmin},
7979
breakerAdapter{b: bs.MongoAdmin},
80+
breakerAdapter{b: bs.QueueAdmin},
8081
breakerAdapter{b: bs.K8sAPI},
8182
}
8283
}

main_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ func TestCollectBreakerInspectors_AllBackendsSurfaced(t *testing.T) {
431431
"postgres_k8s": false,
432432
"redis_admin": false,
433433
"mongo_admin": false,
434+
"queue_admin": false,
434435
"k8s_api": false,
435436
}
436437
for _, ins := range got {

0 commit comments

Comments
 (0)