Skip to content

Commit c8859ca

Browse files
committed
Open per-query tracing span in QueryLogger
QueryLogger now opens a hardcoded INFO-level `db.query` span in `new()` that closes on drop, with field names following the OTel database span semantic conventions (`db.system.name`, `db.query.text`, `db.response.returned_rows`, `db.response.affected_rows`, `otel.kind = "client"`). The existing close-time event is emitted inside the span via `Span::in_scope` so callers get both — the span for OTel correlation, the event for the existing `rows_affected`/`elapsed_secs` fields under whatever level `LogSettings` configures. `QueryLogger::new` keeps its old signature; drivers attach the system name via the additive `with_db_system_name` builder, which records the `db.system.name` attribute on the already-open span. QueryLogger stores `Span`, not `EnteredSpan`, so it stays `Send` across the postgres/mysql `try_stream!` paths that broke #3176. A small `InstrumentedStream` wrapper (using `pin-project-lite`, already present transitively via the async runtimes) enters the span on each `poll_next` and drops the guard before returning, so no guard is held across an await. SQLite's synchronous `Iterator` path just relies on the close-event in-scope emission. Design constraints come from abonander's 2026-04-14 review on #3313: current OTel semconv field names, no `tracing::enabled!()` (broken per tokio-rs/tracing#2448), hardcoded verbosity rather than runtime- configurable span levels.
1 parent 6956cef commit c8859ca

6 files changed

Lines changed: 302 additions & 41 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqlx-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ futures-util = { version = "0.3.32", default-features = false, features = ["allo
8888
log = { version = "0.4.18", default-features = false }
8989
memchr = { version = "2.5.0", default-features = false }
9090
percent-encoding = "2.3.0"
91+
pin-project-lite = "0.2.13"
9192
serde = { version = "1.0.219", features = ["derive", "rc"], optional = true }
9293
serde_json = { version = "1.0.142", features = ["raw_value"], optional = true }
9394
toml = { version = "0.8.16", optional = true }

sqlx-core/src/logger.rs

Lines changed: 283 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
use crate::{connection::LogSettings, sql_str::SqlStr};
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
24
use std::time::Instant;
35

6+
use futures_core::Stream;
7+
use pin_project_lite::pin_project;
8+
use tracing::Span;
9+
410
// Yes these look silly. `tracing` doesn't currently support dynamic levels
511
// https://github.com/tokio-rs/tracing/issues/372
612
#[doc(hidden)]
@@ -66,19 +72,58 @@ pub struct QueryLogger {
6672
rows_affected: u64,
6773
start: Instant,
6874
settings: LogSettings,
75+
span: Span,
6976
}
7077

7178
impl QueryLogger {
7279
pub fn new(sql: SqlStr, settings: LogSettings) -> Self {
80+
// Hardcoded INFO level per maintainer review of #3313: libraries should pick a
81+
// level and let consumers filter via `EnvFilter`. Field names follow the OTel
82+
// database span semantic conventions
83+
// (https://opentelemetry.io/docs/specs/semconv/database/database-spans/).
84+
// `otel.kind = "client"` is the magic field that `tracing-opentelemetry` reads
85+
// to set the exported `SpanKind`. `db.system.name` is declared empty here and
86+
// filled in by drivers via `with_db_system_name`, so adding the field doesn't
87+
// force a signature break on `QueryLogger::new`.
88+
let summary = parse_query_summary(sql.as_str());
89+
let operation = summary
90+
.split_whitespace()
91+
.next()
92+
.map(str::to_owned)
93+
.unwrap_or_default();
94+
let span = tracing::info_span!(
95+
target: "sqlx::query",
96+
"db.query",
97+
"db.system.name" = tracing::field::Empty,
98+
"db.operation.name" = operation,
99+
"db.query.summary" = summary,
100+
"db.query.text" = sql.as_str(),
101+
"db.response.returned_rows" = tracing::field::Empty,
102+
"db.response.affected_rows" = tracing::field::Empty,
103+
"otel.kind" = "client",
104+
);
105+
73106
Self {
74107
sql,
75108
rows_returned: 0,
76109
rows_affected: 0,
77110
start: Instant::now(),
78111
settings,
112+
span,
79113
}
80114
}
81115

116+
/// Records the OTel `db.system.name` attribute on the query span.
117+
///
118+
/// Drivers should call this with their canonical OTel system identifier
119+
/// (`"postgresql"`, `"mysql"`, `"sqlite"`, etc. — see the OTel database span
120+
/// semantic conventions). Separate from `new` so adding the field doesn't break
121+
/// callers that construct `QueryLogger` directly.
122+
pub fn with_db_system_name(self, name: &'static str) -> Self {
123+
self.span.record("db.system.name", name);
124+
self
125+
}
126+
82127
pub fn increment_rows_returned(&mut self) {
83128
self.rows_returned += 1;
84129
}
@@ -91,9 +136,25 @@ impl QueryLogger {
91136
&self.sql
92137
}
93138

139+
/// Clone the span attached to this query.
140+
///
141+
/// Use with [`InstrumentedStream`] (or `Future::instrument` for plain futures) to
142+
/// attribute child events emitted during query execution to the query's span. The
143+
/// `Span` is `Send`; never store an `EnteredSpan` here (see #3176).
144+
pub fn span(&self) -> Span {
145+
self.span.clone()
146+
}
147+
94148
pub fn finish(&self) {
95149
let elapsed = self.start.elapsed();
96150

151+
// Record the per-query result counts on the span before it closes so OTel
152+
// exporters see them as span attributes.
153+
self.span
154+
.record("db.response.returned_rows", self.rows_returned);
155+
self.span
156+
.record("db.response.affected_rows", self.rows_affected);
157+
97158
let was_slow = elapsed >= self.settings.slow_statements_duration;
98159

99160
let lvl = if was_slow {
@@ -117,38 +178,43 @@ impl QueryLogger {
117178
String::new()
118179
};
119180

120-
if was_slow {
121-
private_tracing_dynamic_event!(
122-
target: "sqlx::query",
123-
tracing_level,
124-
summary,
125-
db.statement = sql,
126-
rows_affected = self.rows_affected,
127-
rows_returned = self.rows_returned,
128-
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
129-
?elapsed,
130-
// Search friendly - numeric
131-
elapsed_secs = elapsed.as_secs_f64(),
132-
// When logging to JSON, one can trigger alerts from the presence of this field.
133-
slow_threshold=?self.settings.slow_statements_duration,
134-
// Make sure to use "slow" in the message as that's likely
135-
// what people will grep for.
136-
"slow statement: execution time exceeded alert threshold"
137-
);
138-
} else {
139-
private_tracing_dynamic_event!(
140-
target: "sqlx::query",
141-
tracing_level,
142-
summary,
143-
db.statement = sql,
144-
rows_affected = self.rows_affected,
145-
rows_returned = self.rows_returned,
146-
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
147-
?elapsed,
148-
// Search friendly - numeric
149-
elapsed_secs = elapsed.as_secs_f64(),
150-
);
151-
}
181+
// Emit the existing close-time event inside the query span so consumers
182+
// see both the span (for OTel correlation) and the event (for the
183+
// backwards-compatible `rows_affected`/`elapsed_secs` fields).
184+
self.span.in_scope(|| {
185+
if was_slow {
186+
private_tracing_dynamic_event!(
187+
target: "sqlx::query",
188+
tracing_level,
189+
summary,
190+
db.statement = sql,
191+
rows_affected = self.rows_affected,
192+
rows_returned = self.rows_returned,
193+
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
194+
?elapsed,
195+
// Search friendly - numeric
196+
elapsed_secs = elapsed.as_secs_f64(),
197+
// When logging to JSON, one can trigger alerts from the presence of this field.
198+
slow_threshold=?self.settings.slow_statements_duration,
199+
// Make sure to use "slow" in the message as that's likely
200+
// what people will grep for.
201+
"slow statement: execution time exceeded alert threshold"
202+
);
203+
} else {
204+
private_tracing_dynamic_event!(
205+
target: "sqlx::query",
206+
tracing_level,
207+
summary,
208+
db.statement = sql,
209+
rows_affected = self.rows_affected,
210+
rows_returned = self.rows_returned,
211+
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
212+
?elapsed,
213+
// Search friendly - numeric
214+
elapsed_secs = elapsed.as_secs_f64(),
215+
);
216+
}
217+
});
152218
}
153219
}
154220
}
@@ -160,10 +226,195 @@ impl Drop for QueryLogger {
160226
}
161227
}
162228

229+
pin_project! {
230+
/// Wraps a [`Stream`] so each `poll_next` runs inside the given [`Span`].
231+
///
232+
/// This is the `Stream` counterpart to `tracing::Instrument` for futures. It
233+
/// re-enters the span on every poll and drops the guard before yielding, so no
234+
/// `EnteredSpan` is ever held across an await point — fixing the `!Send` issue
235+
/// that sank #3176. The inner stream is projected via `pin-project-lite`, so this
236+
/// adds no allocation and keeps the module free of `unsafe` pin code.
237+
pub struct InstrumentedStream<S> {
238+
#[pin]
239+
inner: S,
240+
span: Span,
241+
}
242+
}
243+
244+
impl<S> InstrumentedStream<S> {
245+
pub fn new(inner: S, span: Span) -> Self {
246+
Self { inner, span }
247+
}
248+
}
249+
250+
impl<S: Stream> Stream for InstrumentedStream<S> {
251+
type Item = S::Item;
252+
253+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254+
let this = self.project();
255+
let _enter = this.span.enter();
256+
this.inner.poll_next(cx)
257+
}
258+
}
259+
163260
pub fn parse_query_summary(sql: &str) -> String {
164261
// For now, just take the first 4 words
165262
sql.split_whitespace()
166263
.take(4)
167264
.collect::<Vec<&str>>()
168265
.join(" ")
169266
}
267+
268+
#[cfg(test)]
269+
mod tests {
270+
use super::*;
271+
use crate::sql_str::SqlSafeStr;
272+
use std::sync::{Arc, Mutex};
273+
use tracing::field::{Field, Visit};
274+
use tracing::span::{Attributes, Record};
275+
use tracing::subscriber::{with_default, Subscriber};
276+
use tracing::{Event, Id, Metadata};
277+
278+
struct CapturedSpan {
279+
name: &'static str,
280+
target: String,
281+
level: tracing::Level,
282+
fields: std::collections::HashMap<String, String>,
283+
closed: bool,
284+
contained_events: usize,
285+
}
286+
287+
#[derive(Default)]
288+
struct CaptureSubscriber {
289+
next_id: std::sync::atomic::AtomicU64,
290+
spans: Mutex<std::collections::HashMap<u64, CapturedSpan>>,
291+
current: Mutex<Vec<u64>>,
292+
}
293+
294+
struct StringVisitor<'a>(&'a mut std::collections::HashMap<String, String>);
295+
impl Visit for StringVisitor<'_> {
296+
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
297+
self.0
298+
.insert(field.name().to_string(), format!("{value:?}"));
299+
}
300+
fn record_str(&mut self, field: &Field, value: &str) {
301+
self.0.insert(field.name().to_string(), value.to_string());
302+
}
303+
fn record_u64(&mut self, field: &Field, value: u64) {
304+
self.0.insert(field.name().to_string(), value.to_string());
305+
}
306+
fn record_i64(&mut self, field: &Field, value: i64) {
307+
self.0.insert(field.name().to_string(), value.to_string());
308+
}
309+
fn record_bool(&mut self, field: &Field, value: bool) {
310+
self.0.insert(field.name().to_string(), value.to_string());
311+
}
312+
}
313+
314+
impl Subscriber for CaptureSubscriber {
315+
fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
316+
true
317+
}
318+
fn new_span(&self, attrs: &Attributes<'_>) -> Id {
319+
let id = self
320+
.next_id
321+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
322+
+ 1;
323+
let mut span = CapturedSpan {
324+
name: attrs.metadata().name(),
325+
target: attrs.metadata().target().to_string(),
326+
level: *attrs.metadata().level(),
327+
fields: std::collections::HashMap::new(),
328+
closed: false,
329+
contained_events: 0,
330+
};
331+
attrs.record(&mut StringVisitor(&mut span.fields));
332+
self.spans.lock().unwrap().insert(id, span);
333+
Id::from_u64(id)
334+
}
335+
fn record(&self, span: &Id, values: &Record<'_>) {
336+
if let Some(s) = self.spans.lock().unwrap().get_mut(&span.into_u64()) {
337+
values.record(&mut StringVisitor(&mut s.fields));
338+
}
339+
}
340+
fn record_follows_from(&self, _: &Id, _: &Id) {}
341+
fn event(&self, _event: &Event<'_>) {
342+
let current = self.current.lock().unwrap();
343+
if let Some(&id) = current.last() {
344+
if let Some(s) = self.spans.lock().unwrap().get_mut(&id) {
345+
s.contained_events += 1;
346+
}
347+
}
348+
}
349+
fn enter(&self, span: &Id) {
350+
self.current.lock().unwrap().push(span.into_u64());
351+
}
352+
fn exit(&self, _span: &Id) {
353+
self.current.lock().unwrap().pop();
354+
}
355+
fn try_close(&self, id: Id) -> bool {
356+
if let Some(s) = self.spans.lock().unwrap().get_mut(&id.into_u64()) {
357+
s.closed = true;
358+
}
359+
true
360+
}
361+
}
362+
363+
#[test]
364+
fn query_logger_opens_and_closes_span_with_expected_fields() {
365+
let subscriber = Arc::new(CaptureSubscriber::default());
366+
with_default(subscriber.clone(), || {
367+
let settings = LogSettings::default();
368+
let sql = "SELECT id, name FROM users WHERE id = 1".into_sql_str();
369+
let mut logger = QueryLogger::new(sql, settings).with_db_system_name("postgresql");
370+
logger.increment_rows_returned();
371+
logger.increment_rows_returned();
372+
logger.increase_rows_affected(2);
373+
drop(logger);
374+
});
375+
376+
let spans = subscriber.spans.lock().unwrap();
377+
assert_eq!(spans.len(), 1, "exactly one span should be opened");
378+
let span = spans.values().next().unwrap();
379+
380+
assert_eq!(span.name, "db.query");
381+
assert_eq!(span.target, "sqlx::query");
382+
assert_eq!(span.level, tracing::Level::INFO);
383+
assert!(span.closed, "span must close on QueryLogger drop");
384+
assert!(
385+
span.contained_events >= 1,
386+
"the close-time event should fire inside the span"
387+
);
388+
389+
assert_eq!(
390+
span.fields.get("db.system.name").map(String::as_str),
391+
Some("postgresql")
392+
);
393+
assert_eq!(
394+
span.fields.get("db.operation.name").map(String::as_str),
395+
Some("SELECT")
396+
);
397+
assert_eq!(
398+
span.fields.get("otel.kind").map(String::as_str),
399+
Some("client")
400+
);
401+
assert!(span
402+
.fields
403+
.get("db.query.text")
404+
.is_some_and(|s| s.contains("SELECT id, name FROM users")));
405+
assert_eq!(
406+
span.fields
407+
.get("db.response.returned_rows")
408+
.map(String::as_str),
409+
Some("2"),
410+
"rows_returned must be recorded on the span before close"
411+
);
412+
assert_eq!(
413+
span.fields
414+
.get("db.response.affected_rows")
415+
.map(String::as_str),
416+
Some("2"),
417+
"rows_affected must be recorded on the span before close"
418+
);
419+
}
420+
}

0 commit comments

Comments
 (0)