Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use std::fs;
use std::io::{IsTerminal, stderr, stdout};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[cfg(feature = "postgres")]
Expand Down Expand Up @@ -76,6 +76,19 @@ struct FileTiming {
elapsed: Duration,
}

type DataFusionConfigChangeErrors = Arc<Mutex<Vec<String>>>;

fn config_change_result(
config_change_errors: &DataFusionConfigChangeErrors,
) -> Result<()> {
let errors = config_change_errors.lock().unwrap();
if errors.is_empty() {
Ok(())
} else {
Err(DataFusionError::External(errors.join("\n\n").into()))
}
}

pub fn main() -> Result<()> {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -482,7 +495,7 @@ async fn run_test_file_substrait_round_trip(
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
let res = run_file_in_runner(path, runner, filters, colored_output).await;
let res = run_file_in_runner(path, &mut runner, filters, colored_output).await;
pb.finish_and_clear();
res
}
Expand Down Expand Up @@ -512,26 +525,37 @@ async fn run_test_file(
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));

// If DataFusion configuration has changed during test file runs, errors will be
// pushed to this vec.
// HACK: managed externally because `sqllogictest` is an external dependency, and
// it doesn't have an API to directly access the inner runner.
let config_change_errors = Arc::new(Mutex::new(Vec::new()));
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
pb.clone(),
)
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone()))
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())
.with_config_change_errors(Arc::clone(&config_change_errors)))
});
runner.add_label("Datafusion");
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
let result = run_file_in_runner(path, runner, filters, colored_output).await;
let result = run_file_in_runner(path, &mut runner, filters, colored_output).await;
pb.finish_and_clear();
result

result?;

// If there was no correctness error, check that the config is unchanged.
runner.shutdown_async().await;
config_change_result(&config_change_errors)
}

async fn run_file_in_runner<D: AsyncDB, M: MakeConnection<Conn = D>>(
path: PathBuf,
mut runner: sqllogictest::Runner<D, M>,
runner: &mut sqllogictest::Runner<D, M>,
filters: &[Filter],
colored_output: bool,
) -> Result<()> {
Expand Down Expand Up @@ -644,7 +668,7 @@ async fn run_test_file_with_postgres(
runner.with_column_validator(strict_column_validator);
runner.with_normalizer(value_normalizer);
runner.with_validator(validator);
let result = run_file_in_runner(path, runner, filters, false).await;
let result = run_file_in_runner(path, &mut runner, filters, false).await;
pb.finish_and_clear();
result
}
Expand Down Expand Up @@ -688,13 +712,15 @@ async fn run_complete_file(
pb.set_style(mp_style);
pb.set_message(format!("{:?}", &relative_path));

let config_change_errors = Arc::new(Mutex::new(Vec::new()));
let mut runner = sqllogictest::Runner::new(|| async {
Ok(DataFusion::new(
test_ctx.session_ctx().clone(),
relative_path.clone(),
pb.clone(),
)
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone()))
.with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())
.with_config_change_errors(Arc::clone(&config_change_errors)))
});

let col_separator = " ";
Expand All @@ -712,7 +738,9 @@ async fn run_complete_file(

pb.finish_and_clear();

res
res?;
runner.shutdown_async().await;
config_change_result(&config_change_errors)
}

#[cfg(feature = "postgres")]
Expand Down
125 changes: 82 additions & 43 deletions datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::{path::PathBuf, time::Duration};

use super::{DFSqlLogicTestError, error::Result, normalize};
Expand All @@ -40,6 +40,7 @@ pub struct DataFusion {
pb: ProgressBar,
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
default_config: HashMap<String, Option<String>>,
config_change_errors: Option<Arc<Mutex<Vec<String>>>>,
}

impl DataFusion {
Expand All @@ -59,6 +60,7 @@ impl DataFusion {
pb,
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(),
default_config,
config_change_errors: None,
}
}

Expand All @@ -73,6 +75,14 @@ impl DataFusion {
self
}

pub fn with_config_change_errors(
mut self,
config_change_errors: Arc<Mutex<Vec<String>>>,
) -> Self {
self.config_change_errors = Some(config_change_errors);
self
}

fn update_slow_count(&self) {
let msg = self.pb.message();
let split: Vec<&str> = msg.split(" ").collect();
Expand All @@ -88,6 +98,43 @@ impl DataFusion {
self.pb
.set_message(format!("{} - {} took > 500 ms", split[0], current_count));
}

pub fn validate_config_unchanged(&mut self) -> Result<()> {
let mut changed = false;
let mut message = format!(
"SLT file {} left modified configuration",
self.relative_path.display()
);

for entry in self.ctx.state().config().options().entries() {
let default_entry = self.default_config.remove(&entry.key);

if let Some(default_entry) = default_entry
&& default_entry.as_ref() != entry.value.as_ref()
{
changed = true;

let default = default_entry.as_deref().unwrap_or("NULL");
let current = entry.value.as_deref().unwrap_or("NULL");

message
.push_str(&format!("\n {}: {} -> {}", entry.key, default, current));
}
}

for (key, value) in &self.default_config {
changed = true;

let default = value.as_deref().unwrap_or("NULL");
message.push_str(&format!("\n {key}: {default} -> NULL"));
}

if changed {
Err(DFSqlLogicTestError::Other(message))
} else {
Ok(())
}
}
}

#[async_trait]
Expand Down Expand Up @@ -142,48 +189,12 @@ impl sqllogictest::AsyncDB for DataFusion {
tokio::time::sleep(dur).await;
}

async fn shutdown(&mut self) {}
}

impl Drop for DataFusion {
fn drop(&mut self) {
let mut changed = false;

for e in self.ctx.state().config().options().entries() {
let default_entry = self.default_config.remove(&e.key);

if let Some(default_entry) = default_entry
&& default_entry.as_ref() != e.value.as_ref()
{
if !changed {
changed = true;
self.pb.println(format!(
"SLT file {} left modified configuration",
self.relative_path.display()
));
}

let default = default_entry.as_deref().unwrap_or("NULL");
let current = e.value.as_deref().unwrap_or("NULL");

self.pb
.println(format!(" {}: {} -> {}", e.key, default, current));
}
}

// Any remaining entries were present initially but removed during execution
for (key, value) in &self.default_config {
if !changed {
changed = true;
self.pb.println(format!(
"SLT file {} left modified configuration",
self.relative_path.display()
));
}

let default = value.as_deref().unwrap_or("NULL");

self.pb.println(format!(" {key}: {default} -> NULL"));
/// Shutdown and check no DataFusion configuration has changed during test
async fn shutdown(&mut self) {
if let Some(config_change_errors) = self.config_change_errors.clone()
&& let Err(error) = self.validate_config_unchanged()
{
config_change_errors.lock().unwrap().push(error.to_string());
}
}
}
Expand All @@ -209,3 +220,31 @@ async fn run_query(
Ok(DBOutput::Rows { types, rows })
}
}

#[cfg(test)]
mod tests {
use super::*;
use sqllogictest::AsyncDB;

#[tokio::test]
async fn validate_config_unchanged_detects_modified_config() {
let ctx = SessionContext::new();
let default_batch_size = ctx.state().config().options().execution.batch_size;
let mut runner =
DataFusion::new(ctx, PathBuf::from("test.slt"), ProgressBar::hidden());

<DataFusion as AsyncDB>::run(
&mut runner,
"SET datafusion.execution.batch_size = 2048",
)
.await
.unwrap();

let error = runner.validate_config_unchanged().unwrap_err();
let message = error.to_string();

assert!(message.contains("test.slt left modified configuration"));
assert!(message.contains("datafusion.execution.batch_size"));
assert!(message.contains(&format!("{default_batch_size} -> 2048")));
}
}
9 changes: 6 additions & 3 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8778,9 +8778,12 @@ ORDER BY g;
2 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5
3 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5

statement ok
DROP TABLE stream_test;
# Config reset

# Restore default target partitions
# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
DROP TABLE stream_test;
7 changes: 7 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate_repartition.slt
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,10 @@ physical_plan
01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]
02)--AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet

# Config reset

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
SET datafusion.execution.target_partitions = 4;
45 changes: 45 additions & 0 deletions datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,21 @@ c true false NULL
d NULL false NULL
e true false NULL

# Config reset
statement ok
reset datafusion.execution.batch_size;

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_ratio_threshold;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_rows_threshold;

# Prepare settings to always skip aggregation after couple of batches
statement ok
set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10;
Expand Down Expand Up @@ -693,6 +708,21 @@ ORDER BY i;
2 66
3 33

# Config reset
statement ok
reset datafusion.execution.batch_size;

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_ratio_threshold;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_rows_threshold;

statement ok
DROP TABLE decimal_table;

Expand Down Expand Up @@ -738,5 +768,20 @@ SELECT bool_and(c1), bool_and(c2), bool_and(c3), bool_and(c4), bool_and(c5), boo
----
true false false false false true false NULL

# Config reset
statement ok
reset datafusion.execution.batch_size;

# The SLT runner sets `target_partitions` to 4 instead of using the default, so
# reset it explicitly.
statement ok
set datafusion.execution.target_partitions = 4;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_ratio_threshold;

statement ok
reset datafusion.execution.skip_partial_aggregation_probe_rows_threshold;

statement ok
DROP TABLE aggregate_test_100_bool
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/arrow_files.slt
Original file line number Diff line number Diff line change
Expand Up @@ -388,3 +388,7 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/
# querying corrupted stream format should result in error
query error DataFusion error: Arrow error: Parser error: Unsupported message header type in IPC stream: 'NONE'
SELECT * FROM arrow_stream_corrupted_metadata_length

# Config reset
statement ok
RESET datafusion.sql_parser.map_string_types_to_utf8view;
4 changes: 4 additions & 0 deletions datafusion/sqllogictest/test_files/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,7 @@ SELECT id, string_col, int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT
2 0 0 0
3 1 1 10
4 0 0 0

# Config reset
statement ok
reset datafusion.sql_parser.map_string_types_to_utf8view;
Loading
Loading