diff --git a/src/common/base/src/runtime/runtime_tracker.rs b/src/common/base/src/runtime/runtime_tracker.rs index f6221e8ef18cf..1232ee585e0bf 100644 --- a/src/common/base/src/runtime/runtime_tracker.rs +++ b/src/common/base/src/runtime/runtime_tracker.rs @@ -134,6 +134,7 @@ impl CaptureLogSettings { #[derive(Clone)] pub struct TrackingPayload { pub query_id: Option, + pub warehouse_id: Option, pub profile: Option>, pub mem_stat: Option>, pub metrics: Option>, @@ -216,6 +217,7 @@ impl ThreadTracker { time_series_profile: None, local_time_series_profile: None, workload_group_resource: None, + warehouse_id: None, }), } } @@ -314,6 +316,19 @@ impl ThreadTracker { .unwrap_or(None) } + pub fn warehouse_id() -> Option<&'static String> { + TRACKER + .try_with(|tracker| { + tracker + .borrow() + .payload + .warehouse_id + .as_ref() + .map(|warehouse_id| unsafe { &*(warehouse_id as *const String) }) + }) + .unwrap_or(None) + } + pub fn capture_log_settings() -> Option<&'static Arc> { TRACKER .try_with(|tracker| { diff --git a/src/common/tracing/src/predefined_tables/history_tables.toml b/src/common/tracing/src/predefined_tables/history_tables.toml index b504f9619d333..5b97ae9126229 100644 --- a/src/common/tracing/src/predefined_tables/history_tables.toml +++ b/src/common/tracing/src/predefined_tables/history_tables.toml @@ -9,8 +9,8 @@ delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hour [[tables]] name = "query_history" target = "databend::log::query" -create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)" -transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;" +create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL, warehouse_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)" +transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id, f['warehouse_id'] as warehouse_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;" delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})" [[tables]] diff --git a/src/common/tracing/src/remote_log.rs b/src/common/tracing/src/remote_log.rs index 4e545e2583360..58b0c024c3d37 100644 --- a/src/common/tracing/src/remote_log.rs +++ b/src/common/tracing/src/remote_log.rs @@ -58,7 +58,6 @@ use crate::GlobalLogger; pub struct RemoteLog { cluster_id: String, node_id: String, - warehouse_id: Option, buffer: Arc, } @@ -113,13 +112,9 @@ impl RemoteLog { let node_id = labels.get("node_id").cloned().unwrap_or_default(); let rt = Runtime::with_worker_threads(2, Some("remote-log-writer".to_string()))?; let (tx, rx) = bounded(1); - // warehouse_id need to be specified after `create warehouse` - // TODO: inject warehouse_id like query_id - let warehouse_id = None; let remote_log = RemoteLog { cluster_id: labels.get("cluster_id").cloned().unwrap_or_default(), node_id: node_id.clone(), - warehouse_id, buffer: Arc::new(LogBuffer::new(tx.clone(), interval as u64)), }; rt.spawn(async move { RemoteLog::work(rx, &stage_name).await }); @@ -208,6 +203,7 @@ impl RemoteLog { pub fn prepare_log_element(&self, record: &Record) -> RemoteLogElement { let query_id = ThreadTracker::query_id().cloned(); + let warehouse_id = ThreadTracker::warehouse_id().cloned(); let mut fields = Map::new(); let target = record.target().to_string(); let message = record.args().to_string(); @@ -235,7 +231,7 @@ impl RemoteLog { target, cluster_id: self.cluster_id.clone(), node_id: self.node_id.clone(), - warehouse_id: self.warehouse_id.clone(), + warehouse_id, query_id, log_level, message, diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index a90f487ee5131..c9d73694371bc 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -583,6 +583,21 @@ impl ClusterDiscovery { heartbeat.start(node_info, seq); Ok(()) } + + pub async fn get_current_warehouse_id(&self) -> Result> { + let config = GlobalConfig::instance(); + let cluster = self.discover(&config).await?; + for node in cluster.nodes.iter() { + if node.id == cluster.local_id { + if node.warehouse_id.is_empty() { + break; + } else { + return Ok(Some(node.warehouse_id.clone())); + } + } + } + Ok(None) + } } struct ClusterHeartbeat { diff --git a/src/query/service/src/interpreters/common/query_log.rs b/src/query/service/src/interpreters/common/query_log.rs index 3b5ae6903be9c..426eb59fe8e69 100644 --- a/src/query/service/src/interpreters/common/query_log.rs +++ b/src/query/service/src/interpreters/common/query_log.rs @@ -17,6 +17,7 @@ use std::fmt::Write; use std::sync::Arc; use std::time::SystemTime; +use databend_common_base::runtime::ThreadTracker; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -162,6 +163,9 @@ impl InterpreterQueryLog { let txn_state = format!("{:?}", guard.state()); let txn_id = guard.txn_id().to_string(); drop(guard); + + let warehouse_id = ThreadTracker::warehouse_id().cloned().unwrap_or_default(); + Self::write_log(QueryLogElement { log_type, log_type_name, @@ -228,6 +232,7 @@ impl InterpreterQueryLog { peek_memory_usage: HashMap::new(), session_id, + warehouse_id, }) } @@ -342,6 +347,8 @@ impl InterpreterQueryLog { let peek_memory_usage = ctx.get_node_peek_memory_usage(); + let warehouse_id = ThreadTracker::warehouse_id().cloned().unwrap_or_default(); + Self::write_log(QueryLogElement { log_type, log_type_name, @@ -407,6 +414,7 @@ impl InterpreterQueryLog { txn_id, peek_memory_usage, session_id, + warehouse_id, }) } } diff --git a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs index 93ecb10ed3416..7479f0648708c 100644 --- a/src/query/service/src/servers/flight/v1/actions/init_query_env.rs +++ b/src/query/service/src/servers/flight/v1/actions/init_query_env.rs @@ -23,6 +23,7 @@ use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_management::WorkloadGroupResourceManager; +use crate::clusters::ClusterDiscovery; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::servers::flight::v1::packets::QueryEnv; @@ -44,9 +45,15 @@ pub async fn init_query_env(mut env: QueryEnv) -> Result<()> { let name = Some(env.query_id.clone()); let query_mem_stat = MemStat::create_child(name, 0, parent_mem_stat); + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(env.query_id.clone()); + tracking_payload.warehouse_id = warehouse_id; tracking_payload.mem_stat = Some(query_mem_stat.clone()); tracking_payload.workload_group_resource = tracking_workload_group; let _guard = ThreadTracker::tracking(tracking_payload); diff --git a/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs b/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs index 171f56b5f8deb..f7bb88801d009 100644 --- a/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs +++ b/src/query/service/src/servers/flight/v1/actions/init_query_fragments.rs @@ -17,6 +17,7 @@ use databend_common_base::runtime::TrySpawn; use databend_common_exception::Result; use log::debug; +use crate::clusters::ClusterDiscovery; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::servers::flight::v1::packets::QueryFragments; @@ -24,10 +25,16 @@ pub static INIT_QUERY_FRAGMENTS: &str = "/actions/init_query_fragments"; pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> { let ctx = DataExchangeManager::instance().get_query_ctx(&fragments.query_id)?; - + let warehouse = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.mem_stat = ctx.get_query_memory_tracking(); tracking_payload.query_id = Some(fragments.query_id.clone()); + tracking_payload.warehouse_id = warehouse; + let _guard = ThreadTracker::tracking(tracking_payload); debug!("init query fragments with {:?}", fragments); diff --git a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs index 7a760c33dea65..c833dff54fa54 100644 --- a/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs +++ b/src/query/service/src/servers/flight/v1/actions/start_prepared_query.rs @@ -16,6 +16,7 @@ use databend_common_base::runtime::ThreadTracker; use databend_common_exception::Result; use log::debug; +use crate::clusters::ClusterDiscovery; use crate::servers::flight::v1::exchange::DataExchangeManager; pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query"; @@ -23,10 +24,15 @@ pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query"; pub async fn start_prepared_query(id: String) -> Result<()> { let id = id.replace('-', ""); let ctx = DataExchangeManager::instance().get_query_ctx(&id)?; - + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(id.clone()); tracking_payload.mem_stat = ctx.get_query_memory_tracking(); + tracking_payload.warehouse_id = warehouse_id; let _guard = ThreadTracker::tracking(tracking_payload); debug!("start prepared query {}", id); diff --git a/src/query/service/src/servers/http/v1/http_query_handlers.rs b/src/query/service/src/servers/http/v1/http_query_handlers.rs index 941ddb8809bdd..a177f705557aa 100644 --- a/src/query/service/src/servers/http/v1/http_query_handlers.rs +++ b/src/query/service/src/servers/http/v1/http_query_handlers.rs @@ -418,11 +418,17 @@ async fn query_page_handler( let _t = SlowRequestLogTracker::new(ctx); query_page_handle.in_span(root) }; + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let query_page_handle = { let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.mem_stat = query_mem_stat; tracking_payload.query_id = Some(query_id.clone()); + tracking_payload.warehouse_id = warehouse_id; let _tracking_guard = ThreadTracker::tracking(tracking_payload); ThreadTracker::tracking_future(query_page_handle) }; @@ -534,11 +540,16 @@ pub(crate) async fn query_handler( parent_mem_stat = ParentMemStat::Normal(workload_group.mem_stat.clone()); tracking_workload_group = Some(workload_group); } - + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let name = Some(ctx.query_id.clone()); let query_mem_stat = MemStat::create_child(name, 0, parent_mem_stat); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(ctx.query_id.clone()); + tracking_payload.warehouse_id = warehouse_id; tracking_payload.mem_stat = Some(query_mem_stat.clone()); tracking_payload.workload_group_resource = tracking_workload_group; let _tracking_guard = ThreadTracker::tracking(tracking_payload); diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 9ac973836d88c..0b078173a70f1 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -52,6 +52,7 @@ use rand::RngCore; use uuid::Uuid; use crate::auth::CredentialType; +use crate::clusters::ClusterDiscovery; use crate::interpreters::interpreter_plan_sql; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; @@ -224,9 +225,14 @@ impl AsyncMysqlShim for InteractiveWorke SpanContext::new(TraceId(query_id.as_u128()), SpanId::default()).sampled(sampled); let root = Span::root(func_path!(), span_context) .with_properties(|| self.base.session.to_fastrace_properties()); - + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(query_id_str.clone()); + tracking_payload.warehouse_id = warehouse_id; tracking_payload.mem_stat = Some(MemStat::create(query_id_str.to_string())); let _guard = ThreadTracker::tracking(tracking_payload); @@ -492,9 +498,14 @@ impl InteractiveWorkerBase { let query_id = Uuid::new_v4().to_string(); let init_query = format!("USE `{}`;", database_name); - + let warehouse_id = ClusterDiscovery::instance() + .get_current_warehouse_id() + .await + .ok() + .flatten(); let mut tracking_payload = ThreadTracker::new_tracking_payload(); tracking_payload.query_id = Some(query_id.clone()); + tracking_payload.warehouse_id = warehouse_id; tracking_payload.mem_stat = Some(MemStat::create(query_id.clone())); let _guard = ThreadTracker::tracking(tracking_payload); diff --git a/src/query/storages/system/src/query_log_table.rs b/src/query/storages/system/src/query_log_table.rs index c6a6fd0c77463..9669cc5d2be09 100644 --- a/src/query/storages/system/src/query_log_table.rs +++ b/src/query/storages/system/src/query_log_table.rs @@ -168,4 +168,6 @@ pub struct QueryLogElement { pub peek_memory_usage: HashMap, pub session_id: String, + + pub warehouse_id: String, }