Skip to content

Commit d437f77

Browse files
milenkovicmalamb
authored andcommitted
feat: add resolved target to DmlStatement (to eliminate need for table lookup after deserialization) (apache#14631)
* feat: serialize table source to DML proto * Update datafusion/core/src/dataframe/mod.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * remove redundant comment --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 23ab15e commit d437f77

10 files changed

Lines changed: 111 additions & 30 deletions

File tree

datafusion/core/src/dataframe/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use crate::arrow::util::pretty;
3030
use crate::datasource::file_format::csv::CsvFormatFactory;
3131
use crate::datasource::file_format::format_as_file_type;
3232
use crate::datasource::file_format::json::JsonFormatFactory;
33-
use crate::datasource::{provider_as_source, MemTable, TableProvider};
33+
use crate::datasource::{
34+
provider_as_source, DefaultTableSource, MemTable, TableProvider,
35+
};
3436
use crate::error::Result;
3537
use crate::execution::context::{SessionState, TaskContext};
3638
use crate::execution::FunctionRegistry;
@@ -62,6 +64,7 @@ use datafusion_functions_aggregate::expr_fn::{
6264

6365
use async_trait::async_trait;
6466
use datafusion_catalog::Session;
67+
use datafusion_sql::TableReference;
6568

6669
/// Contains options that control how data is
6770
/// written out from a DataFrame

datafusion/core/src/datasource/listing/table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1074,7 +1074,7 @@ mod tests {
10741074
use crate::datasource::file_format::json::JsonFormat;
10751075
#[cfg(feature = "parquet")]
10761076
use crate::datasource::file_format::parquet::ParquetFormat;
1077-
use crate::datasource::{provider_as_source, MemTable};
1077+
use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
10781078
use crate::execution::options::ArrowReadOptions;
10791079
use crate::prelude::*;
10801080
use crate::{
@@ -1904,6 +1904,8 @@ mod tests {
19041904
session_ctx.register_table("source", source_table.clone())?;
19051905
// Convert the source table into a provider so that it can be used in a query
19061906
let source = provider_as_source(source_table);
1907+
let target = session_ctx.table_provider("t").await?;
1908+
let target = Arc::new(DefaultTableSource::new(target));
19071909
// Create a table scan logical plan to read from the source table
19081910
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
19091911
.filter(filter_predicate)?

datafusion/core/src/datasource/memory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ impl DataSink for MemSink {
367367
mod tests {
368368

369369
use super::*;
370-
use crate::datasource::provider_as_source;
370+
use crate::datasource::{provider_as_source, DefaultTableSource};
371371
use crate::physical_plan::collect;
372372
use crate::prelude::SessionContext;
373373

@@ -617,6 +617,7 @@ mod tests {
617617
// Create and register the initial table with the provided schema and data
618618
let initial_table = Arc::new(MemTable::try_new(schema.clone(), initial_data)?);
619619
session_ctx.register_table("t", initial_table.clone())?;
620+
let target = Arc::new(DefaultTableSource::new(initial_table.clone()));
620621
// Create and register the source table with the provided schema and inserted data
621622
let source_table = Arc::new(MemTable::try_new(schema.clone(), inserted_data)?);
622623
session_ctx.register_table("source", source_table.clone())?;

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::sync::Arc;
2424
use crate::datasource::file_format::file_type_to_format;
2525
use crate::datasource::listing::ListingTableUrl;
2626
use crate::datasource::physical_plan::FileSinkConfig;
27-
use crate::datasource::source_as_provider;
27+
use crate::datasource::{source_as_provider, DefaultTableSource};
2828
use crate::error::{DataFusionError, Result};
2929
use crate::execution::context::{ExecutionProps, SessionState};
3030
use crate::logical_expr::utils::generate_sort_key;

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,39 @@ pub struct DmlStatement {
8686
/// The schema of the output relation
8787
pub output_schema: DFSchemaRef,
8888
}
89+
impl Eq for DmlStatement {}
90+
impl Hash for DmlStatement {
91+
fn hash<H: Hasher>(&self, state: &mut H) {
92+
self.table_name.hash(state);
93+
self.target.schema().hash(state);
94+
self.op.hash(state);
95+
self.input.hash(state);
96+
self.output_schema.hash(state);
97+
}
98+
}
99+
100+
impl PartialEq for DmlStatement {
101+
fn eq(&self, other: &Self) -> bool {
102+
self.table_name == other.table_name
103+
&& self.target.schema() == other.target.schema()
104+
&& self.op == other.op
105+
&& self.input == other.input
106+
&& self.output_schema == other.output_schema
107+
}
108+
}
109+
110+
impl Debug for DmlStatement {
111+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112+
f.debug_struct("DmlStatement")
113+
.field("table_name", &self.table_name)
114+
.field("target", &"...")
115+
.field("target_schema", &self.target.schema())
116+
.field("op", &self.op)
117+
.field("input", &self.input)
118+
.field("output_schema", &self.output_schema)
119+
.finish()
120+
}
121+
}
89122

90123
impl Debug for DmlStatement {
91124
fn fmt(&self, f: &mut Formatter) -> fmt::Result {

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ message DmlNode{
276276
Type dml_type = 1;
277277
LogicalPlanNode input = 2;
278278
TableReference table_name = 3;
279-
datafusion_common.DfSchema schema = 4;
279+
LogicalPlanNode target = 5;
280280
}
281281

282282
message UnnestNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 12 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ use datafusion::{
5555
};
5656
use datafusion_common::file_options::file_type::FileType;
5757
use datafusion_common::{
58-
context, internal_datafusion_err, internal_err, not_impl_err, DataFusionError,
59-
Result, TableReference,
58+
context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
59+
DataFusionError, Result, TableReference, ToDFSchema,
6060
};
6161
use datafusion_expr::{
6262
dml,
@@ -66,10 +66,10 @@ use datafusion_expr::{
6666
EmptyRelation, Extension, Join, JoinConstraint, Limit, Prepare, Projection,
6767
Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
6868
},
69-
DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
70-
WindowUDF,
69+
AggregateUDF, ColumnUnnestList, DistinctOn, DmlStatement, DropView, Expr, FetchType,
70+
LogicalPlan, LogicalPlanBuilder, RecursiveQuery, ScalarUDF, SkipType, SortExpr,
71+
Statement, TableSource, Unnest, WindowUDF,
7172
};
72-
use datafusion_expr::{AggregateUDF, DmlStatement, RecursiveQuery, Unnest};
7373

7474
use self::to_proto::{serialize_expr, serialize_exprs};
7575
use crate::logical_plan::to_proto::serialize_sorts;
@@ -233,6 +233,45 @@ fn from_table_reference(
233233
Ok(table_ref.clone().try_into()?)
234234
}
235235

236+
/// Converts [LogicalPlan::TableScan] to [TableSource]
237+
/// method to be used to deserialize nodes
238+
/// serialized by [from_table_source]
239+
fn to_table_source(
240+
node: &Option<Box<LogicalPlanNode>>,
241+
ctx: &SessionContext,
242+
extension_codec: &dyn LogicalExtensionCodec,
243+
) -> Result<Arc<dyn TableSource>> {
244+
if let Some(node) = node {
245+
match node.try_into_logical_plan(ctx, extension_codec)? {
246+
LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
247+
_ => plan_err!("expected TableScan node"),
248+
}
249+
} else {
250+
plan_err!("LogicalPlanNode should be provided")
251+
}
252+
}
253+
254+
/// converts [TableSource] to [LogicalPlan::TableScan]
255+
/// using [LogicalPlan::TableScan] was the best approach to
256+
/// serialize [TableSource] to [LogicalPlan::TableScan]
257+
fn from_table_source(
258+
table_name: TableReference,
259+
target: Arc<dyn TableSource>,
260+
extension_codec: &dyn LogicalExtensionCodec,
261+
) -> Result<LogicalPlanNode> {
262+
let projected_schema = target.schema().to_dfschema_ref()?;
263+
let r = LogicalPlan::TableScan(TableScan {
264+
table_name,
265+
source: target,
266+
projection: None,
267+
projected_schema,
268+
filters: vec![],
269+
fetch: None,
270+
});
271+
272+
LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
273+
}
274+
236275
impl AsLogicalPlan for LogicalPlanNode {
237276
fn try_decode(buf: &[u8]) -> Result<Self>
238277
where
@@ -922,7 +961,7 @@ impl AsLogicalPlan for LogicalPlanNode {
922961
LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
923962
datafusion::logical_expr::DmlStatement::new(
924963
from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
925-
Arc::new(convert_required!(dml_node.schema)?),
964+
to_table_source(&dml_node.target, ctx, extension_codec)?,
926965
dml_node.dml_type().into(),
927966
Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
928967
),
@@ -1656,7 +1695,7 @@ impl AsLogicalPlan for LogicalPlanNode {
16561695
)),
16571696
LogicalPlan::Dml(DmlStatement {
16581697
table_name,
1659-
table_schema,
1698+
target,
16601699
op,
16611700
input,
16621701
..
@@ -1667,7 +1706,11 @@ impl AsLogicalPlan for LogicalPlanNode {
16671706
Ok(LogicalPlanNode {
16681707
logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
16691708
input: Some(Box::new(input)),
1670-
schema: Some(table_schema.try_into()?),
1709+
target: Some(Box::new(from_table_source(
1710+
table_name.clone(),
1711+
Arc::clone(target),
1712+
extension_codec,
1713+
)?)),
16711714
table_name: Some(table_name.clone().into()),
16721715
dml_type: dml_type.into(),
16731716
}))),

datafusion/sql/src/statement.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,8 +1227,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
12271227
// Do a table lookup to verify the table exists
12281228
let table_ref = self.object_name_to_table_reference(table_name.clone())?;
12291229
let table_source = self.context_provider.get_table_source(table_ref.clone())?;
1230-
let schema = (*table_source.schema()).clone();
1231-
let schema = DFSchema::try_from(schema)?;
1230+
let schema = table_source.schema().to_dfschema_ref()?;
12321231
let scan = LogicalPlanBuilder::scan(
12331232
object_name_to_string(&table_name),
12341233
Arc::clone(&table_source),
@@ -1242,7 +1241,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
12421241
Some(predicate_expr) => {
12431242
let filter_expr =
12441243
self.sql_to_expr(predicate_expr, &schema, &mut planner_context)?;
1245-
let schema = Arc::new(schema.clone());
1244+
let schema = Arc::new(schema);
12461245
let mut using_columns = HashSet::new();
12471246
expr_to_columns(&filter_expr, &mut using_columns)?;
12481247
let filter_expr = normalize_col_with_schemas_and_ambiguity_check(

0 commit comments

Comments
 (0)