diff --git a/Cargo.lock b/Cargo.lock index aac946db8ee27..eab0f68d50eed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,7 +579,7 @@ dependencies = [ "futures", "once_cell", "paste", - "prost", + "prost 0.13.5", "prost-types", "tonic 0.13.1", ] @@ -2690,6 +2690,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "const-hex" +version = "1.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531185e432bb31db1ecda541e9e7ab21468d4d844ad7505e0546a49b4945d49b" +dependencies = [ + "cfg-if", + "cpufeatures", + "proptest", + "serde_core", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -3534,7 +3546,7 @@ dependencies = [ "databend-common-exception", "hyper-util", "lenient_semver", - "prost", + "prost 0.13.5", "prost-build", "semver", "serde", @@ -3643,7 +3655,7 @@ dependencies = [ "opendal", "parquet 56.2.0", "paste", - "prost", + "prost 0.13.5", "redis", "reqwest", "rustc-demangle", @@ -3954,7 +3966,7 @@ dependencies = [ "fastrace", "futures", "log", - "prost", + "prost 0.13.5", "seq-marked", "serde", "serde_json", @@ -3983,7 +3995,7 @@ dependencies = [ "log", "logcall", "maplit", - "prost", + "prost 0.13.5", "rand 0.8.5", "seq-marked", "serde", @@ -4018,7 +4030,7 @@ dependencies = [ "num-derive", "num-traits", "paste", - "prost", + "prost 0.13.5", "serde", "serde_json", "sha1", @@ -4056,7 +4068,7 @@ dependencies = [ "futures", "mlua", "openraft", - "prost", + "prost 0.13.5", "reqwest", "serde", "serde_json", @@ -4079,7 +4091,7 @@ dependencies = [ "databend-meta", "databend-meta-client 260205.4.0", "openraft", - "prost", + "prost 0.13.5", "regex", "serde", "serde_json", @@ -4258,7 +4270,7 @@ dependencies = [ "maplit", "num", "pretty_assertions", - "prost", + "prost 0.13.5", "thiserror 1.0.69", ] @@ -4269,7 +4281,7 @@ dependencies = [ "lenient_semver", "num-derive", "num-traits", - "prost", + "prost 0.13.5", "prost-build", "semver", "tonic 0.13.1", @@ -4935,9 +4947,9 @@ dependencies = [ "log", "logforth", "opendal", - "opentelemetry 0.29.1", - "opentelemetry-otlp 0.29.0", - "opentelemetry_sdk 0.29.0", + "opentelemetry 0.31.0", + "opentelemetry-otlp 0.31.1", + "opentelemetry_sdk 0.31.0", "parquet 56.2.0", "serde", "serde_json", @@ -5325,7 +5337,7 @@ dependencies = [ "openraft", "peel-off", "prometheus-client 0.22.3", - "prost", + "prost 0.13.5", "raft-log", "rustls 0.23.36", "seq-marked", @@ -5465,7 +5477,7 @@ dependencies = [ "logcall", "once_cell", "parking_lot 0.12.3", - "prost", + "prost 0.13.5", "serde", "serde_json", "thiserror 1.0.69", @@ -5499,7 +5511,7 @@ dependencies = [ "logcall", "once_cell", "parking_lot 0.12.3", - "prost", + "prost 0.13.5", "serde", "serde_json", "thiserror 1.0.69", @@ -5656,7 +5668,7 @@ dependencies = [ "num-derive", "num-traits", "openraft", - "prost", + "prost 0.13.5", "prost-build", "serde", "serde_json", @@ -5799,7 +5811,7 @@ dependencies = [ "num_cpus", "openraft", "pretty_assertions", - "prost", + "prost 0.13.5", "prost-build", "rotbl", "serde", @@ -5829,7 +5841,7 @@ dependencies = [ "num_cpus", "openraft", "pretty_assertions", - "prost", + "prost 0.13.5", "rotbl", "serde", "serde_json", @@ -5969,6 +5981,7 @@ dependencies = [ "either", "ethnum", "fastrace", + "fastrace-opentelemetry", "flatbuffers", "futures", "futures-util", @@ -5995,8 +6008,9 @@ dependencies = [ "num_cpus", "opendal", "opensrv-mysql", - "opentelemetry 0.29.1", - "opentelemetry_sdk 0.29.0", + "opentelemetry 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", "p256", "parking_lot 0.12.3", "parquet 56.2.0", @@ -6006,7 +6020,7 @@ dependencies = [ "pretty_assertions", "prometheus-client 0.22.3", "proptest", - "prost", + "prost 0.13.5", "rand 0.8.5", "recursive", "redis", @@ -7236,9 +7250,9 @@ dependencies = [ [[package]] name = "fastrace" -version = "0.7.14" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "318783b9fefe06130ab664ff1779215657586b004c0c7f3d6ece16d658936d06" +checksum = "2130caec636d7a1d23b173576674ced1af967228642ceaeb6a1b4705c282b00e" dependencies = [ "fastant", "fastrace-macro", @@ -7251,9 +7265,9 @@ dependencies = [ [[package]] name = "fastrace-macro" -version = "0.7.14" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7079009cf129d63c850dee732b58d7639d278a47ad99c607954ac94cfd57ef4" +checksum = "0b35f67e02527fca6515ff61f922360df781f477daf6a806fff16bd59525dee5" dependencies = [ "proc-macro-error2", "proc-macro2", @@ -7263,14 +7277,14 @@ dependencies = [ [[package]] name = "fastrace-opentelemetry" -version = "0.10.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c1cc020aea54a15f53ca1d4e42321d40f9e767f42d222ea1fdbe9c43ea49d15" +checksum = "4644a8a7ce6d20d83e73d9562f323388e4d817470d40bcb51fa2fe328db58cd2" dependencies = [ "fastrace", "log", - "opentelemetry 0.29.1", - "opentelemetry_sdk 0.29.0", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", "pollster", ] @@ -10579,7 +10593,7 @@ dependencies = [ "num_cpus", "object_store", "pin-project", - "prost", + "prost 0.13.5", "rand 0.9.2", "roaring 0.10.12", "serde_json", @@ -10619,7 +10633,7 @@ dependencies = [ "log", "lz4", "num-traits", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "protobuf-src", @@ -10658,7 +10672,7 @@ dependencies = [ "log", "num-traits", "object_store", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "protobuf-src", @@ -10695,7 +10709,7 @@ dependencies = [ "object_store", "path_abs", "pin-project", - "prost", + "prost 0.13.5", "rand 0.9.2", "serde", "shellexpand 3.1.2", @@ -10755,7 +10769,7 @@ dependencies = [ "lance-io", "log", "object_store", - "prost", + "prost 0.13.5", "prost-build", "prost-types", "protobuf-src", @@ -12174,7 +12188,7 @@ dependencies = [ "moka", "percent-encoding", "prometheus-client 0.23.1", - "prost", + "prost 0.13.5", "quick-xml 0.38.4", "reqsign", "reqwest", @@ -12328,9 +12342,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.29.1" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -12356,16 +12370,15 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry 0.29.1", + "opentelemetry 0.31.0", "reqwest", - "tracing", ] [[package]] @@ -12381,7 +12394,7 @@ dependencies = [ "opentelemetry-http 0.28.0", "opentelemetry-proto 0.28.0", "opentelemetry_sdk 0.28.0", - "prost", + "prost 0.13.5", "reqwest", "serde_json", "thiserror 2.0.18", @@ -12392,21 +12405,20 @@ dependencies = [ [[package]] name = "opentelemetry-otlp" -version = "0.29.0" +version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +checksum = "1f69cd6acbb9af919df949cd1ec9e5e7fdc2ef15d234b6b795aaa525cc02f71f" dependencies = [ - "futures-core", "http 1.3.1", - "opentelemetry 0.29.1", - "opentelemetry-http 0.29.0", - "opentelemetry-proto 0.29.0", - "opentelemetry_sdk 0.29.0", - "prost", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.3", "reqwest", "thiserror 2.0.18", "tokio", - "tonic 0.12.3", + "tonic 0.14.5", "tracing", ] @@ -12420,21 +12432,26 @@ dependencies = [ "hex", "opentelemetry 0.28.0", "opentelemetry_sdk 0.28.0", - "prost", + "prost 0.13.5", "serde", "tonic 0.12.3", ] [[package]] name = "opentelemetry-proto" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ - "opentelemetry 0.29.1", - "opentelemetry_sdk 0.29.0", - "prost", - "tonic 0.12.3", + "base64 0.22.1", + "const-hex", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.3", + "serde", + "serde_json", + "tonic 0.14.5", + "tonic-prost", ] [[package]] @@ -12460,22 +12477,19 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", "futures-util", - "glob", - "opentelemetry 0.29.1", + "opentelemetry 0.31.0", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.18", "tokio", "tokio-stream", - "tracing", ] [[package]] @@ -12496,7 +12510,7 @@ dependencies = [ "lz4_flex", "lzokay-native", "num", - "prost", + "prost 0.13.5", "snafu", "snap", "tokio", @@ -13496,7 +13510,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +dependencies = [ + "bytes", + "prost-derive 0.14.3", ] [[package]] @@ -13512,7 +13536,7 @@ dependencies = [ "once_cell", "petgraph", "prettyplease", - "prost", + "prost 0.13.5", "prost-types", "regex", "syn 2.0.106", @@ -13532,13 +13556,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "prost-derive" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "prost-types" version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" dependencies = [ - "prost", + "prost 0.13.5", ] [[package]] @@ -17119,7 +17156,6 @@ dependencies = [ "axum 0.7.9", "base64 0.22.1", "bytes", - "flate2", "h2 0.4.13", "http 1.3.1", "http-body 1.0.1", @@ -17129,7 +17165,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "socket2 0.5.9", "tokio", "tokio-stream", @@ -17158,7 +17194,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "rustls-native-certs 0.8.1", "socket2 0.5.9", "tokio", @@ -17170,6 +17206,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "flate2", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.8.1", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.13.1" @@ -17184,13 +17247,24 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "tonic-prost" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55376a0bbaa4975a3f10d009ad763d8f4108f067c7c2e74f3001fb49778d309" +dependencies = [ + "bytes", + "prost 0.14.3", + "tonic 0.14.5", +] + [[package]] name = "tonic-reflection" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1" dependencies = [ - "prost", + "prost 0.13.5", "prost-types", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index dc95056e7c8d1..f192e18f7ef9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -522,12 +522,12 @@ vergen-gix = { version = "1", features = ["build", "cargo", "rustc"] } # Observability env_logger = "0.11" -fastrace = { version = "0.7.14", features = ["enable"] } -fastrace-opentelemetry = "0.10" +fastrace = { version = "0.7.17", features = ["enable"] } +fastrace-opentelemetry = "0.16" log = { version = "0.4.27", features = ["serde", "kv_serde", "kv_unstable_std"] } logcall = "0.1.9" -opentelemetry = { version = "0.29", features = ["trace", "logs"] } -opentelemetry-otlp = { version = "0.29", default-features = false, features = [ +opentelemetry = { version = "0.31", features = ["trace", "logs"] } +opentelemetry-otlp = { version = "0.31", default-features = false, features = [ "trace", "logs", "metrics", @@ -537,7 +537,8 @@ opentelemetry-otlp = { version = "0.29", default-features = false, features = [ "http-proto", "reqwest-client", ] } -opentelemetry_sdk = { version = "0.29", features = ["trace", "logs", "rt-tokio"] } +opentelemetry-proto = { version = "0.31", features = ["gen-tonic-messages", "trace", "with-serde"] } +opentelemetry_sdk = { version = "0.31", features = ["trace", "logs", "rt-tokio"] } prometheus-client = "0.22" prometheus-parse = "0.2.3" tracing = "0.1.40" @@ -549,6 +550,9 @@ sqllogictest = "0.28.4" [workspace.lints.rust] async_fn_in_trait = "allow" +# Temporary workaround for false positives from async_backtrace::framed and +# fastrace::trace macro expansion under `cargo clippy --workspace --all-targets -- -D warnings`. +unused_braces = "allow" [workspace.lints.clippy] useless_format = "allow" diff --git a/agents/debug-and-validation.md b/agents/debug-and-validation.md index 207eb586980ac..87fab6be02bc9 100644 --- a/agents/debug-and-validation.md +++ b/agents/debug-and-validation.md @@ -2,6 +2,7 @@ - For Rust changes that will remain in the branch, use `cargo clippy` to confirm there are no compilation or lint errors. - Start with partial verification when a full workspace pass is too expensive, but move toward stronger coverage before handoff when the resulting changes will remain in the branch. +- Use the trace-debug utilities under [`src/query/service/tests/it/trace_debug/README.md`](../src/query/service/tests/it/trace_debug/README.md) when the question is about query lifecycle or stage transitions, such as parser/planner/interpreter/pipeline flow, HTTP query state and pagination behavior, or which execution spans and attached events appear for a specific SQL path. - For temporary investigation outputs that will not be submitted, keep validation minimal and purpose-driven. Run only the checks needed to establish the conclusion or unblock the investigation. - If investigation work begins producing code, tests, or docs that should remain in the branch, raise the validation bar before handoff. diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 0dff1dc9a9b87..33ea030ced0f9 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -198,7 +198,6 @@ pub fn init_logging( let reporter = rt.block_on(async { fastrace_opentelemetry::OpenTelemetryReporter::new( exporter, - opentelemetry::trace::SpanKind::Server, Cow::Owned(resource), opentelemetry::InstrumentationScope::builder(trace_name).build(), ) diff --git a/src/query/pipeline/src/core/processor.rs b/src/query/pipeline/src/core/processor.rs index f63c6d7a67984..437aee16a6db7 100644 --- a/src/query/pipeline/src/core/processor.rs +++ b/src/query/pipeline/src/core/processor.rs @@ -124,7 +124,19 @@ unsafe impl Send for ProcessorPtr {} unsafe impl Sync for ProcessorPtr {} +thread_local! { + // Processor spans are created on the hot path, so cache the current thread name per thread. + static CURRENT_THREAD_NAME: String = std::thread::current() + .name() + .unwrap_or("unnamed") + .to_string(); +} + impl ProcessorPtr { + fn current_thread_name() -> String { + CURRENT_THREAD_NAME.with(|thread_name| thread_name.clone()) + } + #[allow(clippy::arc_with_non_send_sync)] pub fn create(inner: Box) -> ProcessorPtr { ProcessorPtr { @@ -175,7 +187,8 @@ impl ProcessorPtr { pub unsafe fn process(&self) -> Result<()> { unsafe { let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name())) - .with_property(|| ("graph-node-id", self.id().index().to_string())); + .with_property(|| ("graph-node-id", self.id().index().to_string())) + .with_property(|| ("thread_name", Self::current_thread_name())); match (*self.inner.get()).process() { Ok(_) => Ok(()), @@ -215,8 +228,12 @@ impl ProcessorPtr { let inner = self.inner.clone(); async move { - let span = Span::enter_with_local_parent(name) - .with_property(|| ("graph-node-id", id.index().to_string())); + let span = match SpanContext::current_local_parent() { + Some(parent) if parent.sampled => Span::enter_with_local_parent(name) + .with_property(|| ("graph-node-id", id.index().to_string())) + .with_property(|| ("thread_name", Self::current_thread_name())), + _ => Span::noop(), + }; match task.await { Ok(_) => { diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 547d06c4f83af..d767857a5ae60 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -182,6 +182,7 @@ zip = { workspace = true } arrow-cast = { workspace = true } databend-common-sql-test-support = { workspace = true } databend-storages-common-pruner = { workspace = true } +fastrace-opentelemetry = { workspace = true } geo = { workspace = true } goldenfile = { workspace = true } hex = { workspace = true } @@ -189,6 +190,7 @@ hyper-util = { workspace = true } jwt-simple = { workspace = true } maplit = { workspace = true } mysql_async = { workspace = true } +opentelemetry-proto = { workspace = true } p256 = { workspace = true } pretty_assertions = { workspace = true } proptest = { workspace = true } diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index f324c6df91ab6..24a1348166597 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -41,6 +41,7 @@ use databend_common_sql::plans::Plan; use databend_storages_common_cache::CacheManager; use derive_visitor::DriveMut; use derive_visitor::VisitorMut; +use fastrace::collector::SpanContext; use md5::Digest; use md5::Md5; @@ -161,7 +162,11 @@ pub trait Interpreter: Sync + Send { complete_executor.execute()?; self.inject_result() } else { - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let thread_span_parent = ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let pulling_executor = + PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?; ctx.set_executor(pulling_executor.get_inner())?; Ok(Box::pin(ProgressStream::try_create( diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index 87eb981bf43b3..709f02c365342 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -43,6 +43,7 @@ use databend_common_storages_basic::gen_result_cache_key; use databend_common_storages_fuse::FuseLazyPartInfo; use databend_common_storages_fuse::FuseTable; use databend_common_users::UserApiProvider; +use fastrace::collector::SpanContext; use serde::Serialize; use serde_json; @@ -552,7 +553,15 @@ impl ExplainInterpreter { executor.execute()?; } false => { - let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let thread_span_parent = self + .ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let mut executor = PipelinePullingExecutor::from_pipelines( + build_res, + settings, + thread_span_parent, + )?; executor.start(); while (executor.pull_data()?).is_some() {} } diff --git a/src/query/service/src/interpreters/interpreter_explain_perf.rs b/src/query/service/src/interpreters/interpreter_explain_perf.rs index 95aba4d704e4f..b5b2b5f08bec8 100644 --- a/src/query/service/src/interpreters/interpreter_explain_perf.rs +++ b/src/query/service/src/interpreters/interpreter_explain_perf.rs @@ -34,6 +34,7 @@ use databend_common_pipeline::core::always_callback; use databend_common_sql::Planner; use databend_meta_plugin_semaphore::acquirer::Permit; use databend_meta_runtime::DatabendRuntime; +use fastrace::collector::SpanContext; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; @@ -174,7 +175,11 @@ impl ExplainPerfInterpreter { ctx.set_executor(executor.get_inner())?; executor.execute()?; } else { - let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let thread_span_parent = ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let mut executor = + PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?; ctx.set_executor(executor.get_inner())?; executor.start(); while (executor.pull_data()?).is_some() {} diff --git a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs index 7b6469d195772..58f5aab6131fb 100644 --- a/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_complete_executor.rs @@ -21,7 +21,6 @@ use databend_common_base::runtime::drop_guard; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_pipeline::core::Pipeline; -use fastrace::func_path; use fastrace::prelude::*; use crate::pipelines::executor::ExecutorSettings; @@ -101,11 +100,21 @@ impl PipelineCompleteExecutor { .flatten() } - fn thread_function(&self) -> impl Fn() -> Result<()> + use<> { - let span = Span::enter_with_local_parent(func_path!()); + fn thread_function(&self) -> impl FnOnce() -> Result<()> + use<> { + let parent = SpanContext::current_local_parent(); let executor = self.executor.clone(); move || { + let span = if let Some(parent) = parent { + let thread_name = std::thread::current() + .name() + .unwrap_or("unnamed") + .to_string(); + Span::root("PipelineCompleteExecutor::thread_function", parent) + .with_property(|| ("thread_name", thread_name)) + } else { + Span::noop() + }; let _g = span.set_local_parent(); executor.execute() } diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index b975aea68895d..184875446b744 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -34,7 +34,6 @@ use databend_common_pipeline::core::Processor; use databend_common_pipeline::core::ProcessorPtr; use databend_common_pipeline::sinks::Sink; use databend_common_pipeline::sinks::Sinker; -use fastrace::func_path; use fastrace::prelude::*; use parking_lot::Condvar; use parking_lot::Mutex; @@ -98,6 +97,7 @@ pub struct PipelinePullingExecutor { executor: Arc, receiver: Receiver, tracking_payload: TrackingPayload, + thread_span_parent: Option, } impl PipelinePullingExecutor { @@ -122,6 +122,7 @@ impl PipelinePullingExecutor { pub fn try_create( mut pipeline: Pipeline, settings: ExecutorSettings, + thread_span_parent: Option, ) -> Result { let tracking_payload = ThreadTracker::new_tracking_payload(); let _guard = ThreadTracker::tracking(tracking_payload.clone()); @@ -136,12 +137,14 @@ impl PipelinePullingExecutor { executor: Arc::new(executor), state: State::create(), tracking_payload, + thread_span_parent, }) } pub fn from_pipelines( build_res: PipelineBuildResult, settings: ExecutorSettings, + thread_span_parent: Option, ) -> Result { let tracking_payload = ThreadTracker::new_tracking_payload(); let _guard = ThreadTracker::tracking(tracking_payload.clone()); @@ -159,6 +162,7 @@ impl PipelinePullingExecutor { state: State::create(), tracking_payload, executor: Arc::new(executor), + thread_span_parent, }) } @@ -167,19 +171,18 @@ impl PipelinePullingExecutor { let state = self.state.clone(); let threads_executor = self.executor.clone(); - let thread_function = Self::thread_function(state, threads_executor); - #[allow(unused_mut)] - let mut thread_name = Some(String::from("PullingExecutor")); + let thread_function = + Self::thread_function(state, threads_executor, self.thread_span_parent); - #[cfg(debug_assertions)] + let thread_name = if cfg!(debug_assertions) + && matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") + && let Some(cur_thread_name) = std::thread::current().name() { // We need to pass the thread name in the unit test, because the thread name is the test name - if matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") { - if let Some(cur_thread_name) = std::thread::current().name() { - thread_name = Some(cur_thread_name.to_string()); - } - } - } + Some(cur_thread_name.to_string()) + } else { + Some(String::from("PullingExecutor")) + }; Thread::named_spawn(thread_name, thread_function); } @@ -188,9 +191,22 @@ impl PipelinePullingExecutor { self.executor.clone() } - fn thread_function(state: Arc, executor: Arc) -> impl Fn() { - let span = Span::enter_with_local_parent(func_path!()); + fn thread_function( + state: Arc, + executor: Arc, + parent: Option, + ) -> impl FnOnce() { move || { + let thread_name = std::thread::current() + .name() + .unwrap_or("unnamed") + .to_string(); + let span = if let Some(parent) = parent { + Span::root("PipelinePullingExecutor::thread_function", parent) + } else { + Span::noop() + } + .with_property(|| ("thread_name", thread_name.clone())); let _g = span.set_local_parent(); state.finished(executor.execute()); } diff --git a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs index 3aa8f26febb3c..2a4de1aadc222 100644 --- a/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/queries_pipeline_executor.rs @@ -27,7 +27,6 @@ use databend_common_base::runtime::error_info::NodeErrorType; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_pipeline::core::WakeCallback; -use fastrace::func_path; use fastrace::prelude::*; use log::info; use log::warn; @@ -171,9 +170,15 @@ impl QueriesPipelineExecutor { } } - let span = Span::enter_with_local_parent(func_path!()) - .with_property(|| ("thread_name", name.clone())); + let parent = SpanContext::current_local_parent(); + let thread_name = name.clone(); thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe { + let span = if let Some(parent) = parent { + Span::root("QueriesPipelineExecutor::execute_threads", parent) + } else { + Span::noop() + } + .with_property(|| ("thread_name", thread_name.clone())); let _g = span.set_local_parent(); let this_clone = this.clone(); let try_result = catch_unwind(|| this_clone.execute_single_thread(thread_num)); diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 37e1d07614683..1e637144d775f 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -411,9 +411,15 @@ impl QueryPipelineExecutor { } } - let span = Span::enter_with_local_parent("QueryPipelineExecutor::execute_threads") - .with_property(|| ("thread_name", name.clone())); + let parent = SpanContext::current_local_parent(); + let thread_name = name.clone(); thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe { + let span = if let Some(parent) = parent { + Span::root("QueryPipelineExecutor::execute_threads", parent) + } else { + Span::noop() + } + .with_property(|| ("thread_name", thread_name.clone())); let _g = span.set_local_parent(); let this_clone = this.clone(); let try_result = catch_unwind(|| this_clone.execute_single_thread(thread_num)); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index 9636bbf01e4d5..30b6b1c1d03e2 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -46,6 +46,7 @@ use databend_common_sql::Symbol; use databend_common_sql::plans::CreateTablePlan; use databend_common_storages_basic::RecursiveCteMemoryTable; use databend_storages_common_table_meta::table::OPT_KEY_RECURSIVE_CTE; +use fastrace::collector::SpanContext; use futures_util::TryStreamExt; use md5::Digest; use md5::Md5; @@ -207,7 +208,11 @@ impl TransformRecursiveCteSource { QueryFinishHooks::nested().into_callback(ctx.clone()), )); let settings = ExecutorSettings::try_create(ctx.clone())?; - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let thread_span_parent = ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let pulling_executor = + PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?; ctx.set_executor(pulling_executor.get_inner())?; let isolate_runtime = Runtime::with_worker_threads(2, Some("r-cte-source".to_string()))?; let join_handle = isolate_runtime.spawn(async move { diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 7c4585dfc43f1..43711f6ec21ac 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -19,6 +19,7 @@ use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_sql::Planner; use databend_common_sql::planner::QueryExecutor; +use fastrace::collector::SpanContext; use futures_util::TryStreamExt; use crate::interpreters::InterpreterFactory; @@ -146,7 +147,12 @@ impl ServiceQueryExecutor { let result = async { let build_res = build_query_pipeline_without_render_result_set(&self.ctx, plan).await?; let settings = ExecutorSettings::try_create(self.ctx.clone())?; - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + let thread_span_parent = self + .ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let pulling_executor = + PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?; self.ctx.set_executor(pulling_executor.get_inner())?; PullingExecutorStream::create(pulling_executor)? diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index 86d31fdf80b93..720b1b89a0c20 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -31,6 +31,7 @@ use databend_common_settings::Settings; use databend_common_storage::DataOperator; use databend_storages_common_cache::TempDirManager; use databend_storages_common_session::TxnManagerRef; +use fastrace::collector::SpanContext; use futures::StreamExt; use log::debug; use log::error; @@ -474,6 +475,7 @@ impl ExecuteState { executor: Arc>, ) -> Result<(), ExecutionError> { let make_error = || format!("failed to execute {}", interpreter.name()); + ctx.set_executor_tracing_context(SpanContext::current_local_parent()); let mut data_stream = interpreter .execute(ctx.clone()) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 338607438064d..9f43e05c6a8be 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -145,6 +145,7 @@ use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::OPT_KEY_RECURSIVE_CTE; use databend_storages_common_table_meta::table::OPT_KEY_TEMP_PREFIX; +use fastrace::collector::SpanContext; use jiff::Zoned; use jiff::tz::TimeZone; use log::debug; @@ -486,6 +487,7 @@ impl QueryContext { .unload_callbacked .store(false, Ordering::Release); self.shared.cluster_spill_progress.write().clear(); + self.shared.set_executor_tracing_context(None); *self.shared.init_query_id.write() = id; } @@ -493,6 +495,14 @@ impl QueryContext { self.shared.set_executor(weak_ptr) } + pub fn set_executor_tracing_context(&self, span_context: Option) { + self.shared.set_executor_tracing_context(span_context); + } + + pub fn get_executor_tracing_context(&self) -> Option { + self.shared.get_executor_tracing_context() + } + pub fn attach_stage(&self, attachment: StageAttachment) { self.shared.attach_stage(attachment); } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index ab29737362d0b..4d8295d3a3426 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -69,6 +69,7 @@ use databend_common_storages_stream::stream_table::StreamTable; use databend_common_users::UserApiProvider; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::TableMetaTimestamps; +use fastrace::collector::SpanContext; use parking_lot::Mutex; use parking_lot::RwLock; use uuid::Uuid; @@ -116,6 +117,7 @@ pub struct QueryContextShared { pub(super) session: Arc, runtime: Arc>>>, pub(super) init_query_id: Arc>, + executor_tracing_context: Arc>>, cluster_cache: Arc>>, warehouse_cache: Arc>>>, running_query: Arc>>, @@ -222,6 +224,7 @@ impl QueryContextShared { cluster_cache: Arc::new(RwLock::new(cluster_cache)), data_operator: DataOperator::instance(), init_query_id: Arc::new(RwLock::new(Uuid::new_v4().to_string())), + executor_tracing_context: Arc::new(RwLock::new(None)), total_scan_values: Arc::new(Progress::create()), scan_progress: Arc::new(Progress::create()), result_progress: Arc::new(Progress::create()), @@ -757,6 +760,14 @@ impl QueryContextShared { } } + pub fn set_executor_tracing_context(&self, span_context: Option) { + *self.executor_tracing_context.write() = span_context; + } + + pub fn get_executor_tracing_context(&self) -> Option { + *self.executor_tracing_context.read() + } + pub fn get_stage_attachment(&self) -> Option { self.stage_attachment.read().clone() } diff --git a/src/query/service/src/stream/table_read_block_stream.rs b/src/query/service/src/stream/table_read_block_stream.rs index 15b9dfbfdac3f..13fc71c146c73 100644 --- a/src/query/service/src/stream/table_read_block_stream.rs +++ b/src/query/service/src/stream/table_read_block_stream.rs @@ -18,6 +18,7 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_exception::Result; use databend_common_expression::SendableDataBlockStream; use databend_common_pipeline::core::Pipeline; +use fastrace::collector::SpanContext; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelinePullingExecutor; @@ -50,7 +51,11 @@ impl ReadDataBlockStream for T { let settings = ctx.get_settings(); pipeline.set_max_threads(settings.get_max_threads()? as usize); let executor_settings = ExecutorSettings::try_create(ctx.clone())?; - let executor = PipelinePullingExecutor::try_create(pipeline, executor_settings)?; + let thread_span_parent = ctx + .get_executor_tracing_context() + .or_else(SpanContext::current_local_parent); + let executor = + PipelinePullingExecutor::try_create(pipeline, executor_settings, thread_span_parent)?; ctx.set_executor(executor.get_inner())?; Ok(Box::pin(PullingExecutorStream::create(executor)?)) } diff --git a/src/query/service/tests/it/main.rs b/src/query/service/tests/it/main.rs index 692f89c12952a..e109072106c47 100644 --- a/src/query/service/tests/it/main.rs +++ b/src/query/service/tests/it/main.rs @@ -38,3 +38,4 @@ mod sql; mod storages; mod table_functions; mod tests; +mod trace_debug; diff --git a/src/query/service/tests/it/trace_debug/README.md b/src/query/service/tests/it/trace_debug/README.md new file mode 100644 index 0000000000000..19dfe7816dcbe --- /dev/null +++ b/src/query/service/tests/it/trace_debug/README.md @@ -0,0 +1,135 @@ +# Query Service Trace Debugging + +## Structure + +The trace debugging support under `src/query/service/tests/it/trace_debug/` is intentionally split +into two layers: + +- Infrastructure: shared capture, tree formatting, raw span formatting, and runtime helpers. +- Application: concrete debugging-oriented test entry points that export complete trace data. + +Files: + +- `infra.rs` + - Common tracing test infrastructure. +- `direct.rs` + - Direct SQL trace debugging test. +- `http.rs` + - Self-contained HTTP query trace debugging OTLP dump utility. + +## HTTP Query Trace Debugging + +### Test Name + +- `test_dump_http_query_trace_debug` + +### Purpose + +Inspect real HTTP query tracing, including: + +- `POST /v1/query`; +- follow-up page pulls; +- query finalization; +- executor spans under the HTTP request path. +- No trace-shape assertions; this utility writes complete OTLP JSON traces. + +### How To Run + +```bash +cargo test -p databend-query --test it test_dump_http_query_trace_debug -- --ignored --nocapture +``` + +### Environment Variables + +- `DATABEND_TRACE_DEBUG_SQLS` +- `DATABEND_TRACE_DEBUG_WAIT_SECS` +- `DATABEND_TRACE_DEBUG_MAX_ROWS_PER_PAGE` +- `DATABEND_TRACE_DEBUG_MAX_PAGES` +- `DATABEND_TRACE_DEBUG_OUTPUT_DIR` + +## Direct SQL Trace Debugging + +### Test Name + +- `test_dump_direct_sql_trace_debug` + +### Purpose + +Bypass HTTP and inspect tracing across: + +- SQL parsing; +- planning; +- interpreter construction; +- pipeline execution; +- result stream collection. +- The output is a complete OTLP JSON export request instead of an ad hoc summary file. + +### How To Run + +```bash +cargo test -p databend-query --test it test_dump_direct_sql_trace_debug -- --ignored --nocapture +``` + +### Environment Variables + +- `DATABEND_DIRECT_TRACE_DEBUG_SQLS` +- `DATABEND_TRACE_DEBUG_OUTPUT_DIR` + +## Notes + +- Trace-debug tests enable `config.log.structlog.on` in their test fixture so regular log records can + be bridged into fastrace span events without requiring an external OTLP tracing exporter. +- In practice, most attached OTLP events come from log-to-fastrace bridging through + `logforth::append::FastraceEvent`, not from the pipeline `Processor::event()` scheduling enum. +- Event-rich spans are usually planner, binder, optimizer, interpreter, HTTP query state, and some + processor summary spans. Many processor spans still expose only span names plus attributes. +- Shared raw span output is configured through `RawSpanFormatter` and `RawSpanFormatOptions`. +- Raw span output can include attached events via `RawSpanFormatOptions::with_events(true)`. +- Trace-debug tests always write the complete output directly to a file. +- The default output directory is `${TMPDIR:-/tmp}/databend-trace-debug/`. +- Output files use short prefixes plus a timestamp, for example: + `direct-1743341019-851.otlp.json`, + `http-1743341028-104.otlp.json`. +- The persisted payload uses OpenTelemetry OTLP JSON `ExportTraceServiceRequest`. +- Direct and HTTP helpers keep extra debugging context inside standard spans and attributes so the + dump remains a complete trace export instead of a private summary schema. +- Shared test capture is protected by a process-local mutex to avoid reporter interference. + +## Inspecting Events + +List spans that contain attached OTLP events: + +```bash +jq -r ' + .resourceSpans[].scopeSpans[].spans[] + | select((.events | length) > 0) + | "events=\(.events | length)\t\(.name)" +' /tmp/databend-trace-debug/direct-*.otlp.json +``` + +Expand the attached event messages and their attributes: + +```bash +jq -r ' + .resourceSpans[].scopeSpans[].spans[] + | select((.events | length) > 0) + | . as $span + | .events[] + | [ + $span.name, + .name, + ((.attributes // []) + | map("\(.key)=\(.value.stringValue // .value.intValue // .value.boolValue // .value.doubleValue // "")") + | join(",")) + ] + | @tsv +' /tmp/databend-trace-debug/direct-*.otlp.json +``` + +Typical event payloads include: + +- binder status updates and CTE/reference diagnostics +- optimizer summary statistics +- physical plan dumps from `SelectInterpreter` +- HTTP query lifecycle logs such as query creation, first response, stop/finalize, and close reason +- processor summary logs such as aggregate input/output row counts and throughput diff --git a/src/query/service/tests/it/trace_debug/direct.rs b/src/query/service/tests/it/trace_debug/direct.rs new file mode 100644 index 0000000000000..8b45a468b231a --- /dev/null +++ b/src/query/service/tests/it/trace_debug/direct.rs @@ -0,0 +1,192 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +use databend_common_catalog::table_context::TableContext; +use databend_common_expression::DataBlock; +use databend_query::interpreters::InterpreterFactory; +use databend_query::interpreters::interpreter_plan_sql; +use databend_query::test_kits::TestFixture; +use fastrace::collector::SpanRecord; +use fastrace::future::FutureExt; +use fastrace::prelude::*; +use futures_util::TryStreamExt; + +use super::infra::TraceDebugRuntimeFlavor; +use super::infra::build_trace_debug_otlp_export; +use super::infra::persist_trace_debug_output; +use super::infra::run_future_in_named_thread; +use super::infra::setup_trace_debug_fixture; +use super::infra::trace_capture_handle; +use super::infra::trace_test_lock; + +#[derive(Debug, Clone)] +struct DirectTraceDebugConfig { + sqls: Vec, +} + +impl DirectTraceDebugConfig { + fn from_env() -> Self { + Self { + sqls: parse_direct_trace_debug_sqls(), + } + } +} + +fn parse_direct_trace_debug_sqls() -> Vec { + const DEFAULT_SQLS: &[&str] = &["select number from numbers(10)", "show databases"]; + + env::var("DATABEND_DIRECT_TRACE_DEBUG_SQLS") + .ok() + .map(|sqls| { + sqls.split(";;") + .map(str::trim) + .filter(|sql| !sql.is_empty()) + .map(ToOwned::to_owned) + .collect::>() + }) + .filter(|sqls| !sqls.is_empty()) + .unwrap_or_else(|| DEFAULT_SQLS.iter().map(|sql| sql.to_string()).collect()) +} + +#[derive(Debug)] +enum DirectExecutionOutcome { + Succeeded { blocks: usize, rows: usize }, + Failed { stage: &'static str, error: String }, +} + +async fn dump_direct_sql_trace( + fixture: &TestFixture, + sql: &str, +) -> anyhow::Result> { + let capture = trace_capture_handle(); + capture.reset(); + + let ctx = fixture.new_query_ctx().await?; + let query_id = ctx.get_id(); + let sql_text = sql.to_string(); + let query_id_for_span = query_id.clone(); + let sql_text_for_span = sql_text.clone(); + let root = Span::root("db.query", SpanContext::random()) + .with_property(|| ("db.system", "databend")) + .with_property(|| ("db.statement", sql_text_for_span.clone())) + .with_property(|| ("query_id", query_id.clone())) + .with_property(|| ("databend.trace.entry", "direct")); + + async move { + let (plan, _, _queue_guard) = match interpreter_plan_sql(ctx.clone(), sql, true, None).await + { + Ok(plan_res) => plan_res, + Err(err) => { + let outcome = DirectExecutionOutcome::Failed { + stage: "plan_sql", + error: err.to_string(), + }; + record_direct_execution_outcome(&query_id_for_span, &outcome); + return outcome; + } + }; + + let interpreter = match InterpreterFactory::get(ctx.clone(), &plan).await { + Ok(interpreter) => interpreter, + Err(err) => { + let outcome = DirectExecutionOutcome::Failed { + stage: "build_interpreter", + error: err.to_string(), + }; + record_direct_execution_outcome(&query_id_for_span, &outcome); + return outcome; + } + }; + + let stream = match interpreter.execute(ctx.clone()).await { + Ok(stream) => stream, + Err(err) => { + let outcome = DirectExecutionOutcome::Failed { + stage: "execute", + error: err.to_string(), + }; + record_direct_execution_outcome(&query_id_for_span, &outcome); + return outcome; + } + }; + + let outcome = match stream.try_collect::>().await { + Ok(blocks) => DirectExecutionOutcome::Succeeded { + blocks: blocks.len(), + rows: blocks.iter().map(DataBlock::num_rows).sum(), + }, + Err(err) => DirectExecutionOutcome::Failed { + stage: "collect_stream", + error: err.to_string(), + }, + }; + record_direct_execution_outcome(&query_id_for_span, &outcome); + outcome + } + .in_span(root) + .await; + + fastrace::flush(); + Ok(capture.snapshot()) +} + +fn record_direct_execution_outcome(query_id: &str, outcome: &DirectExecutionOutcome) { + let span = LocalSpan::enter_with_local_parent("databend.trace_debug.execution"); + let span = span.with_property(|| ("query_id", query_id.to_string())); + + match outcome { + DirectExecutionOutcome::Succeeded { blocks, rows } => { + let _span = span + .with_property(|| ("span.status_code", "ok")) + .with_property(|| ("databend.query.status", "succeeded")) + .with_property(|| ("databend.query.blocks", blocks.to_string())) + .with_property(|| ("databend.query.rows", rows.to_string())); + } + DirectExecutionOutcome::Failed { stage, error } => { + let _span = span + .with_property(|| ("span.status_code", "error")) + .with_property(|| ("span.status_description", error.clone())) + .with_property(|| ("databend.query.status", "failed")) + .with_property(|| ("databend.query.stage", (*stage).to_string())) + .with_property(|| ("exception.message", error.clone())); + } + } +} + +#[test] +#[ignore = "debug utility: dumps parser/planner/interpreter/pipeline tracing without HTTP"] +fn test_dump_direct_sql_trace_debug() -> anyhow::Result<()> { + let _guard = trace_test_lock().lock().unwrap(); + run_future_in_named_thread( + "trace-debug-direct", + TraceDebugRuntimeFlavor::MultiThread, + || async move { + let config = DirectTraceDebugConfig::from_env(); + let fixture = setup_trace_debug_fixture().await?; + let _capture = trace_capture_handle(); + let mut spans = Vec::new(); + + for sql in &config.sqls { + spans.extend(dump_direct_sql_trace(&fixture, sql).await?); + } + + let export = build_trace_debug_otlp_export(&spans); + let _path = persist_trace_debug_output("direct", &export)?; + + Ok(()) + }, + ) +} diff --git a/src/query/service/tests/it/trace_debug/http.rs b/src/query/service/tests/it/trace_debug/http.rs new file mode 100644 index 0000000000000..55a0ab6e89078 --- /dev/null +++ b/src/query/service/tests/it/trace_debug/http.rs @@ -0,0 +1,323 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::env; + +use databend_query::servers::admin::v1::instance_status::instance_status_handler; +use databend_query::servers::http::error::QueryError; +use databend_query::servers::http::middleware::json_response; +use databend_query::servers::http::v1::ExecuteStateKind; +use databend_query::servers::http::v1::HttpSessionConf; +use databend_query::servers::http::v1::QueryResponseField; +use databend_query::servers::http::v1::QueryStats; +use databend_query::servers::http::v1::query_route; +use databend_query::sessions::QueryAffect; +use fastrace::collector::SpanRecord; +use fastrace::future::FutureExt; +use fastrace::prelude::*; +use headers::HeaderMapExt; +use http::HeaderMap; +use http::HeaderValue; +use http::Method; +use http::StatusCode; +use http::header; +use poem::Endpoint; +use poem::EndpointExt; +use poem::Request; +use poem::Route; +use poem::get; +use serde::Deserialize; + +use super::infra::TraceDebugRuntimeFlavor; +use super::infra::build_trace_debug_otlp_export; +use super::infra::persist_trace_debug_output; +use super::infra::run_future_in_named_thread; +use super::infra::setup_trace_debug_fixture; +use super::infra::trace_capture_handle; +use super::infra::trace_test_lock; + +type EndpointType = Route; + +#[allow(dead_code)] +#[derive(Deserialize, Debug, Clone)] +struct TestQueryResponse { + id: String, + session_id: Option, + node_id: String, + state: ExecuteStateKind, + session: Option, + error: Option, + warnings: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + has_result_set: Option, + schema: Vec, + data: Vec>>, + affect: Option, + result_timeout_secs: Option, + stats: QueryStats, + stats_uri: Option, + final_uri: Option, + next_uri: Option, + kill_uri: Option, +} + +#[derive(Debug, Clone)] +struct TraceDebugConfig { + sqls: Vec, + wait_time_secs: u32, + max_rows_per_page: usize, + max_pages: usize, +} + +impl TraceDebugConfig { + fn from_env() -> Self { + Self { + sqls: parse_trace_debug_sqls(), + wait_time_secs: parse_trace_debug_env("DATABEND_TRACE_DEBUG_WAIT_SECS", 2), + max_rows_per_page: parse_trace_debug_env("DATABEND_TRACE_DEBUG_MAX_ROWS_PER_PAGE", 2), + max_pages: parse_trace_debug_env("DATABEND_TRACE_DEBUG_MAX_PAGES", 16), + } + } +} + +fn parse_trace_debug_sqls() -> Vec { + const DEFAULT_SQLS: &[&str] = &["select number from numbers(10)", "show databases"]; + + env::var("DATABEND_TRACE_DEBUG_SQLS") + .ok() + .map(|sqls| { + sqls.split(";;") + .map(str::trim) + .filter(|sql| !sql.is_empty()) + .map(ToOwned::to_owned) + .collect::>() + }) + .filter(|sqls| !sqls.is_empty()) + .unwrap_or_else(|| DEFAULT_SQLS.iter().map(|sql| sql.to_string()).collect()) +} + +fn parse_trace_debug_env(key: &str, default: T) -> T +where T: std::str::FromStr + Copy { + env::var(key) + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(default) +} + +fn create_endpoint() -> databend_common_exception::Result { + Ok(Route::new() + .nest("/v1", query_route().around(json_response)) + .at("/v1_status", get(instance_status_handler))) +} + +fn root_auth_header() -> HeaderValue { + let mut headers = HeaderMap::new(); + headers.typed_insert(headers::Authorization::basic("root", "")); + headers["authorization"].clone() +} + +async fn send_debug_request( + ep: &EndpointType, + body: &[u8], + method: Method, + uri: &str, +) -> anyhow::Result<(StatusCode, Option, String)> { + let req = Request::builder() + .uri(uri.parse().unwrap()) + .method(method) + .header(header::CONTENT_TYPE, "application/json") + .header(header::AUTHORIZATION, root_auth_header()) + .body(body.to_vec()); + + let resp = ep.call(req).await?; + let status_code = resp.status(); + let body = resp.into_body().into_string().await?; + let query_resp = serde_json::from_str::(&body) + .map(Some) + .unwrap_or_default(); + + Ok((status_code, query_resp, body)) +} + +fn record_http_exchange( + method: &Method, + uri: &str, + request_body: &str, + status: StatusCode, + response: Option<&TestQueryResponse>, + body: &str, +) { + let span = LocalSpan::enter_with_local_parent("databend.trace_debug.http_exchange") + .with_property(|| ("http.request.method", method.as_str().to_string())) + .with_property(|| ("url.path", uri.to_string())) + .with_property(|| ("http.response.status_code", status.as_u16().to_string())) + .with_property(|| ("http.request.body", request_body.to_string())) + .with_property(|| ("http.response.body", body.to_string())); + + match response { + Some(response) => { + let error_text = response + .error + .as_ref() + .map(|error| format!("{error:?}")) + .unwrap_or_default(); + let span = span + .with_property(|| ("query_id", response.id.clone())) + .with_property(|| ("databend.http.state", format!("{:?}", response.state))) + .with_property(|| ("databend.http.rows", response.data.len().to_string())) + .with_property(|| ("databend.http.warnings", response.warnings.join("\n"))) + .with_property(|| { + ( + "databend.http.next_uri", + response.next_uri.clone().unwrap_or_default(), + ) + }) + .with_property(|| { + ( + "databend.http.final_uri", + response.final_uri.clone().unwrap_or_default(), + ) + }) + .with_property(|| ("databend.http.error", error_text.clone())); + + if status.is_success() && response.error.is_none() { + let _span = span.with_property(|| ("span.status_code", "ok")); + } else { + let _span = span + .with_property(|| ("span.status_code", "error")) + .with_property(|| ("span.status_description", error_text)); + } + } + None => { + let _span = span + .with_property(|| ("span.status_code", "error")) + .with_property(|| { + ( + "span.status_description", + "http response body did not match TestQueryResponse", + ) + }); + } + } +} + +async fn dump_http_query_trace( + sql: &str, + config: &TraceDebugConfig, +) -> anyhow::Result> { + let ep = create_endpoint()?; + let capture = trace_capture_handle(); + capture.reset(); + + let sql_text = sql.to_string(); + let root = Span::root("db.query", SpanContext::random()) + .with_property(|| ("db.system", "databend")) + .with_property(|| ("db.statement", sql_text.clone())) + .with_property(|| ("databend.trace.entry", "http")) + .with_property(|| { + ( + "databend.http.wait_time_secs", + config.wait_time_secs.to_string(), + ) + }) + .with_property(|| { + ( + "databend.http.max_rows_per_page", + config.max_rows_per_page.to_string(), + ) + }) + .with_property(|| ("databend.http.max_pages", config.max_pages.to_string())); + + let json = serde_json::json!({ + "sql": sql.to_string(), + "pagination": { + "wait_time_secs": config.wait_time_secs, + "max_rows_per_page": config.max_rows_per_page, + }, + "session": { "settings": {} } + }); + let request_body_bytes = serde_json::to_vec(&json)?; + let request_body_text = String::from_utf8(request_body_bytes.clone())?; + + async { + let (status, begin_resp, body) = + send_debug_request(&ep, &request_body_bytes, Method::POST, "/v1/query").await?; + record_http_exchange( + &Method::POST, + "/v1/query", + &request_body_text, + status, + begin_resp.as_ref(), + &body, + ); + + let mut next_uri = begin_resp.as_ref().and_then(|resp| resp.next_uri.clone()); + let mut fetched_pages = 0usize; + while let Some(current_next_uri) = next_uri.clone() { + if fetched_pages >= config.max_pages { + let _span = + LocalSpan::enter_with_local_parent("databend.trace_debug.http_pagination_stop") + .with_property(|| ("databend.http.reason", "max_pages_reached")) + .with_property(|| ("databend.http.max_pages", config.max_pages.to_string())) + .with_property(|| ("databend.http.next_uri", current_next_uri.clone())); + break; + } + + let (status, next_resp, body) = + send_debug_request(&ep, &request_body_bytes, Method::GET, ¤t_next_uri) + .await?; + record_http_exchange( + &Method::GET, + ¤t_next_uri, + &request_body_text, + status, + next_resp.as_ref(), + &body, + ); + next_uri = next_resp.as_ref().and_then(|resp| resp.next_uri.clone()); + fetched_pages += 1; + } + + Ok::<(), anyhow::Error>(()) + } + .in_span(root) + .await?; + + fastrace::flush(); + Ok(capture.snapshot()) +} + +#[test] +#[ignore = "debug utility: dumps HTTP/query tracing without assertions"] +fn test_dump_http_query_trace_debug() -> anyhow::Result<()> { + let _guard = trace_test_lock().lock().unwrap(); + run_future_in_named_thread( + "trace-debug-http", + TraceDebugRuntimeFlavor::CurrentThread, + || async move { + let _fixture = setup_trace_debug_fixture().await?; + let config = TraceDebugConfig::from_env(); + let mut spans = Vec::new(); + + for sql in &config.sqls { + spans.extend(dump_http_query_trace(sql, &config).await?); + } + + let export = build_trace_debug_otlp_export(&spans); + let _path = persist_trace_debug_output("http", &export)?; + + Ok(()) + }, + ) +} diff --git a/src/query/service/tests/it/trace_debug/infra.rs b/src/query/service/tests/it/trace_debug/infra.rs new file mode 100644 index 0000000000000..763b75a55a071 --- /dev/null +++ b/src/query/service/tests/it/trace_debug/infra.rs @@ -0,0 +1,493 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Cow; +use std::fs; +use std::future::Future; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::OnceLock; +use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +use databend_query::test_kits::ConfigBuilder; +use databend_query::test_kits::TestFixture; +use fastrace::collector::Config as CollectorConfig; +use fastrace::collector::Reporter; +use fastrace::collector::SpanRecord; +use fastrace_opentelemetry::OpenTelemetryReporter; +use opentelemetry::InstrumentationScope; +use opentelemetry::KeyValue; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; +use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::trace::SpanData; +use opentelemetry_sdk::trace::SpanExporter; + +static TRACE_TEST_LOCK: Mutex<()> = Mutex::new(()); +static TRACE_REPORTER: OnceLock = OnceLock::new(); + +pub(crate) fn trace_test_lock() -> &'static Mutex<()> { + &TRACE_TEST_LOCK +} + +pub(crate) fn persist_trace_debug_output( + output_name: &str, + content: &T, +) -> anyhow::Result { + let output_dir = env_trace_debug_output_dir(); + fs::create_dir_all(&output_dir)?; + let file_path = output_dir.join(format!( + "{output_name}-{}.otlp.json", + trace_debug_timestamp() + )); + + fs::write(&file_path, serde_json::to_vec_pretty(content)?)?; + Ok(file_path) +} + +fn env_trace_debug_output_dir() -> PathBuf { + std::env::var("DATABEND_TRACE_DEBUG_OUTPUT_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| std::env::temp_dir().join("databend-trace-debug")) +} + +fn trace_debug_timestamp() -> String { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + let secs = now.as_secs(); + let millis = now.subsec_millis(); + format!("{secs}-{millis:03}") +} + +#[derive(Clone, Copy, Debug)] +pub(crate) enum TraceDebugRuntimeFlavor { + CurrentThread, + MultiThread, +} + +pub(crate) fn run_future_in_named_thread( + thread_name: &str, + runtime_flavor: TraceDebugRuntimeFlavor, + task: F, +) -> anyhow::Result +where + F: FnOnce() -> Fut + Send + 'static, + Fut: Future> + 'static, + T: Send + 'static, +{ + let thread_name = thread_name.to_string(); + let handle = std::thread::Builder::new() + .name(thread_name.clone()) + .spawn(move || -> anyhow::Result { + let runtime = match runtime_flavor { + TraceDebugRuntimeFlavor::CurrentThread => { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + } + TraceDebugRuntimeFlavor::MultiThread => { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + builder.enable_all(); + builder.thread_name(thread_name.clone()); + builder.build()? + } + }; + + runtime.block_on(task()) + })?; + + match handle.join() { + Ok(result) => result, + Err(cause) => { + let message = if let Some(message) = cause.downcast_ref::<&'static str>() { + (*message).to_string() + } else if let Some(message) = cause.downcast_ref::() { + message.clone() + } else { + "unknown panic message".to_string() + }; + Err(anyhow::anyhow!("trace debug thread panicked: {message}")) + } + } +} + +#[derive(Clone, Default)] +pub(crate) struct TraceCaptureHandle { + inner: Arc, +} + +#[derive(Default)] +struct TraceCaptureState { + spans: Mutex>, +} + +impl TraceCaptureHandle { + pub(crate) fn reset(&self) { + self.inner.spans.lock().unwrap().clear(); + } + + pub(crate) fn snapshot(&self) -> Vec { + self.inner.spans.lock().unwrap().clone() + } +} + +struct TraceCaptureReporter { + state: Arc, +} + +impl TraceCaptureReporter { + fn new(handle: &TraceCaptureHandle) -> Self { + Self { + state: handle.inner.clone(), + } + } +} + +impl Reporter for TraceCaptureReporter { + fn report(&mut self, spans: Vec) { + self.state.spans.lock().unwrap().extend(spans); + } +} + +#[derive(Debug, Clone, Default)] +struct CapturingSpanExporter { + spans: Arc>>, +} + +impl CapturingSpanExporter { + fn new() -> (Self, Arc>>) { + let spans = Arc::new(Mutex::new(Vec::new())); + ( + Self { + spans: Arc::clone(&spans), + }, + spans, + ) + } +} + +impl SpanExporter for CapturingSpanExporter { + fn export(&self, batch: Vec) -> impl Future + Send { + self.spans.lock().unwrap().extend(batch); + std::future::ready(Ok(())) + } +} + +pub(crate) fn trace_capture_handle() -> TraceCaptureHandle { + TRACE_REPORTER + .get_or_init(|| { + let handle = TraceCaptureHandle::default(); + fastrace::set_reporter( + TraceCaptureReporter::new(&handle), + CollectorConfig::default().report_interval(Duration::from_secs(3600)), + ); + handle + }) + .clone() +} + +pub(crate) async fn setup_trace_debug_fixture() -> anyhow::Result { + let mut config = ConfigBuilder::create().config(); + config.log.tracing.on = false; + config.log.structlog.on = true; + TestFixture::setup_with_config(&config) + .await + .map_err(Into::into) +} + +pub(crate) fn build_trace_debug_otlp_export(spans: &[SpanRecord]) -> ExportTraceServiceRequest { + if spans.is_empty() { + return ExportTraceServiceRequest { + resource_spans: Vec::new(), + }; + } + + let mut spans = spans.to_vec(); + spans.sort_by_key(|span| { + ( + span.trace_id.0, + span.begin_time_unix_ns, + span.duration_ns, + span.span_id.0, + ) + }); + + let resource = Resource::builder_empty() + .with_attributes([ + KeyValue::new("service.name", "databend-query"), + KeyValue::new("service.namespace", "databend"), + ]) + .build(); + let resource_attrs = ResourceAttributesWithSchema::from(&resource); + + let (exporter, exported_spans) = CapturingSpanExporter::new(); + let scope = InstrumentationScope::builder("databend-query.trace-debug").build(); + let mut reporter = OpenTelemetryReporter::new(exporter, Cow::Owned(resource), scope); + reporter.report(spans); + + ExportTraceServiceRequest { + resource_spans: group_spans_by_resource_and_scope( + exported_spans.lock().unwrap().clone(), + &resource_attrs, + ), + } +} + +fn span_property<'a>(span: &'a SpanRecord, key: &str) -> Option<&'a str> { + span.properties + .iter() + .find(|(k, _)| k.as_ref() == key) + .map(|(_, v)| v.as_ref()) +} + +#[derive(Debug)] +pub(crate) struct QueryTraceTree { + pub(crate) spans: Vec, + pub(crate) roots: Vec, + pub(crate) children: Vec>, + #[allow(dead_code)] + pub(crate) parents: Vec>, +} + +pub(crate) fn span_end_time(span: &SpanRecord) -> u64 { + span.begin_time_unix_ns.saturating_add(span.duration_ns) +} + +fn format_relative_time_ns(base_time: u64, timestamp: u64) -> String { + let delta_us = timestamp.saturating_sub(base_time) / 1_000; + format!("+{delta_us}us") +} + +fn format_duration_ns(duration_ns: u64) -> String { + format!("{}us", duration_ns / 1_000) +} + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub(crate) struct QueryTraceFormatOptions { + pub(crate) show_duration: bool, + pub(crate) show_links: bool, + pub(crate) show_thread_name: bool, + pub(crate) show_ids: bool, + pub(crate) show_properties: bool, + pub(crate) show_events: bool, +} + +impl Default for QueryTraceFormatOptions { + fn default() -> Self { + Self { + show_duration: true, + show_links: true, + show_thread_name: true, + show_ids: false, + show_properties: false, + show_events: false, + } + } +} + +#[allow(dead_code)] +impl QueryTraceFormatOptions { + pub(crate) fn with_duration(mut self, enabled: bool) -> Self { + self.show_duration = enabled; + self + } + + pub(crate) fn with_links(mut self, enabled: bool) -> Self { + self.show_links = enabled; + self + } + + pub(crate) fn with_thread_name(mut self, enabled: bool) -> Self { + self.show_thread_name = enabled; + self + } + + pub(crate) fn with_ids(mut self, enabled: bool) -> Self { + self.show_ids = enabled; + self + } + + pub(crate) fn with_properties(mut self, enabled: bool) -> Self { + self.show_properties = enabled; + self + } + + pub(crate) fn with_events(mut self, enabled: bool) -> Self { + self.show_events = enabled; + self + } +} + +pub(crate) struct QueryTraceTreeFormatter<'a> { + tree: &'a QueryTraceTree, + options: QueryTraceFormatOptions, +} + +#[allow(dead_code)] +impl<'a> QueryTraceTreeFormatter<'a> { + pub(crate) fn new(tree: &'a QueryTraceTree) -> Self { + Self { + tree, + options: QueryTraceFormatOptions::default(), + } + } + + pub(crate) fn with_options(mut self, options: QueryTraceFormatOptions) -> Self { + self.options = options; + self + } + + pub(crate) fn format(&self) -> String { + let Some(root_index) = self.tree.roots.first().copied() else { + return "".to_string(); + }; + + let base_time = self.tree.spans[root_index].begin_time_unix_ns; + let mut lines = Vec::new(); + + for &tree_root_index in &self.tree.roots { + self.walk(tree_root_index, 0, base_time, &mut lines); + } + + lines.join("\n") + } + + fn walk(&self, index: usize, depth: usize, base_time: u64, lines: &mut Vec) { + let span = &self.tree.spans[index]; + let indent = " ".repeat(depth); + let mut suffixes = vec![format!( + "{} -> {}", + format_relative_time_ns(base_time, span.begin_time_unix_ns), + format_relative_time_ns(base_time, span_end_time(span)), + )]; + + if self.options.show_duration { + suffixes.push(format!("dur={}", format_duration_ns(span.duration_ns))); + } + + if self.options.show_links && !span.links.is_empty() { + suffixes.push(format!("links={}", span.links.len())); + } + + if self.options.show_thread_name + && let Some(thread_name) = span_property(span, "thread_name") + { + suffixes.push(format!("thread={thread_name}")); + } + + if self.options.show_ids { + suffixes.push(format!("trace={:?}", span.trace_id)); + suffixes.push(format!("span={:?}", span.span_id)); + suffixes.push(format!("parent={:?}", span.parent_id)); + } + + if self.options.show_properties && !span.properties.is_empty() { + let properties = span + .properties + .iter() + .filter(|(key, _)| !self.options.show_thread_name || key.as_ref() != "thread_name") + .map(|(key, value)| format!("{key}={value}")) + .collect::>(); + if !properties.is_empty() { + suffixes.push(format!("properties=[{}]", properties.join(", "))); + } + } + + lines.push(format!("{indent}{} [{}]", span.name, suffixes.join(", "))); + + if self.options.show_events { + for event in &span.events { + let event_indent = " ".repeat(depth + 1); + let mut event_suffixes = vec![format!( + "at={}", + format_relative_time_ns(base_time, event.timestamp_unix_ns) + )]; + if !event.properties.is_empty() { + let properties = event + .properties + .iter() + .map(|(key, value)| format!("{key}={value}")) + .collect::>() + .join(", "); + event_suffixes.push(format!("properties=[{properties}]")); + } + lines.push(format!( + "{event_indent}event {} [{}]", + event.name, + event_suffixes.join(", ") + )); + } + } + + for &child_index in &self.tree.children[index] { + self.walk(child_index, depth + 1, base_time, lines); + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RawSpanFormatOptions { + pub(crate) show_ids: bool, + pub(crate) show_links: bool, + pub(crate) show_thread_name: bool, + pub(crate) show_properties: bool, + pub(crate) show_events: bool, +} + +impl Default for RawSpanFormatOptions { + fn default() -> Self { + Self { + show_ids: true, + show_links: true, + show_thread_name: true, + show_properties: false, + show_events: false, + } + } +} + +#[allow(dead_code)] +impl RawSpanFormatOptions { + pub(crate) fn with_properties(mut self, enabled: bool) -> Self { + self.show_properties = enabled; + self + } + + pub(crate) fn with_thread_name(mut self, enabled: bool) -> Self { + self.show_thread_name = enabled; + self + } + + pub(crate) fn with_ids(mut self, enabled: bool) -> Self { + self.show_ids = enabled; + self + } + + pub(crate) fn with_links(mut self, enabled: bool) -> Self { + self.show_links = enabled; + self + } + + pub(crate) fn with_events(mut self, enabled: bool) -> Self { + self.show_events = enabled; + self + } +} diff --git a/src/query/service/tests/it/trace_debug/mod.rs b/src/query/service/tests/it/trace_debug/mod.rs new file mode 100644 index 0000000000000..d6581b464be01 --- /dev/null +++ b/src/query/service/tests/it/trace_debug/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod direct; +mod http; +pub(crate) mod infra;