Skip to content
63 changes: 58 additions & 5 deletions datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::{path::PathBuf, time::Duration};

Expand All @@ -38,29 +39,38 @@ pub struct DataFusion {
relative_path: PathBuf,
pb: ProgressBar,
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
default_config: HashMap<String, Option<String>>,
}

impl DataFusion {
pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self {
let default_config = ctx
.state()
.config()
.options()
.entries()
.iter()
.map(|e| (e.key.clone(), e.value.clone()))
.collect();

Self {
ctx,
relative_path,
pb,
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(),
default_config,
}
}

/// Add a tracker that will track the currently executed SQL statement.
///
/// This is useful for logging and debugging purposes.
pub fn with_currently_executing_sql_tracker(
self,
mut self,
currently_executing_sql_tracker: CurrentlyExecutingSqlTracker,
) -> Self {
Self {
currently_executing_sql_tracker,
..self
}
self.currently_executing_sql_tracker = currently_executing_sql_tracker;
self
}

fn update_slow_count(&self) {
Expand Down Expand Up @@ -135,6 +145,49 @@ impl sqllogictest::AsyncDB for DataFusion {
async fn shutdown(&mut self) {}
}

impl Drop for DataFusion {
fn drop(&mut self) {
let mut changed = false;

for e in self.ctx.state().config().options().entries() {
let default_entry = self.default_config.remove(&e.key);

if let Some(default_entry) = default_entry
&& default_entry.as_ref() != e.value.as_ref()
{
if !changed {
changed = true;
self.pb.println(format!(
"SLT file {} left modified configuration",
self.relative_path.display()
));
}

let default = default_entry.as_deref().unwrap_or("NULL");
let current = e.value.as_deref().unwrap_or("NULL");

self.pb
.println(format!(" {}: {} -> {}", e.key, default, current));
}
}

// Any remaining entries were present initially but removed during execution
for (key, value) in &self.default_config {
if !changed {
changed = true;
self.pb.println(format!(
"SLT file {} left modified configuration",
self.relative_path.display()
));
}

let default = value.as_deref().unwrap_or("NULL");

self.pb.println(format!(" {key}: {default} -> NULL"));
}
}
}

async fn run_query(
ctx: &SessionContext,
is_spark_path: bool,
Expand Down
Loading