Skip to content

Commit 37b9a46

Browse files
authored
feat: partition_statistics() for HashJoinExec (apache#20711)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#15873. ## Rationale for this change Copy of apache#16956 <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 1eb5206 commit 37b9a46

File tree

2 files changed

+298
-15
lines changed

2 files changed

+298
-15
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 251 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ mod test {
2727
use datafusion_catalog::TableProvider;
2828
use datafusion_common::Result;
2929
use datafusion_common::stats::Precision;
30-
use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
30+
use datafusion_common::{
31+
ColumnStatistics, JoinType, NullEquality, ScalarValue, Statistics,
32+
};
3133
use datafusion_execution::TaskContext;
3234
use datafusion_execution::config::SessionConfig;
3335
use datafusion_expr::{WindowFrame, WindowFunctionDefinition};
@@ -45,7 +47,9 @@ mod test {
4547
use datafusion_physical_plan::common::compute_record_batch_statistics;
4648
use datafusion_physical_plan::empty::EmptyExec;
4749
use datafusion_physical_plan::filter::FilterExec;
48-
use datafusion_physical_plan::joins::{CrossJoinExec, NestedLoopJoinExec};
50+
use datafusion_physical_plan::joins::{
51+
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
52+
};
4953
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5054
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
5155
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
@@ -1354,4 +1358,249 @@ mod test {
13541358

13551359
Ok(())
13561360
}
1361+
1362+
#[tokio::test]
1363+
async fn test_hash_join_partition_statistics() -> Result<()> {
1364+
// Create left table scan and coalesce to 1 partition for CollectLeft mode
1365+
let left_scan = create_scan_exec_with_statistics(None, Some(2)).await;
1366+
let left_scan_coalesced = Arc::new(CoalescePartitionsExec::new(left_scan.clone()))
1367+
as Arc<dyn ExecutionPlan>;
1368+
1369+
// Create right table scan with different table name
1370+
let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL, date DATE) \
1371+
STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\
1372+
PARTITIONED BY (date) \
1373+
WITH ORDER (id ASC);";
1374+
let right_scan =
1375+
create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await;
1376+
1377+
// Create join condition: t1.id = t2.id
1378+
let on = vec![(
1379+
Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
1380+
Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
1381+
)];
1382+
1383+
// Test CollectLeft mode - left child must have 1 partition
1384+
let collect_left_join = Arc::new(HashJoinExec::try_new(
1385+
left_scan_coalesced,
1386+
Arc::clone(&right_scan),
1387+
on.clone(),
1388+
None,
1389+
&JoinType::Inner,
1390+
None,
1391+
PartitionMode::CollectLeft,
1392+
NullEquality::NullEqualsNothing,
1393+
false,
1394+
)?) as Arc<dyn ExecutionPlan>;
1395+
1396+
// Test partition statistics for CollectLeft mode
1397+
let statistics = (0..collect_left_join.output_partitioning().partition_count())
1398+
.map(|idx| collect_left_join.partition_statistics(Some(idx)))
1399+
.collect::<Result<Vec<_>>>()?;
1400+
1401+
// Check that we have the expected number of partitions
1402+
assert_eq!(statistics.len(), 2);
1403+
1404+
// For collect left mode, the min/max values are from the entire left table and the specific partition of the right table.
1405+
let expected_p0_statistics = Statistics {
1406+
num_rows: Precision::Inexact(2),
1407+
total_byte_size: Precision::Absent,
1408+
column_statistics: vec![
1409+
// Left id column: all partitions (id 1..4)
1410+
ColumnStatistics {
1411+
null_count: Precision::Exact(0),
1412+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1413+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1414+
sum_value: Precision::Absent,
1415+
distinct_count: Precision::Absent,
1416+
byte_size: Precision::Exact(16),
1417+
},
1418+
// Left date column: all partitions (2025-03-01..2025-03-04)
1419+
ColumnStatistics {
1420+
null_count: Precision::Exact(0),
1421+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1422+
DATE_2025_03_04,
1423+
))),
1424+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1425+
DATE_2025_03_01,
1426+
))),
1427+
sum_value: Precision::Absent,
1428+
distinct_count: Precision::Absent,
1429+
byte_size: Precision::Exact(16),
1430+
},
1431+
// Right id column: partition 0 only (id 3..4)
1432+
ColumnStatistics {
1433+
null_count: Precision::Exact(0),
1434+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1435+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
1436+
sum_value: Precision::Absent,
1437+
distinct_count: Precision::Absent,
1438+
byte_size: Precision::Exact(8),
1439+
},
1440+
// Right date column: partition 0 only (2025-03-01..2025-03-02)
1441+
ColumnStatistics {
1442+
null_count: Precision::Exact(0),
1443+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1444+
DATE_2025_03_02,
1445+
))),
1446+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1447+
DATE_2025_03_01,
1448+
))),
1449+
sum_value: Precision::Absent,
1450+
distinct_count: Precision::Absent,
1451+
byte_size: Precision::Exact(8),
1452+
},
1453+
],
1454+
};
1455+
assert_eq!(statistics[0], expected_p0_statistics);
1456+
1457+
// Test Partitioned mode
1458+
let partitioned_join = Arc::new(HashJoinExec::try_new(
1459+
Arc::clone(&left_scan),
1460+
Arc::clone(&right_scan),
1461+
on.clone(),
1462+
None,
1463+
&JoinType::Inner,
1464+
None,
1465+
PartitionMode::Partitioned,
1466+
NullEquality::NullEqualsNothing,
1467+
false,
1468+
)?) as Arc<dyn ExecutionPlan>;
1469+
1470+
// Test partition statistics for Partitioned mode
1471+
let statistics = (0..partitioned_join.output_partitioning().partition_count())
1472+
.map(|idx| partitioned_join.partition_statistics(Some(idx)))
1473+
.collect::<Result<Vec<_>>>()?;
1474+
1475+
// Check that we have the expected number of partitions
1476+
assert_eq!(statistics.len(), 2);
1477+
1478+
// For partitioned mode, the min/max values are from the specific partition for each side.
1479+
let expected_p0_statistics = Statistics {
1480+
num_rows: Precision::Inexact(2),
1481+
total_byte_size: Precision::Absent,
1482+
column_statistics: vec![
1483+
// Left id column: partition 0 only (id 3..4)
1484+
ColumnStatistics {
1485+
null_count: Precision::Exact(0),
1486+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1487+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
1488+
sum_value: Precision::Absent,
1489+
distinct_count: Precision::Absent,
1490+
byte_size: Precision::Exact(8),
1491+
},
1492+
// Left date column: partition 0 only (2025-03-01..2025-03-02)
1493+
ColumnStatistics {
1494+
null_count: Precision::Exact(0),
1495+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1496+
DATE_2025_03_02,
1497+
))),
1498+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1499+
DATE_2025_03_01,
1500+
))),
1501+
sum_value: Precision::Absent,
1502+
distinct_count: Precision::Absent,
1503+
byte_size: Precision::Exact(8),
1504+
},
1505+
// Right id column: partition 0 only (id 3..4)
1506+
ColumnStatistics {
1507+
null_count: Precision::Exact(0),
1508+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1509+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
1510+
sum_value: Precision::Absent,
1511+
distinct_count: Precision::Absent,
1512+
byte_size: Precision::Exact(8),
1513+
},
1514+
// Right date column: partition 0 only (2025-03-01..2025-03-02)
1515+
ColumnStatistics {
1516+
null_count: Precision::Exact(0),
1517+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1518+
DATE_2025_03_02,
1519+
))),
1520+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1521+
DATE_2025_03_01,
1522+
))),
1523+
sum_value: Precision::Absent,
1524+
distinct_count: Precision::Absent,
1525+
byte_size: Precision::Exact(8),
1526+
},
1527+
],
1528+
};
1529+
assert_eq!(statistics[0], expected_p0_statistics);
1530+
1531+
// Test Auto mode - should fall back to getting all partition statistics
1532+
let auto_join = Arc::new(HashJoinExec::try_new(
1533+
Arc::clone(&left_scan),
1534+
Arc::clone(&right_scan),
1535+
on,
1536+
None,
1537+
&JoinType::Inner,
1538+
None,
1539+
PartitionMode::Auto,
1540+
NullEquality::NullEqualsNothing,
1541+
false,
1542+
)?) as Arc<dyn ExecutionPlan>;
1543+
1544+
// Test partition statistics for Auto mode
1545+
let statistics = (0..auto_join.output_partitioning().partition_count())
1546+
.map(|idx| auto_join.partition_statistics(Some(idx)))
1547+
.collect::<Result<Vec<_>>>()?;
1548+
1549+
// Check that we have the expected number of partitions
1550+
assert_eq!(statistics.len(), 2);
1551+
1552+
// For auto mode, the min/max values are from the entire left and right tables.
1553+
let expected_p0_statistics = Statistics {
1554+
num_rows: Precision::Inexact(4),
1555+
total_byte_size: Precision::Absent,
1556+
column_statistics: vec![
1557+
// Left id column: all partitions (id 1..4)
1558+
ColumnStatistics {
1559+
null_count: Precision::Exact(0),
1560+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1561+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1562+
sum_value: Precision::Absent,
1563+
distinct_count: Precision::Absent,
1564+
byte_size: Precision::Exact(16),
1565+
},
1566+
// Left date column: all partitions (2025-03-01..2025-03-04)
1567+
ColumnStatistics {
1568+
null_count: Precision::Exact(0),
1569+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1570+
DATE_2025_03_04,
1571+
))),
1572+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1573+
DATE_2025_03_01,
1574+
))),
1575+
sum_value: Precision::Absent,
1576+
distinct_count: Precision::Absent,
1577+
byte_size: Precision::Exact(16),
1578+
},
1579+
// Right id column: all partitions (id 1..4)
1580+
ColumnStatistics {
1581+
null_count: Precision::Exact(0),
1582+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
1583+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
1584+
sum_value: Precision::Absent,
1585+
distinct_count: Precision::Absent,
1586+
byte_size: Precision::Exact(16),
1587+
},
1588+
// Right date column: all partitions (2025-03-01..2025-03-04)
1589+
ColumnStatistics {
1590+
null_count: Precision::Exact(0),
1591+
max_value: Precision::Exact(ScalarValue::Date32(Some(
1592+
DATE_2025_03_04,
1593+
))),
1594+
min_value: Precision::Exact(ScalarValue::Date32(Some(
1595+
DATE_2025_03_01,
1596+
))),
1597+
sum_value: Precision::Absent,
1598+
distinct_count: Precision::Absent,
1599+
byte_size: Precision::Exact(16),
1600+
},
1601+
],
1602+
};
1603+
assert_eq!(statistics[0], expected_p0_statistics);
1604+
Ok(())
1605+
}
13571606
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1442,19 +1442,53 @@ impl ExecutionPlan for HashJoinExec {
14421442
}
14431443

14441444
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
1445-
if partition.is_some() {
1446-
return Ok(Statistics::new_unknown(&self.schema()));
1447-
}
1448-
// TODO stats: it is not possible in general to know the output size of joins
1449-
// There are some special cases though, for example:
1450-
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1451-
let stats = estimate_join_statistics(
1452-
self.left.partition_statistics(None)?,
1453-
self.right.partition_statistics(None)?,
1454-
&self.on,
1455-
&self.join_type,
1456-
&self.join_schema,
1457-
)?;
1445+
let stats = match (partition, self.mode) {
1446+
// For CollectLeft mode, the left side is collected into a single partition,
1447+
// so all left partitions are available to each output partition.
1448+
// For the right side, we need the specific partition statistics.
1449+
(Some(partition), PartitionMode::CollectLeft) => {
1450+
let left_stats = self.left.partition_statistics(None)?;
1451+
let right_stats = self.right.partition_statistics(Some(partition))?;
1452+
1453+
estimate_join_statistics(
1454+
left_stats,
1455+
right_stats,
1456+
&self.on,
1457+
&self.join_type,
1458+
&self.join_schema,
1459+
)?
1460+
}
1461+
1462+
// For Partitioned mode, both sides are partitioned, so each output partition
1463+
// only has access to the corresponding partition from both sides.
1464+
(Some(partition), PartitionMode::Partitioned) => {
1465+
let left_stats = self.left.partition_statistics(Some(partition))?;
1466+
let right_stats = self.right.partition_statistics(Some(partition))?;
1467+
1468+
estimate_join_statistics(
1469+
left_stats,
1470+
right_stats,
1471+
&self.on,
1472+
&self.join_type,
1473+
&self.join_schema,
1474+
)?
1475+
}
1476+
1477+
// For Auto mode or when no specific partition is requested, fall back to
1478+
// the current behavior of getting all partition statistics.
1479+
(None, _) | (Some(_), PartitionMode::Auto) => {
1480+
// TODO stats: it is not possible in general to know the output size of joins
1481+
// There are some special cases though, for example:
1482+
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
1483+
estimate_join_statistics(
1484+
self.left.partition_statistics(None)?,
1485+
self.right.partition_statistics(None)?,
1486+
&self.on,
1487+
&self.join_type,
1488+
&self.join_schema,
1489+
)?
1490+
}
1491+
};
14581492
// Project statistics if there is a projection
14591493
let stats = stats.project(self.projection.as_ref());
14601494
// Apply fetch limit to statistics

0 commit comments

Comments
 (0)