Skip to content

Commit 71fcd03

Browse files
Weijun-HOmega359
andauthored
feat: add RESET statement for configuration variabless (#18408)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18384 ## Rationale for this change Without a SQL-level reset, clients that SET DataFusion options have to rebuild the session to recover defaults <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - Extended the config macros/traits so every namespace knows how to restore default values - Added the `ResetVariable` <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Yes, SQL clients (including the CLI) can issue RESET --------- Co-authored-by: Bruce Ritchie <bruce.ritchie@veeva.com>
1 parent bd30fe2 commit 71fcd03

11 files changed

Lines changed: 410 additions & 19 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ macro_rules! config_namespace {
185185
self.$field_name.visit(v, key.as_str(), desc);
186186
)*
187187
}
188+
189+
fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
190+
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
191+
match key {
192+
$(
193+
stringify!($field_name) => {
194+
#[allow(deprecated)]
195+
{
196+
if rem.is_empty() {
197+
let default_value: $field_type = $default;
198+
self.$field_name = default_value;
199+
Ok(())
200+
} else {
201+
self.$field_name.reset(rem)
202+
}
203+
}
204+
},
205+
)*
206+
_ => $crate::error::_config_err!(
207+
"Config value \"{}\" not found on {}",
208+
key,
209+
stringify!($struct_name)
210+
),
211+
}
212+
}
188213
}
189214
impl Default for $struct_name {
190215
fn default() -> Self {
@@ -1169,6 +1194,45 @@ impl ConfigField for ConfigOptions {
11691194
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
11701195
}
11711196
}
1197+
1198+
/// Reset a configuration option back to its default value
1199+
fn reset(&mut self, key: &str) -> Result<()> {
1200+
let Some((prefix, rest)) = key.split_once('.') else {
1201+
return _config_err!("could not find config namespace for key \"{key}\"");
1202+
};
1203+
1204+
if prefix != "datafusion" {
1205+
return _config_err!("Could not find config namespace \"{prefix}\"");
1206+
}
1207+
1208+
let (section, rem) = rest.split_once('.').unwrap_or((rest, ""));
1209+
if rem.is_empty() {
1210+
return _config_err!("could not find config field for key \"{key}\"");
1211+
}
1212+
1213+
match section {
1214+
"catalog" => self.catalog.reset(rem),
1215+
"execution" => self.execution.reset(rem),
1216+
"optimizer" => {
1217+
if rem == "enable_dynamic_filter_pushdown" {
1218+
let defaults = OptimizerOptions::default();
1219+
self.optimizer.enable_dynamic_filter_pushdown =
1220+
defaults.enable_dynamic_filter_pushdown;
1221+
self.optimizer.enable_topk_dynamic_filter_pushdown =
1222+
defaults.enable_topk_dynamic_filter_pushdown;
1223+
self.optimizer.enable_join_dynamic_filter_pushdown =
1224+
defaults.enable_join_dynamic_filter_pushdown;
1225+
Ok(())
1226+
} else {
1227+
self.optimizer.reset(rem)
1228+
}
1229+
}
1230+
"explain" => self.explain.reset(rem),
1231+
"sql_parser" => self.sql_parser.reset(rem),
1232+
"format" => self.format.reset(rem),
1233+
other => _config_err!("Config value \"{other}\" not found on ConfigOptions"),
1234+
}
1235+
}
11721236
}
11731237

11741238
impl ConfigOptions {
@@ -1477,6 +1541,10 @@ pub trait ConfigField {
14771541
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
14781542

14791543
fn set(&mut self, key: &str, value: &str) -> Result<()>;
1544+
1545+
fn reset(&mut self, key: &str) -> Result<()> {
1546+
_config_err!("Reset is not supported for this config field, key: {}", key)
1547+
}
14801548
}
14811549

14821550
impl<F: ConfigField + Default> ConfigField for Option<F> {
@@ -1490,6 +1558,15 @@ impl<F: ConfigField + Default> ConfigField for Option<F> {
14901558
fn set(&mut self, key: &str, value: &str) -> Result<()> {
14911559
self.get_or_insert_with(Default::default).set(key, value)
14921560
}
1561+
1562+
fn reset(&mut self, key: &str) -> Result<()> {
1563+
if key.is_empty() {
1564+
*self = Default::default();
1565+
Ok(())
1566+
} else {
1567+
self.get_or_insert_with(Default::default).reset(key)
1568+
}
1569+
}
14931570
}
14941571

14951572
/// Default transformation to parse a [`ConfigField`] for a string.
@@ -1554,6 +1631,19 @@ macro_rules! config_field {
15541631
*self = $transform;
15551632
Ok(())
15561633
}
1634+
1635+
fn reset(&mut self, key: &str) -> $crate::error::Result<()> {
1636+
if key.is_empty() {
1637+
*self = <$t as Default>::default();
1638+
Ok(())
1639+
} else {
1640+
$crate::error::_config_err!(
1641+
"Config field is a scalar {} and does not have nested field \"{}\"",
1642+
stringify!($t),
1643+
key
1644+
)
1645+
}
1646+
}
15571647
}
15581648
};
15591649
}
@@ -2562,7 +2652,7 @@ impl ConfigField for ConfigFileDecryptionProperties {
25622652
self.footer_signature_verification.set(rem, value.as_ref())
25632653
}
25642654
_ => _config_err!(
2565-
"Config value \"{}\" not found on ConfigFileEncryptionProperties",
2655+
"Config value \"{}\" not found on ConfigFileDecryptionProperties",
25662656
key
25672657
),
25682658
}
@@ -2876,7 +2966,6 @@ mod tests {
28762966
};
28772967
use std::any::Any;
28782968
use std::collections::HashMap;
2879-
use std::sync::Arc;
28802969

28812970
#[derive(Default, Debug, Clone)]
28822971
pub struct TestExtensionConfig {
@@ -2991,6 +3080,19 @@ mod tests {
29913080
assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
29923081
}
29933082

3083+
#[test]
3084+
fn reset_nested_scalar_reports_helpful_error() {
3085+
let mut value = true;
3086+
let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err();
3087+
let message = err.to_string();
3088+
assert!(
3089+
message.starts_with(
3090+
"Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\""
3091+
),
3092+
"unexpected error message: {message}"
3093+
);
3094+
}
3095+
29943096
#[cfg(feature = "parquet")]
29953097
#[test]
29963098
fn parquet_table_options() {
@@ -3013,6 +3115,7 @@ mod tests {
30133115
};
30143116
use parquet::encryption::decrypt::FileDecryptionProperties;
30153117
use parquet::encryption::encrypt::FileEncryptionProperties;
3118+
use std::sync::Arc;
30163119

30173120
let footer_key = b"0123456789012345".to_vec(); // 128bit/16
30183121
let column_names = vec!["double_field", "float_field"];

datafusion/core/src/execution/context/mod.rs

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use crate::{
4545
logical_expr::{
4646
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
4747
CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable,
48-
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable,
49-
TableType, UNNAMED_TABLE,
48+
DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, ResetVariable,
49+
SetVariable, TableType, UNNAMED_TABLE,
5050
},
5151
physical_expr::PhysicalExpr,
5252
physical_plan::ExecutionPlan,
@@ -63,7 +63,7 @@ use datafusion_catalog::MemoryCatalogProvider;
6363
use datafusion_catalog::{
6464
DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory,
6565
};
66-
use datafusion_common::config::ConfigOptions;
66+
use datafusion_common::config::{ConfigField, ConfigOptions};
6767
use datafusion_common::metadata::ScalarAndMetadata;
6868
use datafusion_common::{
6969
config::{ConfigExtension, TableOptions},
@@ -72,7 +72,11 @@ use datafusion_common::{
7272
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
7373
DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference,
7474
};
75+
use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT;
7576
pub use datafusion_execution::config::SessionConfig;
77+
use datafusion_execution::disk_manager::{
78+
DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
79+
};
7680
use datafusion_execution::registry::SerializerRegistry;
7781
pub use datafusion_execution::TaskContext;
7882
pub use datafusion_expr::execution_props::ExecutionProps;
@@ -711,7 +715,12 @@ impl SessionContext {
711715
}
712716
// TODO what about the other statements (like TransactionStart and TransactionEnd)
713717
LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
714-
self.set_variable(stmt).await
718+
self.set_variable(stmt).await?;
719+
self.return_empty_dataframe()
720+
}
721+
LogicalPlan::Statement(Statement::ResetVariable(stmt)) => {
722+
self.reset_variable(stmt).await?;
723+
self.return_empty_dataframe()
715724
}
716725
LogicalPlan::Statement(Statement::Prepare(Prepare {
717726
name,
@@ -1069,7 +1078,7 @@ impl SessionContext {
10691078
exec_err!("Schema '{schemaref}' doesn't exist.")
10701079
}
10711080

1072-
async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> {
1081+
async fn set_variable(&self, stmt: SetVariable) -> Result<()> {
10731082
let SetVariable {
10741083
variable, value, ..
10751084
} = stmt;
@@ -1099,11 +1108,37 @@ impl SessionContext {
10991108
for udf in udfs_to_update {
11001109
state.register_udf(udf)?;
11011110
}
1111+
}
11021112

1103-
drop(state);
1113+
Ok(())
1114+
}
1115+
1116+
async fn reset_variable(&self, stmt: ResetVariable) -> Result<()> {
1117+
let variable = stmt.variable;
1118+
if variable.starts_with("datafusion.runtime.") {
1119+
return self.reset_runtime_variable(&variable);
11041120
}
11051121

1106-
self.return_empty_dataframe()
1122+
let mut state = self.state.write();
1123+
state.config_mut().options_mut().reset(&variable)?;
1124+
1125+
// Refresh UDFs to ensure configuration-dependent behavior updates
1126+
let config_options = state.config().options();
1127+
let udfs_to_update: Vec<_> = state
1128+
.scalar_functions()
1129+
.values()
1130+
.filter_map(|udf| {
1131+
udf.inner()
1132+
.with_updated_config(config_options)
1133+
.map(Arc::new)
1134+
})
1135+
.collect();
1136+
1137+
for udf in udfs_to_update {
1138+
state.register_udf(udf)?;
1139+
}
1140+
1141+
Ok(())
11071142
}
11081143

11091144
fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> {
@@ -1127,6 +1162,37 @@ impl SessionContext {
11271162
builder.with_metadata_cache_limit(limit)
11281163
}
11291164
_ => return plan_err!("Unknown runtime configuration: {variable}"),
1165+
// Remember to update `reset_runtime_variable()` when adding new options
1166+
};
1167+
1168+
*state = SessionStateBuilder::from(state.clone())
1169+
.with_runtime_env(Arc::new(builder.build()?))
1170+
.build();
1171+
1172+
Ok(())
1173+
}
1174+
1175+
fn reset_runtime_variable(&self, variable: &str) -> Result<()> {
1176+
let key = variable.strip_prefix("datafusion.runtime.").unwrap();
1177+
1178+
let mut state = self.state.write();
1179+
1180+
let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env());
1181+
match key {
1182+
"memory_limit" => {
1183+
builder.memory_pool = None;
1184+
}
1185+
"max_temp_directory_size" => {
1186+
builder =
1187+
builder.with_max_temp_directory_size(DEFAULT_MAX_TEMP_DIRECTORY_SIZE);
1188+
}
1189+
"temp_directory" => {
1190+
builder.disk_manager_builder = Some(DiskManagerBuilder::default());
1191+
}
1192+
"metadata_cache_limit" => {
1193+
builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT);
1194+
}
1195+
_ => return plan_err!("Unknown runtime configuration: {variable}"),
11301196
};
11311197

11321198
*state = SessionStateBuilder::from(state.clone())

datafusion/core/tests/macro_hygiene/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ mod config_field {
8585
impl std::error::Error for E {}
8686

8787
#[allow(dead_code)]
88+
#[derive(Default)]
8889
struct S;
8990

9091
impl std::str::FromStr for S {

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ impl CacheManager {
181181
}
182182
}
183183

184-
const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
184+
pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
185185

186186
#[derive(Clone)]
187187
pub struct CacheManagerConfig {

datafusion/execution/src/disk_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tempfile::{Builder, NamedTempFile, TempDir};
3030

3131
use crate::memory_pool::human_readable_size;
3232

33-
const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
33+
pub const DEFAULT_MAX_TEMP_DIRECTORY_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB
3434

3535
/// Builder pattern for the [DiskManager] structure
3636
#[derive(Clone, Debug)]

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ pub use plan::{
4545
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
4646
};
4747
pub use statement::{
48-
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,
49-
TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
48+
Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement,
49+
TransactionAccessMode, TransactionConclusion, TransactionEnd,
50+
TransactionIsolationLevel, TransactionStart,
5051
};
5152

5253
pub use datafusion_common::format::ExplainFormat;

datafusion/expr/src/logical_plan/statement.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ pub enum Statement {
3939
TransactionEnd(TransactionEnd),
4040
/// Set a Variable
4141
SetVariable(SetVariable),
42+
/// Reset a Variable
43+
ResetVariable(ResetVariable),
4244
/// Prepare a statement and find any bind parameters
4345
/// (e.g. `?`). This is used to implement SQL-prepared statements.
4446
Prepare(Prepare),
@@ -66,6 +68,7 @@ impl Statement {
6668
Statement::TransactionStart(_) => "TransactionStart",
6769
Statement::TransactionEnd(_) => "TransactionEnd",
6870
Statement::SetVariable(_) => "SetVariable",
71+
Statement::ResetVariable(_) => "ResetVariable",
6972
Statement::Prepare(_) => "Prepare",
7073
Statement::Execute(_) => "Execute",
7174
Statement::Deallocate(_) => "Deallocate",
@@ -109,6 +112,9 @@ impl Statement {
109112
}) => {
110113
write!(f, "SetVariable: set {variable:?} to {value:?}")
111114
}
115+
Statement::ResetVariable(ResetVariable { variable }) => {
116+
write!(f, "ResetVariable: reset {variable:?}")
117+
}
112118
Statement::Prepare(Prepare { name, fields, .. }) => {
113119
write!(
114120
f,
@@ -194,6 +200,12 @@ pub struct SetVariable {
194200
pub value: String,
195201
}
196202

203+
/// Reset a configuration variable to its default
204+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
205+
pub struct ResetVariable {
206+
/// The variable name
207+
pub variable: String,
208+
}
197209
/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
198210
/// `Expr::Placeholder` expressions that are filled in during execution
199211
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]

0 commit comments

Comments
 (0)