Skip to content

Commit d59f6c9

Browse files
authored
fix: decline native V1 scans on object_store-unsupported filesystem schemes (#4525)
1 parent 2e19011 commit d59f6c9

7 files changed

Lines changed: 294 additions & 1 deletion

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ jobs:
346346
org.apache.spark.CometPluginsNonOverrideSuite
347347
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
348348
org.apache.comet.rules.CometScanRuleSuite
349+
org.apache.comet.rules.CometScanSchemeFallbackSuite
349350
org.apache.comet.rules.CometExecRuleSuite
350351
org.apache.spark.sql.CometTPCDSQuerySuite
351352
org.apache.spark.sql.CometTPCDSQueryTestSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ jobs:
162162
org.apache.spark.CometPluginsNonOverrideSuite
163163
org.apache.spark.CometPluginsUnifiedModeOverrideSuite
164164
org.apache.comet.rules.CometScanRuleSuite
165+
org.apache.comet.rules.CometScanSchemeFallbackSuite
165166
org.apache.comet.rules.CometExecRuleSuite
166167
org.apache.spark.sql.CometTPCDSQuerySuite
167168
org.apache.spark.sql.CometTPCDSQueryTestSuite

native/core/src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,31 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_isFeatureEnabled(
156156
})
157157
}
158158

159+
/// JNI method: does object_store recognize this URL's scheme?
160+
///
161+
/// This is the source of truth for the JVM planner's "can Comet's native reader handle this
162+
/// filesystem?" check. Comet's `prepare_object_store_with_configs` dispatches non-hdfs/non-s3
163+
/// schemes to object_store's `parse_url`, which is driven by `ObjectStoreScheme::parse`; an
164+
/// unrecognized scheme (e.g. a custom Hadoop FileSystem) fails there at execution time. By
165+
/// answering from `ObjectStoreScheme::parse` here, the planner can decline early without
166+
/// hardcoding -- and drifting from -- the object_store-supported scheme set. (hdfs / libhdfs
167+
/// schemes are handled separately on the JVM side via the user's libhdfs scheme config.)
168+
#[no_mangle]
169+
pub extern "system" fn Java_org_apache_comet_NativeBase_isObjectStoreSchemeSupported(
170+
env: EnvUnowned,
171+
_: JClass,
172+
url: JString,
173+
) -> jni::sys::jboolean {
174+
try_unwrap_or_throw(&env, |env| {
175+
let url_str: String = url.try_to_string(env)?;
176+
let supported = url::Url::parse(&url_str)
177+
.ok()
178+
.map(|u| object_store::ObjectStoreScheme::parse(&u).is_ok())
179+
.unwrap_or(false);
180+
Ok(supported)
181+
})
182+
}
183+
159184
// Creates a default log4rs config, which logs to console with log level.
160185
fn default_logger_config(log_level: &str) -> CometResult<Config> {
161186
let console_append = ConsoleAppender::builder()

spark/src/main/java/org/apache/comet/NativeBase.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,15 @@ private static String resourceName() {
300300
* @return true if the feature is enabled, false otherwise
301301
*/
302302
public static native boolean isFeatureEnabled(String featureName);
303+
304+
/**
305+
* Check whether Comet's native object_store layer recognizes the given URL's scheme (i.e. the
306+
* scan would be natively readable rather than failing at execution with "Unable to recognise
307+
* URL"). This is the authoritative answer from object_store's own scheme parser, so the JVM
308+
* planner never has to hardcode (and drift from) the set of supported schemes.
309+
*
310+
* @param url a fully-qualified URL whose scheme should be checked (e.g. "s3://bucket/path")
311+
* @return true if object_store can construct a store for this scheme, false otherwise
312+
*/
313+
public static native boolean isObjectStoreSchemeSupported(String url);
303314
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
package org.apache.comet.rules
2121

22+
import java.lang.{Boolean => JBoolean}
2223
import java.net.URI
24+
import java.util.Locale
25+
import java.util.concurrent.ConcurrentHashMap
2326

2427
import scala.collection.mutable
2528
import scala.collection.mutable.ListBuffer
@@ -40,7 +43,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
4043
import org.apache.spark.sql.internal.SQLConf
4144
import org.apache.spark.sql.types._
4245

43-
import org.apache.comet.{CometConf, DataTypeSupport}
46+
import org.apache.comet.{CometConf, DataTypeSupport, NativeBase}
4447
import org.apache.comet.CometConf._
4548
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withFallbackReason, withFallbackReasons}
4649
import org.apache.comet.DataTypeSupport.isComplexType
@@ -201,6 +204,47 @@ case class CometScanRule(session: SparkSession)
201204
s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
202205
return None
203206
}
207+
// Comet's native readers go through object_store, which only understands a fixed set of URL
208+
// schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs.<scheme>.impl) would
209+
// surface at execution time as `Generic URL error: Unable to recognise URL "..."`. Decline here
210+
// so Spark's reader -- which goes through the Hadoop FS API and can resolve custom schemes --
211+
// handles the scan. Whether object_store recognizes a scheme is answered by the native layer
212+
// itself (`NativeBase.isObjectStoreSchemeSupported`) rather than a hardcoded list, so the
213+
// planner can't drift from object_store's actual support.
214+
//
215+
// EXCEPT schemes the user routes through libhdfs via `spark.hadoop.fs.comet.libhdfs.schemes`
216+
// (e.g. `hdfs`, or a test `fake`): those ARE natively readable through the libhdfs object_store
217+
// bridge, so they must NOT be declined here (regression guarded by
218+
// ParquetReadFromFakeHadoopFsSuite).
219+
//
220+
// The default mirrors the native side: when the config is unset, `is_hdfs_scheme`
221+
// (native/core/src/parquet/parquet_support.rs) treats `hdfs` as natively readable, and
222+
// `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). If we
223+
// defaulted to an empty set here, a plain `hdfs://` V1 scan would be declined and fall back to
224+
// Spark even though native can read it -- a silent regression for HDFS users in the default
225+
// configuration. So default to `Set("hdfs")` to stay in lockstep with the native default.
226+
val libhdfsSchemes: Set[String] = COMET_LIBHDFS_SCHEMES.get() match {
227+
case Some(s) =>
228+
s.split(",").map(_.trim.toLowerCase(Locale.ROOT)).filter(_.nonEmpty).toSet
229+
case None => Set("hdfs")
230+
}
231+
val unsupportedFsSchemes = r.location.rootPaths
232+
.map(_.toUri)
233+
.filter { uri =>
234+
val sch = uri.getScheme
235+
sch != null && {
236+
val sl = sch.toLowerCase(Locale.ROOT)
237+
!libhdfsSchemes.contains(sl) && !CometScanRule.isNativelyReadableScheme(uri)
238+
}
239+
}
240+
.map(_.getScheme.toLowerCase(Locale.ROOT))
241+
.toSet
242+
if (unsupportedFsSchemes.nonEmpty) {
243+
withFallbackReason(
244+
scanExec,
245+
s"Unsupported filesystem schemes: ${unsupportedFsSchemes.mkString(", ")}")
246+
return None
247+
}
204248
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
205249
// (silent overflow / null-on-narrowing). Comet has no parquet-mr-equivalent
206250
// backend, so by default fall back to Spark. Users can opt in to letting Comet
@@ -728,6 +772,33 @@ case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim {
728772

729773
object CometScanRule extends Logging {
730774

775+
// Per-scheme memo of `NativeBase.isObjectStoreSchemeSupported`. The answer depends only on the
776+
// URL scheme, so we cache by scheme and never re-cross the JNI boundary for a repeated scheme.
777+
private val schemeSupportCache =
778+
new ConcurrentHashMap[String, JBoolean]()
779+
780+
/**
781+
* True when Comet's native object_store layer recognizes this URI's scheme (so the scan is
782+
* natively readable). Delegates to the native layer -- the source of truth -- instead of a
783+
* hardcoded scheme list. On any failure to consult native (e.g. the library isn't loaded on
784+
* this JVM, or predates this method) we assume the scheme IS supported: the scheme gate is an
785+
* early-fallback optimization, and a build without a working native library can't run Comet's
786+
* native scan anyway, so declining here would only over-restrict.
787+
*/
788+
private[rules] def isNativelyReadableScheme(uri: URI): Boolean = {
789+
val scheme = uri.getScheme
790+
if (scheme == null) return true
791+
schemeSupportCache
792+
.computeIfAbsent(
793+
scheme.toLowerCase(Locale.ROOT),
794+
_ =>
795+
try JBoolean.valueOf(NativeBase.isObjectStoreSchemeSupported(uri.toString))
796+
catch {
797+
case _: Throwable => JBoolean.TRUE
798+
})
799+
.booleanValue()
800+
}
801+
731802
/**
732803
* Tag set on a scan (`FileSourceScanExec` or `BatchScanExec`) that should be left as a plain
733804
* Spark scan rather than converted to a Comet scan. Written by
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.hadoop.fs;
21+
22+
import java.net.URI;
23+
24+
import org.apache.hadoop.fs.RawLocalFileSystem;
25+
26+
/**
27+
* A local-disk-backed FileSystem that reports the {@code hdfs} scheme, so a test can write/read an
28+
* {@code hdfs://} path without a live HDFS cluster. Used to assert that {@code CometScanRule} still
29+
* claims an {@code hdfs://} scan when {@code spark.hadoop.fs.comet.libhdfs.schemes} is unset --
30+
* i.e. the JVM scheme gate's default stays in lockstep with the native {@code is_hdfs_scheme}
31+
* default.
32+
*/
33+
public class FakeHdfsSchemeFileSystem extends RawLocalFileSystem {
34+
35+
public static final String PREFIX = "hdfs://fake-namenode";
36+
37+
public FakeHdfsSchemeFileSystem() {
38+
// Avoid `URI scheme is not "file"` error on
39+
// RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner
40+
RawLocalFileSystem.useStatIfAvailable();
41+
}
42+
43+
@Override
44+
public String getScheme() {
45+
return "hdfs";
46+
}
47+
48+
@Override
49+
public URI getUri() {
50+
return URI.create(PREFIX);
51+
}
52+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.rules
21+
22+
import java.io.File
23+
import java.nio.file.Files
24+
import java.util.UUID
25+
26+
import org.apache.commons.io.FileUtils
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.sql.{CometTestBase, SaveMode}
29+
import org.apache.spark.sql.comet.CometScanExec
30+
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
31+
32+
import org.apache.comet.CometConf
33+
import org.apache.comet.hadoop.fs.{FakeHDFSFileSystem, FakeHdfsSchemeFileSystem}
34+
35+
/**
36+
* Comet's native readers go through object_store, which only understands a fixed set of URL
37+
* schemes. A custom Hadoop FileSystem scheme that object_store can't parse (here `fake://`) must
38+
* NOT be claimed by the native scan -- it would fail at execution with "Unable to recognise URL".
39+
* `CometScanRule` must decline it so Spark's Hadoop-FS-aware reader handles the scan.
40+
*
41+
* Unlike `ParquetReadFromFakeHadoopFsSuite`, this suite does NOT route the `fake` scheme through
42+
* libhdfs (`spark.hadoop.fs.comet.libhdfs.schemes`), so it exercises the decline path. The test
43+
* applies the rule directly to the physical plan and asserts fallback -- no query execution, so
44+
* it doesn't depend on the native reader actually attempting (and failing on) the scheme.
45+
*/
46+
class CometScanSchemeFallbackSuite extends CometTestBase {
47+
48+
private var fakeRootDir: File = _
49+
50+
override protected def sparkConf: SparkConf = {
51+
val conf = super.sparkConf
52+
conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem")
53+
// Back the `hdfs` scheme with a local FS so we can exercise an `hdfs://` path without a live
54+
// cluster. `hdfs` is natively readable by default, so this scan must be CLAIMED, not declined.
55+
conf.set("spark.hadoop.fs.hdfs.impl", "org.apache.comet.hadoop.fs.FakeHdfsSchemeFileSystem")
56+
conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX)
57+
// Intentionally NOT setting CometConf.COMET_LIBHDFS_SCHEMES -- `fake` is not natively readable,
58+
// and `hdfs` must still be claimed by default (mirrors the native `is_hdfs_scheme` default).
59+
conf
60+
}
61+
62+
override def beforeAll(): Unit = {
63+
fakeRootDir = Files.createTempDirectory(s"comet_scheme_${UUID.randomUUID().toString}").toFile
64+
super.beforeAll()
65+
}
66+
67+
protected override def afterAll(): Unit = {
68+
if (fakeRootDir != null) FileUtils.deleteDirectory(fakeRootDir)
69+
super.afterAll()
70+
}
71+
72+
test("native scan declines a filesystem scheme object_store can't read (fake://)") {
73+
val path = s"${FakeHDFSFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/data"
74+
spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path)
75+
76+
// Obtain a clean Spark physical plan (Comet disabled) with the FileSourceScanExec, then apply
77+
// CometScanRule directly. No execution -- we only check whether the rule claims the scan.
78+
// Capture via a var inside the block: `SQLTestUtils.withSQLConf` returns Unit on Spark 3.5
79+
// but a value on Spark 4.x, so we can't return the plan out of it cross-version.
80+
var sparkPlan: SparkPlan = null
81+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
82+
sparkPlan = spark.read.parquet(path).queryExecution.executedPlan
83+
}
84+
85+
withSQLConf(
86+
CometConf.COMET_ENABLED.key -> "true",
87+
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
88+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
89+
val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan))
90+
91+
val cometScans = transformed.collect { case s: CometScanExec => s }
92+
val sparkScans = transformed.collect { case s: FileSourceScanExec => s }
93+
assert(
94+
cometScans.isEmpty,
95+
"`fake://` is not object_store-readable; the native scan must fall back to Spark, " +
96+
s"but Comet claimed it:\n$transformed")
97+
assert(
98+
sparkScans.size == 1,
99+
s"expected the scan to remain a Spark FileSourceScanExec:\n$transformed")
100+
}
101+
}
102+
103+
test("native scan claims hdfs:// when libhdfs.schemes is unset (native-default lockstep)") {
104+
// Native's `is_hdfs_scheme` treats `hdfs` as readable when `fs.comet.libhdfs.schemes` is unset,
105+
// and `create_hdfs_object_store` is in the default build. The JVM gate must agree: with the
106+
// config unset, an `hdfs://` scan must be CLAIMED by Comet, not declined to Spark. Guards the
107+
// `case None => Set("hdfs")` default in CometScanRule against the silent-fallback regression
108+
// Andy flagged in #4525.
109+
val path = s"${FakeHdfsSchemeFileSystem.PREFIX}${fakeRootDir.getAbsolutePath}/hdfs-data"
110+
spark.range(0, 10).toDF("id").write.format("parquet").mode(SaveMode.Overwrite).save(path)
111+
112+
var sparkPlan: SparkPlan = null
113+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
114+
sparkPlan = spark.read.parquet(path).queryExecution.executedPlan
115+
}
116+
117+
withSQLConf(
118+
CometConf.COMET_ENABLED.key -> "true",
119+
CometConf.COMET_NATIVE_SCAN_ENABLED.key -> "true",
120+
CometConf.COMET_EXEC_ENABLED.key -> "true") {
121+
val transformed = CometScanRule(spark).apply(stripAQEPlan(sparkPlan))
122+
123+
val cometScans = transformed.collect { case s: CometScanExec => s }
124+
val sparkScans = transformed.collect { case s: FileSourceScanExec => s }
125+
assert(
126+
cometScans.size == 1,
127+
"`hdfs://` is natively readable by default; Comet must claim the scan, " +
128+
s"but it fell back to Spark:\n$transformed")
129+
assert(sparkScans.isEmpty, s"expected no leftover Spark FileSourceScanExec:\n$transformed")
130+
}
131+
}
132+
}

0 commit comments

Comments
 (0)