Skip to content

Commit 9074748

Browse files
committed
Drive down CRAP
1 parent e87be51 commit 9074748

4 files changed

Lines changed: 675 additions & 35 deletions

File tree

drivers/pg/optimize.go

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -182,17 +182,25 @@ type indexCandidate struct {
182182
// precedes btree reindex so reclaimed space is reflected in the bloat
183183
// measurement that drives the rebuild decision.
184184
func (s *Driver) Optimize(ctx context.Context) error {
185-
if installed, err := s.pgstattupleInstalled(ctx); err != nil {
185+
return optimize(ctx, s.pool)
186+
}
187+
188+
// optimize is the package-level implementation that the four maintenance
189+
// phases dispatch through. Splitting Optimize from its receiver lets unit
190+
// tests drive the phase wiring against a fake driver without standing up a
191+
// real *pgxpool.Pool; the *Driver method is a forwarder.
192+
func optimize(ctx context.Context, db driver) error {
193+
if installed, err := pgstattupleInstalled(ctx, db); err != nil {
186194
return fmt.Errorf("checking pgstattuple extension: %w", err)
187195
} else if !installed {
188196
slog.WarnContext(ctx, "Index optimization skipped: pgstattuple extension is not installed; verify the DAWGS schema bootstrap completed successfully")
189197
return nil
190198
}
191199

192-
s.cleanupOrphanedReindexArtifacts(ctx)
193-
s.flushGinPendingLists(ctx)
194-
s.vacuumGraphPartitions(ctx)
195-
s.reindexBloatedBtreeIndexes(ctx)
200+
cleanupOrphanedReindexArtifacts(ctx, db)
201+
flushGinPendingLists(ctx, db)
202+
vacuumGraphPartitions(ctx, db)
203+
reindexBloatedBtreeIndexes(ctx, db)
196204
return nil
197205
}
198206

@@ -216,8 +224,8 @@ func needsReindex(c indexCandidate) (string, bool) {
216224
// fatal; the loop honors ctx cancellation between candidates. Candidates are
217225
// processed smallest first so that an early cancellation still produces the
218226
// maximum number of completed rebuilds.
219-
func (s *Driver) reindexBloatedBtreeIndexes(ctx context.Context) {
220-
indexes, err := s.listGraphPartitionBtreeIndexes(ctx)
227+
func reindexBloatedBtreeIndexes(ctx context.Context, db driver) {
228+
indexes, err := listGraphPartitionBtreeIndexes(ctx, db)
221229
if err != nil {
222230
slog.WarnContext(ctx, fmt.Sprintf("Btree reindex scan failed; continuing: %v", err))
223231
return
@@ -230,7 +238,7 @@ func (s *Driver) reindexBloatedBtreeIndexes(ctx context.Context) {
230238
totalBytes int64
231239
)
232240
for _, idx := range indexes {
233-
density, fragmentation, err := s.measureIndexBloat(ctx, idx.oid)
241+
density, fragmentation, err := measureIndexBloat(ctx, db, idx.oid)
234242
if err != nil {
235243
slog.WarnContext(ctx, fmt.Sprintf("Skipping bloat assessment for index %s.%s: %v", idx.schema, idx.index, err))
236244
continue
@@ -258,19 +266,19 @@ func (s *Driver) reindexBloatedBtreeIndexes(ctx context.Context) {
258266
return candidates[i].sizeBytes < candidates[j].sizeBytes
259267
})
260268

261-
s.reindexCandidates(ctx, candidates)
269+
reindexCandidates(ctx, db, candidates)
262270
}
263271

264-
func (s *Driver) pgstattupleInstalled(ctx context.Context) (bool, error) {
272+
func pgstattupleInstalled(ctx context.Context, db driver) (bool, error) {
265273
var installed bool
266-
if err := s.pool.QueryRow(ctx, sqlPgstattupleInstalled).Scan(&installed); err != nil {
274+
if err := db.QueryRow(ctx, sqlPgstattupleInstalled).Scan(&installed); err != nil {
267275
return false, err
268276
}
269277
return installed, nil
270278
}
271279

272-
func (s *Driver) listGraphPartitionBtreeIndexes(ctx context.Context) ([]indexRow, error) {
273-
rows, err := s.pool.Query(ctx, sqlSelectGraphPartitionBtreeIndexes)
280+
func listGraphPartitionBtreeIndexes(ctx context.Context, db driver) ([]indexRow, error) {
281+
rows, err := db.Query(ctx, sqlSelectGraphPartitionBtreeIndexes)
274282
if err != nil {
275283
return nil, err
276284
}
@@ -287,9 +295,9 @@ func (s *Driver) listGraphPartitionBtreeIndexes(ctx context.Context) ([]indexRow
287295
return out, rows.Err()
288296
}
289297

290-
func (s *Driver) measureIndexBloat(ctx context.Context, indexOID uint32) (float64, float64, error) {
298+
func measureIndexBloat(ctx context.Context, db driver, indexOID uint32) (float64, float64, error) {
291299
var density, fragmentation float64
292-
if err := s.pool.QueryRow(ctx, sqlSelectIndexBloatMetrics, indexOID).Scan(&density, &fragmentation); err != nil {
300+
if err := db.QueryRow(ctx, sqlSelectIndexBloatMetrics, indexOID).Scan(&density, &fragmentation); err != nil {
293301
return 0, 0, err
294302
}
295303
return density, fragmentation, nil
@@ -306,8 +314,8 @@ type orphanedReindexArtifact struct {
306314
// left behind by previously aborted REINDEX CONCURRENTLY runs. Failures are
307315
// logged at WARN and never fatal: an orphan wastes disk but does not block
308316
// productive rebuilds, so a stuck cleanup must not gate the rest of the pass.
309-
func (s *Driver) cleanupOrphanedReindexArtifacts(ctx context.Context) {
310-
orphans, err := s.listOrphanedReindexArtifacts(ctx)
317+
func cleanupOrphanedReindexArtifacts(ctx context.Context, db driver) {
318+
orphans, err := listOrphanedReindexArtifacts(ctx, db)
311319
if err != nil {
312320
slog.WarnContext(ctx, fmt.Sprintf("Index optimization cleanup: failed to scan for orphaned reindex artifacts; continuing: %v", err))
313321
return
@@ -321,16 +329,16 @@ func (s *Driver) cleanupOrphanedReindexArtifacts(ctx context.Context) {
321329

322330
slog.InfoContext(ctx, fmt.Sprintf("Orphan reindex cleanup assessment complete: %d artifact(s) to drop", len(orphans)))
323331
for _, o := range orphans {
324-
if _, err := s.pool.Exec(ctx, buildDropInvalidIndexSQL(o.schema, o.name)); err != nil {
332+
if _, err := db.Exec(ctx, buildDropInvalidIndexSQL(o.schema, o.name)); err != nil {
325333
slog.WarnContext(ctx, fmt.Sprintf("Index optimization cleanup: failed to drop orphaned reindex artifact %s.%s; continuing: %v", o.schema, o.name, err))
326334
continue
327335
}
328336
slog.InfoContext(ctx, fmt.Sprintf("Index optimization cleanup: dropped orphaned reindex artifact %s.%s", o.schema, o.name))
329337
}
330338
}
331339

332-
func (s *Driver) listOrphanedReindexArtifacts(ctx context.Context) ([]orphanedReindexArtifact, error) {
333-
rows, err := s.pool.Query(ctx, sqlSelectOrphanedReindexArtifacts)
340+
func listOrphanedReindexArtifacts(ctx context.Context, db driver) ([]orphanedReindexArtifact, error) {
341+
rows, err := db.Query(ctx, sqlSelectOrphanedReindexArtifacts)
334342
if err != nil {
335343
return nil, err
336344
}
@@ -352,7 +360,7 @@ func (s *Driver) listOrphanedReindexArtifacts(ctx context.Context) ([]orphanedRe
352360
// Per-candidate failures are logged at WARN and the loop continues; ctx
353361
// cancellation aborts further candidates but in-flight REINDEX statements
354362
// must run to completion in Postgres to avoid leaving _ccnew artifacts.
355-
func (s *Driver) reindexCandidates(ctx context.Context, candidates []indexCandidate) {
363+
func reindexCandidates(ctx context.Context, db driver, candidates []indexCandidate) {
356364
for _, c := range candidates {
357365
if err := ctx.Err(); err != nil {
358366
slog.WarnContext(ctx, fmt.Sprintf("Index optimization rebuild cancelled before processing %s.%s: %v", c.schema, c.index, err))
@@ -365,7 +373,7 @@ func (s *Driver) reindexCandidates(ctx context.Context, candidates []indexCandid
365373
))
366374

367375
started := time.Now()
368-
if _, err := s.pool.Exec(ctx, buildReindexSQL(c.schema, c.index)); err != nil {
376+
if _, err := db.Exec(ctx, buildReindexSQL(c.schema, c.index)); err != nil {
369377
slog.WarnContext(ctx, fmt.Sprintf(
370378
"Index optimization rebuild failed for %s.%s after %s; continuing with next candidate: %v",
371379
c.schema, c.index, time.Since(started), err,
@@ -432,8 +440,8 @@ func needsGinFlush(c ginFlushCandidate) (string, bool) {
432440
// threshold. Per-candidate failures are logged at WARN and never fatal: a
433441
// stuck flush wastes time on the next pass but does not block the rest of
434442
// the optimization phases.
435-
func (s *Driver) flushGinPendingLists(ctx context.Context) {
436-
indexes, err := s.listGraphPartitionGinIndexes(ctx)
443+
func flushGinPendingLists(ctx context.Context, db driver) {
444+
indexes, err := listGraphPartitionGinIndexes(ctx, db)
437445
if err != nil {
438446
slog.WarnContext(ctx, fmt.Sprintf("Index optimization GIN scan failed; continuing: %v", err))
439447
return
@@ -446,7 +454,7 @@ func (s *Driver) flushGinPendingLists(ctx context.Context) {
446454
totalPendingPages int64
447455
)
448456
for _, idx := range indexes {
449-
pages, tuples, err := s.measureGinPending(ctx, idx.oid)
457+
pages, tuples, err := measureGinPending(ctx, db, idx.oid)
450458
if err != nil {
451459
slog.WarnContext(ctx, fmt.Sprintf("Skipping GIN pending-list assessment for index %s.%s: %v", idx.schema, idx.index, err))
452460
continue
@@ -480,7 +488,7 @@ func (s *Driver) flushGinPendingLists(ctx context.Context) {
480488
}
481489

482490
started := time.Now()
483-
if _, err := s.pool.Exec(ctx, sqlCleanGinPendingList, c.oid); err != nil {
491+
if _, err := db.Exec(ctx, sqlCleanGinPendingList, c.oid); err != nil {
484492
slog.WarnContext(ctx, fmt.Sprintf(
485493
"GIN flush failed for %s.%s after %s; continuing with next candidate: %v",
486494
c.schema, c.index, time.Since(started), err,
@@ -495,8 +503,8 @@ func (s *Driver) flushGinPendingLists(ctx context.Context) {
495503
}
496504
}
497505

498-
func (s *Driver) listGraphPartitionGinIndexes(ctx context.Context) ([]ginIndexRow, error) {
499-
rows, err := s.pool.Query(ctx, sqlSelectGraphPartitionGinIndexes)
506+
func listGraphPartitionGinIndexes(ctx context.Context, db driver) ([]ginIndexRow, error) {
507+
rows, err := db.Query(ctx, sqlSelectGraphPartitionGinIndexes)
500508
if err != nil {
501509
return nil, err
502510
}
@@ -513,9 +521,9 @@ func (s *Driver) listGraphPartitionGinIndexes(ctx context.Context) ([]ginIndexRo
513521
return out, rows.Err()
514522
}
515523

516-
func (s *Driver) measureGinPending(ctx context.Context, indexOID uint32) (int64, int64, error) {
524+
func measureGinPending(ctx context.Context, db driver, indexOID uint32) (int64, int64, error) {
517525
var pendingPages, pendingTuples int64
518-
if err := s.pool.QueryRow(ctx, sqlSelectGinPendingMetrics, indexOID).Scan(&pendingPages, &pendingTuples); err != nil {
526+
if err := db.QueryRow(ctx, sqlSelectGinPendingMetrics, indexOID).Scan(&pendingPages, &pendingTuples); err != nil {
519527
return 0, 0, err
520528
}
521529
return pendingPages, pendingTuples, nil
@@ -592,8 +600,8 @@ func vacuumAssessment(r vacuumStatsRow, now time.Time) (vacuumCandidate, bool) {
592600
// flagged by vacuumAssessment. Per-candidate failures are logged at WARN and
593601
// never fatal. The loop honors ctx cancellation between partitions; an
594602
// in-flight VACUUM aborts at the next safe point when ctx is cancelled.
595-
func (s *Driver) vacuumGraphPartitions(ctx context.Context) {
596-
stats, err := s.listGraphPartitionVacuumStats(ctx)
603+
func vacuumGraphPartitions(ctx context.Context, db driver) {
604+
stats, err := listGraphPartitionVacuumStats(ctx, db)
597605
if err != nil {
598606
slog.WarnContext(ctx, fmt.Sprintf("Vacuum assessment scan failed; continuing: %v", err))
599607
return
@@ -639,7 +647,7 @@ func (s *Driver) vacuumGraphPartitions(ctx context.Context) {
639647
}
640648

641649
started := time.Now()
642-
if _, err := s.pool.Exec(ctx, buildVacuumSQL(c.schema, c.table)); err != nil {
650+
if _, err := db.Exec(ctx, buildVacuumSQL(c.schema, c.table)); err != nil {
643651
slog.WarnContext(ctx, fmt.Sprintf(
644652
"Vacuum failed for %s.%s after %s; continuing with next candidate: %v",
645653
c.schema, c.table, time.Since(started), err,
@@ -654,8 +662,8 @@ func (s *Driver) vacuumGraphPartitions(ctx context.Context) {
654662
}
655663
}
656664

657-
func (s *Driver) listGraphPartitionVacuumStats(ctx context.Context) ([]vacuumStatsRow, error) {
658-
rows, err := s.pool.Query(ctx, sqlSelectGraphPartitionVacuumStats)
665+
func listGraphPartitionVacuumStats(ctx context.Context, db driver) ([]vacuumStatsRow, error) {
666+
rows, err := db.Query(ctx, sqlSelectGraphPartitionVacuumStats)
659667
if err != nil {
660668
return nil, err
661669
}

drivers/pg/optimize_fake_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package pg
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"strings"
7+
8+
"github.com/jackc/pgx/v5"
9+
"github.com/jackc/pgx/v5/pgconn"
10+
)
11+
12+
// fakeDriver is a hand-rolled stand-in for the package-private `driver`
13+
// interface used by optimize.go. It lets unit tests exercise the phase
14+
// wiring (list -> assess -> exec) without standing up a *pgxpool.Pool.
15+
//
16+
// Tests register canned Query/QueryRow responses keyed by a substring of
17+
// the SQL statement; the first registered substring that matches a given
18+
// SQL is consumed. Exec responses are similarly keyed by substring; every
19+
// Exec call is appended to execCalls for assertion.
20+
type fakeDriver struct {
21+
queryRules []queryRule
22+
queryRowRules []queryRowRule
23+
execRules []execRule
24+
execCalls []execCall
25+
}
26+
27+
type queryRule struct {
28+
match string
29+
response func() (pgx.Rows, error)
30+
}
31+
32+
type queryRowRule struct {
33+
match string
34+
response func() pgx.Row
35+
}
36+
37+
type execRule struct {
38+
match string
39+
response func() (pgconn.CommandTag, error)
40+
}
41+
42+
type execCall struct {
43+
sql string
44+
args []any
45+
}
46+
47+
func (s *fakeDriver) Exec(_ context.Context, sql string, args ...any) (pgconn.CommandTag, error) {
48+
s.execCalls = append(s.execCalls, execCall{sql: sql, args: args})
49+
for i := range s.execRules {
50+
if strings.Contains(sql, s.execRules[i].match) {
51+
return s.execRules[i].response()
52+
}
53+
}
54+
return pgconn.CommandTag{}, nil
55+
}
56+
57+
func (s *fakeDriver) Query(_ context.Context, sql string, _ ...any) (pgx.Rows, error) {
58+
for i, r := range s.queryRules {
59+
if strings.Contains(sql, r.match) {
60+
s.queryRules = append(s.queryRules[:i], s.queryRules[i+1:]...)
61+
return r.response()
62+
}
63+
}
64+
return &fakeRows{}, nil
65+
}
66+
67+
func (s *fakeDriver) QueryRow(_ context.Context, sql string, _ ...any) pgx.Row {
68+
for i, r := range s.queryRowRules {
69+
if strings.Contains(sql, r.match) {
70+
s.queryRowRules = append(s.queryRowRules[:i], s.queryRowRules[i+1:]...)
71+
return r.response()
72+
}
73+
}
74+
return &fakeRow{err: pgx.ErrNoRows}
75+
}
76+
77+
// fakeRows implements pgx.Rows over a slice of value tuples. Each tuple is
78+
// a []any whose element types must match the pointer destinations passed to
79+
// Scan (assignment is performed via reflect). closeErr is returned by Err()
80+
// after the cursor is exhausted, simulating a partial-read failure.
81+
type fakeRows struct {
82+
values [][]any
83+
pos int
84+
scanErr error
85+
closeErr error
86+
closed bool
87+
}
88+
89+
func (s *fakeRows) Close() { s.closed = true }
90+
func (s *fakeRows) Err() error { return s.closeErr }
91+
func (s *fakeRows) CommandTag() pgconn.CommandTag { return pgconn.CommandTag{} }
92+
func (s *fakeRows) FieldDescriptions() []pgconn.FieldDescription { return nil }
93+
func (s *fakeRows) Values() ([]any, error) { return nil, nil }
94+
func (s *fakeRows) RawValues() [][]byte { return nil }
95+
func (s *fakeRows) Conn() *pgx.Conn { return nil }
96+
97+
func (s *fakeRows) Next() bool {
98+
if s.pos >= len(s.values) {
99+
return false
100+
}
101+
s.pos++
102+
return true
103+
}
104+
105+
func (s *fakeRows) Scan(dest ...any) error {
106+
if s.scanErr != nil {
107+
return s.scanErr
108+
}
109+
row := s.values[s.pos-1]
110+
return assignRow(row, dest)
111+
}
112+
113+
// fakeRow implements pgx.Row for a single QueryRow response.
114+
type fakeRow struct {
115+
values []any
116+
err error
117+
}
118+
119+
func (s *fakeRow) Scan(dest ...any) error {
120+
if s.err != nil {
121+
return s.err
122+
}
123+
return assignRow(s.values, dest)
124+
}
125+
126+
// assignRow copies row[i] into the value pointed to by dest[i] using reflect,
127+
// bridging the typed pointer destinations passed to Scan with the loosely
128+
// typed []any values registered by the test. A length mismatch or assignment
129+
// error returns a descriptive error to make scan misuse obvious in tests.
130+
func assignRow(row []any, dest []any) error {
131+
if len(row) != len(dest) {
132+
return &scanMismatchError{want: len(row), got: len(dest)}
133+
}
134+
for i, v := range row {
135+
dv := reflect.ValueOf(dest[i])
136+
if dv.Kind() != reflect.Ptr {
137+
return &scanMismatchError{want: len(row), got: len(dest)}
138+
}
139+
dv.Elem().Set(reflect.ValueOf(v))
140+
}
141+
return nil
142+
}
143+
144+
type scanMismatchError struct{ want, got int }
145+
146+
func (s *scanMismatchError) Error() string {
147+
return "fake scan mismatch"
148+
}

0 commit comments

Comments
 (0)