@@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.datasources.HadoopFsRelation
3838import org .apache .spark .sql .execution .datasources .parquet .ParquetUtils
3939import org .apache .spark .sql .execution .datasources .v2 .BatchScanExec
4040import org .apache .spark .sql .execution .datasources .v2 .csv .CSVScan
41- import org .apache .spark .sql .execution .datasources .v2 .parquet .ParquetScan
4241import org .apache .spark .sql .internal .SQLConf
4342import org .apache .spark .sql .types ._
4443
@@ -48,7 +47,7 @@ import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, withInfo, wi
4847import org .apache .comet .DataTypeSupport .isComplexType
4948import org .apache .comet .iceberg .{CometIcebergNativeScanMetadata , IcebergReflection }
5049import org .apache .comet .objectstore .NativeConfig
51- import org .apache .comet .parquet .{CometParquetScan , Native , SupportsComet }
50+ import org .apache .comet .parquet .{Native , SupportsComet }
5251import org .apache .comet .parquet .CometParquetUtils .{encryptionEnabled , isEncryptionConfigSupported }
5352import org .apache .comet .serde .operator .CometNativeScan
5453import org .apache .comet .shims .{CometTypeShim , ShimFileFormat , ShimSubqueryBroadcast }
@@ -61,8 +60,6 @@ case class CometScanRule(session: SparkSession)
6160 with CometTypeShim
6261 with ShimSubqueryBroadcast {
6362
64- import CometScanRule ._
65-
6663 private lazy val showTransformations = CometConf .COMET_EXPLAIN_TRANSFORMATIONS .get()
6764
6865 override def apply (plan : SparkPlan ): SparkPlan = {
@@ -176,8 +173,6 @@ case class CometScanRule(session: SparkSession)
176173 nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
177174 case SCAN_NATIVE_ICEBERG_COMPAT =>
178175 nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
179- case SCAN_NATIVE_COMET =>
180- nativeCometScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
181176 }
182177
183178 case _ =>
@@ -231,47 +226,9 @@ case class CometScanRule(session: SparkSession)
231226 Some (CometScanExec (scanExec, session, SCAN_NATIVE_ICEBERG_COMPAT ))
232227 }
233228
234- private def nativeCometScan (
235- session : SparkSession ,
236- scanExec : FileSourceScanExec ,
237- r : HadoopFsRelation ,
238- hadoopConf : Configuration ): Option [SparkPlan ] = {
239- if (! isSchemaSupported(scanExec, SCAN_NATIVE_COMET , r)) {
240- return None
241- }
242- Some (CometScanExec (scanExec, session, SCAN_NATIVE_COMET ))
243- }
244-
245229 private def transformV2Scan (scanExec : BatchScanExec ): SparkPlan = {
246230
247231 scanExec.scan match {
248- case scan : ParquetScan if COMET_NATIVE_SCAN_IMPL .get() == SCAN_NATIVE_COMET =>
249- val fallbackReasons = new ListBuffer [String ]()
250- val schemaSupported =
251- CometBatchScanExec .isSchemaSupported(scan.readDataSchema, fallbackReasons)
252- if (! schemaSupported) {
253- fallbackReasons += s " Schema ${scan.readDataSchema} is not supported "
254- }
255-
256- val partitionSchemaSupported =
257- CometBatchScanExec .isSchemaSupported(scan.readPartitionSchema, fallbackReasons)
258- if (! partitionSchemaSupported) {
259- fallbackReasons += s " Partition schema ${scan.readPartitionSchema} is not supported "
260- }
261-
262- if (scan.pushedAggregate.nonEmpty) {
263- fallbackReasons += " Comet does not support pushed aggregate"
264- }
265-
266- if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
267- val cometScan = CometParquetScan (session, scanExec.scan.asInstanceOf [ParquetScan ])
268- CometBatchScanExec (
269- scanExec.copy(scan = cometScan),
270- runtimeFilters = scanExec.runtimeFilters)
271- } else {
272- withInfos(scanExec, fallbackReasons.toSet)
273- }
274-
275232 case scan : CSVScan if COMET_CSV_V2_NATIVE_ENABLED .get() =>
276233 val fallbackReasons = new ListBuffer [String ]()
277234 val schemaSupported =
@@ -691,48 +648,6 @@ case class CometScanRule(session: SparkSession)
691648 }
692649 }
693650
694- private def selectScan (
695- scanExec : FileSourceScanExec ,
696- partitionSchema : StructType ,
697- hadoopConf : Configuration ): String = {
698-
699- val fallbackReasons = new ListBuffer [String ]()
700-
701- // native_iceberg_compat only supports local filesystem and S3
702- if (scanExec.relation.inputFiles
703- .forall(path => path.startsWith(" file://" ) || path.startsWith(" s3a://" ))) {
704-
705- val filePath = scanExec.relation.inputFiles.headOption
706- if (filePath.exists(_.startsWith(" s3a://" ))) {
707- validateObjectStoreConfig(filePath.get, hadoopConf, fallbackReasons)
708- }
709- } else {
710- fallbackReasons += s " $SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3 "
711- }
712-
713- val typeChecker = CometScanTypeChecker (SCAN_NATIVE_ICEBERG_COMPAT )
714- val schemaSupported =
715- typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
716- val partitionSchemaSupported =
717- typeChecker.isSchemaSupported(partitionSchema, fallbackReasons)
718-
719- val cometExecEnabled = COMET_EXEC_ENABLED .get()
720- if (! cometExecEnabled) {
721- fallbackReasons += s " $SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED .key}=true "
722- }
723-
724- if (cometExecEnabled && schemaSupported && partitionSchemaSupported &&
725- fallbackReasons.isEmpty) {
726- logInfo(s " Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT" )
727- SCAN_NATIVE_ICEBERG_COMPAT
728- } else {
729- logInfo(
730- s " Auto scan mode falling back to $SCAN_NATIVE_COMET due to " +
731- s " ${fallbackReasons.mkString(" , " )}" )
732- SCAN_NATIVE_COMET
733- }
734- }
735-
736651 private def isDynamicPruningFilter (e : Expression ): Boolean =
737652 e.exists(_.isInstanceOf [PlanExpression [_]])
738653
@@ -775,16 +690,12 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
775690 name : String ,
776691 fallbackReasons : ListBuffer [String ]): Boolean = {
777692 dt match {
778- case ShortType
779- if scanImpl != CometConf .SCAN_NATIVE_COMET &&
780- CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .get() =>
693+ case ShortType if CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .get() =>
781694 fallbackReasons += s " $scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
782695 s " Set ${CometConf .COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK .key}=false to allow " +
783696 " native execution if your data does not contain unsigned small integers. " +
784697 CometConf .COMPAT_GUIDE
785698 false
786- case _ : StructType | _ : ArrayType | _ : MapType if scanImpl == CometConf .SCAN_NATIVE_COMET =>
787- false
788699 case dt if isStringCollationType(dt) =>
789700 // we don't need specific support for collation in scans, but this
790701 // is a convenient place to force the whole query to fall back to Spark for now
0 commit comments