Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions native/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,31 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled(
})
}

/// JNI method: does object_store recognize this URL's scheme?
///
/// This is the source of truth for the JVM planner's "can Comet's native reader handle this
/// filesystem?" check. Comet's `prepare_object_store_with_configs` dispatches non-hdfs/non-s3
/// schemes to object_store's `parse_url`, which is driven by `ObjectStoreScheme::parse`; an
/// unrecognized scheme (e.g. a custom Hadoop FileSystem) fails there at execution time. By
/// answering from `ObjectStoreScheme::parse` here, the planner can decline early without
/// hardcoding -- and drifting from -- the object_store-supported scheme set. (hdfs / libhdfs
/// schemes are handled separately on the JVM side via the user's libhdfs scheme config.)
#[no_mangle]
pub extern "system" fn Java_org_apache_comet_NativeBase_isObjectStoreSchemeSupported(
env: EnvUnowned,
_: JClass,
url: JString,
) -> jni::sys::jboolean {
try_unwrap_or_throw(&env, |env| {
let url_str: String = url.try_to_string(env)?;
let supported = url::Url::parse(&url_str)
.ok()
.map(|u| object_store::ObjectStoreScheme::parse(&u).is_ok())
.unwrap_or(false);
Ok(supported)
})
}

// Creates a default log4rs config, which logs to console with log level.
fn default_logger_config(log_level: &str) -> CometResult<Config> {
let console_append = ConsoleAppender::builder()
Expand Down
11 changes: 11 additions & 0 deletions spark/src/main/java/org/apache/comet/NativeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,15 @@ private static String resourceName() {
* @return true if the feature is enabled, false otherwise
*/
public static native boolean isFeatureEnabled(String featureName);

/**
* Check whether Comet's native object_store layer recognizes the given URL's scheme (i.e. the
* scan would be natively readable rather than failing at execution with "Unable to recognise
* URL"). This is the authoritative answer from object_store's own scheme parser, so the JVM
* planner never has to hardcode (and drift from) the set of supported schemes.
*
* @param url a fully-qualified URL whose scheme should be checked (e.g. "s3://bucket/path")
* @return true if object_store can construct a store for this scheme, false otherwise
*/
public static native boolean isObjectStoreSchemeSupported(String url);
}
63 changes: 62 additions & 1 deletion spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.comet.{CometConf, DataTypeSupport}
import org.apache.comet.{CometConf, DataTypeSupport, NativeBase}
import org.apache.comet.CometConf._
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withFallbackReason, withFallbackReasons}
import org.apache.comet.DataTypeSupport.isComplexType
Expand Down Expand Up @@ -201,6 +201,40 @@ case class CometScanRule(session: SparkSession)
s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
return None
}
// Comet's native readers go through object_store, which only understands a fixed set of URL
// schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs.<scheme>.impl) would
// surface at execution time as `Generic URL error: Unable to recognise URL "..."`. Decline here
// so Spark's reader -- which goes through the Hadoop FS API and can resolve custom schemes --
// handles the scan. Whether object_store recognizes a scheme is answered by the native layer
// itself (`NativeBase.isObjectStoreSchemeSupported`) rather than a hardcoded list, so the
// planner can't drift from object_store's actual support.
//
// EXCEPT schemes the user routes through libhdfs via `spark.hadoop.fs.comet.libhdfs.schemes`
// (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store
// bridge, so they must NOT be declined here (regression guarded by
// ParquetReadFromFakeHadoopFsSuite).
val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match {
case Some(s) =>
s.split(",").map(_.trim.toLowerCase(java.util.Locale.ROOT)).filter(_.nonEmpty).toSet
case None => Set.empty
}
val unsupportedFsSchemes = r.location.rootPaths
.map(_.toUri)
.filter { uri =>
val sch = uri.getScheme
sch != null && {
val sl = sch.toLowerCase(java.util.Locale.ROOT)
!libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri)
}
}
.map(_.getScheme.toLowerCase(java.util.Locale.ROOT))
.toSet
if (unsupportedFsSchemes.nonEmpty) {
withInfo(
scanExec,
s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}")
return None
}
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
// backend, so by default fall back to Spark. Users can opt in to letting Comet
Expand Down Expand Up @@ -728,6 +762,33 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim {

object CometScanRule extends Logging {

// Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the
// URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme.
private val schemeSupportCache =
new java.util.concurrent.ConcurrentHashMap[String, java.lang.Boolean]()

/**
* True when Comet's native object_store layer recognizes this URI's scheme (so the scan is
* natively readable). Delegates to the native layer -- the source of truth -- instead of a
* hardcoded scheme list. On any failure to consult native (e.g. the library isn't loaded on
* this JVM, or predates this method) we assume the scheme IS supported: the scheme gate is an
* early-fallback optimization, and a build without a working native library can't run Comet's
* native scan anyway, so declining here would only over-restrict.
*/
private[rules] def isNativelyReadableScheme(uri: java.net.URI): Boolean = {
val scheme = uri.getScheme
if (scheme == null) return true
schemeSupportCache
.computeIfAbsent(
scheme.toLowerCase(java.util.Locale.ROOT),
_ =>
try java.lang.Boolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString))
catch {
case _: Throwable => java.lang.Boolean.TRUE
})
.booleanValue()
}

/**
* Tag set on a scan (`FileSourceScanExec` or `BatchScanExec`) that should be left as a plain
* Spark scan rather than converted to a Comet scan. Written by
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.rules

import java.io.File
import java.nio.file.Files
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CometTestBase, SaveMode}
import org.apache.spark.sql.comet.CometScanExec
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}

import org.apache.comet.CometConf
import org.apache.comet.hadoop.fs.FakeHDFSFileSystem

/**
* Comet's native readers go through object_store, which only understands a fixed set of URL
* schemes. A custom Hadoop FileSystem scheme that object_store can't parse (here `fake://`) must
* NOT be claimed by the native scan -- it would fail at execution with "Unable to recognise URL".
* `CometScanRule` must decline it so Spark's Hadoop-FS-aware reader handles the scan.
*
* Unlike `ParquetReadFromFakeHadoopFsSuite`, this suite does NOT route the `fake` scheme through
* libhdfs (`spark.hadoop.fs.comet.libhdfs.schemes`), so it exercises the decline path. The test
* applies the rule directly to the physical plan and asserts fallback -- no query execution, so
* it doesn't depend on the native reader actually attempting (and failing on) the scheme.
*/
class CometScanSchemeFallbackSuite extends CometTestBase {

private var fakeRootDir: File = _

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem")
conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX)
// Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable.
conf
}

override def beforeAll(): Unit = {
fakeRootDir = Files.createTempDirectory(s"comet_scheme_${UUID.randomUUID().toString}").toFile
super.beforeAll()
}

protected override def afterAll(): Unit = {
if (fakeRootDir != null) FileUtils.deleteDirectory(fakeRootDir)
super.afterAll()
}

test("native scan declines a filesystem scheme object_store can't read (fake://)") {
val path = s"${FakeHDFSFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/data"
spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path)

// Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply
// CometScanRule directly. No execution -- we only check whether the rule claims the scan.
val sparkPlan: SparkPlan = withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
spark.read.parquet(path).queryExecution.executedPlan
}

withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {
val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan))

val cometScans = transformed.collect { case s: CometScanExec => s }
val sparkScans = transformed.collect { case s: FileSourceScanExec => s }
assert(
cometScans.isEmpty,
s"`fake://` is not object_store-readable; the native scan must fall back to Spark, " +
s"but Comet claimed it:\n$transformed")
assert(
sparkScans.size == 1,
s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed")
}
}
}