Skip to content

Commit 670e7af

Browse files
committed
feat: centralize migration error handling with gRPC readiness middleware
Replaces scattered IsMissingTableError checks (15 files, 157 lines) with a single gRPC readiness middleware that blocks ALL requests until the datastore is migrated. Key improvements: - Single point of control vs copy-pasted checks everywhere - Impossible to miss code paths - all gRPC requests gated automatically - Clear error message: "Please run 'spicedb datastore migrate'" - Cached checks (500ms) with singleflight to prevent thundering herd - Health probes bypass the gate for Kubernetes compatibility Net: -81 lines, better coverage, consistent UX.
1 parent 599f956 commit 670e7af

23 files changed

Lines changed: 642 additions & 155 deletions

internal/datastore/crdb/caveat.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import (
99
sq "github.com/Masterminds/squirrel"
1010
"github.com/jackc/pgx/v5"
1111

12-
dscommon "github.com/authzed/spicedb/internal/datastore/common"
1312
"github.com/authzed/spicedb/internal/datastore/crdb/schema"
14-
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
1513
"github.com/authzed/spicedb/internal/datastore/revisions"
1614
"github.com/authzed/spicedb/pkg/datastore"
1715
core "github.com/authzed/spicedb/pkg/proto/core/v1"
@@ -55,9 +53,6 @@ func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.
5553
if errors.Is(err, pgx.ErrNoRows) {
5654
err = datastore.NewCaveatNameNotFoundErr(name)
5755
}
58-
if pgxcommon.IsMissingTableError(err) {
59-
err = dscommon.NewSchemaNotInitializedError(err)
60-
}
6156
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err)
6257
}
6358

@@ -114,9 +109,6 @@ func (cr *crdbReader) lookupCaveats(ctx context.Context, caveatNames []string) (
114109
return nil
115110
}, sql, args...)
116111
if err != nil {
117-
if pgxcommon.IsMissingTableError(err) {
118-
err = dscommon.NewSchemaNotInitializedError(err)
119-
}
120112
return nil, fmt.Errorf(errListCaveats, err)
121113
}
122114

internal/datastore/crdb/crdb.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,12 @@ func wrapError(err error) error {
413413
return err
414414
}
415415

416+
// ReadyState provides an indication of whether the datastore is ready to receive traffic.
417+
// It ensures that migrations have been run and that revisions look correct, and then checks
418+
// the number of available idle connections in the connection pool.
419+
// We use idle connections instead of total connections because we want all connetions
420+
// to be ready to receive traffic, and total connections counts connections in the constructing
421+
// state, which cannot receive traffic.
416422
func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState, error) {
417423
currentRevision, err := migrations.NewCRDBDriver(cds.dburl)
418424
if err != nil {
@@ -429,6 +435,12 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
429435
return state, nil
430436
}
431437

438+
// Wait only until the minimum number of connections is available before
439+
// reporting the datastore as ready.
440+
// NOTE: we do this manually here because new connections to CockroachDB are
441+
// added relatively slowly, and pgxpool doesn't seem to contain logic that
442+
// waits for a pool to be filled before the pool is treated as available/
443+
// ready.
432444
readMin := cds.readPool.MinConns()
433445
if readMin > 0 {
434446
readMin--
@@ -437,21 +449,21 @@ func (cds *crdbDatastore) ReadyState(ctx context.Context) (datastore.ReadyState,
437449
if writeMin > 0 {
438450
writeMin--
439451
}
440-
writeTotal, err := safecast.Convert[uint32](cds.writePool.Stat().TotalConns())
452+
writeIdle, err := safecast.Convert[uint32](cds.writePool.Stat().IdleConns())
441453
if err != nil {
442454
return datastore.ReadyState{}, spiceerrors.MustBugf("could not cast writeTotal to uint32: %v", err)
443455
}
444-
readTotal, err := safecast.Convert[uint32](cds.readPool.Stat().TotalConns())
456+
readIdle, err := safecast.Convert[uint32](cds.readPool.Stat().IdleConns())
445457
if err != nil {
446458
return datastore.ReadyState{}, spiceerrors.MustBugf("could not cast readTotal to uint32: %v", err)
447459
}
448-
if writeTotal < writeMin || readTotal < readMin {
460+
if writeIdle < writeMin || readIdle < readMin {
449461
return datastore.ReadyState{
450462
Message: fmt.Sprintf(
451463
"spicedb does not have the required minimum connection count to the datastore. Read: %d/%d, Write: %d/%d",
452-
readTotal,
464+
readIdle,
453465
readMin,
454-
writeTotal,
466+
writeIdle,
455467
writeMin,
456468
),
457469
IsReady: false,
@@ -465,7 +477,10 @@ func (cds *crdbDatastore) Close() error {
465477
cds.readPool.Close()
466478
cds.writePool.Close()
467479
for _, collector := range cds.collectors {
468-
_ = prometheus.Unregister(collector)
480+
ok := prometheus.Unregister(collector)
481+
if !ok {
482+
return errors.New("could not unregister collector for CRDB datastore")
483+
}
469484
}
470485
return nil
471486
}
@@ -573,11 +588,7 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
573588
}
574589

575590
features.Watch.Status = datastore.FeatureUnsupported
576-
if pgxcommon.IsMissingTableError(err) {
577-
features.Watch.Reason = "Database schema has not been initialized. Please run \"spicedb datastore migrate\": " + err.Error()
578-
} else {
579-
features.Watch.Reason = "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: " + err.Error()
580-
}
591+
features.Watch.Reason = "Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: " + err.Error()
581592
return nil
582593
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.schema.RelationshipTableName, head, "-1s"))
583594
} else {
@@ -642,23 +653,23 @@ func (cds *crdbDatastore) registerPrometheusCollectors(enablePrometheusStats boo
642653
return nil
643654
}
644655

645-
readCollector := pgxpoolprometheus.NewCollector(cds.writePool, map[string]string{
656+
readCollector := pgxpoolprometheus.NewCollector(cds.readPool, map[string]string{
646657
"db_name": "spicedb",
647658
"pool_usage": "read",
648659
})
649660

650661
if err := prometheus.Register(readCollector); err != nil {
651-
return err
662+
return fmt.Errorf("failed to register prometheus read collector: %w", err)
652663
}
653664
cds.collectors = append(cds.collectors, readCollector)
654665

655-
writeCollector := pgxpoolprometheus.NewCollector(cds.readPool, map[string]string{
666+
writeCollector := pgxpoolprometheus.NewCollector(cds.writePool, map[string]string{
656667
"db_name": "spicedb",
657668
"pool_usage": "write",
658669
})
659670

660671
if err := prometheus.Register(writeCollector); err != nil {
661-
return err
672+
return fmt.Errorf("failed to register prometheus write collector: %w", err)
662673
}
663674
cds.collectors = append(cds.collectors, writeCollector)
664675

internal/datastore/crdb/reader.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,6 @@ func (cr *crdbReader) CountRelationships(ctx context.Context, name string) (int,
128128
return row.Scan(&count)
129129
}, sql, args...)
130130
if err != nil {
131-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
132-
return 0, wrappedErr
133-
}
134131
return 0, err
135132
}
136133

@@ -195,9 +192,6 @@ func (cr *crdbReader) lookupCounters(ctx context.Context, optionalFilterName str
195192
return nil
196193
}, sql, args...)
197194
if err != nil {
198-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
199-
return nil, wrappedErr
200-
}
201195
return nil, err
202196
}
203197

@@ -213,9 +207,6 @@ func (cr *crdbReader) ReadNamespaceByName(
213207
if errors.As(err, &datastore.NamespaceNotFoundError{}) {
214208
return nil, datastore.NoRevision, err
215209
}
216-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
217-
return nil, datastore.NoRevision, wrappedErr
218-
}
219210
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
220211
}
221212

@@ -229,9 +220,6 @@ func (cr *crdbReader) ListAllNamespaces(ctx context.Context) ([]datastore.Revisi
229220

230221
nsDefs, sql, err := loadAllNamespaces(ctx, cr.query, addFromToQuery)
231222
if err != nil {
232-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
233-
return nil, wrappedErr
234-
}
235223
return nil, fmt.Errorf(errUnableToListNamespaces, err)
236224
}
237225
cr.assertHasExpectedAsOfSystemTime(sql)
@@ -244,9 +232,6 @@ func (cr *crdbReader) LookupNamespacesWithNames(ctx context.Context, nsNames []s
244232
}
245233
nsDefs, err := cr.lookupNamespaces(ctx, cr.query, nsNames)
246234
if err != nil {
247-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
248-
return nil, wrappedErr
249-
}
250235
return nil, fmt.Errorf(errUnableToListNamespaces, err)
251236
}
252237
return nsDefs, nil

internal/datastore/crdb/stats.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
3333
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
3434
return row.Scan(&uniqueID)
3535
}, sql, args...); err != nil {
36-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
37-
return "", wrappedErr
38-
}
3936
return "", fmt.Errorf("unable to query unique ID: %w", err)
4037
}
4138

@@ -62,9 +59,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
6259
return sb.From(tableName)
6360
})
6461
if err != nil {
65-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
66-
return wrappedErr
67-
}
6862
return fmt.Errorf("unable to read namespaces: %w", err)
6963
}
7064
return nil
@@ -75,9 +69,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
7569
if cds.analyzeBeforeStatistics {
7670
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
7771
if _, err := tx.Exec(ctx, "ANALYZE "+cds.schema.RelationshipTableName); err != nil {
78-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
79-
return wrappedErr
80-
}
8172
return fmt.Errorf("unable to analyze tuple table: %w", err)
8273
}
8374

@@ -152,9 +143,6 @@ func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, erro
152143
log.Warn().Bool("has-rows", hasRows).Msg("unable to find row count in statistics query result")
153144
return nil
154145
}, "SHOW STATISTICS FOR TABLE "+cds.schema.RelationshipTableName); err != nil {
155-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
156-
return datastore.Stats{}, wrappedErr
157-
}
158146
return datastore.Stats{}, fmt.Errorf("unable to query unique estimated row count: %w", err)
159147
}
160148

internal/datastore/mysql/caveat.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
sq "github.com/Masterminds/squirrel"
1010

1111
"github.com/authzed/spicedb/internal/datastore/common"
12-
mysqlcommon "github.com/authzed/spicedb/internal/datastore/mysql/common"
1312
"github.com/authzed/spicedb/internal/datastore/revisions"
1413
"github.com/authzed/spicedb/pkg/datastore"
1514
core "github.com/authzed/spicedb/pkg/proto/core/v1"
@@ -42,9 +41,6 @@ func (mr *mysqlReader) ReadCaveatByName(ctx context.Context, name string) (*core
4241
if errors.Is(err, sql.ErrNoRows) {
4342
return nil, datastore.NoRevision, datastore.NewCaveatNameNotFoundErr(name)
4443
}
45-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
46-
return nil, datastore.NoRevision, wrappedErr
47-
}
4844
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, err)
4945
}
5046
def := core.CaveatDefinition{}
@@ -86,9 +82,6 @@ func (mr *mysqlReader) lookupCaveats(ctx context.Context, caveatNames []string)
8682

8783
rows, err := tx.QueryContext(ctx, listSQL, listArgs...)
8884
if err != nil {
89-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
90-
return nil, wrappedErr
91-
}
9285
return nil, fmt.Errorf(errListCaveats, err)
9386
}
9487
defer common.LogOnError(ctx, rows.Close)

internal/datastore/mysql/reader.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
sq "github.com/Masterminds/squirrel"
1010

1111
"github.com/authzed/spicedb/internal/datastore/common"
12-
mysqlcommon "github.com/authzed/spicedb/internal/datastore/mysql/common"
1312
"github.com/authzed/spicedb/internal/datastore/revisions"
1413
"github.com/authzed/spicedb/pkg/datastore"
1514
"github.com/authzed/spicedb/pkg/datastore/options"
@@ -76,9 +75,6 @@ func (mr *mysqlReader) CountRelationships(ctx context.Context, name string) (int
7675
var count int
7776
rows, err := tx.QueryContext(ctx, sql, args...)
7877
if err != nil {
79-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
80-
return 0, wrappedErr
81-
}
8278
return 0, err
8379
}
8480
defer common.LogOnError(ctx, rows.Close)
@@ -126,9 +122,6 @@ func (mr *mysqlReader) lookupCounters(ctx context.Context, optionalName string)
126122

127123
rows, err := tx.QueryContext(ctx, sql, args...)
128124
if err != nil {
129-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
130-
return nil, wrappedErr
131-
}
132125
return nil, err
133126
}
134127
defer common.LogOnError(ctx, rows.Close)
@@ -229,9 +222,6 @@ func (mr *mysqlReader) ReadNamespaceByName(ctx context.Context, nsName string) (
229222
case err == nil:
230223
return loaded, version, nil
231224
default:
232-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
233-
return nil, datastore.NoRevision, wrappedErr
234-
}
235225
return nil, datastore.NoRevision, fmt.Errorf(errUnableToReadConfig, err)
236226
}
237227
}
@@ -274,9 +264,6 @@ func (mr *mysqlReader) ListAllNamespaces(ctx context.Context) ([]datastore.Revis
274264

275265
nsDefs, err := loadAllNamespaces(ctx, tx, query)
276266
if err != nil {
277-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
278-
return nil, wrappedErr
279-
}
280267
return nil, fmt.Errorf(errUnableToListNamespaces, err)
281268
}
282269

@@ -303,9 +290,6 @@ func (mr *mysqlReader) LookupNamespacesWithNames(ctx context.Context, nsNames []
303290

304291
nsDefs, err := loadAllNamespaces(ctx, tx, query)
305292
if err != nil {
306-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
307-
return nil, wrappedErr
308-
}
309293
return nil, fmt.Errorf(errUnableToListNamespaces, err)
310294
}
311295

internal/datastore/mysql/stats.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/ccoveille/go-safecast/v2"
1010

1111
"github.com/authzed/spicedb/internal/datastore/common"
12-
mysqlcommon "github.com/authzed/spicedb/internal/datastore/mysql/common"
1312
"github.com/authzed/spicedb/pkg/datastore"
1413
"github.com/authzed/spicedb/pkg/spiceerrors"
1514
)
@@ -27,9 +26,6 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
2726
if mds.analyzeBeforeStats {
2827
_, err := mds.db.ExecContext(ctx, "ANALYZE TABLE "+mds.driver.RelationTuple())
2928
if err != nil {
30-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
31-
return datastore.Stats{}, wrappedErr
32-
}
3329
return datastore.Stats{}, fmt.Errorf("unable to run ANALYZE TABLE: %w", err)
3430
}
3531
}
@@ -51,9 +47,6 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
5147
var count sql.NullInt64
5248
err = mds.db.QueryRowContext(ctx, query, args...).Scan(&count)
5349
if err != nil {
54-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
55-
return datastore.Stats{}, wrappedErr
56-
}
5750
return datastore.Stats{}, err
5851
}
5952

@@ -66,9 +59,6 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
6659
}
6760
err = mds.db.QueryRowContext(ctx, query, args...).Scan(&count)
6861
if err != nil {
69-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
70-
return datastore.Stats{}, wrappedErr
71-
}
7262
return datastore.Stats{}, err
7363
}
7464
}
@@ -83,9 +73,6 @@ func (mds *Datastore) Statistics(ctx context.Context) (datastore.Stats, error) {
8373

8474
nsDefs, err := loadAllNamespaces(ctx, tx, nsQuery)
8575
if err != nil {
86-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
87-
return datastore.Stats{}, wrappedErr
88-
}
8976
return datastore.Stats{}, fmt.Errorf("unable to load namespaces: %w", err)
9077
}
9178

@@ -110,9 +97,6 @@ func (mds *Datastore) UniqueID(ctx context.Context) (string, error) {
11097

11198
var uniqueID string
11299
if err := mds.db.QueryRowContext(ctx, sql, args...).Scan(&uniqueID); err != nil {
113-
if wrappedErr := mysqlcommon.WrapMissingTableError(err); wrappedErr != nil {
114-
return "", wrappedErr
115-
}
116100
return "", fmt.Errorf("unable to query unique ID: %w", err)
117101
}
118102
mds.uniqueID.Store(&uniqueID)

internal/datastore/postgres/caveat.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
sq "github.com/Masterminds/squirrel"
99
"github.com/jackc/pgx/v5"
1010

11-
pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common"
1211
"github.com/authzed/spicedb/internal/datastore/postgres/schema"
1312
"github.com/authzed/spicedb/pkg/datastore"
1413
"github.com/authzed/spicedb/pkg/genutil/mapz"
@@ -50,9 +49,6 @@ func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.Cav
5049
if errors.Is(err, pgx.ErrNoRows) {
5150
return nil, datastore.NoRevision, datastore.NewCaveatNameNotFoundErr(name)
5251
}
53-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
54-
return nil, datastore.NoRevision, wrappedErr
55-
}
5652
return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, err)
5753
}
5854
def := core.CaveatDefinition{}
@@ -110,9 +106,6 @@ func (r *pgReader) lookupCaveats(ctx context.Context, caveatNames []string) ([]d
110106
return rows.Err()
111107
}, sql, args...)
112108
if err != nil {
113-
if wrappedErr := pgxcommon.WrapMissingTableError(err); wrappedErr != nil {
114-
return nil, wrappedErr
115-
}
116109
return nil, fmt.Errorf(errListCaveats, err)
117110
}
118111

0 commit comments

Comments
 (0)