Skip to content

Commit fadc734

Browse files
authored
fix(indexstore): limit SQLite to single connection and add atomic replace with retry (#141)
GORM's default connection pool opened 22+ SQLite connections, causing pragmas (WAL, busy_timeout) to only apply to one connection. Multiple connections triggered SQLITE_IOERR_BEGIN_ATOMIC (6410) from VFS-level write contention. - Set SetMaxOpenConns(1) for SQLite in both indexstore and auth store - Add ReplaceTestStats/ReplaceTestStatsBlockLogs for atomic delete+insert in a single transaction (prevents data loss on insert failure) - Add withRetry helper for transient SQLite errors with exponential backoff
1 parent 2ab4295 commit fadc734

3 files changed

Lines changed: 155 additions & 14 deletions

File tree

pkg/api/indexer/indexer.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -496,11 +496,6 @@ func (idx *indexer) indexTestStats(
496496
entry *executor.IndexEntry,
497497
resultData []byte,
498498
) error {
499-
// Delete old test stats for this run before re-inserting.
500-
if err := idx.store.DeleteTestStatsForRun(ctx, runID); err != nil {
501-
return fmt.Errorf("deleting old test stats: %w", err)
502-
}
503-
504499
// Use AccumulateRunResult to extract per-test durations.
505500
suiteStats := make(executor.SuiteStats)
506501

@@ -576,8 +571,8 @@ func (idx *indexer) indexTestStats(
576571
}
577572
}
578573

579-
if err := idx.store.BulkUpsertTestStats(ctx, testStats); err != nil {
580-
return fmt.Errorf("bulk inserting test stats: %w", err)
574+
if err := idx.store.ReplaceTestStats(ctx, runID, testStats); err != nil {
575+
return fmt.Errorf("replacing test stats: %w", err)
581576
}
582577

583578
return nil
@@ -645,11 +640,6 @@ func (idx *indexer) indexTestStatsBlockLogs(
645640
suiteHash, runID, client string,
646641
data []byte,
647642
) error {
648-
// Delete old block logs for this run before re-inserting.
649-
if err := idx.store.DeleteTestStatsBlockLogsForRun(ctx, runID); err != nil {
650-
return fmt.Errorf("deleting old test stats block logs: %w", err)
651-
}
652-
653643
// The file is a map of test name -> single block log entry.
654644
var testMap map[string]blockLogEntry
655645
if err := json.Unmarshal(data, &testMap); err != nil {
@@ -698,8 +688,10 @@ func (idx *indexer) indexTestStatsBlockLogs(
698688
})
699689
}
700690

701-
if err := idx.store.BulkInsertTestStatsBlockLogs(ctx, logs); err != nil {
702-
return fmt.Errorf("bulk inserting test stats block logs: %w", err)
691+
if err := idx.store.ReplaceTestStatsBlockLogs(
692+
ctx, runID, logs,
693+
); err != nil {
694+
return fmt.Errorf("replacing test stats block logs: %w", err)
703695
}
704696

705697
return nil

pkg/api/indexstore/indexstore.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package indexstore
33
import (
44
"context"
55
"fmt"
6+
"strings"
7+
"time"
68

79
"github.com/ethpandaops/benchmarkoor/pkg/config"
810
"github.com/glebarez/sqlite"
@@ -28,6 +30,9 @@ type Store interface {
2830
BulkUpsertTestStats(
2931
ctx context.Context, stats []*TestStat,
3032
) error
33+
ReplaceTestStats(
34+
ctx context.Context, runID string, stats []*TestStat,
35+
) error
3136
ListTestStatsBySuite(
3237
ctx context.Context, suiteHash string,
3338
) ([]TestStat, error)
@@ -45,6 +50,9 @@ type Store interface {
4550
BulkInsertTestStatsBlockLogs(
4651
ctx context.Context, logs []*TestStatsBlockLog,
4752
) error
53+
ReplaceTestStatsBlockLogs(
54+
ctx context.Context, runID string, logs []*TestStatsBlockLog,
55+
) error
4856
DeleteTestStatsBlockLogsForRun(ctx context.Context, runID string) error
4957

5058
QueryRuns(ctx context.Context, params *QueryParams) (*QueryResult, error)
@@ -112,6 +120,18 @@ func (s *store) Start(ctx context.Context) error {
112120

113121
s.db = db
114122

123+
// SQLite requires a single connection to avoid write contention and
124+
// ensure pragmas are applied consistently. GORM's default pool opens
125+
// many connections, each with independent pragma state.
126+
if s.cfg.Driver == "sqlite" {
127+
sqlDB, err := db.DB()
128+
if err != nil {
129+
return fmt.Errorf("getting underlying sql.DB: %w", err)
130+
}
131+
132+
sqlDB.SetMaxOpenConns(1)
133+
}
134+
115135
// Set SQLite pragmas for performance and reliability.
116136
if s.cfg.Driver == "sqlite" {
117137
pragmas := []string{
@@ -418,6 +438,82 @@ func (s *store) DeleteTestStatsForRun(
418438
return nil
419439
}
420440

441+
// ReplaceTestStats atomically deletes old test stats for a run and inserts
442+
// new ones in a single transaction with retry for transient SQLite errors.
443+
func (s *store) ReplaceTestStats(
444+
ctx context.Context, runID string, stats []*TestStat,
445+
) error {
446+
return s.withRetry(func() error {
447+
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
448+
if err := tx.Where("run_id = ?", runID).
449+
Delete(&TestStat{}).Error; err != nil {
450+
return fmt.Errorf(
451+
"deleting test stats for run: %w", err,
452+
)
453+
}
454+
455+
if len(stats) == 0 {
456+
return nil
457+
}
458+
459+
const batchSize = 100
460+
for i := 0; i < len(stats); i += batchSize {
461+
end := min(i+batchSize, len(stats))
462+
batch := stats[i:end]
463+
464+
if err := tx.CreateInBatches(
465+
batch, len(batch),
466+
).Error; err != nil {
467+
return fmt.Errorf(
468+
"bulk inserting test stats: %w", err,
469+
)
470+
}
471+
}
472+
473+
return nil
474+
})
475+
})
476+
}
477+
478+
// ReplaceTestStatsBlockLogs atomically deletes old block logs for a run
479+
// and inserts new ones in a single transaction with retry.
480+
func (s *store) ReplaceTestStatsBlockLogs(
481+
ctx context.Context, runID string, logs []*TestStatsBlockLog,
482+
) error {
483+
return s.withRetry(func() error {
484+
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
485+
if err := tx.Where("run_id = ?", runID).
486+
Delete(&TestStatsBlockLog{}).Error; err != nil {
487+
return fmt.Errorf(
488+
"deleting test stats block logs for run: %w",
489+
err,
490+
)
491+
}
492+
493+
if len(logs) == 0 {
494+
return nil
495+
}
496+
497+
const batchSize = 100
498+
for i := 0; i < len(logs); i += batchSize {
499+
end := min(i+batchSize, len(logs))
500+
batch := logs[i:end]
501+
502+
if err := tx.CreateInBatches(
503+
batch, len(batch),
504+
).Error; err != nil {
505+
return fmt.Errorf(
506+
"bulk inserting test stats block logs: %w",
507+
err,
508+
)
509+
}
510+
}
511+
512+
return nil
513+
})
514+
})
515+
}
516+
421517
// BulkInsertTestStatsBlockLogs inserts multiple test stats block log records
422518
// in batches. Each batch is auto-committed independently to keep
423519
// WAL/journal pressure low. The caller deletes old records first.
@@ -638,6 +734,48 @@ func (s *store) QuerySuites(
638734
}, nil
639735
}
640736

737+
// withRetry retries fn up to 3 times on transient SQLite errors
738+
// (database locked, disk I/O) with exponential backoff.
739+
func (s *store) withRetry(fn func() error) error {
740+
const maxAttempts = 3
741+
742+
backoffs := [...]time.Duration{
743+
100 * time.Millisecond,
744+
500 * time.Millisecond,
745+
2500 * time.Millisecond,
746+
}
747+
748+
var err error
749+
750+
for attempt := range maxAttempts {
751+
err = fn()
752+
if err == nil {
753+
return nil
754+
}
755+
756+
if !isSQLiteTransient(err) {
757+
return err
758+
}
759+
760+
if attempt < maxAttempts-1 {
761+
s.log.WithError(err).WithField("attempt", attempt+1).
762+
Warn("Transient SQLite error, retrying")
763+
time.Sleep(backoffs[attempt])
764+
}
765+
}
766+
767+
return err
768+
}
769+
770+
// isSQLiteTransient returns true for transient SQLite errors that may
771+
// succeed on retry.
772+
func isSQLiteTransient(err error) bool {
773+
msg := err.Error()
774+
775+
return strings.Contains(msg, "database is locked") ||
776+
strings.Contains(msg, "disk I/O error")
777+
}
778+
641779
// scanMaps scans query results into []map[string]any so only the selected
642780
// columns appear in the JSON response.
643781
func scanMaps(

pkg/api/store/store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,17 @@ func (s *store) Start(ctx context.Context) error {
118118
return fmt.Errorf("opening database: %w", err)
119119
}
120120

121+
// SQLite requires a single connection to avoid write contention and
122+
// ensure pragmas are applied consistently.
123+
if s.cfg.Driver == "sqlite" {
124+
sqlDB, dbErr := s.db.DB()
125+
if dbErr != nil {
126+
return fmt.Errorf("getting underlying sql.DB: %w", dbErr)
127+
}
128+
129+
sqlDB.SetMaxOpenConns(1)
130+
}
131+
121132
if err := s.db.WithContext(ctx).AutoMigrate(
122133
&User{},
123134
&Session{},

0 commit comments

Comments
 (0)