Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 147 additions & 73 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -522,12 +522,12 @@ vergen-gix = { version = "1", features = ["build", "cargo", "rustc"] }

# Observability
env_logger = "0.11"
fastrace = { version = "0.7.14", features = ["enable"] }
fastrace-opentelemetry = "0.10"
fastrace = { version = "0.7.17", features = ["enable"] }
fastrace-opentelemetry = "0.16"
log = { version = "0.4.27", features = ["serde", "kv_serde", "kv_unstable_std"] }
logcall = "0.1.9"
opentelemetry = { version = "0.29", features = ["trace", "logs"] }
opentelemetry-otlp = { version = "0.29", default-features = false, features = [
opentelemetry = { version = "0.31", features = ["trace", "logs"] }
opentelemetry-otlp = { version = "0.31", default-features = false, features = [
"trace",
"logs",
"metrics",
Expand All @@ -537,7 +537,8 @@ opentelemetry-otlp = { version = "0.29", default-features = false, features = [
"http-proto",
"reqwest-client",
] }
opentelemetry_sdk = { version = "0.29", features = ["trace", "logs", "rt-tokio"] }
opentelemetry-proto = { version = "0.31", features = ["gen-tonic-messages", "trace", "with-serde"] }
opentelemetry_sdk = { version = "0.31", features = ["trace", "logs", "rt-tokio"] }
prometheus-client = "0.22"
prometheus-parse = "0.2.3"
tracing = "0.1.40"
Expand All @@ -549,6 +550,9 @@ sqllogictest = "0.28.4"

[workspace.lints.rust]
async_fn_in_trait = "allow"
# Temporary workaround for false positives from async_backtrace::framed and
# fastrace::trace macro expansion under `cargo clippy --workspace --all-targets -- -D warnings`.
unused_braces = "allow"

[workspace.lints.clippy]
useless_format = "allow"
Expand Down
1 change: 1 addition & 0 deletions agents/debug-and-validation.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- For Rust changes that will remain in the branch, use `cargo clippy` to confirm there are no compilation or lint errors.
- Start with partial verification when a full workspace pass is too expensive, but move toward stronger coverage before handoff when the resulting changes will remain in the branch.
- Use the trace-debug utilities under [`src/query/service/tests/it/trace_debug/README.md`](../src/query/service/tests/it/trace_debug/README.md) when the question is about query lifecycle or stage transitions, such as parser/planner/interpreter/pipeline flow, HTTP query state and pagination behavior, or which execution spans and attached events appear for a specific SQL path.
- For temporary investigation outputs that will not be submitted, keep validation minimal and purpose-driven. Run only the checks needed to establish the conclusion or unblock the investigation.
- If investigation work begins producing code, tests, or docs that should remain in the branch, raise the validation bar before handoff.

Expand Down
1 change: 0 additions & 1 deletion src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ pub fn init_logging(
let reporter = rt.block_on(async {
fastrace_opentelemetry::OpenTelemetryReporter::new(
exporter,
opentelemetry::trace::SpanKind::Server,
Cow::Owned(resource),
opentelemetry::InstrumentationScope::builder(trace_name).build(),
)
Expand Down
23 changes: 20 additions & 3 deletions src/query/pipeline/src/core/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,19 @@ unsafe impl Send for ProcessorPtr {}

unsafe impl Sync for ProcessorPtr {}

thread_local! {
// Processor spans are created on the hot path, so cache the current thread name per thread.
static CURRENT_THREAD_NAME: String = std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string();
}

impl ProcessorPtr {
fn current_thread_name() -> String {
CURRENT_THREAD_NAME.with(|thread_name| thread_name.clone())
}

#[allow(clippy::arc_with_non_send_sync)]
pub fn create(inner: Box<dyn Processor>) -> ProcessorPtr {
ProcessorPtr {
Expand Down Expand Up @@ -175,7 +187,8 @@ impl ProcessorPtr {
pub unsafe fn process(&self) -> Result<()> {
unsafe {
let span = LocalSpan::enter_with_local_parent(format!("{}::process", self.name()))
.with_property(|| ("graph-node-id", self.id().index().to_string()));
.with_property(|| ("graph-node-id", self.id().index().to_string()))
.with_property(|| ("thread_name", Self::current_thread_name()));

match (*self.inner.get()).process() {
Ok(_) => Ok(()),
Expand Down Expand Up @@ -215,8 +228,12 @@ impl ProcessorPtr {

let inner = self.inner.clone();
async move {
let span = Span::enter_with_local_parent(name)
.with_property(|| ("graph-node-id", id.index().to_string()));
let span = match SpanContext::current_local_parent() {
Some(parent) if parent.sampled => Span::enter_with_local_parent(name)
.with_property(|| ("graph-node-id", id.index().to_string()))
.with_property(|| ("thread_name", Self::current_thread_name())),
_ => Span::noop(),
};

match task.await {
Ok(_) => {
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ zip = { workspace = true }
arrow-cast = { workspace = true }
databend-common-sql-test-support = { workspace = true }
databend-storages-common-pruner = { workspace = true }
fastrace-opentelemetry = { workspace = true }
geo = { workspace = true }
goldenfile = { workspace = true }
hex = { workspace = true }
hyper-util = { workspace = true }
jwt-simple = { workspace = true }
maplit = { workspace = true }
mysql_async = { workspace = true }
opentelemetry-proto = { workspace = true }
p256 = { workspace = true }
pretty_assertions = { workspace = true }
proptest = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use databend_common_sql::plans::Plan;
use databend_storages_common_cache::CacheManager;
use derive_visitor::DriveMut;
use derive_visitor::VisitorMut;
use fastrace::collector::SpanContext;
use md5::Digest;
use md5::Md5;

Expand Down Expand Up @@ -161,7 +162,11 @@ pub trait Interpreter: Sync + Send {
complete_executor.execute()?;
self.inject_result()
} else {
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
let thread_span_parent = ctx
.get_executor_tracing_context()
.or_else(SpanContext::current_local_parent);
let pulling_executor =
PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?;

ctx.set_executor(pulling_executor.get_inner())?;
Ok(Box::pin(ProgressStream::try_create(
Expand Down
11 changes: 10 additions & 1 deletion src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use databend_common_storages_basic::gen_result_cache_key;
use databend_common_storages_fuse::FuseLazyPartInfo;
use databend_common_storages_fuse::FuseTable;
use databend_common_users::UserApiProvider;
use fastrace::collector::SpanContext;
use serde::Serialize;
use serde_json;

Expand Down Expand Up @@ -552,7 +553,15 @@ impl ExplainInterpreter {
executor.execute()?;
}
false => {
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
let thread_span_parent = self
.ctx
.get_executor_tracing_context()
.or_else(SpanContext::current_local_parent);
let mut executor = PipelinePullingExecutor::from_pipelines(
build_res,
settings,
thread_span_parent,
)?;
executor.start();
while (executor.pull_data()?).is_some() {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use databend_common_pipeline::core::always_callback;
use databend_common_sql::Planner;
use databend_meta_plugin_semaphore::acquirer::Permit;
use databend_meta_runtime::DatabendRuntime;
use fastrace::collector::SpanContext;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
Expand Down Expand Up @@ -174,7 +175,11 @@ impl ExplainPerfInterpreter {
ctx.set_executor(executor.get_inner())?;
executor.execute()?;
} else {
let mut executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
let thread_span_parent = ctx
.get_executor_tracing_context()
.or_else(SpanContext::current_local_parent);
let mut executor =
PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?;
ctx.set_executor(executor.get_inner())?;
executor.start();
while (executor.pull_data()?).is_some() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_base::runtime::drop_guard;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_pipeline::core::Pipeline;
use fastrace::func_path;
use fastrace::prelude::*;

use crate::pipelines::executor::ExecutorSettings;
Expand Down Expand Up @@ -101,11 +100,21 @@ impl PipelineCompleteExecutor {
.flatten()
}

fn thread_function(&self) -> impl Fn() -> Result<()> + use<> {
let span = Span::enter_with_local_parent(func_path!());
fn thread_function(&self) -> impl FnOnce() -> Result<()> + use<> {
let parent = SpanContext::current_local_parent();
let executor = self.executor.clone();

move || {
let span = if let Some(parent) = parent {
let thread_name = std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string();
Span::root("PipelineCompleteExecutor::thread_function", parent)
.with_property(|| ("thread_name", thread_name))
} else {
Span::noop()
};
let _g = span.set_local_parent();
executor.execute()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use databend_common_pipeline::core::Processor;
use databend_common_pipeline::core::ProcessorPtr;
use databend_common_pipeline::sinks::Sink;
use databend_common_pipeline::sinks::Sinker;
use fastrace::func_path;
use fastrace::prelude::*;
use parking_lot::Condvar;
use parking_lot::Mutex;
Expand Down Expand Up @@ -98,6 +97,7 @@ pub struct PipelinePullingExecutor {
executor: Arc<PipelineExecutor>,
receiver: Receiver<DataBlock>,
tracking_payload: TrackingPayload,
thread_span_parent: Option<SpanContext>,
}

impl PipelinePullingExecutor {
Expand All @@ -122,6 +122,7 @@ impl PipelinePullingExecutor {
pub fn try_create(
mut pipeline: Pipeline,
settings: ExecutorSettings,
thread_span_parent: Option<SpanContext>,
) -> Result<PipelinePullingExecutor> {
let tracking_payload = ThreadTracker::new_tracking_payload();
let _guard = ThreadTracker::tracking(tracking_payload.clone());
Expand All @@ -136,12 +137,14 @@ impl PipelinePullingExecutor {
executor: Arc::new(executor),
state: State::create(),
tracking_payload,
thread_span_parent,
})
}

pub fn from_pipelines(
build_res: PipelineBuildResult,
settings: ExecutorSettings,
thread_span_parent: Option<SpanContext>,
) -> Result<PipelinePullingExecutor> {
let tracking_payload = ThreadTracker::new_tracking_payload();
let _guard = ThreadTracker::tracking(tracking_payload.clone());
Expand All @@ -159,6 +162,7 @@ impl PipelinePullingExecutor {
state: State::create(),
tracking_payload,
executor: Arc::new(executor),
thread_span_parent,
})
}

Expand All @@ -167,19 +171,18 @@ impl PipelinePullingExecutor {

let state = self.state.clone();
let threads_executor = self.executor.clone();
let thread_function = Self::thread_function(state, threads_executor);
#[allow(unused_mut)]
let mut thread_name = Some(String::from("PullingExecutor"));
let thread_function =
Self::thread_function(state, threads_executor, self.thread_span_parent);

#[cfg(debug_assertions)]
let thread_name = if cfg!(debug_assertions)
&& matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE")
&& let Some(cur_thread_name) = std::thread::current().name()
{
// We need to pass the thread name in the unit test, because the thread name is the test name
if matches!(std::env::var("UNIT_TEST"), Ok(var_value) if var_value == "TRUE") {
if let Some(cur_thread_name) = std::thread::current().name() {
thread_name = Some(cur_thread_name.to_string());
}
}
}
Some(cur_thread_name.to_string())
} else {
Some(String::from("PullingExecutor"))
};

Thread::named_spawn(thread_name, thread_function);
}
Expand All @@ -188,9 +191,22 @@ impl PipelinePullingExecutor {
self.executor.clone()
}

fn thread_function(state: Arc<State>, executor: Arc<PipelineExecutor>) -> impl Fn() {
let span = Span::enter_with_local_parent(func_path!());
fn thread_function(
state: Arc<State>,
executor: Arc<PipelineExecutor>,
parent: Option<SpanContext>,
) -> impl FnOnce() {
move || {
let thread_name = std::thread::current()
.name()
.unwrap_or("unnamed")
.to_string();
let span = if let Some(parent) = parent {
Span::root("PipelinePullingExecutor::thread_function", parent)
} else {
Span::noop()
}
.with_property(|| ("thread_name", thread_name.clone()));
let _g = span.set_local_parent();
state.finished(executor.execute());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use databend_common_base::runtime::error_info::NodeErrorType;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_pipeline::core::WakeCallback;
use fastrace::func_path;
use fastrace::prelude::*;
use log::info;
use log::warn;
Expand Down Expand Up @@ -171,9 +170,15 @@ impl QueriesPipelineExecutor {
}
}

let span = Span::enter_with_local_parent(func_path!())
.with_property(|| ("thread_name", name.clone()));
let parent = SpanContext::current_local_parent();
let thread_name = name.clone();
thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe {
let span = if let Some(parent) = parent {
Span::root("QueriesPipelineExecutor::execute_threads", parent)
} else {
Span::noop()
}
.with_property(|| ("thread_name", thread_name.clone()));
let _g = span.set_local_parent();
let this_clone = this.clone();
let try_result = catch_unwind(|| this_clone.execute_single_thread(thread_num));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,15 @@ impl QueryPipelineExecutor {
}
}

let span = Span::enter_with_local_parent("QueryPipelineExecutor::execute_threads")
.with_property(|| ("thread_name", name.clone()));
let parent = SpanContext::current_local_parent();
let thread_name = name.clone();
thread_join_handles.push(Thread::named_spawn(Some(name), move || unsafe {
let span = if let Some(parent) = parent {
Span::root("QueryPipelineExecutor::execute_threads", parent)
} else {
Span::noop()
}
.with_property(|| ("thread_name", thread_name.clone()));
let _g = span.set_local_parent();
let this_clone = this.clone();
let try_result = catch_unwind(|| this_clone.execute_single_thread(thread_num));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use databend_common_sql::Symbol;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_storages_basic::RecursiveCteMemoryTable;
use databend_storages_common_table_meta::table::OPT_KEY_RECURSIVE_CTE;
use fastrace::collector::SpanContext;
use futures_util::TryStreamExt;
use md5::Digest;
use md5::Md5;
Expand Down Expand Up @@ -207,7 +208,11 @@ impl TransformRecursiveCteSource {
QueryFinishHooks::nested().into_callback(ctx.clone()),
));
let settings = ExecutorSettings::try_create(ctx.clone())?;
let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?;
let thread_span_parent = ctx
.get_executor_tracing_context()
.or_else(SpanContext::current_local_parent);
let pulling_executor =
PipelinePullingExecutor::from_pipelines(build_res, settings, thread_span_parent)?;
ctx.set_executor(pulling_executor.get_inner())?;
let isolate_runtime = Runtime::with_worker_threads(2, Some("r-cte-source".to_string()))?;
let join_handle = isolate_runtime.spawn(async move {
Expand Down
Loading
Loading