11use crate :: { connection:: LogSettings , sql_str:: SqlStr } ;
2+ use std:: pin:: Pin ;
3+ use std:: task:: { Context , Poll } ;
24use std:: time:: Instant ;
35
6+ use futures_core:: Stream ;
7+ use pin_project_lite:: pin_project;
8+ use tracing:: Span ;
9+
410// Yes these look silly. `tracing` doesn't currently support dynamic levels
511// https://github.com/tokio-rs/tracing/issues/372
612#[ doc( hidden) ]
@@ -66,19 +72,58 @@ pub struct QueryLogger {
6672 rows_affected : u64 ,
6773 start : Instant ,
6874 settings : LogSettings ,
75+ span : Span ,
6976}
7077
7178impl QueryLogger {
7279 pub fn new ( sql : SqlStr , settings : LogSettings ) -> Self {
80+ // Hardcoded INFO level per maintainer review of #3313: libraries should pick a
81+ // level and let consumers filter via `EnvFilter`. Field names follow the OTel
82+ // database span semantic conventions
83+ // (https://opentelemetry.io/docs/specs/semconv/database/database-spans/).
84+ // `otel.kind = "client"` is the magic field that `tracing-opentelemetry` reads
85+ // to set the exported `SpanKind`. `db.system.name` is declared empty here and
86+ // filled in by drivers via `with_db_system_name`, so adding the field doesn't
87+ // force a signature break on `QueryLogger::new`.
88+ let summary = parse_query_summary ( sql. as_str ( ) ) ;
89+ let operation = summary
90+ . split_whitespace ( )
91+ . next ( )
92+ . map ( str:: to_owned)
93+ . unwrap_or_default ( ) ;
94+ let span = tracing:: info_span!(
95+ target: "sqlx::query" ,
96+ "db.query" ,
97+ "db.system.name" = tracing:: field:: Empty ,
98+ "db.operation.name" = operation,
99+ "db.query.summary" = summary,
100+ "db.query.text" = sql. as_str( ) ,
101+ "db.response.returned_rows" = tracing:: field:: Empty ,
102+ "db.response.affected_rows" = tracing:: field:: Empty ,
103+ "otel.kind" = "client" ,
104+ ) ;
105+
73106 Self {
74107 sql,
75108 rows_returned : 0 ,
76109 rows_affected : 0 ,
77110 start : Instant :: now ( ) ,
78111 settings,
112+ span,
79113 }
80114 }
81115
116+ /// Records the OTel `db.system.name` attribute on the query span.
117+ ///
118+ /// Drivers should call this with their canonical OTel system identifier
119+ /// (`"postgresql"`, `"mysql"`, `"sqlite"`, etc. — see the OTel database span
120+ /// semantic conventions). Separate from `new` so adding the field doesn't break
121+ /// callers that construct `QueryLogger` directly.
122+ pub fn with_db_system_name ( self , name : & ' static str ) -> Self {
123+ self . span . record ( "db.system.name" , name) ;
124+ self
125+ }
126+
82127 pub fn increment_rows_returned ( & mut self ) {
83128 self . rows_returned += 1 ;
84129 }
@@ -91,9 +136,25 @@ impl QueryLogger {
91136 & self . sql
92137 }
93138
139+ /// Clone the span attached to this query.
140+ ///
141+ /// Use with [`InstrumentedStream`] (or `Future::instrument` for plain futures) to
142+ /// attribute child events emitted during query execution to the query's span. The
143+ /// `Span` is `Send`; never store an `EnteredSpan` here (see #3176).
144+ pub fn span ( & self ) -> Span {
145+ self . span . clone ( )
146+ }
147+
94148 pub fn finish ( & self ) {
95149 let elapsed = self . start . elapsed ( ) ;
96150
151+ // Record the per-query result counts on the span before it closes so OTel
152+ // exporters see them as span attributes.
153+ self . span
154+ . record ( "db.response.returned_rows" , self . rows_returned ) ;
155+ self . span
156+ . record ( "db.response.affected_rows" , self . rows_affected ) ;
157+
97158 let was_slow = elapsed >= self . settings . slow_statements_duration ;
98159
99160 let lvl = if was_slow {
@@ -117,38 +178,43 @@ impl QueryLogger {
117178 String :: new ( )
118179 } ;
119180
120- if was_slow {
121- private_tracing_dynamic_event ! (
122- target: "sqlx::query" ,
123- tracing_level,
124- summary,
125- db. statement = sql,
126- rows_affected = self . rows_affected,
127- rows_returned = self . rows_returned,
128- // Human-friendly - includes units (usually ms). Also kept for backward compatibility
129- ?elapsed,
130- // Search friendly - numeric
131- elapsed_secs = elapsed. as_secs_f64( ) ,
132- // When logging to JSON, one can trigger alerts from the presence of this field.
133- slow_threshold=?self . settings. slow_statements_duration,
134- // Make sure to use "slow" in the message as that's likely
135- // what people will grep for.
136- "slow statement: execution time exceeded alert threshold"
137- ) ;
138- } else {
139- private_tracing_dynamic_event ! (
140- target: "sqlx::query" ,
141- tracing_level,
142- summary,
143- db. statement = sql,
144- rows_affected = self . rows_affected,
145- rows_returned = self . rows_returned,
146- // Human-friendly - includes units (usually ms). Also kept for backward compatibility
147- ?elapsed,
148- // Search friendly - numeric
149- elapsed_secs = elapsed. as_secs_f64( ) ,
150- ) ;
151- }
181+ // Emit the existing close-time event inside the query span so consumers
182+ // see both the span (for OTel correlation) and the event (for the
183+ // backwards-compatible `rows_affected`/`elapsed_secs` fields).
184+ self . span . in_scope ( || {
185+ if was_slow {
186+ private_tracing_dynamic_event ! (
187+ target: "sqlx::query" ,
188+ tracing_level,
189+ summary,
190+ db. statement = sql,
191+ rows_affected = self . rows_affected,
192+ rows_returned = self . rows_returned,
193+ // Human-friendly - includes units (usually ms). Also kept for backward compatibility
194+ ?elapsed,
195+ // Search friendly - numeric
196+ elapsed_secs = elapsed. as_secs_f64( ) ,
197+ // When logging to JSON, one can trigger alerts from the presence of this field.
198+ slow_threshold=?self . settings. slow_statements_duration,
199+ // Make sure to use "slow" in the message as that's likely
200+ // what people will grep for.
201+ "slow statement: execution time exceeded alert threshold"
202+ ) ;
203+ } else {
204+ private_tracing_dynamic_event ! (
205+ target: "sqlx::query" ,
206+ tracing_level,
207+ summary,
208+ db. statement = sql,
209+ rows_affected = self . rows_affected,
210+ rows_returned = self . rows_returned,
211+ // Human-friendly - includes units (usually ms). Also kept for backward compatibility
212+ ?elapsed,
213+ // Search friendly - numeric
214+ elapsed_secs = elapsed. as_secs_f64( ) ,
215+ ) ;
216+ }
217+ } ) ;
152218 }
153219 }
154220 }
@@ -160,10 +226,190 @@ impl Drop for QueryLogger {
160226 }
161227}
162228
229+ pin_project ! {
230+ /// Wraps a [`Stream`] so each `poll_next` runs inside the given [`Span`].
231+ ///
232+ /// This is the `Stream` counterpart to `tracing::Instrument` for futures. It
233+ /// re-enters the span on every poll and drops the guard before yielding, so no
234+ /// `EnteredSpan` is ever held across an await point — fixing the `!Send` issue
235+ /// that sank #3176. The inner stream is projected via `pin-project-lite`, so this
236+ /// adds no allocation and keeps the module free of `unsafe` pin code.
237+ pub struct InstrumentedStream <S > {
238+ #[ pin]
239+ inner: S ,
240+ span: Span ,
241+ }
242+ }
243+
244+ impl < S > InstrumentedStream < S > {
245+ pub fn new ( inner : S , span : Span ) -> Self {
246+ Self { inner, span }
247+ }
248+ }
249+
250+ impl < S : Stream > Stream for InstrumentedStream < S > {
251+ type Item = S :: Item ;
252+
253+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
254+ let this = self . project ( ) ;
255+ let _enter = this. span . enter ( ) ;
256+ this. inner . poll_next ( cx)
257+ }
258+ }
259+
163260pub fn parse_query_summary ( sql : & str ) -> String {
164261 // For now, just take the first 4 words
165262 sql. split_whitespace ( )
166263 . take ( 4 )
167264 . collect :: < Vec < & str > > ( )
168265 . join ( " " )
169266}
267+
268+ #[ cfg( test) ]
269+ mod tests {
270+ use super :: * ;
271+ use crate :: sql_str:: SqlSafeStr ;
272+ use std:: sync:: { Arc , Mutex } ;
273+ use tracing:: field:: { Field , Visit } ;
274+ use tracing:: span:: { Attributes , Record } ;
275+ use tracing:: subscriber:: { with_default, Subscriber } ;
276+ use tracing:: { Event , Id , Metadata } ;
277+
278+ struct CapturedSpan {
279+ name : & ' static str ,
280+ target : String ,
281+ level : tracing:: Level ,
282+ fields : std:: collections:: HashMap < String , String > ,
283+ closed : bool ,
284+ contained_events : usize ,
285+ }
286+
287+ #[ derive( Default ) ]
288+ struct CaptureSubscriber {
289+ next_id : std:: sync:: atomic:: AtomicU64 ,
290+ spans : Mutex < std:: collections:: HashMap < u64 , CapturedSpan > > ,
291+ current : Mutex < Vec < u64 > > ,
292+ }
293+
294+ struct StringVisitor < ' a > ( & ' a mut std:: collections:: HashMap < String , String > ) ;
295+ impl Visit for StringVisitor < ' _ > {
296+ fn record_debug ( & mut self , field : & Field , value : & dyn std:: fmt:: Debug ) {
297+ self . 0 . insert ( field. name ( ) . to_string ( ) , format ! ( "{value:?}" ) ) ;
298+ }
299+ fn record_str ( & mut self , field : & Field , value : & str ) {
300+ self . 0 . insert ( field. name ( ) . to_string ( ) , value. to_string ( ) ) ;
301+ }
302+ fn record_u64 ( & mut self , field : & Field , value : u64 ) {
303+ self . 0 . insert ( field. name ( ) . to_string ( ) , value. to_string ( ) ) ;
304+ }
305+ fn record_i64 ( & mut self , field : & Field , value : i64 ) {
306+ self . 0 . insert ( field. name ( ) . to_string ( ) , value. to_string ( ) ) ;
307+ }
308+ fn record_bool ( & mut self , field : & Field , value : bool ) {
309+ self . 0 . insert ( field. name ( ) . to_string ( ) , value. to_string ( ) ) ;
310+ }
311+ }
312+
313+ impl Subscriber for CaptureSubscriber {
314+ fn enabled ( & self , _metadata : & Metadata < ' _ > ) -> bool {
315+ true
316+ }
317+ fn new_span ( & self , attrs : & Attributes < ' _ > ) -> Id {
318+ let id = self
319+ . next_id
320+ . fetch_add ( 1 , std:: sync:: atomic:: Ordering :: SeqCst )
321+ + 1 ;
322+ let mut span = CapturedSpan {
323+ name : attrs. metadata ( ) . name ( ) ,
324+ target : attrs. metadata ( ) . target ( ) . to_string ( ) ,
325+ level : * attrs. metadata ( ) . level ( ) ,
326+ fields : std:: collections:: HashMap :: new ( ) ,
327+ closed : false ,
328+ contained_events : 0 ,
329+ } ;
330+ attrs. record ( & mut StringVisitor ( & mut span. fields ) ) ;
331+ self . spans . lock ( ) . unwrap ( ) . insert ( id, span) ;
332+ Id :: from_u64 ( id)
333+ }
334+ fn record ( & self , span : & Id , values : & Record < ' _ > ) {
335+ if let Some ( s) = self . spans . lock ( ) . unwrap ( ) . get_mut ( & span. into_u64 ( ) ) {
336+ values. record ( & mut StringVisitor ( & mut s. fields ) ) ;
337+ }
338+ }
339+ fn record_follows_from ( & self , _: & Id , _: & Id ) { }
340+ fn event ( & self , _event : & Event < ' _ > ) {
341+ let current = self . current . lock ( ) . unwrap ( ) ;
342+ if let Some ( & id) = current. last ( ) {
343+ if let Some ( s) = self . spans . lock ( ) . unwrap ( ) . get_mut ( & id) {
344+ s. contained_events += 1 ;
345+ }
346+ }
347+ }
348+ fn enter ( & self , span : & Id ) {
349+ self . current . lock ( ) . unwrap ( ) . push ( span. into_u64 ( ) ) ;
350+ }
351+ fn exit ( & self , _span : & Id ) {
352+ self . current . lock ( ) . unwrap ( ) . pop ( ) ;
353+ }
354+ fn try_close ( & self , id : Id ) -> bool {
355+ if let Some ( s) = self . spans . lock ( ) . unwrap ( ) . get_mut ( & id. into_u64 ( ) ) {
356+ s. closed = true ;
357+ }
358+ true
359+ }
360+ }
361+
362+ #[ test]
363+ fn query_logger_opens_and_closes_span_with_expected_fields ( ) {
364+ let subscriber = Arc :: new ( CaptureSubscriber :: default ( ) ) ;
365+ with_default ( subscriber. clone ( ) , || {
366+ let settings = LogSettings :: default ( ) ;
367+ let sql = "SELECT id, name FROM users WHERE id = 1" . into_sql_str ( ) ;
368+ let mut logger = QueryLogger :: new ( sql, settings) . with_db_system_name ( "postgresql" ) ;
369+ logger. increment_rows_returned ( ) ;
370+ logger. increment_rows_returned ( ) ;
371+ logger. increase_rows_affected ( 2 ) ;
372+ drop ( logger) ;
373+ } ) ;
374+
375+ let spans = subscriber. spans . lock ( ) . unwrap ( ) ;
376+ assert_eq ! ( spans. len( ) , 1 , "exactly one span should be opened" ) ;
377+ let span = spans. values ( ) . next ( ) . unwrap ( ) ;
378+
379+ assert_eq ! ( span. name, "db.query" ) ;
380+ assert_eq ! ( span. target, "sqlx::query" ) ;
381+ assert_eq ! ( span. level, tracing:: Level :: INFO ) ;
382+ assert ! ( span. closed, "span must close on QueryLogger drop" ) ;
383+ assert ! (
384+ span. contained_events >= 1 ,
385+ "the close-time event should fire inside the span"
386+ ) ;
387+
388+ assert_eq ! (
389+ span. fields. get( "db.system.name" ) . map( String :: as_str) ,
390+ Some ( "postgresql" )
391+ ) ;
392+ assert_eq ! (
393+ span. fields. get( "db.operation.name" ) . map( String :: as_str) ,
394+ Some ( "SELECT" )
395+ ) ;
396+ assert_eq ! (
397+ span. fields. get( "otel.kind" ) . map( String :: as_str) ,
398+ Some ( "client" )
399+ ) ;
400+ assert ! ( span
401+ . fields
402+ . get( "db.query.text" )
403+ . is_some_and( |s| s. contains( "SELECT id, name FROM users" ) ) ) ;
404+ assert_eq ! (
405+ span. fields. get( "db.response.returned_rows" ) . map( String :: as_str) ,
406+ Some ( "2" ) ,
407+ "rows_returned must be recorded on the span before close"
408+ ) ;
409+ assert_eq ! (
410+ span. fields. get( "db.response.affected_rows" ) . map( String :: as_str) ,
411+ Some ( "2" ) ,
412+ "rows_affected must be recorded on the span before close"
413+ ) ;
414+ }
415+ }
0 commit comments