-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Fix sqldb/v2 regressions
#10703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
yyforyongyu
merged 14 commits into
lightningnetwork:master
from
yyforyongyu:10697-review-fixes
Apr 10, 2026
Merged
Fix sqldb/v2 regressions
#10703
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
093c1c7
sqldb/v2: fix postgres time rewrite
yyforyongyu cfb7ae3
sqldb/v2: align test helper build tags
yyforyongyu ab7f36f
sqldb/v2: fix no_sqlite target builds
yyforyongyu 2be43f4
sqldb/v2: enforce require ssl mode
yyforyongyu 82e3ce2
sqldb/v2: restore sqlite conn limit
yyforyongyu d8734aa
sqldb/v2: fix sqlite migration errors
yyforyongyu a0a5273
sqldb/v2: align test helper args
yyforyongyu 76fc686
sqldb/v2: use BaseDB skip flag
yyforyongyu 8be8964
sqldb/v2: add executor backend
yyforyongyu 5c067e7
sqldb/v2: drop dead retry helper
yyforyongyu 275fe49
sqldb/v2: harden fixture names
yyforyongyu 70ab2fc
sqldb/v2: validate migration sets
yyforyongyu 115daef
sqldb/v2: scope retry rollbacks
yyforyongyu 7074419
sqldb/v2: align sqlite idle defaults
yyforyongyu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| package sqldb | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // TestSqliteConfigMaxConns verifies that SQLite keeps the low default | ||
| // connection limit unless the caller overrides it explicitly. | ||
| func TestSqliteConfigMaxConns(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| maxConns int | ||
| expectedConn int | ||
| }{ | ||
| { | ||
| name: "default limit", | ||
| expectedConn: DefaultSqliteMaxConns, | ||
| }, | ||
| { | ||
| name: "explicit limit", | ||
| maxConns: 7, | ||
| expectedConn: 7, | ||
| }, | ||
| } | ||
|
|
||
| for _, testCase := range testCases { | ||
| testCase := testCase | ||
|
|
||
| t.Run(testCase.name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cfg := &SqliteConfig{ | ||
| MaxConnections: testCase.maxConns, | ||
| } | ||
|
|
||
| require.Equal(t, testCase.expectedConn, cfg.MaxConns()) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // TestSqliteConfigMaxIdleConns verifies that SQLite defaults its idle | ||
| // connections to the open connection limit unless the caller overrides it. | ||
| func TestSqliteConfigMaxIdleConns(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| maxConns int | ||
| maxIdleConns int | ||
| expectedIdleConn int | ||
| }{ | ||
| { | ||
| name: "default idle limit", | ||
| expectedIdleConn: DefaultSqliteMaxConns, | ||
| }, | ||
| { | ||
| name: "inherits explicit open limit", | ||
| maxConns: 4, | ||
| expectedIdleConn: 4, | ||
| }, | ||
| { | ||
| name: "explicit idle limit", | ||
| maxConns: 4, | ||
| maxIdleConns: 3, | ||
| expectedIdleConn: 3, | ||
| }, | ||
| } | ||
|
|
||
| for _, testCase := range testCases { | ||
| testCase := testCase | ||
|
|
||
| t.Run(testCase.name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cfg := &SqliteConfig{ | ||
| MaxConnections: testCase.maxConns, | ||
| MaxIdleConnections: testCase.maxIdleConns, | ||
| } | ||
|
|
||
| require.Equal(t, testCase.expectedIdleConn, | ||
| cfg.MaxIdleConns()) | ||
| }) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,20 +84,12 @@ func ReadTxOpt() TxOptions { | |
| } | ||
| } | ||
|
|
||
| // BaseQuerier is a generic interface that represents the base methods that any | ||
| // database backend implementation which uses a Querier for its operations must | ||
| // implement. | ||
| type BaseQuerier interface { | ||
| // Backend returns the type of the database backend used. | ||
| Backend() BackendType | ||
| } | ||
|
|
||
| // BatchedTx is a generic interface that represents the ability to execute | ||
| // several operations to a given storage interface in a single atomic | ||
| // transaction. Typically, Q here will be some subset of the main sqlc.Querier | ||
| // interface allowing it to only depend on the routines it needs to implement | ||
| // any additional business logic. | ||
| type BatchedTx[Q BaseQuerier] interface { | ||
| type BatchedTx[Q any] interface { | ||
| // ExecTx will execute the passed txBody, operating upon generic | ||
| // parameter Q (usually a storage interface) in a single transaction. | ||
| // | ||
|
|
@@ -137,6 +129,9 @@ type BatchedQuerier interface { | |
| // BeginTx creates a new database transaction given the set of | ||
| // transaction options. | ||
| BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) | ||
|
|
||
| // Backend returns the type of the database backend used. | ||
| Backend() BackendType | ||
| } | ||
|
|
||
| // txExecutorOptions is a struct that holds the options for the transaction | ||
|
|
@@ -158,12 +153,6 @@ func defaultTxExecutorOptions() *txExecutorOptions { | |
| } | ||
| } | ||
|
|
||
| // randRetryDelay returns a random retry delay between 0 and the configured max | ||
| // delay. | ||
| func (t *txExecutorOptions) randRetryDelay() time.Duration { | ||
| return time.Duration(rand.Int63n(int64(t.maxRetryDelay))) //nolint:gosec | ||
| } | ||
|
|
||
| // TxExecutorOption is a functional option that allows us to pass in optional | ||
| // argument when creating the executor. | ||
| type TxExecutorOption func(*txExecutorOptions) | ||
|
|
@@ -188,18 +177,22 @@ func WithTxRetryDelay(delay time.Duration) TxExecutorOption { | |
| // query a type needs to run under a database transaction, and also the set of | ||
| // options for that transaction. The QueryCreator is used to create a query | ||
| // given a database transaction created by the BatchedQuerier. | ||
| type TransactionExecutor[Query BaseQuerier] struct { | ||
| type TransactionExecutor[Query any] struct { | ||
|
yyforyongyu marked this conversation as resolved.
|
||
| BatchedQuerier | ||
|
|
||
| createQuery QueryCreator[Query] | ||
|
|
||
| opts *txExecutorOptions | ||
| } | ||
|
|
||
| // A compile-time assertion to ensure TransactionExecutor satisfies the | ||
| // batched transaction interface. | ||
| var _ BatchedTx[any] = (*TransactionExecutor[any])(nil) | ||
|
|
||
| // NewTransactionExecutor creates a new instance of a TransactionExecutor given | ||
| // a Querier query object and a concrete type for the type of transactions the | ||
| // Querier understands. | ||
| func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier, | ||
| func NewTransactionExecutor[Querier any](db BatchedQuerier, | ||
| createQuery QueryCreator[Querier], | ||
| opts ...TxExecutorOption) *TransactionExecutor[Querier] { | ||
|
|
||
|
|
@@ -215,6 +208,11 @@ func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier, | |
| } | ||
| } | ||
|
|
||
| // Backend returns the type of database backend used by the executor. | ||
| func (t *TransactionExecutor[Q]) Backend() BackendType { | ||
|
yyforyongyu marked this conversation as resolved.
|
||
| return t.BatchedQuerier.Backend() | ||
| } | ||
|
|
||
| // randRetryDelay returns a random retry delay between -50% and +50% of the | ||
| // configured delay that is doubled for each attempt and capped at a max value. | ||
| func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration, | ||
|
|
@@ -268,6 +266,55 @@ type RollbackTx func(tx Tx) error | |
| // the delay before the next retry. | ||
| type OnBackoff func(retry int, delay time.Duration) | ||
|
|
||
| // executeTxAttempt runs a single transaction attempt and reports whether the | ||
| // caller should retry it. | ||
| func executeTxAttempt(tx Tx, txBody TxBody, rollbackTx RollbackTx, | ||
| waitBeforeRetry func(int) bool, attempt int) (bool, error) { | ||
|
|
||
| // Rollback is safe to call even if the tx is already closed, so if the tx | ||
| // commits successfully, this is a no-op. | ||
| defer func() { | ||
| _ = tx.Rollback() | ||
| }() | ||
|
|
||
| if bodyErr := txBody(tx); bodyErr != nil { | ||
| log.Tracef("Error in txBody: %v", bodyErr) | ||
|
|
||
| // Roll back the transaction, then attempt a random backoff and try | ||
| // again if the error was a serialization error. | ||
| if err := rollbackTx(tx); err != nil { | ||
| return false, MapSQLError(err) | ||
| } | ||
|
|
||
| dbErr := MapSQLError(bodyErr) | ||
| if IsSerializationOrDeadlockError(dbErr) { | ||
| return waitBeforeRetry(attempt), dbErr | ||
| } | ||
|
|
||
| return false, dbErr | ||
| } | ||
|
|
||
| // Commit transaction. | ||
| if commitErr := tx.Commit(); commitErr != nil { | ||
| log.Tracef("Failed to commit tx: %v", commitErr) | ||
|
|
||
| // Roll back the transaction, then attempt a random backoff and try | ||
| // again if the error was a serialization error. | ||
| if err := rollbackTx(tx); err != nil { | ||
| return false, MapSQLError(err) | ||
| } | ||
|
|
||
| dbErr := MapSQLError(commitErr) | ||
| if IsSerializationOrDeadlockError(dbErr) { | ||
| return waitBeforeRetry(attempt), dbErr | ||
| } | ||
|
|
||
| return false, dbErr | ||
| } | ||
|
|
||
| return false, nil | ||
| } | ||
|
|
||
| // ExecuteSQLTransactionWithRetry is a helper function that executes a | ||
| // transaction with retry logic. It will retry the transaction if it fails with | ||
| // a serialization error. The function will return an error if the transaction | ||
|
|
@@ -316,51 +363,15 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx, | |
| return dbErr | ||
| } | ||
|
|
||
| // Rollback is safe to call even if the tx is already closed, | ||
| // so if the tx commits successfully, this is a no-op. | ||
| defer func() { | ||
| _ = tx.Rollback() | ||
| }() | ||
|
|
||
| if bodyErr := txBody(tx); bodyErr != nil { | ||
| log.Tracef("Error in txBody: %v", bodyErr) | ||
|
|
||
| // Roll back the transaction, then attempt a random | ||
| // backoff and try again if the error was a | ||
| // serialization error. | ||
| if err := rollbackTx(tx); err != nil { | ||
| return MapSQLError(err) | ||
| } | ||
|
|
||
| dbErr := MapSQLError(bodyErr) | ||
| if IsSerializationOrDeadlockError(dbErr) { | ||
| if waitBeforeRetry(i) { | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| return dbErr | ||
| retry, err := executeTxAttempt( | ||
| tx, txBody, rollbackTx, waitBeforeRetry, i, | ||
| ) | ||
| if retry { | ||
| // Transient serialization error, discard this attempt and retry. | ||
| continue | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a comment here: |
||
| } | ||
|
|
||
| // Commit transaction. | ||
| if commitErr := tx.Commit(); commitErr != nil { | ||
| log.Tracef("Failed to commit tx: %v", commitErr) | ||
|
|
||
| // Roll back the transaction, then attempt a random | ||
| // backoff and try again if the error was a | ||
| // serialization error. | ||
| if err := rollbackTx(tx); err != nil { | ||
| return MapSQLError(err) | ||
| } | ||
|
|
||
| dbErr := MapSQLError(commitErr) | ||
| if IsSerializationOrDeadlockError(dbErr) { | ||
| if waitBeforeRetry(i) { | ||
| continue | ||
| } | ||
| } | ||
|
|
||
| return dbErr | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package sqldb | ||
|
|
||
| import ( | ||
| "context" | ||
| "database/sql" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // testQuerier is a minimal query wrapper used to instantiate the generic | ||
| // transaction executor in tests. | ||
| type testQuerier struct { | ||
| } | ||
|
|
||
| // testBatchedQuerier is a minimal BatchedQuerier implementation used to verify | ||
| // that TransactionExecutor forwards backend identity. | ||
| type testBatchedQuerier struct { | ||
| backend BackendType | ||
| } | ||
|
|
||
| // BeginTx is a stub implementation used to satisfy the BatchedQuerier | ||
| // interface in tests. | ||
| func (t testBatchedQuerier) BeginTx(context.Context, | ||
| TxOptions) (*sql.Tx, error) { | ||
|
|
||
| return nil, nil | ||
| } | ||
|
|
||
| // Backend returns the backend type used by the test batched querier. | ||
| func (t testBatchedQuerier) Backend() BackendType { | ||
| return t.backend | ||
| } | ||
|
|
||
| // TestTransactionExecutorBackend verifies that the executor forwards the | ||
| // backend type from its batched querier. | ||
| func TestTransactionExecutorBackend(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| executor := NewTransactionExecutor[testQuerier]( | ||
| testBatchedQuerier{backend: BackendTypePostgres}, | ||
| func(*sql.Tx) testQuerier { | ||
| return testQuerier{} | ||
| }, | ||
| ) | ||
|
|
||
| require.Equal(t, BackendTypePostgres, executor.Backend()) | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.