@@ -84,20 +84,12 @@ func ReadTxOpt() TxOptions {
8484 }
8585}
8686
87- // BaseQuerier is a generic interface that represents the base methods that any
88- // database backend implementation which uses a Querier for its operations must
89- // implement.
90- type BaseQuerier interface {
91- // Backend returns the type of the database backend used.
92- Backend () BackendType
93- }
94-
9587// BatchedTx is a generic interface that represents the ability to execute
9688// several operations to a given storage interface in a single atomic
9789// transaction. Typically, Q here will be some subset of the main sqlc.Querier
9890// interface allowing it to only depend on the routines it needs to implement
9991// any additional business logic.
100- type BatchedTx [Q BaseQuerier ] interface {
92+ type BatchedTx [Q any ] interface {
10193 // ExecTx will execute the passed txBody, operating upon generic
10294 // parameter Q (usually a storage interface) in a single transaction.
10395 //
@@ -137,6 +129,9 @@ type BatchedQuerier interface {
137129 // BeginTx creates a new database transaction given the set of
138130 // transaction options.
139131 BeginTx (ctx context.Context , options TxOptions ) (* sql.Tx , error )
132+
133+ // Backend returns the type of the database backend used.
134+ Backend () BackendType
140135}
141136
142137// txExecutorOptions is a struct that holds the options for the transaction
@@ -158,12 +153,6 @@ func defaultTxExecutorOptions() *txExecutorOptions {
158153 }
159154}
160155
161- // randRetryDelay returns a random retry delay between 0 and the configured max
162- // delay.
163- func (t * txExecutorOptions ) randRetryDelay () time.Duration {
164- return time .Duration (rand .Int63n (int64 (t .maxRetryDelay ))) //nolint:gosec
165- }
166-
167156// TxExecutorOption is a functional option that allows us to pass in optional
168157// argument when creating the executor.
169158type TxExecutorOption func (* txExecutorOptions )
@@ -188,18 +177,22 @@ func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
188177// query a type needs to run under a database transaction, and also the set of
189178// options for that transaction. The QueryCreator is used to create a query
190179// given a database transaction created by the BatchedQuerier.
191- type TransactionExecutor [Query BaseQuerier ] struct {
180+ type TransactionExecutor [Query any ] struct {
192181 BatchedQuerier
193182
194183 createQuery QueryCreator [Query ]
195184
196185 opts * txExecutorOptions
197186}
198187
188+ // A compile-time assertion to ensure TransactionExecutor satisfies the
189+ // batched transaction interface.
190+ var _ BatchedTx [any ] = (* TransactionExecutor [any ])(nil )
191+
199192// NewTransactionExecutor creates a new instance of a TransactionExecutor given
200193// a Querier query object and a concrete type for the type of transactions the
201194// Querier understands.
202- func NewTransactionExecutor [Querier BaseQuerier ](db BatchedQuerier ,
195+ func NewTransactionExecutor [Querier any ](db BatchedQuerier ,
203196 createQuery QueryCreator [Querier ],
204197 opts ... TxExecutorOption ) * TransactionExecutor [Querier ] {
205198
@@ -215,6 +208,11 @@ func NewTransactionExecutor[Querier BaseQuerier](db BatchedQuerier,
215208 }
216209}
217210
211+ // Backend returns the type of database backend used by the executor.
212+ func (t * TransactionExecutor [Q ]) Backend () BackendType {
213+ return t .BatchedQuerier .Backend ()
214+ }
215+
218216// randRetryDelay returns a random retry delay between -50% and +50% of the
219217// configured delay that is doubled for each attempt and capped at a max value.
220218func randRetryDelay (initialRetryDelay , maxRetryDelay time.Duration ,
@@ -268,6 +266,55 @@ type RollbackTx func(tx Tx) error
268266// the delay before the next retry.
269267type OnBackoff func (retry int , delay time.Duration )
270268
269+ // executeTxAttempt runs a single transaction attempt and reports whether the
270+ // caller should retry it.
271+ func executeTxAttempt (tx Tx , txBody TxBody , rollbackTx RollbackTx ,
272+ waitBeforeRetry func (int ) bool , attempt int ) (bool , error ) {
273+
274+ // Rollback is safe to call even if the tx is already closed, so if the tx
275+ // commits successfully, this is a no-op.
276+ defer func () {
277+ _ = tx .Rollback ()
278+ }()
279+
280+ if bodyErr := txBody (tx ); bodyErr != nil {
281+ log .Tracef ("Error in txBody: %v" , bodyErr )
282+
283+ // Roll back the transaction, then attempt a random backoff and try
284+ // again if the error was a serialization error.
285+ if err := rollbackTx (tx ); err != nil {
286+ return false , MapSQLError (err )
287+ }
288+
289+ dbErr := MapSQLError (bodyErr )
290+ if IsSerializationOrDeadlockError (dbErr ) {
291+ return waitBeforeRetry (attempt ), dbErr
292+ }
293+
294+ return false , dbErr
295+ }
296+
297+ // Commit transaction.
298+ if commitErr := tx .Commit (); commitErr != nil {
299+ log .Tracef ("Failed to commit tx: %v" , commitErr )
300+
301+ // Roll back the transaction, then attempt a random backoff and try
302+ // again if the error was a serialization error.
303+ if err := rollbackTx (tx ); err != nil {
304+ return false , MapSQLError (err )
305+ }
306+
307+ dbErr := MapSQLError (commitErr )
308+ if IsSerializationOrDeadlockError (dbErr ) {
309+ return waitBeforeRetry (attempt ), dbErr
310+ }
311+
312+ return false , dbErr
313+ }
314+
315+ return false , nil
316+ }
317+
271318// ExecuteSQLTransactionWithRetry is a helper function that executes a
272319// transaction with retry logic. It will retry the transaction if it fails with
273320// a serialization error. The function will return an error if the transaction
@@ -316,51 +363,15 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
316363 return dbErr
317364 }
318365
319- // Rollback is safe to call even if the tx is already closed,
320- // so if the tx commits successfully, this is a no-op.
321- defer func () {
322- _ = tx .Rollback ()
323- }()
324-
325- if bodyErr := txBody (tx ); bodyErr != nil {
326- log .Tracef ("Error in txBody: %v" , bodyErr )
327-
328- // Roll back the transaction, then attempt a random
329- // backoff and try again if the error was a
330- // serialization error.
331- if err := rollbackTx (tx ); err != nil {
332- return MapSQLError (err )
333- }
334-
335- dbErr := MapSQLError (bodyErr )
336- if IsSerializationOrDeadlockError (dbErr ) {
337- if waitBeforeRetry (i ) {
338- continue
339- }
340- }
341-
342- return dbErr
366+ retry , err := executeTxAttempt (
367+ tx , txBody , rollbackTx , waitBeforeRetry , i ,
368+ )
369+ if retry {
370+ // Transient serialization error, discard this attempt and retry.
371+ continue
343372 }
344-
345- // Commit transaction.
346- if commitErr := tx .Commit (); commitErr != nil {
347- log .Tracef ("Failed to commit tx: %v" , commitErr )
348-
349- // Roll back the transaction, then attempt a random
350- // backoff and try again if the error was a
351- // serialization error.
352- if err := rollbackTx (tx ); err != nil {
353- return MapSQLError (err )
354- }
355-
356- dbErr := MapSQLError (commitErr )
357- if IsSerializationOrDeadlockError (dbErr ) {
358- if waitBeforeRetry (i ) {
359- continue
360- }
361- }
362-
363- return dbErr
373+ if err != nil {
374+ return err
364375 }
365376
366377 return nil
0 commit comments