Skip to content

Commit 4a1ecdc

Browse files
committed
feat: add gRPC readiness middleware with migration error detection
Add readiness gate that blocks gRPC requests until the datastore is ready. Uses singleflight to deduplicate concurrent checks and caches results briefly. Only blocks for migration-related issues; transient states like connection pool warmup are allowed through.
1 parent f6706a4 commit 4a1ecdc

7 files changed

Lines changed: 616 additions & 2 deletions

File tree

pkg/cmd/server/defaults.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/authzed/spicedb/pkg/datastore"
4141
consistencymw "github.com/authzed/spicedb/pkg/middleware/consistency"
4242
logmw "github.com/authzed/spicedb/pkg/middleware/logging"
43+
"github.com/authzed/spicedb/pkg/middleware/readiness"
4344
"github.com/authzed/spicedb/pkg/middleware/requestid"
4445
"github.com/authzed/spicedb/pkg/middleware/serverversion"
4546
"github.com/authzed/spicedb/pkg/releases"
@@ -175,6 +176,7 @@ const (
175176
DefaultMiddlewareGRPCProm = "grpcprom"
176177
DefaultMiddlewareServerVersion = "serverversion"
177178
DefaultMiddlewareMemoryProtection = "memoryprotection"
179+
DefaultMiddlewareReadiness = "readiness"
178180

179181
DefaultInternalMiddlewareDispatch = "dispatch"
180182
DefaultInternalMiddlewareDatastore = "datastore"
@@ -196,6 +198,7 @@ type MiddlewareOption struct {
196198
MismatchingZedTokenOption consistencymw.MismatchingTokenOption `debugmap:"visible"`
197199

198200
MemoryUsageProvider memoryprotection.MemoryUsageProvider `debugmap:"hidden"`
201+
ReadinessChecker readiness.ReadinessChecker `debugmap:"hidden"`
199202

200203
unaryDatastoreMiddleware *ReferenceableMiddleware[grpc.UnaryServerInterceptor] `debugmap:"hidden"`
201204
streamDatastoreMiddleware *ReferenceableMiddleware[grpc.StreamServerInterceptor] `debugmap:"hidden"`
@@ -310,6 +313,12 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS
310313
WithInterceptor(grpcMetricsUnaryInterceptor).
311314
Done(),
312315

316+
NewUnaryMiddleware().
317+
WithName(DefaultMiddlewareReadiness).
318+
WithInterceptor(readiness.NewGate(opts.ReadinessChecker).UnaryServerInterceptor()).
319+
EnsureAlreadyExecuted(DefaultMiddlewareGRPCProm). // so that prom middleware reports blocked requests
320+
Done(),
321+
313322
NewUnaryMiddleware().
314323
WithName(DefaultMiddlewareMemoryProtection).
315324
WithInterceptor(selector.UnaryServerInterceptor(
@@ -391,6 +400,12 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St
391400
WithInterceptor(grpcMetricsStreamingInterceptor).
392401
Done(),
393402

403+
NewStreamMiddleware().
404+
WithName(DefaultMiddlewareReadiness).
405+
WithInterceptor(readiness.NewGate(opts.ReadinessChecker).StreamServerInterceptor()).
406+
EnsureInterceptorAlreadyExecuted(DefaultMiddlewareGRPCProm). // so that prom middleware reports blocked requests
407+
Done(),
408+
394409
NewStreamMiddleware().
395410
WithName(DefaultMiddlewareMemoryProtection).
396411
WithInterceptor(memoryProtectionStreamInterceptor.StreamServerInterceptor()).

pkg/cmd/server/defaults_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ func TestWithDatastore(t *testing.T) {
3535
"service",
3636
consistency.TreatMismatchingTokensAsError,
3737
memoryprotection.NewNoopMemoryUsageProvider(),
38+
nil, // ReadinessChecker
3839
nil,
3940
nil,
4041
}
@@ -78,6 +79,7 @@ func TestWithDatastoreMiddleware(t *testing.T) {
7879
"service",
7980
consistency.TreatMismatchingTokensAsError,
8081
memoryprotection.NewNoopMemoryUsageProvider(),
82+
nil, // ReadinessChecker
8183
nil,
8284
nil,
8385
}

pkg/cmd/server/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
383383
MiddlewareServiceLabel: serverName,
384384
MismatchingZedTokenOption: mismatchZedTokenOption,
385385
MemoryUsageProvider: memoryUsageProvider,
386+
ReadinessChecker: ds,
386387
}
387388
opts = opts.WithDatastore(ds)
388389

pkg/cmd/server/server_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ func TestModifyUnaryMiddleware(t *testing.T) {
527527
},
528528
}}
529529

530-
opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, memoryprotection.NewNoopMemoryUsageProvider(), nil, nil}
530+
opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, memoryprotection.NewNoopMemoryUsageProvider(), nil, nil, nil}
531531
opt = opt.WithDatastore(nil)
532532

533533
defaultMw, err := DefaultUnaryMiddleware(opt)
@@ -555,7 +555,7 @@ func TestModifyStreamingMiddleware(t *testing.T) {
555555
},
556556
}}
557557

558-
opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, memoryprotection.NewNoopMemoryUsageProvider(), nil, nil}
558+
opt := MiddlewareOption{logging.Logger, nil, false, nil, false, false, false, "testing", consistency.TreatMismatchingTokensAsFullConsistency, memoryprotection.NewNoopMemoryUsageProvider(), nil, nil, nil}
559559
opt = opt.WithDatastore(nil)
560560

561561
defaultMw, err := DefaultStreamingMiddleware(opt)

pkg/cmd/server/zz_generated.middlewareoption.go

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package readiness
2+
3+
import (
4+
"context"
5+
"strings"
6+
"sync"
7+
"time"
8+
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
"resenje.org/singleflight"
13+
14+
"github.com/authzed/spicedb/pkg/datastore"
15+
)
16+
17+
const (
18+
// healthCheckPrefix is the gRPC method prefix for health checks.
19+
// We bypass readiness checks for health endpoints so Kubernetes probes work.
20+
healthCheckPrefix = "/grpc.health.v1.Health/"
21+
22+
// readyCacheTTL is how long to cache a positive ready state.
23+
readyCacheTTL = 500 * time.Millisecond
24+
25+
// notReadyCacheTTL is how long to cache a negative ready state.
26+
// Shorter than readyCacheTTL to allow faster recovery detection.
27+
notReadyCacheTTL = 100 * time.Millisecond
28+
29+
// readinessCheckTimeout is the maximum time to wait for a readiness check.
30+
// We use a dedicated context with this timeout inside singleflight to ensure
31+
// consistent behavior regardless of which request's context triggered the check.
32+
readinessCheckTimeout = 5 * time.Second
33+
)
34+
35+
// ReadinessChecker is the interface for checking datastore readiness.
36+
type ReadinessChecker interface {
37+
ReadyState(ctx context.Context) (datastore.ReadyState, error)
38+
}
39+
40+
// Gate blocks gRPC requests until the datastore is ready.
41+
// It caches the ready state briefly to avoid overwhelming the datastore
42+
// with readiness checks on every request.
43+
type Gate struct {
44+
checker ReadinessChecker
45+
46+
// singleflight prevents thundering herd when cache expires
47+
sfGroup singleflight.Group[string, readinessResult]
48+
49+
mu sync.RWMutex
50+
cachedReady bool // GUARDED_BY(mu)
51+
cachedMessage string // GUARDED_BY(mu)
52+
cacheTime time.Time // GUARDED_BY(mu)
53+
}
54+
55+
// NewGate creates a new readiness gate with the given checker.
56+
// If checker is nil, the gate will pass through all requests without checking.
57+
func NewGate(checker ReadinessChecker) *Gate {
58+
return &Gate{checker: checker}
59+
}
60+
61+
// readinessResult holds the result of a readiness check for singleflight.
62+
type readinessResult struct {
63+
ready bool
64+
message string
65+
}
66+
67+
// isMigrationIssue returns true if the not-ready message indicates
68+
// the database schema hasn't been migrated. Other not-ready reasons
69+
// (like connection pool warmup) are transient and shouldn't block requests.
70+
func isMigrationIssue(msg string) bool {
71+
return strings.Contains(msg, "not migrated") || strings.Contains(msg, "migration")
72+
}
73+
74+
// isReady checks if the datastore is ready, using a cached value if available.
75+
// Uses singleflight to prevent thundering herd on cache expiry.
76+
// Only blocks requests for migration-related issues; transient states like
77+
// connection pool warmup are allowed through.
78+
func (g *Gate) isReady(ctx context.Context) (bool, string) {
79+
// If no checker is configured, pass through
80+
if g.checker == nil {
81+
return true, ""
82+
}
83+
84+
// Fast path: check cache with read lock
85+
g.mu.RLock()
86+
elapsed := time.Since(g.cacheTime)
87+
ttl := readyCacheTTL
88+
if !g.cachedReady {
89+
ttl = notReadyCacheTTL
90+
}
91+
if elapsed < ttl {
92+
ready, msg := g.cachedReady, g.cachedMessage
93+
g.mu.RUnlock()
94+
return ready, msg
95+
}
96+
g.mu.RUnlock()
97+
98+
// Slow path: use singleflight to deduplicate concurrent checks
99+
result, _, _ := g.sfGroup.Do(ctx, "readiness", func(ctx context.Context) (readinessResult, error) {
100+
// Double-check cache after acquiring singleflight
101+
g.mu.RLock()
102+
elapsed := time.Since(g.cacheTime)
103+
ttl := readyCacheTTL
104+
if !g.cachedReady {
105+
ttl = notReadyCacheTTL
106+
}
107+
if elapsed < ttl {
108+
ready, msg := g.cachedReady, g.cachedMessage
109+
g.mu.RUnlock()
110+
return readinessResult{ready: ready, message: msg}, nil
111+
}
112+
g.mu.RUnlock()
113+
114+
// Use an independent context with timeout for the readiness check.
115+
// This ensures consistent behavior when multiple requests are coalesced
116+
// by singleflight - we don't want the check to fail because the first
117+
// request's context was cancelled.
118+
checkCtx, cancel := context.WithTimeout(context.Background(), readinessCheckTimeout)
119+
defer cancel()
120+
121+
state, err := g.checker.ReadyState(checkCtx)
122+
if err != nil {
123+
// On error checking readiness, allow requests through.
124+
// If the datastore is truly unavailable, requests will fail
125+
// with appropriate errors from the datastore layer.
126+
return readinessResult{ready: true, message: ""}, nil
127+
}
128+
129+
// Only block requests for migration-related issues.
130+
// Transient states (connection pool warmup, etc.) should not block.
131+
ready := state.IsReady || !isMigrationIssue(state.Message)
132+
133+
// Update cache
134+
g.mu.Lock()
135+
g.cachedReady = ready
136+
g.cachedMessage = state.Message
137+
g.cacheTime = time.Now()
138+
g.mu.Unlock()
139+
140+
return readinessResult{ready: ready, message: state.Message}, nil
141+
})
142+
143+
return result.ready, result.message
144+
}
145+
146+
// formatNotReadyError creates a user-friendly error message based on the readiness failure reason.
147+
// TODO(authzed/api#159): Once ERROR_REASON_DATASTORE_NOT_MIGRATED is available in the API,
148+
// use spiceerrors.WithCodeAndReason to include the structured error reason.
149+
func formatNotReadyError(msg string) error {
150+
// Check if this is a migration-related issue
151+
if strings.Contains(msg, "not migrated") || strings.Contains(msg, "migration") {
152+
return status.Errorf(codes.FailedPrecondition,
153+
"SpiceDB datastore is not migrated. Please run 'spicedb datastore migrate'. Details: %s", msg)
154+
}
155+
// Generic not-ready message for other cases (connection issues, pool not ready, etc.)
156+
return status.Errorf(codes.FailedPrecondition,
157+
"SpiceDB datastore is not ready. Details: %s", msg)
158+
}
159+
160+
// UnaryServerInterceptor returns a gRPC unary interceptor that blocks
161+
// requests until the datastore is ready.
162+
func (g *Gate) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
163+
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
164+
// Bypass health checks so Kubernetes probes work
165+
if strings.HasPrefix(info.FullMethod, healthCheckPrefix) {
166+
return handler(ctx, req)
167+
}
168+
169+
ready, msg := g.isReady(ctx)
170+
if !ready {
171+
return nil, formatNotReadyError(msg)
172+
}
173+
174+
return handler(ctx, req)
175+
}
176+
}
177+
178+
// StreamServerInterceptor returns a gRPC stream interceptor that blocks
179+
// streams until the datastore is ready.
180+
func (g *Gate) StreamServerInterceptor() grpc.StreamServerInterceptor {
181+
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
182+
// Bypass health checks so Kubernetes probes work
183+
if strings.HasPrefix(info.FullMethod, healthCheckPrefix) {
184+
return handler(srv, ss)
185+
}
186+
187+
ready, msg := g.isReady(ss.Context())
188+
if !ready {
189+
return formatNotReadyError(msg)
190+
}
191+
192+
return handler(srv, ss)
193+
}
194+
}

0 commit comments

Comments
 (0)