diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 19a2d774a0..7d15c761ca 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -156,6 +156,31 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled( }) } +/// JNI method: does object_store recognize this URL's scheme? +/// +/// This is the source of truth for the JVM planner's "can Comet's native reader handle this +/// filesystem?" check. Comet's `prepare_object_store_with_configs` dispatches non-hdfs/non-s3 +/// schemes to object_store's `parse_url`, which is driven by `ObjectStoreScheme::parse`; an +/// unrecognized scheme (e.g. a custom Hadoop FileSystem) fails there at execution time. By +/// answering from `ObjectStoreScheme::parse` here, the planner can decline early without +/// hardcoding -- and drifting from -- the object_store-supported scheme set. (hdfs / libhdfs +/// schemes are handled separately on the JVM side via the user's libhdfs scheme config.) +#[no_mangle] +pub extern "system" fn Java_org_apache_comet_NativeBase_isObjectStoreSchemeSupported( + env: EnvUnowned, + _: JClass, + url: JString, +) -> jni::sys::jboolean { + try_unwrap_or_throw(&env, |env| { + let url_str: String = url.try_to_string(env)?; + let supported = url::Url::parse(&url_str) + .ok() + .map(|u| object_store::ObjectStoreScheme::parse(&u).is_ok()) + .unwrap_or(false); + Ok(supported) + }) +} + // Creates a default log4rs config, which logs to console with log level. fn default_logger_config(log_level: &str) -> CometResult { let console_append = ConsoleAppender::builder() diff --git a/spark/src/main/java/org/apache/comet/NativeBase.java b/spark/src/main/java/org/apache/comet/NativeBase.java index 074a4b1625..e2fcbb24a7 100644 --- a/spark/src/main/java/org/apache/comet/NativeBase.java +++ b/spark/src/main/java/org/apache/comet/NativeBase.java @@ -300,4 +300,15 @@ private static String resourceName() { * @return true if the feature is enabled, false otherwise */ public static native boolean isFeatureEnabled(String featureName); + + /** + * Check whether Comet's native object_store layer recognizes the given URL's scheme (i.e. the + * scan would be natively readable rather than failing at execution with "Unable to recognise + * URL"). This is the authoritative answer from object_store's own scheme parser, so the JVM + * planner never has to hardcode (and drift from) the set of supported schemes. + * + * @param url a fully-qualified URL whose scheme should be checked (e.g. "s3://bucket/path") + * @return true if object_store can construct a store for this scheme, false otherwise + */ + public static native boolean isObjectStoreSchemeSupported(String url); } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6dfcdcff25..3c32b661e3 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, DataTypeSupport} +import org.apache.comet.{CometConf, DataTypeSupport, NativeBase} import org.apache.comet.CometConf._ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withFallbackReason, withFallbackReasons} import org.apache.comet.DataTypeSupport.isComplexType @@ -201,6 +201,40 @@ case class CometScanRule(session: SparkSession) s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled") return None } + // Comet's native readers go through object_store, which only understands a fixed set of URL + // schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs..impl) would + // surface at execution time as `Generic URL error: Unable to recognise URL "..."`. Decline here + // so Spark's reader -- which goes through the Hadoop FS API and can resolve custom schemes -- + // handles the scan. Whether object_store recognizes a scheme is answered by the native layer + // itself (`NativeBase.isObjectStoreSchemeSupported`) rather than a hardcoded list, so the + // planner can't drift from object_store's actual support. + // + // EXCEPT schemes the user routes through libhdfs via `spark.hadoop.fs.comet.libhdfs.schemes` + // (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store + // bridge, so they must NOT be declined here (regression guarded by + // ParquetReadFromFakeHadoopFsSuite). + val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match { + case Some(s) => + s.split(",").map(_.trim.toLowerCase(java.util.Locale.ROOT)).filter(_.nonEmpty).toSet + case None => Set.empty + } + val unsupportedFsSchemes = r.location.rootPaths + .map(_.toUri) + .filter { uri => + val sch = uri.getScheme + sch != null && { + val sl = sch.toLowerCase(java.util.Locale.ROOT) + !libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri) + } + } + .map(_.getScheme.toLowerCase(java.util.Locale.ROOT)) + .toSet + if (unsupportedFsSchemes.nonEmpty) { + withInfo( + scanExec, + s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}") + return None + } // Disabling the vectorized reader opts into parquet-mr's permissive behavior // (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent // backend, so by default fall back to Spark. Users can opt in to letting Comet @@ -728,6 +762,33 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim { object CometScanRule extends Logging { + // Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the + // URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme. + private val schemeSupportCache = + new java.util.concurrent.ConcurrentHashMap[String, java.lang.Boolean]() + + /** + * True when Comet's native object_store layer recognizes this URI's scheme (so the scan is + * natively readable). Delegates to the native layer -- the source of truth -- instead of a + * hardcoded scheme list. On any failure to consult native (e.g. the library isn't loaded on + * this JVM, or predates this method) we assume the scheme IS supported: the scheme gate is an + * early-fallback optimization, and a build without a working native library can't run Comet's + * native scan anyway, so declining here would only over-restrict. + */ + private[rules] def isNativelyReadableScheme(uri: java.net.URI): Boolean = { + val scheme = uri.getScheme + if (scheme == null) return true + schemeSupportCache + .computeIfAbsent( + scheme.toLowerCase(java.util.Locale.ROOT), + _ => + try java.lang.Boolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString)) + catch { + case _: Throwable => java.lang.Boolean.TRUE + }) + .booleanValue() + } + /** * Tag set on a scan (`FileSourceScanExec` or `BatchScanExec`) that should be left as a plain * Spark scan rather than converted to a Comet scan. Written by diff --git a/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala new file mode 100644 index 0000000000..e7ab7ffc6c --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/rules/CometScanSchemeFallbackSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.rules + +import java.io.File +import java.nio.file.Files +import java.util.UUID + +import org.apache.commons.io.FileUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{CometTestBase, SaveMode} +import org.apache.spark.sql.comet.CometScanExec +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} + +import org.apache.comet.CometConf +import org.apache.comet.hadoop.fs.FakeHDFSFileSystem + +/** + * Comet's native readers go through object_store, which only understands a fixed set of URL + * schemes. A custom Hadoop FileSystem scheme that object_store can't parse (here `fake://`) must + * NOT be claimed by the native scan -- it would fail at execution with "Unable to recognise URL". + * `CometScanRule` must decline it so Spark's Hadoop-FS-aware reader handles the scan. + * + * Unlike `ParquetReadFromFakeHadoopFsSuite`, this suite does NOT route the `fake` scheme through + * libhdfs (`spark.hadoop.fs.comet.libhdfs.schemes`), so it exercises the decline path. The test + * applies the rule directly to the physical plan and asserts fallback -- no query execution, so + * it doesn't depend on the native reader actually attempting (and failing on) the scheme. + */ +class CometScanSchemeFallbackSuite extends CometTestBase { + + private var fakeRootDir: File = _ + + override protected def sparkConf: SparkConf = { + val conf = super.sparkConf + conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem") + conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX) + // Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable. + conf + } + + override def beforeAll(): Unit = { + fakeRootDir = Files.createTempDirectory(s"comet_scheme_${UUID.randomUUID().toString}").toFile + super.beforeAll() + } + + protected override def afterAll(): Unit = { + if (fakeRootDir != null) FileUtils.deleteDirectory(fakeRootDir) + super.afterAll() + } + + test("native scan declines a filesystem scheme object_store can't read (fake://)") { + val path = s"${FakeHDFSFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/data" + spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path) + + // Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply + // CometScanRule directly. No execution -- we only check whether the rule claims the scan. + val sparkPlan: SparkPlan = withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + spark.read.parquet(path).queryExecution.executedPlan + } + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan)) + + val cometScans = transformed.collect { case s: CometScanExec => s } + val sparkScans = transformed.collect { case s: FileSourceScanExec => s } + assert( + cometScans.isEmpty, + s"`fake://` is not object_store-readable; the native scan must fall back to Spark, " + + s"but Comet claimed it:\n$transformed") + assert( + sparkScans.size == 1, + s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed") + } + } +}