diff --git a/native/Cargo.lock b/native/Cargo.lock index a0e9481399..5fdb7968fc 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -189,7 +189,7 @@ dependencies = [ "miniz_oxide", "num-bigint", "quad-rand", - "rand 0.9.2", + "rand 0.9.4", "regex-lite", "serde", "serde_bytes", @@ -1847,7 +1847,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.9.2", + "rand 0.9.4", "regex", "sqlparser", "tempfile", @@ -2139,7 +2139,7 @@ dependencies = [ "liblzma", "log", "object_store", - "rand 0.9.2", + "rand 0.9.4", "tokio", "tokio-util", "url", @@ -2272,7 +2272,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand 0.9.2", + "rand 0.9.4", "tempfile", "url", ] @@ -2337,7 +2337,7 @@ dependencies = [ "md-5", "memchr", "num-traits", - "rand 0.9.2", + "rand 0.9.4", "regex", "sha2", "unicode-segmentation", @@ -2634,7 +2634,7 @@ dependencies = [ "datafusion-functions-nested", "log", "percent-encoding", - "rand 0.9.2", + "rand 0.9.4", "serde_json", "sha1", "sha2", @@ -2863,7 +2863,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3478,7 +3478,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=477a1e5#477a1e525b4915895388a4f45557b825ea541ef2" +source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" dependencies = [ "aes-gcm", "anyhow", @@ -3511,7 +3511,7 @@ dependencies = [ "once_cell", "ordered-float 4.6.0", "parquet", - "rand 0.8.5", + "rand 0.9.4", "reqwest", "roaring", "serde", @@ -3533,7 +3533,7 @@ dependencies = [ [[package]] name = "iceberg-storage-opendal" version = "0.9.0" -source = "git+https://github.com/apache/iceberg-rust?rev=477a1e5#477a1e525b4915895388a4f45557b825ea541ef2" +source = "git+https://github.com/apache/iceberg-rust?rev=a2f067d#a2f067d0225d66ab88b8a18ec25b8a0953e35082" dependencies = [ "anyhow", "async-trait", @@ -3754,7 +3754,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3810,7 +3810,7 @@ dependencies = [ "portable-atomic-util", "serde_core", "wasm-bindgen", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4157,7 +4157,7 @@ dependencies = [ "log-mdc", "mock_instant", "parking_lot", - "rand 0.9.2", + "rand 0.9.4", "serde", "serde-value", "serde_json", @@ -5233,7 +5233,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.2", + "rand 0.9.4", "ring", "rustc-hash 2.1.2", "rustls", @@ -5256,7 +5256,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -5293,9 +5293,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.2" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.5", @@ -5651,7 +5651,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6243,10 +6243,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6567,7 +6567,7 @@ version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" dependencies = [ - "rand 0.9.2", + "rand 0.9.4", ] [[package]] @@ -6958,7 +6958,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -7053,15 +7053,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", -] - [[package]] name = "windows-sys" version = "0.61.2" @@ -7095,30 +7086,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -7131,12 +7105,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -7149,12 +7117,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -7167,24 +7129,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -7197,12 +7147,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -7215,12 +7159,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -7233,12 +7171,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -7251,12 +7183,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/native/Cargo.toml b/native/Cargo.toml index b71bc0c73c..7d0f26c9ba 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -58,8 +58,8 @@ object_store = { version = "0.13.1", features = ["gcp", "azure", "aws", "http"] url = "2.2" aws-config = "1.8.14" aws-credential-types = "1.2.13" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "477a1e5" } -iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "477a1e5", features = ["opendal-all"] } +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d" } +iceberg-storage-opendal = { git = "https://github.com/apache/iceberg-rust", rev = "a2f067d", features = ["opendal-all"] } [profile.release] debug = true diff --git a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala index 033b634e0f..62c8844f72 100644 --- a/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala @@ -22,9 +22,12 @@ package org.apache.comet import java.io.File import java.nio.file.Files +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.comet.CometIcebergNativeScanExec import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, TimestampType} import org.apache.comet.iceberg.RESTCatalogHelper @@ -2242,6 +2245,162 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } + // Regression test for https://github.com/apache/datafusion-comet/issues/3856 + // Fixed in https://github.com/apache/iceberg-rust/pull/2301 + test("migration - INT96 timestamp") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator} + import org.apache.spark.sql.functions.monotonically_increasing_id + import org.apache.spark.sql.types._ + + val dataPath = s"${warehouseDir.getAbsolutePath}/int96_data" + val numRows = 50 + val r = new scala.util.Random(42) + + // Exercise INT96 coercion in flat columns, structs, arrays, and maps + val fuzzSchema = StructType( + Seq( + StructField("ts", TimestampType, nullable = true), + StructField("value", DoubleType, nullable = true), + StructField( + "ts_struct", + StructType( + Seq( + StructField("inner_ts", TimestampType, nullable = true), + StructField("inner_val", DoubleType, nullable = true))), + nullable = true), + StructField( + "ts_array", + ArrayType(TimestampType, containsNull = true), + nullable = true), + StructField("ts_map", MapType(IntegerType, TimestampType), nullable = true))) + + // Default FuzzDataGenerator baseDate is year 3333, outside the i64 nanosecond + // range (~1677-2262). This triggers the INT96 overflow bug if coercion is missing. + val dataGenOptions = DataGenOptions(allowNull = false) + val fuzzDf = + FuzzDataGenerator.generateDataFrame(r, spark, fuzzSchema, numRows, dataGenOptions) + + val df = fuzzDf.withColumn("id", monotonically_increasing_id()) + + // Write Parquet with INT96 timestamps + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + df.write.mode("overwrite").parquet(dataPath) + } + + // Verify all timestamp columns in the Parquet file use INT96 + val parquetFiles = new java.io.File(dataPath) + .listFiles() + .filter(f => f.getName.endsWith(".parquet")) + assert(parquetFiles.nonEmpty, "Expected at least one Parquet file") + + val parquetFile = parquetFiles.head + val reader = org.apache.parquet.hadoop.ParquetFileReader.open( + org.apache.parquet.hadoop.util.HadoopInputFile.fromPath( + new org.apache.hadoop.fs.Path(parquetFile.getAbsolutePath), + spark.sessionState.newHadoopConf())) + try { + val parquetSchema = reader.getFooter.getFileMetaData.getSchema + val int96Columns = parquetSchema.getColumns.asScala + .filter(_.getPrimitiveType.getPrimitiveTypeName == + org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96) + .map(_.getPath.mkString(".")) + // Expect INT96 for: ts, ts_struct.inner_ts, ts_array.list.element, ts_map.value + assert( + int96Columns.size >= 4, + s"Expected at least 4 INT96 columns but found ${int96Columns.size}: ${int96Columns.mkString(", ")}") + } finally { + reader.close() + } + + // Create an unpartitioned Iceberg table and import the Parquet files + spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db") + spark.sql(""" + CREATE TABLE test_cat.db.int96_test ( + ts TIMESTAMP, + value DOUBLE, + ts_struct STRUCT, + ts_array ARRAY, + ts_map MAP, + id BIGINT + ) USING iceberg + """) + + try { + val tableUtilClass = Class.forName("org.apache.iceberg.spark.SparkTableUtil") + val sparkCatalog = spark.sessionState.catalogManager + .catalog("test_cat") + .asInstanceOf[org.apache.iceberg.spark.SparkCatalog] + val ident = + org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "int96_test") + val sparkTable = sparkCatalog + .loadTable(ident) + .asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + val table = sparkTable.table() + + val stagingDir = s"${warehouseDir.getAbsolutePath}/staging" + + spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION '$dataPath'""") + val sourceIdent = new org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp") + + val importMethod = tableUtilClass.getMethod( + "importSparkTable", + classOf[org.apache.spark.sql.SparkSession], + classOf[org.apache.spark.sql.catalyst.TableIdentifier], + classOf[org.apache.iceberg.Table], + classOf[String]) + importMethod.invoke(null, spark, sourceIdent, table, stagingDir) + + val distinctCount = spark + .sql("SELECT COUNT(DISTINCT id) FROM test_cat.db.int96_test") + .collect()(0) + .getLong(0) + assert( + distinctCount == numRows, + s"Expected $numRows distinct IDs but got $distinctCount") + + // Spark's Iceberg reader returns null for INT96 timestamps inside structs, + // so we can't use checkIcebergNativeScan (which compares against Spark) for + // ts_struct. Instead, compare Comet's read against the raw Parquet source. + checkIcebergNativeScan( + "SELECT id, ts, value, ts_array, ts_map FROM test_cat.db.int96_test ORDER BY id") + checkIcebergNativeScan("SELECT id, ts FROM test_cat.db.int96_test ORDER BY id") + checkIcebergNativeScan("SELECT id, ts_array FROM test_cat.db.int96_test ORDER BY id") + checkIcebergNativeScan("SELECT id, ts_map FROM test_cat.db.int96_test ORDER BY id") + + // Validate ts_struct against raw Parquet since Spark's Iceberg reader can't read it + val icebergStructDf = spark + .sql("SELECT id, ts_struct FROM test_cat.db.int96_test ORDER BY id") + .collect() + val parquetStructDf = spark.read + .parquet(dataPath) + .select("id", "ts_struct") + .orderBy("id") + .collect() + assert( + icebergStructDf.sameElements(parquetStructDf), + "ts_struct mismatch between Comet Iceberg read and raw Parquet") + + spark.sql("DROP TABLE test_cat.db.int96_test") + spark.sql("DROP TABLE parquet_temp") + } catch { + case _: ClassNotFoundException => + cancel("SparkTableUtil not available") + } + } + } + } + test("REST catalog with native Iceberg scan") { assume(icebergAvailable, "Iceberg not available in classpath") @@ -2521,4 +2680,88 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper { } } } + + // Regression test for https://github.com/apache/datafusion-comet/issues/3860 + // Fixed in https://github.com/apache/iceberg-rust/pull/2307 + test("filter with nested types in migrated table") { + assume(icebergAvailable, "Iceberg not available in classpath") + + withTempIcebergDir { warehouseDir => + withSQLConf( + "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog", + "spark.sql.catalog.test_cat.type" -> "hadoop", + "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath, + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true", + CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") { + + val dataPath = s"${warehouseDir.getAbsolutePath}/nested_data" + + // Write Parquet WITHOUT Iceberg (simulates pre-migration data) + // id is last so its leaf index is after all nested type leaves + spark + .sql(""" + SELECT + named_struct('age', id * 10, 'score', id * 1.5) AS info, + array(id, id + 1) AS tags, + map('key', id) AS props, + id + FROM range(10) + """) + .write + .parquet(dataPath) + + spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db") + spark.sql(""" + CREATE TABLE test_cat.db.nested_migrate ( + info STRUCT, + tags ARRAY, + props MAP, + id BIGINT + ) USING iceberg + """) + + try { + val tableUtilClass = Class.forName("org.apache.iceberg.spark.SparkTableUtil") + val sparkCatalog = spark.sessionState.catalogManager + .catalog("test_cat") + .asInstanceOf[org.apache.iceberg.spark.SparkCatalog] + val ident = + org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "nested_migrate") + val sparkTable = sparkCatalog + .loadTable(ident) + .asInstanceOf[org.apache.iceberg.spark.source.SparkTable] + val table = sparkTable.table() + + val stagingDir = s"${warehouseDir.getAbsolutePath}/staging" + spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION '$dataPath'""") + val sourceIdent = new org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp") + + val importMethod = tableUtilClass.getMethod( + "importSparkTable", + classOf[org.apache.spark.sql.SparkSession], + classOf[org.apache.spark.sql.catalyst.TableIdentifier], + classOf[org.apache.iceberg.Table], + classOf[String]) + importMethod.invoke(null, spark, sourceIdent, table, stagingDir) + + // Select only flat columns to avoid Spark's Iceberg reader returning + // null for struct fields in migrated tables (separate Spark bug) + checkIcebergNativeScan("SELECT id FROM test_cat.db.nested_migrate ORDER BY id") + + // Filter on root column with nested types in migrated table: + // Parquet files lack Iceberg field IDs, so iceberg-rust falls back to + // name mapping where column_map resolution was broken for nested types + checkIcebergNativeScan( + "SELECT id FROM test_cat.db.nested_migrate WHERE id > 5 ORDER BY id") + + spark.sql("DROP TABLE test_cat.db.nested_migrate") + spark.sql("DROP TABLE parquet_temp") + } catch { + case _: ClassNotFoundException => + cancel("SparkTableUtil not available") + } + } + } + } }