Skip to content

Commit 257c31c

Browse files
committed
chore: remove dead validateObjectStoreConfig path
CometScanRule.validateObjectStoreConfig had no production caller; only NativeConfigSuite invoked it. Drop the Scala wrapper, its configValidity cache, the Native.java JNI declaration (which leaves the file empty), the backing Rust JNI impl, and the tests that exercised the wrapper. Keep the extractObjectStoreOptions tests since that utility is still used by the native scan paths.
1 parent 1cc6409 commit 257c31c

4 files changed

Lines changed: 2 additions & 183 deletions

File tree

native/core/src/parquet/mod.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -129,31 +129,6 @@ pub fn get_object_store_options(
129129
Ok(collected_map)
130130
}
131131

132-
/// # Safety
133-
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
134-
#[no_mangle]
135-
pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_validateObjectStoreConfig(
136-
e: EnvUnowned,
137-
_jclass: JClass,
138-
file_path: JString,
139-
object_store_options: JObject,
140-
) {
141-
try_unwrap_or_throw(&e, |env| {
142-
let session_config = SessionConfig::new();
143-
let planner =
144-
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)), 0);
145-
let session_ctx = planner.session_ctx();
146-
let path: String = file_path.try_to_string(env).unwrap();
147-
let object_store_config = get_object_store_options(env, object_store_options)?;
148-
let (_, _) = prepare_object_store_with_configs(
149-
session_ctx.runtime_env(),
150-
path.clone(),
151-
&object_store_config,
152-
)?;
153-
Ok(())
154-
})
155-
}
156-
157132
/// # Safety
158133
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
159134
#[no_mangle]

spark/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,13 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
4040
import org.apache.spark.sql.internal.SQLConf
4141
import org.apache.spark.sql.types._
4242

43-
import org.apache.comet.{CometConf, CometNativeException, DataTypeSupport}
43+
import org.apache.comet.{CometConf, DataTypeSupport}
4444
import org.apache.comet.CometConf._
4545
import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isSpark35Plus, withInfo, withInfos}
4646
import org.apache.comet.DataTypeSupport.isComplexType
4747
import org.apache.comet.iceberg.{CometIcebergNativeScanMetadata, IcebergReflection}
4848
import org.apache.comet.objectstore.NativeConfig
4949
import org.apache.comet.parquet.CometParquetUtils.{encryptionEnabled, isEncryptionConfigSupported}
50-
import org.apache.comet.parquet.Native
5150
import org.apache.comet.serde.operator.{CometIcebergNativeScan, CometNativeScan}
5251
import org.apache.comet.shims.{CometTypeShim, ShimCometStreaming, ShimFileFormat, ShimSubqueryBroadcast}
5352

@@ -735,61 +734,6 @@ object CometScanRule extends Logging {
735734
val SKIP_COMET_SCAN_TAG: org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit] =
736735
org.apache.spark.sql.catalyst.trees.TreeNodeTag[Unit]("comet.skipCometScan")
737736

738-
/**
739-
* Validating object store configs can cause requests to be made to S3 APIs (such as when
740-
* resolving the region for a bucket). We use a cache to reduce the number of S3 calls.
741-
*
742-
* The key is the config map converted to a string. The value is the reason that the config is
743-
* not valid, or None if the config is valid.
744-
*/
745-
val configValidityMap = new mutable.HashMap[String, Option[String]]()
746-
747-
/**
748-
* We do not expect to see a large number of unique configs within the lifetime of a Spark
749-
* session, but we reset the cache once it reaches a fixed size to prevent it growing
750-
* indefinitely.
751-
*/
752-
val configValidityMapMaxSize = 1024
753-
754-
def validateObjectStoreConfig(
755-
filePath: String,
756-
hadoopConf: Configuration,
757-
fallbackReasons: mutable.ListBuffer[String]): Unit = {
758-
val objectStoreConfigMap =
759-
NativeConfig.extractObjectStoreOptions(hadoopConf, URI.create(filePath))
760-
761-
val cacheKey = objectStoreConfigMap
762-
.map { case (k, v) =>
763-
s"$k=$v"
764-
}
765-
.toList
766-
.sorted
767-
.mkString("\n")
768-
769-
if (configValidityMap.size >= configValidityMapMaxSize) {
770-
logWarning("Resetting S3 object store validity cache")
771-
configValidityMap.clear()
772-
}
773-
774-
configValidityMap.get(cacheKey) match {
775-
case Some(Some(reason)) =>
776-
fallbackReasons += reason
777-
case Some(None) =>
778-
// previously validated
779-
case _ =>
780-
try {
781-
val objectStoreOptions = objectStoreConfigMap.asJava
782-
Native.validateObjectStoreConfig(filePath, objectStoreOptions)
783-
} catch {
784-
case e: CometNativeException =>
785-
val reason = s"Object store config not supported: ${e.getMessage}"
786-
fallbackReasons += reason
787-
configValidityMap.put(cacheKey, Some(reason))
788-
}
789-
}
790-
791-
}
792-
793737
/**
794738
* Single-pass validation of Iceberg FileScanTasks.
795739
*

spark/src/test/scala/org/apache/comet/objectstore/NativeConfigSuite.scala

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,12 @@ package org.apache.comet.objectstore
2121

2222
import java.net.URI
2323

24-
import scala.collection.mutable
25-
26-
import org.scalatest.BeforeAndAfterEach
2724
import org.scalatest.funsuite.AnyFunSuite
2825
import org.scalatest.matchers.should.Matchers
2926

3027
import org.apache.hadoop.conf.Configuration
3128

32-
import org.apache.comet.rules.CometScanRule
33-
34-
class NativeConfigSuite extends AnyFunSuite with Matchers with BeforeAndAfterEach {
35-
36-
override protected def beforeEach(): Unit = {
37-
CometScanRule.configValidityMap.clear()
38-
}
29+
class NativeConfigSuite extends AnyFunSuite with Matchers {
3930

4031
test("extractObjectStoreOptions - multiple cloud provider configurations") {
4132
val hadoopConf = new Configuration()
@@ -79,61 +70,4 @@ class NativeConfigSuite extends AnyFunSuite with Matchers with BeforeAndAfterEac
7970
new URI("unsupported://test-bucket/test-object"))
8071
assert(unsupportedOptions.isEmpty, "Unsupported scheme should return empty options")
8172
}
82-
83-
test("validate object store config - no provider") {
84-
val hadoopConf = new Configuration()
85-
validate(hadoopConf)
86-
}
87-
88-
test("validate object store config - valid providers") {
89-
val hadoopConf = new Configuration()
90-
val provider1 = "com.amazonaws.auth.EnvironmentVariableCredentialsProvider"
91-
val provider2 = "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
92-
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider1, provider2).mkString(","))
93-
validate(hadoopConf)
94-
}
95-
96-
test("validate object store config - invalid provider") {
97-
val hadoopConf = new Configuration()
98-
hadoopConf.set("fs.s3a.aws.credentials.provider", "invalid")
99-
val fallbackReasons = validate(hadoopConf)
100-
val expectedError = "Unsupported credential provider: invalid"
101-
assert(fallbackReasons.exists(_.contains(expectedError)))
102-
}
103-
104-
test("validate object store config - mixed anonymous providers") {
105-
val hadoopConf = new Configuration()
106-
val provider1 = "com.amazonaws.auth.AnonymousAWSCredentials"
107-
val provider2 = "software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider"
108-
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider1, provider2).mkString(","))
109-
val fallbackReasons = validate(hadoopConf)
110-
val expectedError =
111-
"Anonymous credential provider cannot be mixed with other credential providers"
112-
assert(fallbackReasons.exists(_.contains(expectedError)))
113-
}
114-
115-
test("validity cache") {
116-
val hadoopConf = new Configuration()
117-
val provider1 = "com.amazonaws.auth.AnonymousAWSCredentials"
118-
val provider2 = "software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider"
119-
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider1, provider2).mkString(","))
120-
121-
assert(CometScanRule.configValidityMap.isEmpty)
122-
for (_ <- 0 until 5) {
123-
assert(validate(hadoopConf).nonEmpty)
124-
assert(CometScanRule.configValidityMap.size == 1)
125-
}
126-
127-
// set the same providers but in a different order
128-
hadoopConf.set("fs.s3a.aws.credentials.provider", Seq(provider2, provider1).mkString(","))
129-
assert(validate(hadoopConf).nonEmpty)
130-
assert(CometScanRule.configValidityMap.size == 2)
131-
}
132-
133-
private def validate(hadoopConf: Configuration): Set[String] = {
134-
val path = "s3a://path/to/file.parquet"
135-
val fallbackReasons = mutable.ListBuffer[String]()
136-
CometScanRule.validateObjectStoreConfig(path, hadoopConf, fallbackReasons)
137-
fallbackReasons.toSet
138-
}
13973
}

0 commit comments

Comments
 (0)