Skip to content

Commit aabb338

Browse files
committed
Merge branch 'datafusion-54' into df54
# Conflicts: # native/Cargo.lock # native/Cargo.toml # native/core/src/parquet/schema_adapter.rs # native/core/src/parquet/util/test_common/page_util.rs # native/spark-expr/src/agg_funcs/correlation.rs # native/spark-expr/src/agg_funcs/covariance.rs # native/spark-expr/src/agg_funcs/variance.rs
2 parents 16ac0f2 + bc96836 commit aabb338

7 files changed

Lines changed: 56 additions & 35 deletions

File tree

native/core/src/debug/debug_batch_stream.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::fmt;
2019
use std::hash::{Hash, Hasher};
2120
use std::sync::Arc;
2221

2322
use arrow::array::RecordBatch;
2423
use arrow::datatypes::{DataType, Schema};
24+
use datafusion::common::tree_node::TreeNodeRecursion;
2525
use datafusion::common::Result;
2626
use datafusion::execution::SendableRecordBatchStream;
2727
use datafusion::logical_expr::ColumnarValue;
@@ -82,15 +82,18 @@ impl datafusion::physical_plan::ExecutionPlan for DebugExecutionDataStream {
8282
fn name(&self) -> &str {
8383
"DebugExecutionDataStream"
8484
}
85-
fn as_any(&self) -> &dyn std::any::Any {
86-
self
87-
}
8885
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
8986
self.inner.properties()
9087
}
9188
fn children(&self) -> Vec<&Arc<dyn datafusion::physical_plan::ExecutionPlan>> {
9289
vec![&self.inner]
9390
}
91+
fn apply_expressions(
92+
&self,
93+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
94+
) -> Result<TreeNodeRecursion> {
95+
Ok(TreeNodeRecursion::Continue)
96+
}
9497
fn with_new_children(
9598
self: Arc<Self>,
9699
children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
@@ -155,9 +158,6 @@ impl Hash for DebugExecutionDataPhyExpr {
155158
}
156159

157160
impl PhysicalExpr for DebugExecutionDataPhyExpr {
158-
fn as_any(&self) -> &dyn Any {
159-
self
160-
}
161161
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
162162
self.inner.data_type(input_schema)
163163
}

native/core/src/execution/merge_as_partial.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
//! outputs state) but redirects `update_batch` calls to `merge_batch`, giving merge
2727
//! semantics with state output.
2828
29-
use std::any::Any;
3029
use std::fmt::Debug;
3130
use std::hash::{Hash, Hasher};
3231

@@ -100,10 +99,6 @@ impl MergeAsPartialUDF {
10099
}
101100

102101
impl AggregateUDFImpl for MergeAsPartialUDF {
103-
fn as_any(&self) -> &dyn Any {
104-
self
105-
}
106-
107102
fn name(&self) -> &str {
108103
&self.name
109104
}

native/core/src/execution/metrics/utils.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,19 @@ pub(crate) fn to_native_metric_node(
7171
children: Vec::with_capacity(children.len()),
7272
};
7373

74-
if let Some(metrics) = node_metrics {
75-
for m in metrics.iter() {
76-
let value = m.value();
77-
native_metric_node
78-
.metrics
79-
.insert(value.name().to_string(), value.as_usize() as i64);
80-
}
81-
}
74+
// Aggregate metrics by name using DataFusion's aggregate_by_name(), which
75+
// correctly handles duplicate metric names (e.g. BaselineMetrics registered
76+
// by both FileStream and ParquetMorselizer on the same ExecutionPlanMetricsSet).
77+
// The additional_native_plans branch below already does this.
78+
node_metrics
79+
.unwrap_or_default()
80+
.aggregate_by_name()
81+
.iter()
82+
.map(|m| m.value())
83+
.map(|m| (m.name(), m.as_usize() as i64))
84+
.for_each(|(name, value)| {
85+
native_metric_node.metrics.insert(name.to_string(), value);
86+
});
8287

8388
for child_plan in children {
8489
let child_node = to_native_metric_node(child_plan)?;

native/spark-expr/src/array_funcs/array_position.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion::logical_expr::{
2828
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
2929
};
3030
use num::Float;
31-
use std::any::Any;
3231
use std::sync::Arc;
3332

3433
/// Spark array_position() function that returns the 1-based position of an element in an array.
@@ -313,10 +312,6 @@ impl SparkArrayPositionFunc {
313312
}
314313

315314
impl ScalarUDFImpl for SparkArrayPositionFunc {
316-
fn as_any(&self) -> &dyn Any {
317-
self
318-
}
319-
320315
fn name(&self) -> &str {
321316
"spark_array_position"
322317
}

native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use datafusion::common::{utils::take_function_args, DataFusionError, Result, Sca
2424
use datafusion::logical_expr::{
2525
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
2626
};
27-
use std::any::Any;
2827
use std::sync::Arc;
2928

3029
const MICROS_PER_SECOND: i64 = 1_000_000;
@@ -61,10 +60,6 @@ impl Default for SparkSecondsToTimestamp {
6160
}
6261

6362
impl ScalarUDFImpl for SparkSecondsToTimestamp {
64-
fn as_any(&self) -> &dyn Any {
65-
self
66-
}
67-
6863
fn name(&self) -> &str {
6964
"seconds_to_timestamp"
7065
}

native/spark-expr/src/jvm_udf/mod.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::any::Any;
1918
use std::fmt::{Display, Formatter};
2019
use std::hash::{Hash, Hasher};
2120
use std::sync::Arc;
@@ -107,10 +106,6 @@ impl PartialEq for JvmScalarUdfExpr {
107106
impl Eq for JvmScalarUdfExpr {}
108107

109108
impl PhysicalExpr for JvmScalarUdfExpr {
110-
fn as_any(&self) -> &dyn Any {
111-
self
112-
}
113-
114109
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
115110
Display::fmt(self, f)
116111
}

spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,42 @@ class CometJoinSuite extends CometTestBase {
657657
}
658658
}
659659

660+
// Reproducer for SPARK-43113: full outer SMJ with a join filter that references
661+
// a nullable column should not match when the filter evaluates to NULL.
662+
test("SPARK-43113: Full outer SMJ with NULL in join filter") {
663+
withTempView("l", "r") {
664+
// testData2: (a, b) — all non-null
665+
Seq((1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2))
666+
.toDF("a", "b")
667+
.createOrReplaceTempView("l")
668+
669+
// testData3: (a, b) — b is nullable
670+
Seq((1, None), (2, Some(2)))
671+
.toDF("a", "b")
672+
.createOrReplaceTempView("r")
673+
674+
val query =
675+
"""select /*+ MERGE(r) */ *
676+
|from l
677+
|full outer join r
678+
|on l.a = r.a
679+
|and l.b < (r.b + 1)
680+
|and l.b < (r.a + 1)""".stripMargin
681+
682+
val expected = Seq(
683+
(Some(1), Some(1), None, None),
684+
(Some(1), Some(2), None, None),
685+
(None, None, Some(1), None),
686+
(Some(2), Some(1), Some(2), Some(2)),
687+
(Some(2), Some(2), Some(2), Some(2)),
688+
(Some(3), Some(1), None, None),
689+
(Some(3), Some(2), None, None)).toDF("a", "b", "a", "b")
690+
691+
val df = sql(query)
692+
checkAnswer(df, expected)
693+
}
694+
}
695+
660696
test("Broadcast exchange respects AQE shuffle partition coalescing") {
661697
// When a shuffle feeds into a broadcast exchange, AQE may coalesce the shuffle
662698
// partitions. The broadcast collect should execute through the AQEShuffleReadExec

0 commit comments

Comments
 (0)