@@ -203,18 +203,58 @@ macro_rules! __sql_query {
203203 } ;
204204 ( [ $ctx: expr, @tx $tx: expr] $sql: expr, $( $bind: expr) ,* $( , ) ?) => {
205205 async {
206- let query = sqlx:: query( $crate:: __opt_indoc!( $sql) )
207- $(
208- . bind( $bind)
209- ) * ;
210-
211206 // Execute query
212207 $crate:: __sql_query_metrics_acquire!( _acquire) ;
213208 $crate:: __sql_query_metrics_start!( $ctx, execute, _acquire, _start) ;
214- let res = query. execute( & mut * * $tx) . await . map_err( Into :: <GlobalError >:: into) ;
209+
210+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
211+ 4 ,
212+ None ,
213+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
214+ 50
215+ ) ;
216+ let mut i = 0 ;
217+
218+ // Retry loop
219+ let res = loop {
220+ let query = sqlx:: query( $crate:: __opt_indoc!( $sql) )
221+ $(
222+ . bind( $bind)
223+ ) * ;
224+
225+ match query. execute( & mut * * $tx) . await {
226+ Err ( err) => {
227+ i += 1 ;
228+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
229+ break Err (
230+ sqlx:: Error :: Io (
231+ std:: io:: Error :: new(
232+ std:: io:: ErrorKind :: Other ,
233+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
234+ )
235+ )
236+ ) ;
237+ }
238+
239+ use sqlx:: Error :: * ;
240+ match & err {
241+ // Retry other errors with a backoff
242+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
243+ | WorkerCrashed => {
244+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
245+ backoff. tick( ) . await ;
246+ }
247+ // Throw error
248+ _ => break Err ( err) ,
249+ }
250+ }
251+ x => break x,
252+ }
253+ } ;
254+
215255 $crate:: __sql_query_metrics_finish!( $ctx, execute, _start) ;
216256
217- res
257+ res. map_err ( Into :: < GlobalError > :: into )
218258 }
219259 . instrument( tracing:: info_span!( "sql_query" ) )
220260 } ;
@@ -229,39 +269,119 @@ macro_rules! __sql_query_as {
229269 async {
230270 use sqlx:: Acquire ;
231271
232- let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
233- $(
234- . bind( $bind)
235- ) * ;
236-
237272 // Acquire connection
238273 $crate:: __sql_query_metrics_acquire!( _acquire) ;
239274 let driver = $driver;
240275 let mut conn = $crate:: __sql_acquire!( $ctx, driver) ;
241276
242277 // Execute query
243278 $crate:: __sql_query_metrics_start!( $ctx, $action, _acquire, _start) ;
244- let res = query. $action( & mut * conn) . await . map_err( Into :: <GlobalError >:: into) ;
279+
280+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
281+ 4 ,
282+ None ,
283+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
284+ 50
285+ ) ;
286+ let mut i = 0 ;
287+
288+ // Retry loop
289+ let res = loop {
290+ let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
291+ $(
292+ . bind( $bind)
293+ ) * ;
294+
295+ match query. $action( & mut * conn) . await {
296+ Err ( err) => {
297+ i += 1 ;
298+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
299+ break Err (
300+ sqlx:: Error :: Io (
301+ std:: io:: Error :: new(
302+ std:: io:: ErrorKind :: Other ,
303+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
304+ )
305+ )
306+ ) ;
307+ }
308+
309+ use sqlx:: Error :: * ;
310+ match & err {
311+ // Retry other errors with a backoff
312+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
313+ | WorkerCrashed => {
314+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
315+ backoff. tick( ) . await ;
316+ }
317+ // Throw error
318+ _ => break Err ( err) ,
319+ }
320+ }
321+ x => break x,
322+ }
323+ } ;
324+
245325 $crate:: __sql_query_metrics_finish!( $ctx, $action, _start) ;
246326
247- res
327+ res. map_err ( Into :: < GlobalError > :: into )
248328 }
249329 . instrument( tracing:: info_span!( "sql_query_as" ) )
250330 } ;
251331 ( [ $ctx: expr, $rv: ty, $action: ident, @tx $tx: expr] $sql: expr, $( $bind: expr) ,* $( , ) ?) => {
252332 async {
253- let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
254- $(
255- . bind( $bind)
256- ) * ;
257-
258333 // Execute query
259334 $crate:: __sql_query_metrics_acquire!( _acquire) ;
260335 $crate:: __sql_query_metrics_start!( $ctx, $action, _acquire, _start) ;
261- let res = query. $action( & mut * * $tx) . await . map_err( Into :: <GlobalError >:: into) ;
336+
337+ let mut backoff = $crate:: __rivet_util:: Backoff :: new(
338+ 4 ,
339+ None ,
340+ $crate:: utils:: sql_query_macros:: QUERY_RETRY_MS ,
341+ 50
342+ ) ;
343+ let mut i = 0 ;
344+
345+ // Retry loop
346+ let res = loop {
347+ let query = sqlx:: query_as:: <_, $rv>( $crate:: __opt_indoc!( $sql) )
348+ $(
349+ . bind( $bind)
350+ ) * ;
351+
352+ match query. $action( & mut * * $tx) . await {
353+ Err ( err) => {
354+ i += 1 ;
355+ if i > $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES {
356+ break Err (
357+ sqlx:: Error :: Io (
358+ std:: io:: Error :: new(
359+ std:: io:: ErrorKind :: Other ,
360+ $crate:: utils:: sql_query_macros:: Error :: MaxSqlRetries ( err) ,
361+ )
362+ )
363+ ) ;
364+ }
365+
366+ use sqlx:: Error :: * ;
367+ match & err {
368+ // Retry other errors with a backoff
369+ Database ( _) | Io ( _) | Tls ( _) | Protocol ( _) | PoolTimedOut | PoolClosed
370+ | WorkerCrashed => {
371+ tracing:: warn!( ?err, "query retry ({i}/{})" , $crate:: utils:: sql_query_macros:: MAX_QUERY_RETRIES ) ;
372+ backoff. tick( ) . await ;
373+ }
374+ // Throw error
375+ _ => break Err ( err) ,
376+ }
377+ }
378+ x => break x,
379+ }
380+ } ;
381+
262382 $crate:: __sql_query_metrics_finish!( $ctx, $action, _start) ;
263383
264- res
384+ res. map_err ( Into :: < GlobalError > :: into )
265385 }
266386 . instrument( tracing:: info_span!( "sql_query_as" ) )
267387 } ;
0 commit comments