Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,37 @@ impl Column {
.collect::<Vec<_>>();
match qualified_fields.len() {
0 => continue,
1 => return Ok(Column::from(qualified_fields[0])),
1 => {
// Even a single structural match must be rejected when the
// schema itself has flagged the name as ambiguous (e.g. a
// derived-table subquery that contained two columns with
// the same unqualified name).
let is_ambiguous = schema_level
.iter()
.any(|s| s.ambiguous_names().contains(&self.name));
if is_ambiguous {
return _schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new_unqualified(&self.name)),
})
.map_err(|err| {
let mut diagnostic = Diagnostic::new_error(
format!("column '{}' is ambiguous", &self.name),
self.spans().first(),
);
let columns = schema_level
.iter()
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
add_possible_columns_to_diag(
&mut diagnostic,
&Column::new_unqualified(&self.name),
&columns,
);
err.with_diagnostic(diagnostic)
});
}
return Ok(Column::from(qualified_fields[0]));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
Expand Down
99 changes: 94 additions & 5 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! fields with optional relation names.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::fmt::{self, Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;

Expand Down Expand Up @@ -108,7 +108,7 @@ pub type DFSchemaRef = Arc<DFSchema>;
/// let schema: &Schema = df_schema.as_arrow();
/// assert_eq!(schema.fields().len(), 1);
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct DFSchema {
/// Inner Arrow schema reference.
inner: SchemaRef,
Expand All @@ -117,6 +117,26 @@ pub struct DFSchema {
field_qualifiers: Vec<Option<TableReference>>,
/// Stores functional dependencies in the schema.
functional_dependencies: FunctionalDependencies,
/// Field names that are ambiguous in this schema because the underlying
/// source (e.g. a derived-table subquery) contained multiple columns with
/// the same unqualified name. Any attempt to reference these names without
/// a qualifier should produce an [`SchemaError::AmbiguousReference`] error.
ambiguous_names: Option<Arc<HashSet<String>>>,
}

impl fmt::Debug for DFSchema {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
// Show the ambiguous-names set as `{}` when it is empty/absent so that
// existing Debug snapshots are not affected by the Option wrapper.
let empty = HashSet::new();
let ambiguous = self.ambiguous_names.as_deref().unwrap_or(&empty);
f.debug_struct("DFSchema")
.field("inner", &self.inner)
.field("field_qualifiers", &self.field_qualifiers)
.field("functional_dependencies", &self.functional_dependencies)
.field("ambiguous_names", ambiguous)
.finish()
}
}

impl DFSchema {
Expand All @@ -126,6 +146,7 @@ impl DFSchema {
inner: Arc::new(Schema::new([])),
field_qualifiers: vec![],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
}
}

Expand Down Expand Up @@ -157,6 +178,7 @@ impl DFSchema {
inner: schema,
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -173,6 +195,7 @@ impl DFSchema {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -191,6 +214,7 @@ impl DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
schema.check_names()?;
Ok(schema)
Expand All @@ -205,6 +229,7 @@ impl DFSchema {
inner: Arc::clone(schema),
field_qualifiers: qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
dfschema.check_names()?;
Ok(dfschema)
Expand All @@ -226,6 +251,7 @@ impl DFSchema {
inner: Arc::clone(&self.inner),
field_qualifiers: qualifiers,
functional_dependencies: self.functional_dependencies.clone(),
ambiguous_names: self.ambiguous_names.clone(),
})
}

Expand Down Expand Up @@ -275,6 +301,34 @@ impl DFSchema {
}
}

/// Marks the given field names as ambiguous.
///
/// Ambiguous names correspond to fields that originated from multiple
/// source columns with the same unqualified name (e.g. both sides of a
/// JOIN having an `age` column). Any attempt to resolve such a name
/// without a table qualifier will produce an
/// [`SchemaError::AmbiguousReference`] error.
pub fn with_ambiguous_names(mut self, names: HashSet<String>) -> Self {
self.ambiguous_names = if names.is_empty() {
None
} else {
Some(Arc::new(names))
};
self
}

/// Returns the set of field names that are considered ambiguous in this
/// schema. See [`Self::with_ambiguous_names`].
///
/// Returns a reference to an empty set when no ambiguous names have been
/// recorded (the common case).
pub fn ambiguous_names(&self) -> &HashSet<String> {
static EMPTY: std::sync::OnceLock<HashSet<String>> = std::sync::OnceLock::new();
self.ambiguous_names
.as_deref()
.unwrap_or_else(|| EMPTY.get_or_init(HashSet::new))
}

/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
Expand All @@ -294,6 +348,7 @@ impl DFSchema {
inner: Arc::new(new_schema_with_metadata),
field_qualifiers: new_qualifiers,
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
new_self.check_names()?;
Ok(new_self)
Expand Down Expand Up @@ -350,6 +405,22 @@ impl DFSchema {
let finished_with_metadata = finished.with_metadata(metadata);
self.inner = finished_with_metadata.into();
self.field_qualifiers.extend(qualifiers);
// Propagate ambiguous names from the other schema so that names marked
// as ambiguous (e.g. by a JOIN) are not silently dropped when schemas
// are merged for ORDER BY / HAVING resolution.
if let Some(other_names) = &other_schema.ambiguous_names {
match &mut self.ambiguous_names {
Some(self_names) => {
// Build a new combined set (Arc prevents in-place mutation).
let mut combined = (**self_names).clone();
combined.extend(other_names.iter().cloned());
self.ambiguous_names = Some(Arc::new(combined));
}
None => {
self.ambiguous_names = Some(Arc::clone(other_names));
}
}
}
}

/// Get a list of fields for this schema
Expand Down Expand Up @@ -506,6 +577,18 @@ impl DFSchema {
&self,
name: &str,
) -> Result<(Option<&TableReference>, &FieldRef)> {
// If this field name was marked as ambiguous at schema creation time
// (e.g. because a derived-table subquery produced duplicate column
// names), refuse to resolve it without an explicit qualifier.
if self
.ambiguous_names
.as_ref()
.is_some_and(|s| s.contains(name))
{
return _schema_err!(SchemaError::AmbiguousReference {
field: Box::new(Column::new_unqualified(name.to_string()))
});
}
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
Expand Down Expand Up @@ -845,6 +928,7 @@ impl DFSchema {
field_qualifiers: vec![None; self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
ambiguous_names: self.ambiguous_names,
}
}

Expand All @@ -855,6 +939,7 @@ impl DFSchema {
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
inner: self.inner,
functional_dependencies: self.functional_dependencies,
ambiguous_names: self.ambiguous_names,
}
}

Expand Down Expand Up @@ -1126,6 +1211,7 @@ impl TryFrom<SchemaRef> for DFSchema {
inner: schema,
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
// Without checking names, because schema here may have duplicate field names.
// For example, Partial AggregateMode will generate duplicate field names from
Expand Down Expand Up @@ -1187,13 +1273,14 @@ impl ToDFSchema for Vec<Field> {
inner: schema.into(),
field_qualifiers: vec![None; field_count],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
Ok(dfschema)
}
}

impl Display for DFSchema {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"fields:[{}], metadata:{:?}",
Expand All @@ -1211,7 +1298,7 @@ impl Display for DFSchema {
///
/// Note that this trait is implemented for &[DFSchema] which is
/// widely used in the DataFusion codebase.
pub trait ExprSchema: std::fmt::Debug {
pub trait ExprSchema: fmt::Debug {
/// Is this column reference nullable?
fn nullable(&self, col: &Column) -> Result<bool> {
Ok(self.field_from_column(col)?.is_nullable())
Expand All @@ -1238,7 +1325,7 @@ pub trait ExprSchema: std::fmt::Debug {
}

// Implement `ExprSchema` for `Arc<DFSchema>`
impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
impl<P: AsRef<DFSchema> + fmt::Debug> ExprSchema for P {
fn nullable(&self, col: &Column) -> Result<bool> {
self.as_ref().nullable(col)
}
Expand Down Expand Up @@ -1578,6 +1665,7 @@ mod tests {
inner: Arc::clone(&arrow_schema_ref),
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};
let df_schema_ref = Arc::new(df_schema.clone());

Expand Down Expand Up @@ -1624,6 +1712,7 @@ mod tests {
inner: Arc::clone(&schema),
field_qualifiers: vec![None; schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
ambiguous_names: None,
};

assert_eq!(df_schema.inner.metadata(), schema.metadata())
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")
};

let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");
let sql =
format!("CREATE EXTERNAL TABLE hits_raw STORED AS PARQUET LOCATION '{path}'");

// ClickBench partitioned dataset was written by an ancient version of pyarrow that
// that wrote strings with the wrong logical type. To read it correctly, we must
Expand All @@ -139,6 +140,17 @@ fn register_clickbench_hits_table(rt: &Runtime) -> SessionContext {
.unwrap();
rt.block_on(ctx.sql(&sql)).unwrap();

// ClickBench stores EventDate as UInt16 (days since 1970-01-01). Create a view
// that exposes it as SQL DATE so that queries comparing it with date literals
// (e.g. "EventDate >= '2013-07-01'") work correctly during planning.
rt.block_on(ctx.sql(
"CREATE VIEW hits AS \
SELECT * EXCEPT (\"EventDate\"), \
CAST(CAST(\"EventDate\" AS INTEGER) AS DATE) AS \"EventDate\" \
FROM hits_raw",
))
.unwrap();

let count =
rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() });
assert!(count > 0);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3570,7 +3570,7 @@ mod tests {
.expect_err("planning error")
.strip_backtrace();

insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] }, ambiguous_names: {} }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
}

#[tokio::test]
Expand Down
25 changes: 24 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,30 @@ pub fn build_join_schema(
.collect();

let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
dfschema.with_functional_dependencies(func_dependencies)
let dfschema = dfschema.with_functional_dependencies(func_dependencies)?;

// Propagate ambiguous names from both input schemas. A name that was
// already ambiguous on either side of the join (e.g. because the left
// input is itself a subquery that wrapped a JOIN) remains ambiguous in
// the output. We only propagate names that actually appear as field
// names in the output schema so we don't accumulate stale entries.
let output_field_names: HashSet<&str> = dfschema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
let inherited_ambiguous: HashSet<String> = left
.ambiguous_names()
.iter()
.chain(right.ambiguous_names())
.filter(|n| output_field_names.contains(n.as_str()))
.cloned()
.collect();
if inherited_ambiguous.is_empty() {
Ok(dfschema)
} else {
Ok(dfschema.with_ambiguous_names(inherited_ambiguous))
}
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
Expand Down
Loading
Loading