Skip to content

Commit 0da2267

Browse files
committed
chore: refine benchmark profile details
1 parent 8219fbd commit 0da2267

1 file changed

Lines changed: 22 additions & 81 deletions

File tree

benchmark/src/bin/benchmark_cloud.rs

Lines changed: 22 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,15 @@ struct ServerStatsRecord {
8585
spill_bytes: usize,
8686
}
8787

88-
#[derive(Serialize)]
89-
struct QueryExecutionRecord {
90-
node: String,
91-
query_id: String,
92-
process_rows: u64,
93-
process_time_in_micros: u64,
94-
}
95-
9688
#[derive(Serialize)]
9789
struct SystemHistoryRecord {
9890
#[serde(skip_serializing_if = "Option::is_none")]
9991
query_history: Option<serde_json::Value>,
10092
#[serde(skip_serializing_if = "Vec::is_empty")]
10193
profile_history: Vec<serde_json::Value>,
10294
#[serde(skip_serializing_if = "Option::is_none")]
95+
profile_statistics_desc: Option<serde_json::Value>,
96+
#[serde(skip_serializing_if = "Option::is_none")]
10397
error: Option<String>,
10498
}
10599

@@ -116,8 +110,6 @@ struct QueryAttempt {
116110
server_stats: Option<ServerStatsRecord>,
117111
#[serde(skip_serializing_if = "Vec::is_empty")]
118112
stats_samples: Vec<ServerStatsRecord>,
119-
#[serde(skip_serializing_if = "Vec::is_empty")]
120-
query_execution: Vec<QueryExecutionRecord>,
121113
#[serde(skip_serializing_if = "Option::is_none")]
122114
system_history: Option<SystemHistoryRecord>,
123115
#[serde(skip_serializing_if = "Option::is_none")]
@@ -172,6 +164,9 @@ fn load_config() -> Result<BenchmarkConfig> {
172164
if version.is_empty() {
173165
bail!("Please set BENCHMARK_VERSION to run the benchmark.");
174166
}
167+
if dataset == "load" {
168+
bail!("BENCHMARK_DATASET=load is not supported by benchmark-cloud");
169+
}
175170

176171
let tries: usize = tries_raw
177172
.parse()
@@ -352,13 +347,6 @@ async fn run_query_attempt(conn: &Connection, sql: &str, attempt: usize) -> Quer
352347

353348
let client_wall_ms = start.elapsed().as_millis();
354349
let query_id = conn.last_query_id();
355-
let query_execution = if let Some(query_id) = &query_id {
356-
collect_query_execution(conn, query_id)
357-
.await
358-
.unwrap_or_default()
359-
} else {
360-
Vec::new()
361-
};
362350
let elapsed_seconds = final_stats
363351
.as_ref()
364352
.map(|stats| round3(stats.running_time_ms / 1000.0))
@@ -372,52 +360,15 @@ async fn run_query_attempt(conn: &Connection, sql: &str, attempt: usize) -> Quer
372360
query_id,
373361
server_stats: final_stats,
374362
stats_samples,
375-
query_execution,
376363
system_history: None,
377364
error,
378365
}
379366
}
380367

381-
async fn collect_query_execution(
382-
conn: &Connection,
383-
query_id: &str,
384-
) -> Result<Vec<QueryExecutionRecord>> {
385-
let query_id_literal = quote_literal(query_id);
386-
let sql = format!(
387-
"select node, query_id, process_rows, process_time_in_micros \
388-
from system.query_execution where query_id = '{query_id_literal}' order by node"
389-
);
390-
391-
for delay in [0_u64, 100, 500, 1000, 2000] {
392-
if delay > 0 {
393-
tokio::time::sleep(Duration::from_millis(delay)).await;
394-
}
395-
let rows = conn.query_all(&sql).await?;
396-
if !rows.is_empty() {
397-
let mut records = Vec::with_capacity(rows.len());
398-
for row in rows {
399-
let (node, query_id, process_rows, process_time_in_micros): (
400-
String,
401-
String,
402-
u64,
403-
u64,
404-
) = row.try_into().map_err(|err: String| anyhow!(err))?;
405-
records.push(QueryExecutionRecord {
406-
node,
407-
query_id,
408-
process_rows,
409-
process_time_in_micros,
410-
});
411-
}
412-
return Ok(records);
413-
}
414-
}
415-
Ok(Vec::new())
416-
}
417-
418368
async fn collect_system_history(conn: &Connection, query_id: &str) -> SystemHistoryRecord {
419369
let mut query_history = None;
420370
let mut profile_history = Vec::new();
371+
let mut profile_statistics_desc = None;
421372
let mut error = None;
422373

423374
for delay in [0_u64, 500, 1000, 2000] {
@@ -435,9 +386,10 @@ async fn collect_system_history(conn: &Connection, query_id: &str) -> SystemHist
435386
}
436387

437388
match collect_profile_history(conn, query_id).await {
438-
Ok(records) => {
389+
Ok((records, statistics_desc)) => {
439390
if !records.is_empty() {
440391
profile_history = records;
392+
profile_statistics_desc = statistics_desc;
441393
}
442394
}
443395
Err(err) => {
@@ -454,6 +406,7 @@ async fn collect_system_history(conn: &Connection, query_id: &str) -> SystemHist
454406
SystemHistoryRecord {
455407
query_history,
456408
profile_history,
409+
profile_statistics_desc,
457410
error,
458411
}
459412
}
@@ -506,6 +459,7 @@ fn history_error(error: String) -> SystemHistoryRecord {
506459
SystemHistoryRecord {
507460
query_history: None,
508461
profile_history: Vec::new(),
462+
profile_statistics_desc: None,
509463
error: Some(error),
510464
}
511465
}
@@ -597,16 +551,16 @@ async fn collect_query_history(
597551
async fn collect_profile_history(
598552
conn: &Connection,
599553
query_id: &str,
600-
) -> Result<Vec<serde_json::Value>> {
554+
) -> Result<(Vec<serde_json::Value>, Option<serde_json::Value>)> {
601555
let query_id_literal = quote_literal(query_id);
602556
let sql = format!(
603557
r#"
604558
SELECT to_string(object_construct(
605559
'timestamp', timestamp,
606560
'query_id', query_id,
607-
'profiles', profiles,
608-
'statistics_desc', statistics_desc
609-
))
561+
'profiles', profiles
562+
)),
563+
to_string(statistics_desc)
610564
FROM system_history.profile_history
611565
WHERE query_id = '{query_id_literal}'
612566
ORDER BY timestamp
@@ -615,13 +569,19 @@ async fn collect_profile_history(
615569

616570
let rows = conn.query_all(&sql).await?;
617571
let mut records = Vec::with_capacity(rows.len());
572+
let mut statistics_desc = None;
618573
for row in rows {
619-
let (raw,): (Option<String>,) = row.try_into().map_err(|err: String| anyhow!(err))?;
574+
let (raw, statistics_desc_raw): (Option<String>, Option<String>) =
575+
row.try_into().map_err(|err: String| anyhow!(err))?;
620576
if let Some(raw) = raw {
621577
records.push(json_or_raw(&raw));
622578
}
579+
if statistics_desc.is_none() {
580+
statistics_desc = statistics_desc_raw.map(|raw| json_or_raw(&raw));
581+
}
623582
}
624-
Ok(records)
583+
584+
Ok((records, statistics_desc))
625585
}
626586

627587
fn json_or_raw(raw: &str) -> serde_json::Value {
@@ -724,7 +684,6 @@ async fn main() -> Result<()> {
724684
timing_source: "server_stats.running_time_ms".to_string(),
725685
detail_sources: vec![
726686
"databend-driver query_iter_ext".to_string(),
727-
"system.query_execution".to_string(),
728687
"system_history.query_history".to_string(),
729688
"system_history.profile_history".to_string(),
730689
],
@@ -756,16 +715,6 @@ async fn main() -> Result<()> {
756715
execute_sql(&admin_conn, "SHOW WAREHOUSES;").await?;
757716
wait_for_warehouse(&admin_conn, &config.warehouse).await?;
758717

759-
let warehouse_conn = connect(build_dsn(&config, None, Some(&config.warehouse), false)).await?;
760-
if config.dataset == "load" {
761-
println!("Creating database {} for load dataset...", config.database);
762-
execute_sql(
763-
&warehouse_conn,
764-
&format!("CREATE DATABASE {}", config.database),
765-
)
766-
.await?;
767-
}
768-
769718
let query_conn = connect(build_dsn(
770719
&config,
771720
Some(&config.database),
@@ -878,14 +827,6 @@ async fn main() -> Result<()> {
878827

879828
async fn cleanup(config: &BenchmarkConfig) -> Result<()> {
880829
let cleanup_conn = connect(build_dsn(config, None, Some("default"), true)).await?;
881-
if config.dataset == "load" {
882-
println!("Dropping database {}...", config.database);
883-
execute_sql(
884-
&cleanup_conn,
885-
&format!("DROP DATABASE IF EXISTS {}", config.database),
886-
)
887-
.await?;
888-
}
889830
println!("Dropping warehouse {}...", config.warehouse);
890831
execute_sql(
891832
&cleanup_conn,

0 commit comments

Comments
 (0)