Skip to content

Commit a45a4c4

Browse files
authored
Add serialization/deserialization and round-trip tests for all tpc-h queries (apache#16742)
* Add serialization/deserialization and round-trip tests for all tpc-h queries * Remove specific test for q16 becasue it is now a part of all tpch tests * Add bug ticket into the test comment
1 parent 36991ac commit a45a4c4

8 files changed

Lines changed: 88 additions & 29 deletions
7.55 KB
Binary file not shown.
10.8 KB
Binary file not shown.
4.26 KB
Binary file not shown.
7.05 KB
Binary file not shown.
5.33 KB
Binary file not shown.
2.76 KB
Binary file not shown.
6.81 KB
Binary file not shown.

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1738,45 +1738,104 @@ async fn roundtrip_physical_plan_node() {
17381738
let _ = plan.execute(0, ctx.task_ctx()).unwrap();
17391739
}
17401740

1741-
// Failing due to https://github.com/apache/datafusion/pull/16662
1742-
// Fixed: Column index mismatch during protobuf deserialization
1743-
#[tokio::test]
1744-
async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> {
1745-
// Test the specific query: SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31)
1746-
//
1747-
// NOTE: This test uses a minimal subset of TPC-H part.parquet data (tpch_part_small.parquet)
1748-
// which contains only 20 rows with p_size values in [14, 6, 5, 31] to reproduce the bug.
1749-
// Using alltypes_plain.parquet does NOT reproduce the issue, suggesting the bug
1750-
// is specific to certain characteristics of TPC-H parquet files or their schema.
1751-
1741+
/// Helper function to create a SessionContext with all TPC-H tables registered as external tables
1742+
async fn tpch_context() -> Result<SessionContext> {
17521743
use datafusion_common::test_util::datafusion_test_data;
17531744

17541745
let ctx = SessionContext::new();
1755-
1756-
// Register the TPC-H part table using the local test data
17571746
let test_data = datafusion_test_data();
1758-
let table_sql = format!(
1759-
"CREATE EXTERNAL TABLE part STORED AS PARQUET LOCATION '{test_data}/tpch_part_small.parquet'"
17601747

1761-
);
1762-
ctx.sql(&table_sql).await.map_err(|e| {
1763-
DataFusionError::External(format!("Failed to create part table: {e}").into())
1748+
// TPC-H table names
1749+
let tables = [
1750+
"part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation",
1751+
"region",
1752+
];
1753+
1754+
// Create external tables for all TPC-H tables
1755+
for table in &tables {
1756+
let table_sql = format!(
1757+
"CREATE EXTERNAL TABLE {table} STORED AS PARQUET LOCATION '{test_data}/tpch_{table}_small.parquet'"
1758+
);
1759+
ctx.sql(&table_sql).await.map_err(|e| {
1760+
DataFusionError::External(
1761+
format!("Failed to create {table} table: {e}").into(),
1762+
)
1763+
})?;
1764+
}
1765+
1766+
Ok(ctx)
1767+
}
1768+
1769+
/// Helper function to get TPC-H query SQL
1770+
fn get_tpch_query_sql(query: usize) -> Result<Vec<String>> {
1771+
use std::fs;
1772+
1773+
if !(1..=22).contains(&query) {
1774+
return Err(DataFusionError::External(
1775+
format!("Invalid TPC-H query number: {query}").into(),
1776+
));
1777+
}
1778+
1779+
let filename = format!("../../benchmarks/queries/q{query}.sql");
1780+
let contents = fs::read_to_string(&filename).map_err(|e| {
1781+
DataFusionError::External(
1782+
format!("Failed to read query file {filename}: {e}").into(),
1783+
)
17641784
})?;
17651785

1766-
// Test the exact problematic query
1767-
let sql = "SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31)";
1786+
Ok(contents
1787+
.split(';')
1788+
.map(|s| s.trim())
1789+
.filter(|s| !s.is_empty())
1790+
.map(|s| s.to_string())
1791+
.collect())
1792+
}
17681793

1769-
let logical_plan = ctx.sql(sql).await?.into_unoptimized_plan();
1770-
let optimized_plan = ctx.state().optimize(&logical_plan)?;
1771-
let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?;
1794+
#[tokio::test]
1795+
async fn test_serialize_deserialize_tpch_queries() -> Result<()> {
1796+
// Create context with TPC-H tables
1797+
let ctx = tpch_context().await?;
1798+
1799+
// repeat to run all 22 queries
1800+
for query in 1..=22 {
1801+
// run all statements in the query
1802+
let sql = get_tpch_query_sql(query)?;
1803+
for stmt in sql {
1804+
let logical_plan = ctx.sql(&stmt).await?.into_unoptimized_plan();
1805+
let optimized_plan = ctx.state().optimize(&logical_plan)?;
1806+
let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?;
1807+
1808+
// serialize the physical plan
1809+
let codec = DefaultPhysicalExtensionCodec {};
1810+
let proto =
1811+
PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?;
1812+
1813+
// deserialize the physical plan
1814+
let _deserialized_plan =
1815+
proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?;
1816+
}
1817+
}
17721818

1773-
// Serialize the physical plan - bug may happen here already but not necessarily manifests
1774-
let codec = DefaultPhysicalExtensionCodec {};
1775-
let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?;
1819+
Ok(())
1820+
}
17761821

1777-
// This will fail with the bug, but should succeed when fixed
1778-
let _deserialized_plan =
1779-
proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?;
1822+
// bug: https://github.com/apache/datafusion/issues/16772
1823+
// Only 4 queries pass: q3, q5, q10, q12
1824+
// Ignore the test until the bug is fixed
1825+
#[ignore]
1826+
#[tokio::test]
1827+
async fn test_round_trip_tpch_queries() -> Result<()> {
1828+
// Create context with TPC-H tables
1829+
let ctx = tpch_context().await?;
1830+
1831+
// repeat to run all 22 queries
1832+
for query in 1..=22 {
1833+
// run all statements in the query
1834+
let sql = get_tpch_query_sql(query)?;
1835+
for stmt in sql {
1836+
roundtrip_test_sql_with_context(&stmt, &ctx).await?;
1837+
}
1838+
}
17801839

17811840
Ok(())
17821841
}

0 commit comments

Comments
 (0)