Skip to content

Commit 76150b2

Browse files
committed
fix: raise AmbiguousReference error for duplicate column names in subquery
1 parent 5ba06ac commit 76150b2

File tree

7 files changed

+351
-6
lines changed

7 files changed

+351
-6
lines changed

datafusion/common/src/column.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,37 @@ impl Column {
237237
.collect::<Vec<_>>();
238238
match qualified_fields.len() {
239239
0 => continue,
240-
1 => return Ok(Column::from(qualified_fields[0])),
240+
1 => {
241+
// Even a single structural match must be rejected when the
242+
// schema itself has flagged the name as ambiguous (e.g. a
243+
// derived-table subquery that contained two columns with
244+
// the same unqualified name).
245+
let is_ambiguous = schema_level
246+
.iter()
247+
.any(|s| s.ambiguous_names().contains(&self.name));
248+
if is_ambiguous {
249+
return _schema_err!(SchemaError::AmbiguousReference {
250+
field: Box::new(Column::new_unqualified(&self.name)),
251+
})
252+
.map_err(|err| {
253+
let mut diagnostic = Diagnostic::new_error(
254+
format!("column '{}' is ambiguous", &self.name),
255+
self.spans().first(),
256+
);
257+
let columns = schema_level
258+
.iter()
259+
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
260+
.collect::<Vec<_>>();
261+
add_possible_columns_to_diag(
262+
&mut diagnostic,
263+
&Column::new_unqualified(&self.name),
264+
&columns,
265+
);
266+
err.with_diagnostic(diagnostic)
267+
});
268+
}
269+
return Ok(Column::from(qualified_fields[0]));
270+
}
241271
_ => {
242272
// More than 1 fields in this schema have their names set to self.name.
243273
//

datafusion/common/src/dfschema.rs

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
//! fields with optional relation names.
2020
2121
use std::collections::{BTreeSet, HashMap, HashSet};
22-
use std::fmt::{Display, Formatter};
22+
use std::fmt::{self, Display, Formatter};
2323
use std::hash::Hash;
2424
use std::sync::Arc;
2525

@@ -108,7 +108,7 @@ pub type DFSchemaRef = Arc<DFSchema>;
108108
/// let schema: &Schema = df_schema.as_arrow();
109109
/// assert_eq!(schema.fields().len(), 1);
110110
/// ```
111-
#[derive(Debug, Clone, PartialEq, Eq)]
111+
#[derive(Clone, PartialEq, Eq)]
112112
pub struct DFSchema {
113113
/// Inner Arrow schema reference.
114114
inner: SchemaRef,
@@ -117,6 +117,26 @@ pub struct DFSchema {
117117
field_qualifiers: Vec<Option<TableReference>>,
118118
/// Stores functional dependencies in the schema.
119119
functional_dependencies: FunctionalDependencies,
120+
/// Field names that are ambiguous in this schema because the underlying
121+
/// source (e.g. a derived-table subquery) contained multiple columns with
122+
/// the same unqualified name. Any attempt to reference these names without
123+
/// a qualifier should produce an [`SchemaError::AmbiguousReference`] error.
124+
ambiguous_names: Option<Arc<HashSet<String>>>,
125+
}
126+
127+
impl fmt::Debug for DFSchema {
128+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129+
// Show the ambiguous-names set as `{}` when it is empty/absent so that
130+
// existing Debug snapshots are not affected by the Option wrapper.
131+
let empty = HashSet::new();
132+
let ambiguous = self.ambiguous_names.as_deref().unwrap_or(&empty);
133+
f.debug_struct("DFSchema")
134+
.field("inner", &self.inner)
135+
.field("field_qualifiers", &self.field_qualifiers)
136+
.field("functional_dependencies", &self.functional_dependencies)
137+
.field("ambiguous_names", ambiguous)
138+
.finish()
139+
}
120140
}
121141

122142
impl DFSchema {
@@ -126,6 +146,7 @@ impl DFSchema {
126146
inner: Arc::new(Schema::new([])),
127147
field_qualifiers: vec![],
128148
functional_dependencies: FunctionalDependencies::empty(),
149+
ambiguous_names: None,
129150
}
130151
}
131152

@@ -157,6 +178,7 @@ impl DFSchema {
157178
inner: schema,
158179
field_qualifiers: qualifiers,
159180
functional_dependencies: FunctionalDependencies::empty(),
181+
ambiguous_names: None,
160182
};
161183
dfschema.check_names()?;
162184
Ok(dfschema)
@@ -173,6 +195,7 @@ impl DFSchema {
173195
inner: schema,
174196
field_qualifiers: vec![None; field_count],
175197
functional_dependencies: FunctionalDependencies::empty(),
198+
ambiguous_names: None,
176199
};
177200
dfschema.check_names()?;
178201
Ok(dfschema)
@@ -191,6 +214,7 @@ impl DFSchema {
191214
inner: schema.clone().into(),
192215
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
193216
functional_dependencies: FunctionalDependencies::empty(),
217+
ambiguous_names: None,
194218
};
195219
schema.check_names()?;
196220
Ok(schema)
@@ -205,6 +229,7 @@ impl DFSchema {
205229
inner: Arc::clone(schema),
206230
field_qualifiers: qualifiers,
207231
functional_dependencies: FunctionalDependencies::empty(),
232+
ambiguous_names: None,
208233
};
209234
dfschema.check_names()?;
210235
Ok(dfschema)
@@ -226,6 +251,7 @@ impl DFSchema {
226251
inner: Arc::clone(&self.inner),
227252
field_qualifiers: qualifiers,
228253
functional_dependencies: self.functional_dependencies.clone(),
254+
ambiguous_names: self.ambiguous_names.clone(),
229255
})
230256
}
231257

@@ -275,6 +301,35 @@ impl DFSchema {
275301
}
276302
}
277303

304+
/// Marks the given field names as ambiguous.
305+
///
306+
/// Ambiguous names correspond to fields that originated from multiple
307+
/// source columns with the same unqualified name (e.g. both sides of a
308+
/// JOIN having an `age` column). Any attempt to resolve such a name
309+
/// without a table qualifier will produce an
310+
/// [`SchemaError::AmbiguousReference`] error.
311+
pub fn with_ambiguous_names(mut self, names: HashSet<String>) -> Self {
312+
self.ambiguous_names = if names.is_empty() {
313+
None
314+
} else {
315+
Some(Arc::new(names))
316+
};
317+
self
318+
}
319+
320+
/// Returns the set of field names that are considered ambiguous in this
321+
/// schema. See [`Self::with_ambiguous_names`].
322+
///
323+
/// Returns a reference to an empty set when no ambiguous names have been
324+
/// recorded (the common case).
325+
pub fn ambiguous_names(&self) -> &HashSet<String> {
326+
static EMPTY: std::sync::OnceLock<HashSet<String>> =
327+
std::sync::OnceLock::new();
328+
self.ambiguous_names
329+
.as_deref()
330+
.unwrap_or_else(|| EMPTY.get_or_init(HashSet::new))
331+
}
332+
278333
/// Create a new schema that contains the fields from this schema followed by the fields
279334
/// from the supplied schema. An error will be returned if there are duplicate field names.
280335
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
@@ -294,6 +349,7 @@ impl DFSchema {
294349
inner: Arc::new(new_schema_with_metadata),
295350
field_qualifiers: new_qualifiers,
296351
functional_dependencies: FunctionalDependencies::empty(),
352+
ambiguous_names: None,
297353
};
298354
new_self.check_names()?;
299355
Ok(new_self)
@@ -350,6 +406,22 @@ impl DFSchema {
350406
let finished_with_metadata = finished.with_metadata(metadata);
351407
self.inner = finished_with_metadata.into();
352408
self.field_qualifiers.extend(qualifiers);
409+
// Propagate ambiguous names from the other schema so that names marked
410+
// as ambiguous (e.g. by a JOIN) are not silently dropped when schemas
411+
// are merged for ORDER BY / HAVING resolution.
412+
if let Some(other_names) = &other_schema.ambiguous_names {
413+
match &mut self.ambiguous_names {
414+
Some(self_names) => {
415+
// Build a new combined set (Arc prevents in-place mutation).
416+
let mut combined = (**self_names).clone();
417+
combined.extend(other_names.iter().cloned());
418+
self.ambiguous_names = Some(Arc::new(combined));
419+
}
420+
None => {
421+
self.ambiguous_names = Some(Arc::clone(other_names));
422+
}
423+
}
424+
}
353425
}
354426

355427
/// Get a list of fields for this schema
@@ -506,6 +578,18 @@ impl DFSchema {
506578
&self,
507579
name: &str,
508580
) -> Result<(Option<&TableReference>, &FieldRef)> {
581+
// If this field name was marked as ambiguous at schema creation time
582+
// (e.g. because a derived-table subquery produced duplicate column
583+
// names), refuse to resolve it without an explicit qualifier.
584+
if self
585+
.ambiguous_names
586+
.as_ref()
587+
.is_some_and(|s| s.contains(name))
588+
{
589+
return _schema_err!(SchemaError::AmbiguousReference {
590+
field: Box::new(Column::new_unqualified(name.to_string()))
591+
});
592+
}
509593
let matches = self.qualified_fields_with_unqualified_name(name);
510594
match matches.len() {
511595
0 => Err(unqualified_field_not_found(name, self)),
@@ -845,6 +929,7 @@ impl DFSchema {
845929
field_qualifiers: vec![None; self.inner.fields.len()],
846930
inner: self.inner,
847931
functional_dependencies: self.functional_dependencies,
932+
ambiguous_names: self.ambiguous_names,
848933
}
849934
}
850935

@@ -855,6 +940,7 @@ impl DFSchema {
855940
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
856941
inner: self.inner,
857942
functional_dependencies: self.functional_dependencies,
943+
ambiguous_names: self.ambiguous_names,
858944
}
859945
}
860946

@@ -1126,6 +1212,7 @@ impl TryFrom<SchemaRef> for DFSchema {
11261212
inner: schema,
11271213
field_qualifiers: vec![None; field_count],
11281214
functional_dependencies: FunctionalDependencies::empty(),
1215+
ambiguous_names: None,
11291216
};
11301217
// Without checking names, because schema here may have duplicate field names.
11311218
// For example, Partial AggregateMode will generate duplicate field names from
@@ -1187,6 +1274,7 @@ impl ToDFSchema for Vec<Field> {
11871274
inner: schema.into(),
11881275
field_qualifiers: vec![None; field_count],
11891276
functional_dependencies: FunctionalDependencies::empty(),
1277+
ambiguous_names: None,
11901278
};
11911279
Ok(dfschema)
11921280
}
@@ -1578,6 +1666,7 @@ mod tests {
15781666
inner: Arc::clone(&arrow_schema_ref),
15791667
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
15801668
functional_dependencies: FunctionalDependencies::empty(),
1669+
ambiguous_names: None,
15811670
};
15821671
let df_schema_ref = Arc::new(df_schema.clone());
15831672

@@ -1624,6 +1713,7 @@ mod tests {
16241713
inner: Arc::clone(&schema),
16251714
field_qualifiers: vec![None; schema.fields.len()],
16261715
functional_dependencies: FunctionalDependencies::empty(),
1716+
ambiguous_names: None,
16271717
};
16281718

16291719
assert_eq!(df_schema.inner.metadata(), schema.metadata())

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3570,7 +3570,7 @@ mod tests {
35703570
.expect_err("planning error")
35713571
.strip_backtrace();
35723572

3573-
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
3573+
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32 }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] }, ambiguous_names: {} }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32 }], metadata: {} }"#);
35743574
}
35753575

35763576
#[tokio::test]

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1745,7 +1745,30 @@ pub fn build_join_schema(
17451745
.collect();
17461746

17471747
let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?;
1748-
dfschema.with_functional_dependencies(func_dependencies)
1748+
let dfschema = dfschema.with_functional_dependencies(func_dependencies)?;
1749+
1750+
// Propagate ambiguous names from both input schemas. A name that was
1751+
// already ambiguous on either side of the join (e.g. because the left
1752+
// input is itself a subquery that wrapped a JOIN) remains ambiguous in
1753+
// the output. We only propagate names that actually appear as field
1754+
// names in the output schema so we don't accumulate stale entries.
1755+
let output_field_names: HashSet<&str> = dfschema
1756+
.fields()
1757+
.iter()
1758+
.map(|f| f.name().as_str())
1759+
.collect();
1760+
let inherited_ambiguous: HashSet<String> = left
1761+
.ambiguous_names()
1762+
.iter()
1763+
.chain(right.ambiguous_names())
1764+
.filter(|n| output_field_names.contains(n.as_str()))
1765+
.cloned()
1766+
.collect();
1767+
if inherited_ambiguous.is_empty() {
1768+
Ok(dfschema)
1769+
} else {
1770+
Ok(dfschema.with_ambiguous_names(inherited_ambiguous))
1771+
}
17491772
}
17501773

17511774
/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2375,6 +2375,29 @@ pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result<Arc<DFSc
23752375
exprs, input,
23762376
)?)?;
23772377

2378+
// Propagate ambiguous names from the input for any column passed through
2379+
// unchanged. This prevents a `SELECT * FROM (...) AS alias` wrapper from
2380+
// silently dropping the ambiguity marker set by an inner JOIN or alias.
2381+
let input_ambiguous = input.schema().ambiguous_names();
2382+
if !input_ambiguous.is_empty() {
2383+
// A column is a pass-through when it is `Expr::Column(c)` and `c.name`
2384+
// appears in the input's ambiguous set.
2385+
let inherited: HashSet<String> = exprs
2386+
.iter()
2387+
.filter_map(|e| {
2388+
if let Expr::Column(col) = e
2389+
&& input_ambiguous.contains(&col.name)
2390+
{
2391+
return Some(col.name.clone());
2392+
}
2393+
None
2394+
})
2395+
.collect();
2396+
if !inherited.is_empty() {
2397+
return Ok(Arc::new(schema.with_ambiguous_names(inherited)));
2398+
}
2399+
}
2400+
23782401
Ok(Arc::new(schema))
23792402
}
23802403

@@ -2406,6 +2429,39 @@ impl SubqueryAlias {
24062429
let aliases = unique_field_aliases(plan.schema().fields());
24072430
let is_projection_needed = aliases.iter().any(Option::is_some);
24082431

2432+
// Collect unqualified field names that are ambiguous in this alias's
2433+
// output schema. `unique_field_aliases` renames duplicates (e.g. to
2434+
// "id:1") to keep Arrow happy, but outer queries must still be
2435+
// prevented from referencing those names without qualification.
2436+
// We also inherit names already marked ambiguous by the input schema
2437+
// so nested `SELECT * FROM (...) AS sN` wrappers don't lose the marker.
2438+
let ambiguous_names: HashSet<String> = {
2439+
let mut name_counts: HashMap<&str, usize> = HashMap::new();
2440+
for field in plan.schema().fields() {
2441+
*name_counts.entry(field.name().as_str()).or_insert(0) += 1;
2442+
}
2443+
let mut names: HashSet<String> = name_counts
2444+
.into_iter()
2445+
.filter(|&(_, count)| count >= 2)
2446+
.map(|(name, _)| name.to_string())
2447+
.collect();
2448+
2449+
// Inherit names still visible in the output (the first occurrence
2450+
// of a renamed duplicate like "id:1" still keeps the name "id").
2451+
let output_field_names: HashSet<&str> = plan
2452+
.schema()
2453+
.fields()
2454+
.iter()
2455+
.map(|f| f.name().as_str())
2456+
.collect();
2457+
for inherited in plan.schema().ambiguous_names() {
2458+
if output_field_names.contains(inherited.as_str()) {
2459+
names.insert(inherited.clone());
2460+
}
2461+
}
2462+
names
2463+
};
2464+
24092465
// Insert a projection node, if needed, to make sure aliases are applied.
24102466
let plan = if is_projection_needed {
24112467
let projection_expressions = aliases
@@ -2438,7 +2494,8 @@ impl SubqueryAlias {
24382494

24392495
let schema = DFSchemaRef::new(
24402496
DFSchema::try_from_qualified_schema(alias.clone(), schema)?
2441-
.with_functional_dependencies(func_dependencies)?,
2497+
.with_functional_dependencies(func_dependencies)?
2498+
.with_ambiguous_names(ambiguous_names),
24422499
);
24432500
Ok(SubqueryAlias {
24442501
input: plan,

0 commit comments

Comments
 (0)