Skip to content

Commit d7d3a7a

Browse files
timsaucerclaude
andcommitted
Add missing DataFrame methods for set operations and query
Expose upstream DataFusion DataFrame methods that were not yet available in the Python API. Closes #1455. Set operations: - except_distinct: set difference with deduplication - intersect_distinct: set intersection with deduplication - union_by_name: union matching columns by name instead of position - union_by_name_distinct: union by name with deduplication Query: - distinct_on: deduplicate rows based on specific columns - sort_by: sort by expressions with ascending order and nulls last Note: show_limit is already covered by the existing show(num) method. explain_with_options and with_param_values are deferred as they require exposing additional types (ExplainOption, ParamValues). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 2499409 commit d7d3a7a

File tree

3 files changed

+231
-0
lines changed

3 files changed

+231
-0
lines changed

crates/core/src/dataframe.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,71 @@ impl PyDataFrame {
922922
Ok(Self::new(new_df))
923923
}
924924

925+
/// Calculate the set difference with deduplication
926+
fn except_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
927+
let new_df = self
928+
.df
929+
.as_ref()
930+
.clone()
931+
.except_distinct(py_df.df.as_ref().clone())?;
932+
Ok(Self::new(new_df))
933+
}
934+
935+
/// Calculate the intersection with deduplication
936+
fn intersect_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
937+
let new_df = self
938+
.df
939+
.as_ref()
940+
.clone()
941+
.intersect_distinct(py_df.df.as_ref().clone())?;
942+
Ok(Self::new(new_df))
943+
}
944+
945+
/// Union two DataFrames matching columns by name
946+
fn union_by_name(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
947+
let new_df = self
948+
.df
949+
.as_ref()
950+
.clone()
951+
.union_by_name(py_df.df.as_ref().clone())?;
952+
Ok(Self::new(new_df))
953+
}
954+
955+
/// Union two DataFrames by name with deduplication
956+
fn union_by_name_distinct(&self, py_df: PyDataFrame) -> PyDataFusionResult<Self> {
957+
let new_df = self
958+
.df
959+
.as_ref()
960+
.clone()
961+
.union_by_name_distinct(py_df.df.as_ref().clone())?;
962+
Ok(Self::new(new_df))
963+
}
964+
965+
/// Deduplicate rows based on specific columns, keeping the first row per group
966+
fn distinct_on(
967+
&self,
968+
on_expr: Vec<PyExpr>,
969+
select_expr: Vec<PyExpr>,
970+
sort_expr: Option<Vec<PySortExpr>>,
971+
) -> PyDataFusionResult<Self> {
972+
let on_expr = on_expr.into_iter().map(|e| e.into()).collect();
973+
let select_expr = select_expr.into_iter().map(|e| e.into()).collect();
974+
let sort_expr = sort_expr.map(to_sort_expressions);
975+
let df = self
976+
.df
977+
.as_ref()
978+
.clone()
979+
.distinct_on(on_expr, select_expr, sort_expr)?;
980+
Ok(Self::new(df))
981+
}
982+
983+
/// Sort by column expressions with ascending order and nulls last
984+
fn sort_by(&self, exprs: Vec<PyExpr>) -> PyDataFusionResult<Self> {
985+
let exprs = exprs.into_iter().map(|e| e.into()).collect();
986+
let df = self.df.as_ref().clone().sort_by(exprs)?;
987+
Ok(Self::new(df))
988+
}
989+
925990
/// Write a `DataFrame` to a CSV file.
926991
fn write_csv(
927992
&self,

python/datafusion/dataframe.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,6 +1036,109 @@ def except_all(self, other: DataFrame) -> DataFrame:
10361036
"""
10371037
return DataFrame(self.df.except_all(other.df))
10381038

1039+
def except_distinct(self, other: DataFrame) -> DataFrame:
1040+
"""Calculate the set difference with deduplication.
1041+
1042+
Returns rows that are in this DataFrame but not in ``other``,
1043+
removing any duplicates. This is the complement of :py:meth:`except_all`
1044+
which preserves duplicates.
1045+
1046+
The two :py:class:`DataFrame` must have exactly the same schema.
1047+
1048+
Args:
1049+
other: DataFrame to calculate exception with.
1050+
1051+
Returns:
1052+
DataFrame after set difference with deduplication.
1053+
"""
1054+
return DataFrame(self.df.except_distinct(other.df))
1055+
1056+
def intersect_distinct(self, other: DataFrame) -> DataFrame:
1057+
"""Calculate the intersection with deduplication.
1058+
1059+
Returns distinct rows that appear in both DataFrames. This is the
1060+
complement of :py:meth:`intersect` which preserves duplicates.
1061+
1062+
The two :py:class:`DataFrame` must have exactly the same schema.
1063+
1064+
Args:
1065+
other: DataFrame to intersect with.
1066+
1067+
Returns:
1068+
DataFrame after intersection with deduplication.
1069+
"""
1070+
return DataFrame(self.df.intersect_distinct(other.df))
1071+
1072+
def union_by_name(self, other: DataFrame) -> DataFrame:
1073+
"""Union two :py:class:`DataFrame` matching columns by name.
1074+
1075+
Unlike :py:meth:`union` which matches columns by position, this method
1076+
matches columns by their names, allowing DataFrames with different
1077+
column orders to be combined.
1078+
1079+
Args:
1080+
other: DataFrame to union with.
1081+
1082+
Returns:
1083+
DataFrame after union by name.
1084+
"""
1085+
return DataFrame(self.df.union_by_name(other.df))
1086+
1087+
def union_by_name_distinct(self, other: DataFrame) -> DataFrame:
1088+
"""Union two :py:class:`DataFrame` by name with deduplication.
1089+
1090+
Combines :py:meth:`union_by_name` with deduplication of rows.
1091+
1092+
Args:
1093+
other: DataFrame to union with.
1094+
1095+
Returns:
1096+
DataFrame after union by name with deduplication.
1097+
"""
1098+
return DataFrame(self.df.union_by_name_distinct(other.df))
1099+
1100+
def distinct_on(
1101+
self,
1102+
on_expr: list[Expr],
1103+
select_expr: list[Expr],
1104+
sort_expr: list[SortKey] | None = None,
1105+
) -> DataFrame:
1106+
"""Deduplicate rows based on specific columns.
1107+
1108+
Returns a new DataFrame with one row per unique combination of the
1109+
``on_expr`` columns, keeping the first row per group as determined by
1110+
``sort_expr``.
1111+
1112+
Args:
1113+
on_expr: Expressions that determine uniqueness.
1114+
select_expr: Expressions to include in the output.
1115+
sort_expr: Optional sort expressions to determine which row to keep.
1116+
1117+
Returns:
1118+
DataFrame after deduplication.
1119+
"""
1120+
on_raw = expr_list_to_raw_expr_list(on_expr)
1121+
select_raw = expr_list_to_raw_expr_list(select_expr)
1122+
sort_raw = sort_list_to_raw_sort_list(sort_expr) if sort_expr else None
1123+
return DataFrame(self.df.distinct_on(on_raw, select_raw, sort_raw))
1124+
1125+
def sort_by(self, *exprs: Expr | str) -> DataFrame:
1126+
"""Sort the DataFrame by column expressions in ascending order.
1127+
1128+
This is a convenience method that sorts all columns in ascending order
1129+
with nulls last. For more control over sort direction and null ordering,
1130+
use :py:meth:`sort` instead.
1131+
1132+
Args:
1133+
exprs: Expressions or column names to sort by.
1134+
1135+
Returns:
1136+
DataFrame after sorting.
1137+
"""
1138+
exprs = [self.parse_sql_expr(e) if isinstance(e, str) else e for e in exprs]
1139+
raw = expr_list_to_raw_expr_list(exprs)
1140+
return DataFrame(self.df.sort_by(raw))
1141+
10391142
def write_csv(
10401143
self,
10411144
path: str | pathlib.Path,

python/tests/test_dataframe.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3569,3 +3569,66 @@ def test_read_parquet_file_sort_order(tmp_path, file_sort_order):
35693569
pa.parquet.write_table(table, path)
35703570
df = ctx.read_parquet(path, file_sort_order=file_sort_order)
35713571
assert df.collect()[0].column(0).to_pylist() == [1, 2]
3572+
3573+
3574+
def test_except_distinct():
3575+
ctx = SessionContext()
3576+
df1 = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]})
3577+
df2 = ctx.from_pydict({"a": [1, 2], "b": [10, 20]})
3578+
result = (
3579+
df1.except_distinct(df2).sort(column("a").sort(ascending=True)).collect()[0]
3580+
)
3581+
assert result.column(0).to_pylist() == [3]
3582+
assert result.column(1).to_pylist() == [30]
3583+
3584+
3585+
def test_intersect_distinct():
3586+
ctx = SessionContext()
3587+
df1 = ctx.from_pydict({"a": [1, 2, 3, 1], "b": [10, 20, 30, 10]})
3588+
df2 = ctx.from_pydict({"a": [1, 4], "b": [10, 40]})
3589+
result = df1.intersect_distinct(df2).collect()[0]
3590+
assert result.column(0).to_pylist() == [1]
3591+
assert result.column(1).to_pylist() == [10]
3592+
3593+
3594+
def test_union_by_name():
3595+
ctx = SessionContext()
3596+
df1 = ctx.from_pydict({"a": [1], "b": [10]})
3597+
# Different column order
3598+
df2 = ctx.from_pydict({"b": [20], "a": [2]})
3599+
batches = df1.union_by_name(df2).sort(column("a").sort(ascending=True)).collect()
3600+
rows = pa.concat_arrays([b.column(0) for b in batches]).to_pylist()
3601+
assert rows == [1, 2]
3602+
3603+
3604+
def test_union_by_name_distinct():
3605+
ctx = SessionContext()
3606+
df1 = ctx.from_pydict({"a": [1, 1], "b": [10, 10]})
3607+
df2 = ctx.from_pydict({"b": [10], "a": [1]})
3608+
batches = df1.union_by_name_distinct(df2).collect()
3609+
total_rows = sum(b.num_rows for b in batches)
3610+
assert total_rows == 1
3611+
3612+
3613+
def test_distinct_on():
3614+
ctx = SessionContext()
3615+
df = ctx.from_pydict({"a": [1, 1, 2, 2], "b": [10, 20, 30, 40]})
3616+
result = (
3617+
df.distinct_on(
3618+
[column("a")],
3619+
[column("a"), column("b")],
3620+
[column("a").sort(ascending=True), column("b").sort(ascending=True)],
3621+
)
3622+
.sort(column("a").sort(ascending=True))
3623+
.collect()[0]
3624+
)
3625+
# Keeps the first row per group (smallest b per a)
3626+
assert result.column(0).to_pylist() == [1, 2]
3627+
assert result.column(1).to_pylist() == [10, 30]
3628+
3629+
3630+
def test_sort_by():
3631+
ctx = SessionContext()
3632+
df = ctx.from_pydict({"a": [3, 1, 2]})
3633+
result = df.sort_by(column("a")).collect()[0]
3634+
assert result.column(0).to_pylist() == [1, 2, 3]

0 commit comments

Comments
 (0)