|
20 | 20 | package org.apache.comet.parquet |
21 | 21 |
|
22 | 22 | import java.io.File |
23 | | -import java.math.BigDecimal |
| 23 | +import java.math.{BigDecimal, BigInteger} |
24 | 24 | import java.time.{ZoneId, ZoneOffset} |
25 | 25 |
|
26 | 26 | import scala.reflect.ClassTag |
@@ -1730,6 +1730,66 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { |
1730 | 1730 | } |
1731 | 1731 | } |
1732 | 1732 |
|
| 1733 | + test("test V1 parquet scan filter pushdown of primitive types") { |
| 1734 | + withTempPath { dir => |
| 1735 | + val path = new Path(dir.toURI.toString, "test1.parquet") |
| 1736 | + val rows = 1000 |
| 1737 | + withSQLConf(CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key -> "true") { |
| 1738 | + makeParquetFileAllPrimitiveTypes( |
| 1739 | + path, |
| 1740 | + dictionaryEnabled = false, |
| 1741 | + 0, |
| 1742 | + rows, |
| 1743 | + nullEnabled = false) |
| 1744 | + } |
| 1745 | + Seq(true, false).foreach { pushDown => |
| 1746 | + withSQLConf( |
| 1747 | + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, |
| 1748 | + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) { |
| 1749 | + Seq( |
| 1750 | + ("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean |
| 1751 | + ("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte |
| 1752 | + ("_3 = 1", 1), // Short |
| 1753 | + ("_4 = 1", 1), // Integer |
| 1754 | + ("_5 = 1", 1), // Long |
| 1755 | + ("_6 = 1.0", 1), // Float |
| 1756 | + ("_7 = 1.0", 1), // Double |
| 1757 | + (s"_8 = '${1.toString * 48}'", 1), // String |
| 1758 | + ("_21 = to_binary('1', 'utf-8')", 1), // binary |
| 1759 | + ("_15 = 0.0", 1), // DECIMAL(5, 2) |
| 1760 | + ("_16 = 0.0", 1), // DECIMAL(18, 10) |
| 1761 | + ( |
| 1762 | + s"_17 = ${new BigDecimal(new BigInteger(("1" * 16).getBytes), 37).toString}", |
| 1763 | + Math.ceil(rows.toDouble / 10) |
| 1764 | + ), // DECIMAL(38, 37) |
| 1765 | + (s"_19 = TIMESTAMP '${DateTimeUtils.toJavaTimestamp(1)}'", 1), // Timestamp |
| 1766 | + ("_20 = DATE '1970-01-02'", 1) // Date |
| 1767 | + ).foreach { case (whereCause, expectedRows) => |
| 1768 | + val df = spark.read |
| 1769 | + .parquet(path.toString) |
| 1770 | + .where(whereCause) |
| 1771 | + val (_, cometPlan) = checkSparkAnswer(df) |
| 1772 | + val scan = collect(cometPlan) { |
| 1773 | + case p: CometScanExec => |
| 1774 | + assert(p.dataFilters.nonEmpty) |
| 1775 | + p |
| 1776 | + case p: CometNativeScanExec => |
| 1777 | + assert(p.dataFilters.nonEmpty) |
| 1778 | + p |
| 1779 | + } |
| 1780 | + assert(scan.size == 1) |
| 1781 | + |
| 1782 | + if (pushDown) { |
| 1783 | + assert(scan.head.metrics("output_rows").value == expectedRows) |
| 1784 | + } else { |
| 1785 | + assert(scan.head.metrics("output_rows").value == rows) |
| 1786 | + } |
| 1787 | + } |
| 1788 | + } |
| 1789 | + } |
| 1790 | + } |
| 1791 | + } |
| 1792 | + |
1733 | 1793 | test("read basic complex types") { |
1734 | 1794 | Seq(true, false).foreach(dictionaryEnabled => { |
1735 | 1795 | withTempPath { dir => |
|
0 commit comments