Skip to content

Commit 5293212

Browse files
timsaucerclaudeCopilot
authored
Add missing Dataframe functions (#1472)
* Add missing DataFrame methods for set operations and query Expose upstream DataFusion DataFrame methods that were not yet available in the Python API. Closes #1455. Set operations: - except_distinct: set difference with deduplication - intersect_distinct: set intersection with deduplication - union_by_name: union matching columns by name instead of position - union_by_name_distinct: union by name with deduplication Query: - distinct_on: deduplicate rows based on specific columns - sort_by: sort by expressions with ascending order and nulls last Note: show_limit is already covered by the existing show(num) method. explain_with_options and with_param_values are deferred as they require exposing additional types (ExplainOption, ParamValues). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add ExplainFormat enum and format option to DataFrame.explain() Extend the existing explain() method with an optional format parameter instead of adding a separate explain_with_options() method. This keeps the API simple while exposing all upstream ExplainOption functionality. Available formats: indent (default), tree, pgjson, graphviz. The ExplainFormat enum is exported from the top-level datafusion module. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add DataFrame.window() and unnest recursion options Expose remaining DataFrame methods from upstream DataFusion. Closes #1456. - window(*exprs): apply window function expressions and append results as new columns - unnest_column/unnest_columns: add optional recursions parameter for controlling unnest depth via (input_column, output_column, depth) tuples Note: drop_columns is already exposed as the existing drop() method. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update docstring Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Improve docstrings and test robustness for new DataFrame methods Clarify except_distinct/intersect_distinct docstrings, add deterministic sort to test_window, add sort_by ascending verification test, and add smoke tests for PGJSON and GRAPHVIZ explain formats. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Consolidate new DataFrame tests into parametrized tests Combine set operation tests (except_distinct, intersect_distinct, union_by_name, union_by_name_distinct) into a single parametrized test_set_operations_distinct. Merge sort_by tests and convert explain format tests to parametrized form. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add doctest examples to new DataFrame method docstrings Add >>> style usage examples for window, explain, except_distinct, intersect_distinct, union_by_name, union_by_name_distinct, distinct_on, sort_by, and unnest_columns to match existing docstring conventions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Improve error messages, tests, and API hygiene from PR review - Provide actionable error message for invalid explain format strings - Remove recursions param from deprecated unnest_column (use unnest_columns) - Add null-handling test case for sort_by to verify nulls-last behavior - Add format-specific assertions to explain tests (TREE, PGJSON, GRAPHVIZ) - Add deep recursion test for unnest_columns with depth > 1 - Add multi-expression window test to verify variadic *exprs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Consolidate window and unnest tests into parametrized tests Combine test_window and test_window_multiple_expressions into a single parametrized test. Merge unnest recursion tests into one parametrized test covering basic, explicit depth 1, and deep recursion cases. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Address PR review feedback for DataFrame operations - Use upstream parse error for explain format instead of hardcoded options - Fix sort_by to use column name resolution consistent with sort() - Use ExplainFormat enum members directly in tests instead of string lookup - Merge union_by_name_distinct into union_by_name(distinct=False) for a more Pythonic API - Update check-upstream skill to note union_by_name_distinct coverage Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add DataFrame.column(), col(), and find_qualified_columns() methods Expose upstream find_qualified_columns to resolve unqualified column names into fully qualified column expressions. This is especially useful for disambiguating columns after joins. - find_qualified_columns(*names) on Rust side calls upstream directly - DataFrame.column(name) and col(name) alias on Python side - Update join and join_on docstrings to reference DataFrame.col() - Add "Disambiguating Columns with DataFrame.col()" section to joins docs - Add tests for qualified column resolution, ambiguity, and join usage Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Merge union_by_name and union_by_name_distinct into a single method with distinct flag Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * converting into a python dict loses a column when the names are identical * Consolidate except_all/except_distinct and intersect/intersect_distinct into single methods with distinct flag Follows the same pattern as union(distinct=) and union_by_name(distinct=). Also deprecates union_distinct() in favor of union(distinct=True). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 898d73d commit 5293212

File tree

6 files changed

+767
-52
lines changed

6 files changed

+767
-52
lines changed

.ai/skills/check-upstream/SKILL.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ The user may specify an area via `$ARGUMENTS`. If no area is specified or "all"
109109
**Evaluated and not requiring separate Python exposure:**
110110
- `show_limit` — already covered by `DataFrame.show()`, which provides the same functionality with a simpler API
111111
- `with_param_values` — already covered by the `param_values` argument on `SessionContext.sql()`, which accomplishes the same thing more robustly
112+
- `union_by_name_distinct` — already covered by `DataFrame.union_by_name(distinct=True)`, which provides a more Pythonic API
112113

113114
**How to check:**
114115
1. Fetch the upstream DataFrame documentation page listing all methods

crates/core/src/dataframe.rs

Lines changed: 126 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,14 @@ impl PyDataFrame {
582582
Ok(Self::new(df))
583583
}
584584

585+
/// Apply window function expressions to the DataFrame
586+
#[pyo3(signature = (*exprs))]
587+
fn window(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
588+
let window_exprs = exprs.into_iter().map(|e| e.into()).collect();
589+
let df = self.df.as_ref().clone().window(window_exprs)?;
590+
Ok(Self::new(df))
591+
}
592+
585593
fn filter(&self, predicate: PyExpr) -> PyDataFusionResult<Self> {
586594
let df = self.df.as_ref().clone().filter(predicate.into())?;
587595
Ok(Self::new(df))
@@ -804,9 +812,27 @@ impl PyDataFrame {
804812
}
805813

806814
/// Print the query plan
807-
#[pyo3(signature = (verbose=false, analyze=false))]
808-
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyDataFusionResult<()> {
809-
let df = self.df.as_ref().clone().explain(verbose, analyze)?;
815+
#[pyo3(signature = (verbose=false, analyze=false, format=None))]
816+
fn explain(
817+
&self,
818+
py: Python,
819+
verbose: bool,
820+
analyze: bool,
821+
format: Option<&str>,
822+
) -> PyDataFusionResult<()> {
823+
let explain_format = match format {
824+
Some(f) => f
825+
.parse::<datafusion::common::format::ExplainFormat>()
826+
.map_err(|e| {
827+
PyDataFusionError::Common(format!("Invalid explain format '{}': {}", f, e))
828+
})?,
829+
None => datafusion::common::format::ExplainFormat::Indent,
830+
};
831+
let opts = datafusion::logical_expr::ExplainOption::default()
832+
.with_verbose(verbose)
833+
.with_analyze(analyze)
834+
.with_format(explain_format);
835+
let df = self.df.as_ref().clone().explain_with_options(opts)?;
810836
print_dataframe(py, df)
811837
}
812838

@@ -864,22 +890,14 @@ impl PyDataFrame {
864890
Ok(Self::new(new_df))
865891
}
866892

867-
/// Calculate the distinct union of two `DataFrame`s. The
868-
/// two `DataFrame`s must have exactly the same schema
869-
fn union_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
870-
let new_df = self
871-
.df
872-
.as_ref()
873-
.clone()
874-
.union_distinct(py_df.df.as_ref().clone())?;
875-
Ok(Self::new(new_df))
876-
}
877-
878-
#[pyo3(signature = (column, preserve_nulls=true))]
879-
fn unnest_column(&self, column: &str, preserve_nulls: bool) -> PyDataFusionResult<Self> {
880-
// TODO: expose RecursionUnnestOptions
881-
// REF: https://github.com/apache/datafusion/pull/11577
882-
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
893+
#[pyo3(signature = (column, preserve_nulls=true, recursions=None))]
894+
fn unnest_column(
895+
&self,
896+
column: &str,
897+
preserve_nulls: bool,
898+
recursions: Option<Vec<(String, String, usize)>>,
899+
) -> PyDataFusionResult<Self> {
900+
let unnest_options = build_unnest_options(preserve_nulls, recursions);
883901
let df = self
884902
.df
885903
.as_ref()
@@ -888,15 +906,14 @@ impl PyDataFrame {
888906
Ok(Self::new(df))
889907
}
890908

891-
#[pyo3(signature = (columns, preserve_nulls=true))]
909+
#[pyo3(signature = (columns, preserve_nulls=true, recursions=None))]
892910
fn unnest_columns(
893911
&self,
894912
columns: Vec<String>,
895913
preserve_nulls: bool,
914+
recursions: Option<Vec<(String, String, usize)>>,
896915
) -> PyDataFusionResult<Self> {
897-
// TODO: expose RecursionUnnestOptions
898-
// REF: https://github.com/apache/datafusion/pull/11577
899-
let unnest_options = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
916+
let unnest_options = build_unnest_options(preserve_nulls, recursions);
900917
let cols = columns.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
901918
let df = self
902919
.df
@@ -907,21 +924,79 @@ impl PyDataFrame {
907924
}
908925

909926
/// Calculate the intersection of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
910-
fn intersect(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
911-
let new_df = self
912-
.df
913-
.as_ref()
914-
.clone()
915-
.intersect(py_df.df.as_ref().clone())?;
927+
#[pyo3(signature = (py_df, distinct=false))]
928+
fn intersect(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
929+
let base = self.df.as_ref().clone();
930+
let other = py_df.df.as_ref().clone();
931+
let new_df = if distinct {
932+
base.intersect_distinct(other)?
933+
} else {
934+
base.intersect(other)?
935+
};
916936
Ok(Self::new(new_df))
917937
}
918938

919939
/// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
920-
fn except_all(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
921-
let new_df = self.df.as_ref().clone().except(py_df.df.as_ref().clone())?;
940+
#[pyo3(signature = (py_df, distinct=false))]
941+
fn except_all(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
942+
let base = self.df.as_ref().clone();
943+
let other = py_df.df.as_ref().clone();
944+
let new_df = if distinct {
945+
base.except_distinct(other)?
946+
} else {
947+
base.except(other)?
948+
};
922949
Ok(Self::new(new_df))
923950
}
924951

952+
/// Union two DataFrames matching columns by name
953+
#[pyo3(signature = (py_df, distinct=false))]
954+
fn union_by_name(&self, py_df: PyDataFrame, distinct: bool) -> PyDataFusionResult<Self> {
955+
let base = self.df.as_ref().clone();
956+
let other = py_df.df.as_ref().clone();
957+
let new_df = if distinct {
958+
base.union_by_name_distinct(other)?
959+
} else {
960+
base.union_by_name(other)?
961+
};
962+
Ok(Self::new(new_df))
963+
}
964+
965+
/// Deduplicate rows based on specific columns, keeping the first row per group
966+
fn distinct_on(
967+
&self,
968+
on_expr: Vec<PyExpr>,
969+
select_expr: Vec<PyExpr>,
970+
sort_expr: Option<Vec<PySortExpr>>,
971+
) -> PyDataFusionResult<Self> {
972+
let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
973+
let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
974+
let sort_expr = sort_expr.map(to_sort_expressions);
975+
let df = self
976+
.df
977+
.as_ref()
978+
.clone()
979+
.distinct_on(on_expr, select_expr, sort_expr)?;
980+
Ok(Self::new(df))
981+
}
982+
983+
/// Sort by column expressions with ascending order and nulls last
984+
fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
985+
let exprs = exprs.into_iter().map(|e| e.into()).collect();
986+
let df = self.df.as_ref().clone().sort_by(exprs)?;
987+
Ok(Self::new(df))
988+
}
989+
990+
/// Return fully qualified column expressions for the given column names
991+
fn find_qualified_columns(&self, names: Vec<String>) -> PyDataFusionResult<Vec<PyExpr>> {
992+
let name_refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
993+
let qualified = self.df.find_qualified_columns(&name_refs)?;
994+
Ok(qualified
995+
.into_iter()
996+
.map(|q| Expr::Column(Column::from(q)).into())
997+
.collect())
998+
}
999+
9251000
/// Write a `DataFrame` to a CSV file.
9261001
fn write_csv(
9271002
&self,
@@ -1295,6 +1370,26 @@ impl PyDataFrameWriteOptions {
12951370
}
12961371
}
12971372

1373+
fn build_unnest_options(
1374+
preserve_nulls: bool,
1375+
recursions: Option<Vec<(String, String, usize)>>,
1376+
) -> UnnestOptions {
1377+
let mut opts = UnnestOptions::default().with_preserve_nulls(preserve_nulls);
1378+
if let Some(recs) = recursions {
1379+
opts.recursions = recs
1380+
.into_iter()
1381+
.map(
1382+
|(input, output, depth)| datafusion::common::RecursionUnnestOption {
1383+
input_column: datafusion::common::Column::from(input.as_str()),
1384+
output_column: datafusion::common::Column::from(output.as_str()),
1385+
depth,
1386+
},
1387+
)
1388+
.collect();
1389+
}
1390+
opts
1391+
}
1392+
12981393
/// Print DataFrame
12991394
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
13001395
// Get string representation of record batches

docs/source/user-guide/common-operations/joins.rst

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,3 +134,36 @@ In contrast to the above example, if we wish to get both columns:
134134
.. ipython:: python
135135
136136
left.join(right, "id", how="inner", coalesce_duplicate_keys=False)
137+
138+
Disambiguating Columns with ``DataFrame.col()``
139+
------------------------------------------------
140+
141+
When both DataFrames contain non-key columns with the same name, you can use
142+
:py:meth:`~datafusion.dataframe.DataFrame.col` on each DataFrame **before** the
143+
join to create fully qualified column references. These references can then be
144+
used in the join predicate and when selecting from the result.
145+
146+
This is especially useful with :py:meth:`~datafusion.dataframe.DataFrame.join_on`,
147+
which accepts expression-based predicates.
148+
149+
.. ipython:: python
150+
151+
left = ctx.from_pydict(
152+
{
153+
"id": [1, 2, 3],
154+
"val": [10, 20, 30],
155+
}
156+
)
157+
158+
right = ctx.from_pydict(
159+
{
160+
"id": [1, 2, 3],
161+
"val": [40, 50, 60],
162+
}
163+
)
164+
165+
joined = left.join_on(
166+
right, left.col("id") == right.col("id"), how="inner"
167+
)
168+
169+
joined.select(left.col("id"), left.col("val"), right.col("val"))

python/datafusion/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from .dataframe import (
4848
DataFrame,
4949
DataFrameWriteOptions,
50+
ExplainFormat,
5051
InsertOp,
5152
ParquetColumnOptions,
5253
ParquetWriterOptions,
@@ -82,6 +83,7 @@
8283
"DataFrameWriteOptions",
8384
"Database",
8485
"ExecutionPlan",
86+
"ExplainFormat",
8587
"Expr",
8688
"InsertOp",
8789
"LogicalPlan",

0 commit comments

Comments
 (0)