Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/common/base/src/runtime/runtime_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl CaptureLogSettings {
#[derive(Clone)]
pub struct TrackingPayload {
pub query_id: Option<String>,
pub warehouse_id: Option<String>,
pub profile: Option<Arc<Profile>>,
pub mem_stat: Option<Arc<MemStat>>,
pub metrics: Option<Arc<ScopedRegistry>>,
Expand Down Expand Up @@ -216,6 +217,7 @@ impl ThreadTracker {
time_series_profile: None,
local_time_series_profile: None,
workload_group_resource: None,
warehouse_id: None,
}),
}
}
Expand Down Expand Up @@ -314,6 +316,19 @@ impl ThreadTracker {
.unwrap_or(None)
}

pub fn warehouse_id() -> Option<&'static String> {
TRACKER
.try_with(|tracker| {
tracker
.borrow()
.payload
.warehouse_id
.as_ref()
.map(|warehouse_id| unsafe { &*(warehouse_id as *const String) })
})
.unwrap_or(None)
}

pub fn capture_log_settings() -> Option<&'static Arc<CaptureLogSettings>> {
TRACKER
.try_with(|tracker| {
Expand Down
4 changes: 2 additions & 2 deletions src/common/tracing/src/predefined_tables/history_tables.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ delete = "DELETE FROM system_history.log_history WHERE timestamp < subtract_hour
[[tables]]
name = "query_history"
target = "databend::log::query"
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)"
transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;"
create = "CREATE TABLE IF NOT EXISTS system_history.query_history (log_type Int8 NULL, log_type_name STRING NULL, handler_type STRING NULL, tenant_id STRING NULL, cluster_id STRING NULL, node_id STRING NULL, sql_user STRING NULL, sql_user_quota STRING NULL, sql_user_privileges STRING NULL, query_id STRING NULL, query_kind STRING NULL, query_text STRING NULL, query_hash STRING NULL, query_parameterized_hash STRING NULL, event_date DATE NULL, event_time TIMESTAMP NULL, query_start_time TIMESTAMP NULL, query_duration_ms Int64 NULL, query_queued_duration_ms Int64 NULL, current_database STRING NULL, written_rows UInt64 NULL, written_bytes UInt64 NULL, join_spilled_rows UInt64 NULL, join_spilled_bytes UInt64 NULL, agg_spilled_rows UInt64 NULL, agg_spilled_bytes UInt64 NULL, group_by_spilled_rows UInt64 NULL, group_by_spilled_bytes UInt64 NULL, written_io_bytes UInt64 NULL, written_io_bytes_cost_ms UInt64 NULL, scan_rows UInt64 NULL, scan_bytes UInt64 NULL, scan_io_bytes UInt64 NULL, scan_io_bytes_cost_ms UInt64 NULL, scan_partitions UInt64 NULL, total_partitions UInt64 NULL, result_rows UInt64 NULL, result_bytes UInt64 NULL, bytes_from_remote_disk UInt64 NULL, bytes_from_local_disk UInt64 NULL, bytes_from_memory UInt64 NULL, client_address STRING NULL, user_agent STRING NULL, exception_code Int32 NULL, exception_text STRING NULL, server_version STRING NULL, query_tag STRING NULL, has_profile BOOLEAN NULL, peek_memory_usage VARIANT NULL, session_id STRING NULL, warehouse_id STRING NULL) CLUSTER BY LINEAR(event_time, query_id)"
transform = "settings (timezone='Etc/UTC') MERGE INTO system_history.query_history AS target USING (SELECT f['log_type'] AS log_type, f['log_type_name'] AS log_type_name, f['handler_type'] AS handler_type, f['tenant_id'] AS tenant_id, f['cluster_id'] AS cluster_id, f['node_id'] AS node_id, f['sql_user'] AS sql_user, f['sql_user_quota'] AS sql_user_quota, f['sql_user_privileges'] AS sql_user_privileges, f['query_id'] AS query_id, f['query_kind'] AS query_kind, f['query_text'] AS query_text, f['query_hash'] AS query_hash, f['query_parameterized_hash'] AS query_parameterized_hash, f['event_date'] AS event_date, f['event_time'] AS event_time, f['query_start_time'] AS query_start_time, f['query_duration_ms'] AS query_duration_ms, f['query_queued_duration_ms'] AS query_queued_duration_ms, f['current_database'] AS current_database, f['written_rows'] AS written_rows, f['written_bytes'] AS written_bytes, f['join_spilled_rows'] AS join_spilled_rows, f['join_spilled_bytes'] AS join_spilled_bytes, f['agg_spilled_rows'] AS agg_spilled_rows, f['agg_spilled_bytes'] AS agg_spilled_bytes, f['group_by_spilled_rows'] AS group_by_spilled_rows, f['group_by_spilled_bytes'] AS group_by_spilled_bytes, f['written_io_bytes'] AS written_io_bytes, f['written_io_bytes_cost_ms'] AS written_io_bytes_cost_ms, f['scan_rows'] AS scan_rows, f['scan_bytes'] AS scan_bytes, f['scan_io_bytes'] AS scan_io_bytes, f['scan_io_bytes_cost_ms'] AS scan_io_bytes_cost_ms, f['scan_partitions'] AS scan_partitions, f['total_partitions'] AS total_partitions, f['result_rows'] AS result_rows, f['result_bytes'] AS result_bytes, f['bytes_from_remote_disk'] AS bytes_from_remote_disk, f['bytes_from_local_disk'] AS bytes_from_local_disk, f['bytes_from_memory'] AS bytes_from_memory, f['client_address'] AS client_address, f['user_agent'] AS user_agent, f['exception_code'] AS exception_code, f['exception_text'] AS exception_text, f['server_version'] AS server_version, f['query_tag'] AS query_tag, f['has_profile'] AS has_profile, f['peek_memory_usage'] AS peek_memory_usage, f['session_id'] AS session_id, f['warehouse_id'] as warehouse_id FROM (SELECT ARG_MAX(m, m['log_type']) AS f FROM (SELECT parse_json(message) AS m FROM system_history.log_history WHERE target = 'databend::log::query' AND batch_number >= {batch_begin} AND batch_number < {batch_end}) AS parsed_data GROUP BY m['query_id'])) AS source ON target.query_id = source.query_id WHEN MATCHED AND source.log_type IN (2, 3, 4, 5) THEN UPDATE * WHEN NOT MATCHED THEN INSERT *;"
delete = "DELETE FROM system_history.query_history WHERE event_time < subtract_hours(NOW(), {retention_hours})"

[[tables]]
Expand Down
8 changes: 2 additions & 6 deletions src/common/tracing/src/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::GlobalLogger;
pub struct RemoteLog {
cluster_id: String,
node_id: String,
warehouse_id: Option<String>,
buffer: Arc<LogBuffer>,
}

Expand Down Expand Up @@ -113,13 +112,9 @@ impl RemoteLog {
let node_id = labels.get("node_id").cloned().unwrap_or_default();
let rt = Runtime::with_worker_threads(2, Some("remote-log-writer".to_string()))?;
let (tx, rx) = bounded(1);
// warehouse_id need to be specified after `create warehouse`
// TODO: inject warehouse_id like query_id
let warehouse_id = None;
let remote_log = RemoteLog {
cluster_id: labels.get("cluster_id").cloned().unwrap_or_default(),
node_id: node_id.clone(),
warehouse_id,
buffer: Arc::new(LogBuffer::new(tx.clone(), interval as u64)),
};
rt.spawn(async move { RemoteLog::work(rx, &stage_name).await });
Expand Down Expand Up @@ -208,6 +203,7 @@ impl RemoteLog {

pub fn prepare_log_element(&self, record: &Record) -> RemoteLogElement {
let query_id = ThreadTracker::query_id().cloned();
let warehouse_id = ThreadTracker::warehouse_id().cloned();
let mut fields = Map::new();
let target = record.target().to_string();
let message = record.args().to_string();
Expand Down Expand Up @@ -235,7 +231,7 @@ impl RemoteLog {
target,
cluster_id: self.cluster_id.clone(),
node_id: self.node_id.clone(),
warehouse_id: self.warehouse_id.clone(),
warehouse_id,
query_id,
log_level,
message,
Expand Down
15 changes: 15 additions & 0 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,21 @@ impl ClusterDiscovery {
heartbeat.start(node_info, seq);
Ok(())
}

pub async fn get_current_warehouse_id(&self) -> Result<Option<String>> {
let config = GlobalConfig::instance();
let cluster = self.discover(&config).await?;
for node in cluster.nodes.iter() {
if node.id == cluster.local_id {
if node.warehouse_id.is_empty() {
break;
} else {
return Ok(Some(node.warehouse_id.clone()));
}
}
}
Ok(None)
}
}

struct ClusterHeartbeat {
Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/interpreters/common/query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fmt::Write;
use std::sync::Arc;
use std::time::SystemTime;

use databend_common_base::runtime::ThreadTracker;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -162,6 +163,9 @@ impl InterpreterQueryLog {
let txn_state = format!("{:?}", guard.state());
let txn_id = guard.txn_id().to_string();
drop(guard);

let warehouse_id = ThreadTracker::warehouse_id().cloned().unwrap_or_default();

Self::write_log(QueryLogElement {
log_type,
log_type_name,
Expand Down Expand Up @@ -228,6 +232,7 @@ impl InterpreterQueryLog {
peek_memory_usage: HashMap::new(),

session_id,
warehouse_id,
})
}

Expand Down Expand Up @@ -342,6 +347,8 @@ impl InterpreterQueryLog {

let peek_memory_usage = ctx.get_node_peek_memory_usage();

let warehouse_id = ThreadTracker::warehouse_id().cloned().unwrap_or_default();

Self::write_log(QueryLogElement {
log_type,
log_type_name,
Expand Down Expand Up @@ -407,6 +414,7 @@ impl InterpreterQueryLog {
txn_id,
peek_memory_usage,
session_id,
warehouse_id,
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_config::GlobalConfig;
use databend_common_exception::Result;
use databend_common_management::WorkloadGroupResourceManager;

use crate::clusters::ClusterDiscovery;
use crate::servers::flight::v1::exchange::DataExchangeManager;
use crate::servers::flight::v1::packets::QueryEnv;

Expand All @@ -44,9 +45,15 @@ pub async fn init_query_env(mut env: QueryEnv) -> Result<()> {

let name = Some(env.query_id.clone());
let query_mem_stat = MemStat::create_child(name, 0, parent_mem_stat);
let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();

let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(env.query_id.clone());
tracking_payload.warehouse_id = warehouse_id;
tracking_payload.mem_stat = Some(query_mem_stat.clone());
tracking_payload.workload_group_resource = tracking_workload_group;
let _guard = ThreadTracker::tracking(tracking_payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@ use databend_common_base::runtime::TrySpawn;
use databend_common_exception::Result;
use log::debug;

use crate::clusters::ClusterDiscovery;
use crate::servers::flight::v1::exchange::DataExchangeManager;
use crate::servers::flight::v1::packets::QueryFragments;

pub static INIT_QUERY_FRAGMENTS: &str = "/actions/init_query_fragments";

pub async fn init_query_fragments(fragments: QueryFragments) -> Result<()> {
let ctx = DataExchangeManager::instance().get_query_ctx(&fragments.query_id)?;

let warehouse = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.mem_stat = ctx.get_query_memory_tracking();
tracking_payload.query_id = Some(fragments.query_id.clone());
tracking_payload.warehouse_id = warehouse;

let _guard = ThreadTracker::tracking(tracking_payload);

debug!("init query fragments with {:?}", fragments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ use databend_common_base::runtime::ThreadTracker;
use databend_common_exception::Result;
use log::debug;

use crate::clusters::ClusterDiscovery;
use crate::servers::flight::v1::exchange::DataExchangeManager;

pub static START_PREPARED_QUERY: &str = "/actions/start_prepared_query";

pub async fn start_prepared_query(id: String) -> Result<()> {
let id = id.replace('-', "");
let ctx = DataExchangeManager::instance().get_query_ctx(&id)?;

let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(id.clone());
tracking_payload.mem_stat = ctx.get_query_memory_tracking();
tracking_payload.warehouse_id = warehouse_id;
let _guard = ThreadTracker::tracking(tracking_payload);

debug!("start prepared query {}", id);
Expand Down
13 changes: 12 additions & 1 deletion src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,17 @@ async fn query_page_handler(
let _t = SlowRequestLogTracker::new(ctx);
query_page_handle.in_span(root)
};
let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();

let query_page_handle = {
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.mem_stat = query_mem_stat;
tracking_payload.query_id = Some(query_id.clone());
tracking_payload.warehouse_id = warehouse_id;
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
ThreadTracker::tracking_future(query_page_handle)
};
Expand Down Expand Up @@ -534,11 +540,16 @@ pub(crate) async fn query_handler(
parent_mem_stat = ParentMemStat::Normal(workload_group.mem_stat.clone());
tracking_workload_group = Some(workload_group);
}

let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();
let name = Some(ctx.query_id.clone());
let query_mem_stat = MemStat::create_child(name, 0, parent_mem_stat);
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(ctx.query_id.clone());
tracking_payload.warehouse_id = warehouse_id;
tracking_payload.mem_stat = Some(query_mem_stat.clone());
tracking_payload.workload_group_resource = tracking_workload_group;
let _tracking_guard = ThreadTracker::tracking(tracking_payload);
Expand Down
15 changes: 13 additions & 2 deletions src/query/service/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use rand::RngCore;
use uuid::Uuid;

use crate::auth::CredentialType;
use crate::clusters::ClusterDiscovery;
use crate::interpreters::interpreter_plan_sql;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
Expand Down Expand Up @@ -224,9 +225,14 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for InteractiveWorke
SpanContext::new(TraceId(query_id.as_u128()), SpanId::default()).sampled(sampled);
let root = Span::root(func_path!(), span_context)
.with_properties(|| self.base.session.to_fastrace_properties());

let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(query_id_str.clone());
tracking_payload.warehouse_id = warehouse_id;
tracking_payload.mem_stat = Some(MemStat::create(query_id_str.to_string()));
let _guard = ThreadTracker::tracking(tracking_payload);

Expand Down Expand Up @@ -492,9 +498,14 @@ impl InteractiveWorkerBase {

let query_id = Uuid::new_v4().to_string();
let init_query = format!("USE `{}`;", database_name);

let warehouse_id = ClusterDiscovery::instance()
.get_current_warehouse_id()
.await
.ok()
.flatten();
let mut tracking_payload = ThreadTracker::new_tracking_payload();
tracking_payload.query_id = Some(query_id.clone());
tracking_payload.warehouse_id = warehouse_id;
tracking_payload.mem_stat = Some(MemStat::create(query_id.clone()));
let _guard = ThreadTracker::tracking(tracking_payload);

Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/system/src/query_log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,6 @@ pub struct QueryLogElement {
pub peek_memory_usage: HashMap<String, usize>,

pub session_id: String,

pub warehouse_id: String,
}
Loading