Skip to content

Commit 59e95e5

Browse files
authored
fix(store): separate SQLite read/write connections for concurrent reads (#142)
The previous fix set SetMaxOpenConns(1) to prevent write contention, but this serialized all operations including reads, causing timeouts for API users. SQLite WAL mode supports concurrent readers alongside a single writer, so split into two GORM instances: a single-connection write pool and a multi-connection read pool.
1 parent fadc734 commit 59e95e5

2 files changed

Lines changed: 222 additions & 87 deletions

File tree

pkg/api/indexstore/indexstore.go

Lines changed: 115 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ type Store interface {
7171
var _ Store = (*store)(nil)
7272

7373
type store struct {
74-
log logrus.FieldLogger
75-
cfg *config.APIDatabaseConfig
76-
db *gorm.DB
74+
log logrus.FieldLogger
75+
cfg *config.APIDatabaseConfig
76+
db *gorm.DB // write-only connection (single conn for SQLite)
77+
readDB *gorm.DB // read-only connection pool (concurrent readers)
7778
}
7879

7980
// NewStore creates a new index Store backed by the configured database driver.
@@ -89,15 +90,15 @@ func NewStore(
8990

9091
// Start opens the database connection and runs migrations.
9192
func (s *store) Start(ctx context.Context) error {
92-
var dialector gorm.Dialector
93-
9493
gormCfg := &gorm.Config{
9594
Logger: logger.Discard,
9695
}
9796

9897
switch s.cfg.Driver {
9998
case "sqlite":
100-
dialector = sqlite.Open(s.cfg.SQLite.Path)
99+
if err := s.openSQLite(gormCfg); err != nil {
100+
return err
101+
}
101102
case "postgres":
102103
dsn := fmt.Sprintf(
103104
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
@@ -108,44 +109,16 @@ func (s *store) Start(ctx context.Context) error {
108109
s.cfg.Postgres.Database,
109110
s.cfg.Postgres.SSLMode,
110111
)
111-
dialector = postgres.Open(dsn)
112-
default:
113-
return fmt.Errorf("unsupported database driver: %s", s.cfg.Driver)
114-
}
115-
116-
db, err := gorm.Open(dialector, gormCfg)
117-
if err != nil {
118-
return fmt.Errorf("opening index database: %w", err)
119-
}
120112

121-
s.db = db
122-
123-
// SQLite requires a single connection to avoid write contention and
124-
// ensure pragmas are applied consistently. GORM's default pool opens
125-
// many connections, each with independent pragma state.
126-
if s.cfg.Driver == "sqlite" {
127-
sqlDB, err := db.DB()
113+
db, err := gorm.Open(postgres.Open(dsn), gormCfg)
128114
if err != nil {
129-
return fmt.Errorf("getting underlying sql.DB: %w", err)
115+
return fmt.Errorf("opening index database: %w", err)
130116
}
131117

132-
sqlDB.SetMaxOpenConns(1)
133-
}
134-
135-
// Set SQLite pragmas for performance and reliability.
136-
if s.cfg.Driver == "sqlite" {
137-
pragmas := []string{
138-
"PRAGMA journal_mode=WAL",
139-
"PRAGMA synchronous=NORMAL",
140-
"PRAGMA busy_timeout=5000",
141-
"PRAGMA foreign_keys=ON",
142-
"PRAGMA temp_store=MEMORY",
143-
}
144-
for _, p := range pragmas {
145-
if err := s.db.Exec(p).Error; err != nil {
146-
return fmt.Errorf("setting pragma %q: %w", p, err)
147-
}
148-
}
118+
s.db = db
119+
s.readDB = db
120+
default:
121+
return fmt.Errorf("unsupported database driver: %s", s.cfg.Driver)
149122
}
150123

151124
if err := s.db.WithContext(ctx).AutoMigrate(
@@ -170,8 +143,79 @@ func (s *store) Start(ctx context.Context) error {
170143
return nil
171144
}
172145

173-
// Stop closes the underlying database connection.
146+
// openSQLite opens the write and read GORM connections for SQLite.
147+
// The write connection is limited to a single connection to prevent
148+
// write contention. The read connection allows concurrent readers
149+
// via WAL mode. For in-memory databases both point to the same
150+
// instance since separate connections would create independent DBs.
151+
func (s *store) openSQLite(gormCfg *gorm.Config) error {
152+
writeDB, err := gorm.Open(sqlite.Open(s.cfg.SQLite.Path), gormCfg)
153+
if err != nil {
154+
return fmt.Errorf("opening index database (write): %w", err)
155+
}
156+
157+
writeSQLDB, err := writeDB.DB()
158+
if err != nil {
159+
return fmt.Errorf("getting underlying sql.DB (write): %w", err)
160+
}
161+
162+
// Single writer prevents "database is locked" contention.
163+
writeSQLDB.SetMaxOpenConns(1)
164+
165+
if err := applySQLitePragmas(writeDB); err != nil {
166+
return err
167+
}
168+
169+
s.db = writeDB
170+
171+
// In-memory databases cannot share state across separate
172+
// connections, so both reads and writes use the same instance.
173+
if s.cfg.SQLite.Path == ":memory:" ||
174+
strings.Contains(s.cfg.SQLite.Path, "mode=memory") {
175+
s.readDB = writeDB
176+
177+
return nil
178+
}
179+
180+
// File-backed SQLite: open a separate read pool so concurrent
181+
// readers are not blocked behind the single-writer connection.
182+
readDB, err := gorm.Open(
183+
sqlite.Open(s.cfg.SQLite.Path), gormCfg,
184+
)
185+
if err != nil {
186+
return fmt.Errorf("opening index database (read): %w", err)
187+
}
188+
189+
readSQLDB, err := readDB.DB()
190+
if err != nil {
191+
return fmt.Errorf("getting underlying sql.DB (read): %w", err)
192+
}
193+
194+
readSQLDB.SetMaxOpenConns(4)
195+
196+
if err := applySQLitePragmas(readDB); err != nil {
197+
return err
198+
}
199+
200+
s.readDB = readDB
201+
202+
return nil
203+
}
204+
205+
// Stop closes the underlying database connections.
174206
func (s *store) Stop() error {
207+
// Close the read pool first (if it's a separate instance).
208+
if s.readDB != nil && s.readDB != s.db {
209+
readSQL, err := s.readDB.DB()
210+
if err != nil {
211+
return fmt.Errorf("getting underlying read db: %w", err)
212+
}
213+
214+
if err := readSQL.Close(); err != nil {
215+
return fmt.Errorf("closing read db: %w", err)
216+
}
217+
}
218+
175219
if s.db == nil {
176220
return nil
177221
}
@@ -203,7 +247,7 @@ func (s *store) ListRuns(
203247
ctx context.Context, discoveryPath string,
204248
) ([]Run, error) {
205249
var runs []Run
206-
if err := s.db.WithContext(ctx).
250+
if err := s.readDB.WithContext(ctx).
207251
Where("discovery_path = ?", discoveryPath).
208252
Order("timestamp DESC").
209253
Find(&runs).Error; err != nil {
@@ -216,7 +260,7 @@ func (s *store) ListRuns(
216260
// ListAllRuns returns all runs across all discovery paths.
217261
func (s *store) ListAllRuns(ctx context.Context) ([]Run, error) {
218262
var runs []Run
219-
if err := s.db.WithContext(ctx).
263+
if err := s.readDB.WithContext(ctx).
220264
Order("timestamp DESC").
221265
Find(&runs).Error; err != nil {
222266
return nil, fmt.Errorf("listing all runs: %w", err)
@@ -230,7 +274,7 @@ func (s *store) GetRunByRunID(
230274
ctx context.Context, runID string,
231275
) (*Run, error) {
232276
var run Run
233-
if err := s.db.WithContext(ctx).
277+
if err := s.readDB.WithContext(ctx).
234278
Where("run_id = ?", runID).
235279
First(&run).Error; err != nil {
236280
return nil, fmt.Errorf("getting run by run_id: %w", err)
@@ -336,7 +380,7 @@ func (s *store) ListRunIDs(
336380
ctx context.Context, discoveryPath string,
337381
) ([]string, error) {
338382
var ids []string
339-
if err := s.db.WithContext(ctx).
383+
if err := s.readDB.WithContext(ctx).
340384
Model(&Run{}).
341385
Where("discovery_path = ?", discoveryPath).
342386
Pluck("run_id", &ids).Error; err != nil {
@@ -357,7 +401,7 @@ func (s *store) ListIncompleteRunIDs(
357401
ctx context.Context, discoveryPath string,
358402
) ([]string, error) {
359403
var ids []string
360-
if err := s.db.WithContext(ctx).
404+
if err := s.readDB.WithContext(ctx).
361405
Model(&Run{}).
362406
Where("discovery_path = ? AND has_result = ? AND status != '' AND status NOT IN ?",
363407
discoveryPath, false, terminalStatuses).
@@ -416,7 +460,7 @@ func (s *store) ListTestStatsBySuite(
416460
ctx context.Context, suiteHash string,
417461
) ([]TestStat, error) {
418462
var stats []TestStat
419-
if err := s.db.WithContext(ctx).
463+
if err := s.readDB.WithContext(ctx).
420464
Where("suite_hash = ?", suiteHash).
421465
Find(&stats).Error; err != nil {
422466
return nil, fmt.Errorf("listing test stats: %w", err)
@@ -576,7 +620,7 @@ func (s *store) UpsertSuite(ctx context.Context, suite *Suite) error {
576620
func (s *store) QueryRuns(
577621
ctx context.Context, params *QueryParams,
578622
) (*QueryResult, error) {
579-
q := applyQuery(s.db.WithContext(ctx), &Run{}, params)
623+
q := applyQuery(s.readDB.WithContext(ctx), &Run{}, params)
580624

581625
// When select is specified, scan into maps so the JSON response
582626
// only contains the requested columns (no zero-valued extras).
@@ -616,7 +660,7 @@ func (s *store) QueryTestStats(
616660
ctx context.Context, params *QueryParams,
617661
) (*QueryResult, error) {
618662
q := applyQuery(
619-
s.db.WithContext(ctx), &TestStat{}, params,
663+
s.readDB.WithContext(ctx), &TestStat{}, params,
620664
)
621665

622666
// When select is specified, scan into maps so the JSON response
@@ -657,7 +701,7 @@ func (s *store) QueryTestStatsBlockLogs(
657701
ctx context.Context, params *QueryParams,
658702
) (*QueryResult, error) {
659703
q := applyQuery(
660-
s.db.WithContext(ctx), &TestStatsBlockLog{}, params,
704+
s.readDB.WithContext(ctx), &TestStatsBlockLog{}, params,
661705
)
662706

663707
// When select is specified, scan into maps so the JSON response
@@ -701,7 +745,7 @@ func (s *store) QueryTestStatsBlockLogs(
701745
func (s *store) QuerySuites(
702746
ctx context.Context, params *QueryParams,
703747
) (*QueryResult, error) {
704-
q := applyQuery(s.db.WithContext(ctx), &Suite{}, params)
748+
q := applyQuery(s.readDB.WithContext(ctx), &Suite{}, params)
705749

706750
// When select is specified, scan into maps so the JSON response
707751
// only contains the requested columns (no zero-valued extras).
@@ -767,6 +811,26 @@ func (s *store) withRetry(fn func() error) error {
767811
return err
768812
}
769813

814+
// applySQLitePragmas sets performance and reliability pragmas on a
815+
// SQLite GORM connection.
816+
func applySQLitePragmas(db *gorm.DB) error {
817+
pragmas := []string{
818+
"PRAGMA journal_mode=WAL",
819+
"PRAGMA synchronous=NORMAL",
820+
"PRAGMA busy_timeout=5000",
821+
"PRAGMA foreign_keys=ON",
822+
"PRAGMA temp_store=MEMORY",
823+
}
824+
825+
for _, p := range pragmas {
826+
if err := db.Exec(p).Error; err != nil {
827+
return fmt.Errorf("setting pragma %q: %w", p, err)
828+
}
829+
}
830+
831+
return nil
832+
}
833+
770834
// isSQLiteTransient returns true for transient SQLite errors that may
771835
// succeed on retry.
772836
func isSQLiteTransient(err error) bool {

0 commit comments

Comments
 (0)