Skip to content

Commit c49d542

Browse files
committed
test(query): add trace_debug OTLP dump utilities for direct and HTTP query tracing
1 parent 18aca7c commit c49d542

File tree

25 files changed

+1452
-108
lines changed

25 files changed

+1452
-108
lines changed

Cargo.lock

Lines changed: 147 additions & 73 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -521,12 +521,12 @@ vergen-gix = { version = "1", features = ["build", "cargo", "rustc"] }
521521

522522
# Observability
523523
env_logger = "0.11"
524-
fastrace = { version = "0.7.14", features = ["enable"] }
525-
fastrace-opentelemetry = "0.10"
524+
fastrace = { version = "0.7.17", features = ["enable"] }
525+
fastrace-opentelemetry = "0.16"
526526
log = { version = "0.4.27", features = ["serde", "kv_serde", "kv_unstable_std"] }
527527
logcall = "0.1.9"
528-
opentelemetry = { version = "0.29", features = ["trace", "logs"] }
529-
opentelemetry-otlp = { version = "0.29", default-features = false, features = [
528+
opentelemetry = { version = "0.31", features = ["trace", "logs"] }
529+
opentelemetry-otlp = { version = "0.31", default-features = false, features = [
530530
"trace",
531531
"logs",
532532
"metrics",
@@ -536,7 +536,8 @@ opentelemetry-otlp = { version = "0.29", default-features = false, features = [
536536
"http-proto",
537537
"reqwest-client",
538538
] }
539-
opentelemetry_sdk = { version = "0.29", features = ["trace", "logs", "rt-tokio"] }
539+
opentelemetry-proto = { version = "0.31", features = ["gen-tonic-messages", "trace", "with-serde"] }
540+
opentelemetry_sdk = { version = "0.31", features = ["trace", "logs", "rt-tokio"] }
540541
prometheus-client = "0.22"
541542
prometheus-parse = "0.2.3"
542543
tracing = "0.1.40"
@@ -548,6 +549,9 @@ sqllogictest = "0.28.4"
548549

549550
[workspace.lints.rust]
550551
async_fn_in_trait = "allow"
552+
# Temporary workaround for false positives from async_backtrace::framed and
553+
# fastrace::trace macro expansion under `cargo clippy --workspace --all-targets -- -D warnings`.
554+
unused_braces = "allow"
551555

552556
[workspace.lints.clippy]
553557
useless_format = "allow"

agents/debug-and-validation.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
- For Rust changes that will remain in the branch, use `cargo clippy` to confirm there are no compilation or lint errors.
44
- Start with partial verification when a full workspace pass is too expensive, but move toward stronger coverage before handoff when the resulting changes will remain in the branch.
5+
- Use the trace-debug utilities under [`src/query/service/tests/it/trace_debug/README.md`](../src/query/service/tests/it/trace_debug/README.md) when the question is about query lifecycle or stage transitions, such as parser/planner/interpreter/pipeline flow, HTTP query state and pagination behavior, or which execution spans and attached events appear for a specific SQL path.
56
- For temporary investigation outputs that will not be submitted, keep validation minimal and purpose-driven. Run only the checks needed to establish the conclusion or unblock the investigation.
67
- If investigation work begins producing code, tests, or docs that should remain in the branch, raise the validation bar before handoff.
78

src/common/tracing/src/init.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ pub fn init_logging(
198198
let reporter = rt.block_on(async {
199199
fastrace_opentelemetry::OpenTelemetryReporter::new(
200200
exporter,
201-
opentelemetry::trace::SpanKind::Server,
202201
Cow::Owned(resource),
203202
opentelemetry::InstrumentationScope::builder(trace_name).build(),
204203
)

src/query/pipeline/src/core/processor.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ unsafe impl Send for ProcessorPtr {}
125125
unsafe impl Sync for ProcessorPtr {}
126126

127127
impl ProcessorPtr {
128+
fn current_thread_name() -> String {
129+
std::thread::current()
130+
.name()
131+
.unwrap_or("unnamed")
132+
.to_string()
133+
}
134+
128135
#[allow(clippy::arc_with_non_send_sync)]
129136
pub fn create(inner: Box<dyn Processor>) -> ProcessorPtr {
130137
ProcessorPtr {
@@ -174,8 +181,10 @@ impl ProcessorPtr {
174181
/// # Safety
175182
pub unsafe fn process(&self) -> Result<()> {
176183
unsafe {
184+
let thread_name = Self::current_thread_name();
177185
let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name()))
178-
.with_property(|| ("graph-node-id", self.id().index().to_string()));
186+
.with_property(|| ("graph-node-id", self.id().index().to_string()))
187+
.with_property(|| ("thread_name", thread_name.clone()));
179188

180189
match (*self.inner.get()).process() {
181190
Ok(_) => Ok(()),
@@ -201,6 +210,7 @@ impl ProcessorPtr {
201210
let id = self.id();
202211
let mut name = self.name();
203212
name.push_str("::async_process");
213+
let thread_name = Self::current_thread_name();
204214

205215
let task = (*self.inner.get()).async_process();
206216

@@ -216,7 +226,8 @@ impl ProcessorPtr {
216226
let inner = self.inner.clone();
217227
async move {
218228
let span = Span::enter_with_local_parent(name)
219-
.with_property(|| ("graph-node-id", id.index().to_string()));
229+
.with_property(|| ("graph-node-id", id.index().to_string()))
230+
.with_property(|| ("thread_name", thread_name.clone()));
220231

221232
match task.await {
222233
Ok(_) => {

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,15 @@ zip = { workspace = true }
182182
arrow-cast = { workspace = true }
183183
databend-common-sql-test-support = { workspace = true }
184184
databend-storages-common-pruner = { workspace = true }
185+
fastrace-opentelemetry = { workspace = true }
185186
geo = { workspace = true }
186187
goldenfile = { workspace = true }
187188
hex = { workspace = true }
188189
hyper-util = { workspace = true }
189190
jwt-simple = { workspace = true }
190191
maplit = { workspace = true }
191192
mysql_async = { workspace = true }
193+
opentelemetry-proto = { workspace = true }
192194
p256 = { workspace = true }
193195
pretty_assertions = { workspace = true }
194196
proptest = { workspace = true }

src/query/service/src/interpreters/interpreter.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_sql::plans::Plan;
4141
use databend_storages_common_cache::CacheManager;
4242
use derive_visitor::DriveMut;
4343
use derive_visitor::VisitorMut;
44+
use fastrace::collector::SpanContext;
4445
use md5::Digest;
4546
use md5::Md5;
4647

@@ -161,7 +162,11 @@ pub trait Interpreter: Sync + Send {
161162
complete_executor.execute()?;
162163
self.inject_result()
163164
} else {
164-
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
165+
let thread_span_parent = ctx
166+
.get_executor_tracing_context()
167+
.or_else(SpanContext::current_local_parent);
168+
let pulling_executor =
169+
PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?;
165170

166171
ctx.set_executor(pulling_executor.get_inner())?;
167172
Ok(Box::pin(ProgressStream::try_create(

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use databend_common_storages_basic::gen_result_cache_key;
4343
use databend_common_storages_fuse::FuseLazyPartInfo;
4444
use databend_common_storages_fuse::FuseTable;
4545
use databend_common_users::UserApiProvider;
46+
use fastrace::collector::SpanContext;
4647
use serde::Serialize;
4748
use serde_json;
4849

@@ -552,7 +553,15 @@ impl ExplainInterpreter {
552553
executor.execute()?;
553554
}
554555
false => {
555-
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
556+
let thread_span_parent = self
557+
.ctx
558+
.get_executor_tracing_context()
559+
.or_else(SpanContext::current_local_parent);
560+
let mut executor = PipelinePullingExecutor::from_pipelines(
561+
build_res,
562+
settings,
563+
thread_span_parent,
564+
)?;
556565
executor.start();
557566
while (executor.pull_data()?).is_some() {}
558567
}

src/query/service/src/interpreters/interpreter_explain_perf.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use databend_common_pipeline::core::always_callback;
3434
use databend_common_sql::Planner;
3535
use databend_meta_plugin_semaphore::acquirer::Permit;
3636
use databend_meta_runtime::DatabendRuntime;
37+
use fastrace::collector::SpanContext;
3738

3839
use crate::interpreters::Interpreter;
3940
use crate::interpreters::InterpreterFactory;
@@ -174,7 +175,11 @@ impl ExplainPerfInterpreter {
174175
ctx.set_executor(executor.get_inner())?;
175176
executor.execute()?;
176177
} else {
177-
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
178+
let thread_span_parent = ctx
179+
.get_executor_tracing_context()
180+
.or_else(SpanContext::current_local_parent);
181+
let mut executor =
182+
PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?;
178183
ctx.set_executor(executor.get_inner())?;
179184
executor.start();
180185
while (executor.pull_data()?).is_some() {}

src/query/service/src/pipelines/executor/pipeline_complete_executor.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use databend_common_base::runtime::drop_guard;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_exception::Result;
2323
use databend_common_pipeline::core::Pipeline;
24-
use fastrace::func_path;
2524
use fastrace::prelude::*;
2625

2726
use crate::pipelines::executor::ExecutorSettings;
@@ -101,11 +100,21 @@ impl PipelineCompleteExecutor {
101100
.flatten()
102101
}
103102

104-
fn thread_function(&self) -> impl Fn() -> Result<()> + use<> {
105-
let span = Span::enter_with_local_parent(func_path!());
103+
fn thread_function(&self) -> impl FnOnce() -> Result<()> + use<> {
104+
let parent = SpanContext::current_local_parent();
106105
let executor = self.executor.clone();
107106

108107
move || {
108+
let span = if let Some(parent) = parent {
109+
let thread_name = std::thread::current()
110+
.name()
111+
.unwrap_or("unnamed")
112+
.to_string();
113+
Span::root("PipelineCompleteExecutor::thread_function", parent)
114+
.with_property(|| ("thread_name", thread_name))
115+
} else {
116+
Span::noop()
117+
};
109118
let _g = span.set_local_parent();
110119
executor.execute()
111120
}

0 commit comments

Comments
 (0)