Skip to content

Commit 7e689ca

Browse files
author
Rudolf Lorenz
committed
dml: fixing cherry-pick errors
1 parent d437f77 commit 7e689ca

7 files changed

Lines changed: 16 additions & 121 deletions

File tree

datafusion/core/src/dataframe/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ use crate::arrow::util::pretty;
3030
use crate::datasource::file_format::csv::CsvFormatFactory;
3131
use crate::datasource::file_format::format_as_file_type;
3232
use crate::datasource::file_format::json::JsonFormatFactory;
33-
use crate::datasource::{
34-
provider_as_source, DefaultTableSource, MemTable, TableProvider,
35-
};
33+
use crate::datasource::{provider_as_source, MemTable, TableProvider};
3634
use crate::error::Result;
3735
use crate::execution::context::{SessionState, TaskContext};
3836
use crate::execution::FunctionRegistry;
@@ -64,7 +62,6 @@ use datafusion_functions_aggregate::expr_fn::{
6462

6563
use async_trait::async_trait;
6664
use datafusion_catalog::Session;
67-
use datafusion_sql::TableReference;
6865

6966
/// Contains options that control how data is
7067
/// written out from a DataFrame

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use crate::datasource::file_format::file_type_to_format;
2525
use crate::datasource::listing::ListingTableUrl;
2626
use crate::datasource::physical_plan::FileSinkConfig;
27-
use crate::datasource::{source_as_provider, DefaultTableSource};
27+
use crate::datasource::source_as_provider;
2828
use crate::error::{DataFusionError, Result};
2929
use crate::execution::context::{ExecutionProps, SessionState};
3030
use crate::logical_expr::utils::generate_sort_key;

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -86,39 +86,6 @@ pub struct DmlStatement {
8686
/// The schema of the output relation
8787
pub output_schema: DFSchemaRef,
8888
}
89-
impl Eq for DmlStatement {}
90-
impl Hash for DmlStatement {
91-
fn hash<H: Hasher>(&self, state: &mut H) {
92-
self.table_name.hash(state);
93-
self.target.schema().hash(state);
94-
self.op.hash(state);
95-
self.input.hash(state);
96-
self.output_schema.hash(state);
97-
}
98-
}
99-
100-
impl PartialEq for DmlStatement {
101-
fn eq(&self, other: &Self) -> bool {
102-
self.table_name == other.table_name
103-
&& self.target.schema() == other.target.schema()
104-
&& self.op == other.op
105-
&& self.input == other.input
106-
&& self.output_schema == other.output_schema
107-
}
108-
}
109-
110-
impl Debug for DmlStatement {
111-
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112-
f.debug_struct("DmlStatement")
113-
.field("table_name", &self.table_name)
114-
.field("target", &"...")
115-
.field("target_schema", &self.target.schema())
116-
.field("op", &self.op)
117-
.field("input", &self.input)
118-
.field("output_schema", &self.output_schema)
119-
.finish()
120-
}
121-
}
12289

12390
impl Debug for DmlStatement {
12491
fn fmt(&self, f: &mut Formatter) -> fmt::Result {

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use datafusion_common::{
2222
exec_datafusion_err, internal_err, plan_datafusion_err, Result, ScalarValue,
2323
TableReference, UnnestOptions,
2424
};
25-
use datafusion_expr::dml::InsertOp;
25+
2626
use datafusion_expr::expr::{Alias, Placeholder, Sort};
2727
use datafusion_expr::expr::{Unnest, WildcardOptions};
2828
use datafusion_expr::{
@@ -230,11 +230,11 @@ impl From<protobuf::dml_node::Type> for WriteOp {
230230
match t {
231231
protobuf::dml_node::Type::Update => WriteOp::Update,
232232
protobuf::dml_node::Type::Delete => WriteOp::Delete,
233-
protobuf::dml_node::Type::InsertAppend => WriteOp::Insert(InsertOp::Append),
234-
protobuf::dml_node::Type::InsertOverwrite => {
235-
WriteOp::Insert(InsertOp::Overwrite)
236-
}
237-
protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
233+
// TODO: support for insert append
234+
protobuf::dml_node::Type::InsertAppend => WriteOp::InsertInto,
235+
protobuf::dml_node::Type::InsertOverwrite => WriteOp::InsertOverwrite,
236+
// TODO: support for insert replace
237+
protobuf::dml_node::Type::InsertReplace => WriteOp::InsertOverwrite,
238238
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
239239
}
240240
}

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use std::sync::Arc;
2121

2222
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
2323
use crate::protobuf::{
24-
dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25-
CustomTableScanNode, DmlNode, SortExprNodeCollection,
24+
dml_node, CteWorkTableScanNode, CustomTableScanNode, DmlNode, SortExprNodeCollection,
2625
};
2726
use crate::{
2827
convert_required, into_required,
@@ -66,9 +65,9 @@ use datafusion_expr::{
6665
EmptyRelation, Extension, Join, JoinConstraint, Limit, Prepare, Projection,
6766
Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
6867
},
69-
AggregateUDF, ColumnUnnestList, DistinctOn, DmlStatement, DropView, Expr, FetchType,
70-
LogicalPlan, LogicalPlanBuilder, RecursiveQuery, ScalarUDF, SkipType, SortExpr,
71-
Statement, TableSource, Unnest, WindowUDF,
68+
AggregateUDF, DistinctOn, DmlStatement, DropView, Expr, LogicalPlan,
69+
LogicalPlanBuilder, RecursiveQuery, ScalarUDF, SortExpr, TableSource, Unnest,
70+
WindowUDF,
7271
};
7372

7473
use self::to_proto::{serialize_expr, serialize_exprs};
@@ -1695,7 +1694,7 @@ impl AsLogicalPlan for LogicalPlanNode {
16951694
)),
16961695
LogicalPlan::Dml(DmlStatement {
16971696
table_name,
1698-
target,
1697+
dst,
16991698
op,
17001699
input,
17011700
..
@@ -1708,7 +1707,7 @@ impl AsLogicalPlan for LogicalPlanNode {
17081707
input: Some(Box::new(input)),
17091708
target: Some(Box::new(from_table_source(
17101709
table_name.clone(),
1711-
Arc::clone(target),
1710+
Arc::clone(dst),
17121711
extension_codec,
17131712
)?)),
17141713
table_name: Some(table_name.clone().into()),

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
//! processes.
2121
2222
use datafusion_common::{TableReference, UnnestOptions};
23-
use datafusion_expr::dml::InsertOp;
2423
use datafusion_expr::expr::{
2524
self, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like, Placeholder,
2625
ScalarFunction, Unnest,
@@ -706,11 +705,8 @@ impl From<JoinConstraint> for protobuf::JoinConstraint {
706705
impl From<&WriteOp> for protobuf::dml_node::Type {
707706
fn from(t: &WriteOp) -> Self {
708707
match t {
709-
WriteOp::Insert(InsertOp::Append) => protobuf::dml_node::Type::InsertAppend,
710-
WriteOp::Insert(InsertOp::Overwrite) => {
711-
protobuf::dml_node::Type::InsertOverwrite
712-
}
713-
WriteOp::Insert(InsertOp::Replace) => protobuf::dml_node::Type::InsertReplace,
708+
WriteOp::InsertInto => protobuf::dml_node::Type::InsertAppend,
709+
WriteOp::InsertOverwrite => protobuf::dml_node::Type::InsertOverwrite,
714710
WriteOp::Delete => protobuf::dml_node::Type::Delete,
715711
WriteOp::Update => protobuf::dml_node::Type::Update,
716712
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -341,70 +341,6 @@ async fn roundtrip_logical_plan_aggregation() -> Result<()> {
341341
Ok(())
342342
}
343343

344-
#[tokio::test]
345-
async fn roundtrip_logical_plan_sort() -> Result<()> {
346-
let ctx = SessionContext::new();
347-
348-
let schema = Schema::new(vec![
349-
Field::new("a", DataType::Int64, true),
350-
Field::new("b", DataType::Decimal128(15, 2), true),
351-
]);
352-
353-
ctx.register_csv(
354-
"t1",
355-
"tests/testdata/test.csv",
356-
CsvReadOptions::default().schema(&schema),
357-
)
358-
.await?;
359-
360-
let query = "SELECT a, b FROM t1 ORDER BY b LIMIT 5";
361-
let plan = ctx.sql(query).await?.into_optimized_plan()?;
362-
363-
let bytes = logical_plan_to_bytes(&plan)?;
364-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
365-
assert_eq!(format!("{plan}"), format!("{logical_round_trip}"));
366-
367-
Ok(())
368-
}
369-
370-
#[tokio::test]
371-
async fn roundtrip_logical_plan_dml() -> Result<()> {
372-
let ctx = SessionContext::new();
373-
let schema = Schema::new(vec![
374-
Field::new("a", DataType::Int64, true),
375-
Field::new("b", DataType::Decimal128(15, 2), true),
376-
]);
377-
378-
ctx.register_csv(
379-
"t1",
380-
"tests/testdata/test.csv",
381-
CsvReadOptions::default().schema(&schema),
382-
)
383-
.await?;
384-
let queries = [
385-
"INSERT INTO T1 VALUES (1, null)",
386-
"INSERT OVERWRITE T1 VALUES (1, null)",
387-
"REPLACE INTO T1 VALUES (1, null)",
388-
"INSERT OR REPLACE INTO T1 VALUES (1, null)",
389-
"DELETE FROM T1",
390-
"UPDATE T1 SET a = 1",
391-
"CREATE TABLE T2 AS SELECT * FROM T1",
392-
];
393-
for query in queries {
394-
let plan = ctx.sql(query).await?.into_optimized_plan()?;
395-
let bytes = logical_plan_to_bytes(&plan)?;
396-
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
397-
assert_eq!(
398-
format!("{plan}"),
399-
format!("{logical_round_trip}"),
400-
"failed query roundtrip: {}",
401-
query
402-
);
403-
}
404-
405-
Ok(())
406-
}
407-
408344
#[tokio::test]
409345
async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
410346
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)