|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +package org.apache.spark.sql.comet |
| 21 | + |
| 22 | +import scala.collection.mutable.HashMap |
| 23 | +import scala.concurrent.duration.NANOSECONDS |
| 24 | + |
| 25 | +import org.apache.hadoop.fs.Path |
| 26 | +import org.apache.spark.SparkContext |
| 27 | +import org.apache.spark.sql.catalyst.expressions._ |
| 28 | +import org.apache.spark.sql.comet.shims.ShimCometScanExec |
| 29 | +import org.apache.spark.sql.errors.QueryExecutionErrors |
| 30 | +import org.apache.spark.sql.execution.{FileSourceScanExec, LeafExecNode, SQLExecution} |
| 31 | +import org.apache.spark.sql.execution.datasources._ |
| 32 | +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} |
| 33 | + |
| 34 | +/** |
| 35 | + * Computes [[FilePartition]]s from a [[FileSourceScanExec]] without going through `inputRDD` / |
| 36 | + * `buildReaderWithPartitionValues`. |
| 37 | + * |
| 38 | + * `CometNativeScanExec` only needs the post-DPP `Seq[FilePartition]` for native execution; it |
| 39 | + * never invokes the JVM Parquet reader. Going through `originalPlan.inputRDD` would build a |
| 40 | + * Parquet reader closure (captured by `FileScanRDD` but never invoked here), broadcast a |
| 41 | + * `SerializableConfiguration` per scan that lives until session shutdown, and walk |
| 42 | + * `FileSourceScanLike.pushedDownFilters` -- which calls `ScalarSubquery.toLiteral` on |
| 43 | + * `dataFilters` and requires the subqueries to have been resolved through Spark's standard |
| 44 | + * `prepare -> waitForSubqueries` lifecycle. Because `originalPlan` is `@transient`, the wrapper's |
| 45 | + * prepare walk does not reach it, so its scalar-subquery instances stay unresolved and the lazy |
| 46 | + * val throws "Subquery has not finished". |
| 47 | + * |
| 48 | + * Logic mirrors Spark's `FileSourceScanLike` in |
| 49 | + * `org.apache.spark.sql.execution.DataSourceScanExec` -- specifically `selectedPartitions`, |
| 50 | + * `dynamicallySelectedPartitions`, `setFilesNumAndSizeMetric`, `sendDriverMetrics`, plus |
| 51 | + * `createBucketedReadRDD` / `createReadRDD` reduced to their `FilePartition`-producing subset. |
| 52 | + * Code ported second-hand from `CometScanExec` on `apache/main`, which ports the same methods out |
| 53 | + * of Spark for the legacy V1 scan path. When `CometScanExec` is deleted, this helper becomes the |
| 54 | + * single home for the partition-computation port. |
| 55 | + * |
| 56 | + * A single implementation works for Spark 3.x and 4.x. Spark 4.x's `selectedPartitions` returns |
| 57 | + * `ScanFileListing` (with `filterAndPruneFiles`, `toPartitionArray`, `filePartitionIterator`) |
| 58 | + * while 3.x returns `Seq[PartitionDirectory]`. This helper operates on `Seq[PartitionDirectory]` |
| 59 | + * directly via `relation.location.listFiles`, which exists on both versions. Per-Spark-version |
| 60 | + * `splitFiles` / `getPartitionedFile` signature differences (Spark 3.5.5 changed signatures via |
| 61 | + * reflection) are handled by `ShimCometScanExec`. While Comet still supports Spark 3.x, this is |
| 62 | + * the right shape; once 3.x is dropped, this helper can adopt Spark 4.x's `ScanFileListing` API |
| 63 | + * directly. If Spark drifts on partition-computation logic, the canonical reference is |
| 64 | + * `FileSourceScanLike` in Spark's `DataSourceScanExec.scala`. |
| 65 | + * |
| 66 | + * One known intentional divergence from Spark 4.x: `dynamicallySelectedPartitions` calls |
| 67 | + * `selectedPartitions.filterAndPruneFiles(boundPredicate, dynamicDataFilters)`, which also |
| 68 | + * applies dynamic data filters for file-level pruning. This helper applies only dynamic partition |
| 69 | + * filters, matching `CometScanExec`. For native scans this has zero practical gap. |
| 70 | + * `dynamicDataFilters` is `dataFilters.filter(isDynamicPruningFilter)`, where |
| 71 | + * `isDynamicPruningFilter` matches anything containing a `PlanExpression`. By class: |
| 72 | + * `ScalarSubquery` (`org.apache.spark.sql.execution.ScalarSubquery`, aliased |
| 73 | + * `ExecScalarSubquery`) is handled separately -- `CometNativeScanExec.serializedPartitionData` |
| 74 | + * filters `dataFilters` for it, calls `updateResult()`, transforms each to a `Literal`, and |
| 75 | + * pushes via `commonBuilder.addDataFilters(proto)`, after which DataFusion does row-group / page |
| 76 | + * / row pruning natively. `InSubqueryExec` in `dataFilters` (i.e. `WHERE non_partition_col IN |
| 77 | + * (subquery)`) is rewritten to semi-joins / hash joins by `RewritePredicateSubquery` during |
| 78 | + * logical optimization and never reaches physical `dataFilters`; Spark's `PartitionPruning` rule |
| 79 | + * only inserts `InSubqueryExec` on partition columns. `DynamicPruningExpression` on data columns |
| 80 | + * is not generated by base Spark. `Exists`, `ListQuery`, `LateralSubquery` are rewritten to joins |
| 81 | + * before physical planning. In practice the only `PlanExpression` subclass that lands in |
| 82 | + * `dataFilters` is `ScalarSubquery`, and we already handle it. Risk surface: a future Spark |
| 83 | + * version could land a new `PlanExpression` subclass in `dataFilters` that the scalar-subquery |
| 84 | + * resolution block silently skips. If diagnostic gaps appear here, add an |
| 85 | + * unknown-`PlanExpression` log in `CometNativeScanExec.serializedPartitionData` as a tripwire. |
| 86 | + * |
| 87 | + * `getFilePartitions` writes directly into the `SQLMetric` instances that Spark constructed on |
| 88 | + * `wrapped.driverMetrics` and posts driver-side updates through the SQL listener bus. |
| 89 | + * `CometNativeScanExec.metrics` exposes those same instances by reference (via |
| 90 | + * `originalPlan.metrics.filterKeys(driverMetricKeys)`), so `numFiles`, `filesSize`, |
| 91 | + * `metadataTime`, `pruningTime`, and `numPartitions` show actual values once |
| 92 | + * `serializedPartitionData` has run. Plan-info dumps that read metrics before execution see |
| 93 | + * zeros; the runtime listener-bus update is what populates the SQL UI. |
| 94 | + * |
| 95 | + * DPP `InSubqueryExec` values in `wrapped.partitionFilters` must be resolved before |
| 96 | + * `getFilePartitions()` runs; `dynamicallySelectedPartitions` evaluates bound predicates against |
| 97 | + * partition values and a still-empty subquery would throw. |
| 98 | + */ |
| 99 | +private[comet] case class CometFilePartitionHelper(wrapped: FileSourceScanExec) |
| 100 | + extends LeafExecNode |
| 101 | + with ShimCometScanExec { |
| 102 | + |
| 103 | + private def relation = wrapped.relation |
| 104 | + private def partitionFilters: Seq[Expression] = wrapped.partitionFilters |
| 105 | + private def dataFilters: Seq[Expression] = wrapped.dataFilters |
| 106 | + private def requiredSchema = wrapped.requiredSchema |
| 107 | + private def optionalBucketSet = wrapped.optionalBucketSet |
| 108 | + private def optionalNumCoalescedBuckets = wrapped.optionalNumCoalescedBuckets |
| 109 | + private def bucketedScan = wrapped.bucketedScan |
| 110 | + |
| 111 | + private val driverMetrics: HashMap[String, Long] = HashMap.empty |
| 112 | + |
| 113 | + private def isDynamicPruningFilter(e: Expression): Boolean = |
| 114 | + e.find(_.isInstanceOf[PlanExpression[_]]).isDefined |
| 115 | + |
| 116 | + @transient private lazy val selectedPartitions: Array[PartitionDirectory] = { |
| 117 | + val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) |
| 118 | + val startTime = System.nanoTime() |
| 119 | + val ret = |
| 120 | + relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), dataFilters) |
| 121 | + setFilesNumAndSizeMetric(ret, static = true) |
| 122 | + val timeTakenMs = |
| 123 | + NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) |
| 124 | + driverMetrics("metadataTime") = timeTakenMs |
| 125 | + ret |
| 126 | + }.toArray |
| 127 | + |
| 128 | + @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { |
| 129 | + val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) |
| 130 | + if (dynamicPartitionFilters.nonEmpty) { |
| 131 | + val startTime = System.nanoTime() |
| 132 | + val predicate = dynamicPartitionFilters.reduce(And) |
| 133 | + val partitionColumns = relation.partitionSchema |
| 134 | + val boundPredicate = Predicate.create( |
| 135 | + predicate.transform { case a: AttributeReference => |
| 136 | + val index = partitionColumns.indexWhere(a.name == _.name) |
| 137 | + BoundReference(index, partitionColumns(index).dataType, nullable = true) |
| 138 | + }, |
| 139 | + Nil) |
| 140 | + val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) |
| 141 | + setFilesNumAndSizeMetric(ret, static = false) |
| 142 | + val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 |
| 143 | + driverMetrics("pruningTime") = timeTakenMs |
| 144 | + ret |
| 145 | + } else { |
| 146 | + selectedPartitions |
| 147 | + } |
| 148 | + } |
| 149 | + |
| 150 | + def getFilePartitions(): Seq[FilePartition] = { |
| 151 | + val parts = if (bucketedScan) { |
| 152 | + createFilePartitionsForBucketedScan(dynamicallySelectedPartitions) |
| 153 | + } else { |
| 154 | + createFilePartitionsForNonBucketedScan(dynamicallySelectedPartitions) |
| 155 | + } |
| 156 | + sendDriverMetrics() |
| 157 | + parts |
| 158 | + } |
| 159 | + |
| 160 | + private def createFilePartitionsForBucketedScan( |
| 161 | + partitions: Array[PartitionDirectory]): Seq[FilePartition] = { |
| 162 | + val bucketSpec = relation.bucketSpec.get |
| 163 | + logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") |
| 164 | + val filesGroupedToBuckets = |
| 165 | + partitions |
| 166 | + .flatMap(p => p.files.map(f => getPartitionedFile(f, p))) |
| 167 | + .groupBy { f => |
| 168 | + BucketingUtils |
| 169 | + .getBucketId(new Path(f.filePath.toString()).getName) |
| 170 | + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath.toString())) |
| 171 | + } |
| 172 | + |
| 173 | + val pruned = optionalBucketSet match { |
| 174 | + case Some(bucketSet) => filesGroupedToBuckets.filter { case (id, _) => bucketSet.get(id) } |
| 175 | + case None => filesGroupedToBuckets |
| 176 | + } |
| 177 | + |
| 178 | + optionalNumCoalescedBuckets |
| 179 | + .map { numCoalescedBuckets => |
| 180 | + logInfo(s"Coalescing to $numCoalescedBuckets buckets") |
| 181 | + val coalesced = pruned.groupBy(_._1 % numCoalescedBuckets) |
| 182 | + Seq.tabulate(numCoalescedBuckets) { bucketId => |
| 183 | + val files = |
| 184 | + coalesced.get(bucketId).map(_.values.flatten.toArray).getOrElse(Array.empty) |
| 185 | + FilePartition(bucketId, files) |
| 186 | + } |
| 187 | + } |
| 188 | + .getOrElse { |
| 189 | + Seq.tabulate(bucketSpec.numBuckets) { bucketId => |
| 190 | + FilePartition(bucketId, pruned.getOrElse(bucketId, Array.empty)) |
| 191 | + } |
| 192 | + } |
| 193 | + } |
| 194 | + |
| 195 | + private def createFilePartitionsForNonBucketedScan( |
| 196 | + partitions: Array[PartitionDirectory]): Seq[FilePartition] = { |
| 197 | + val sparkSession = relation.sparkSession |
| 198 | + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes |
| 199 | + val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, partitions) |
| 200 | + logInfo( |
| 201 | + s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + |
| 202 | + s"open cost is considered as scanning $openCostInBytes bytes.") |
| 203 | + |
| 204 | + val bucketingEnabled = sparkSession.sessionState.conf.bucketingEnabled |
| 205 | + val shouldProcess: Path => Boolean = optionalBucketSet match { |
| 206 | + case Some(bucketSet) if bucketingEnabled => |
| 207 | + filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get) |
| 208 | + case _ => |
| 209 | + _ => true |
| 210 | + } |
| 211 | + |
| 212 | + val splitFilesList = partitions |
| 213 | + .flatMap { partition => |
| 214 | + partition.files.flatMap { file => |
| 215 | + val filePath = file.getPath |
| 216 | + if (shouldProcess(filePath)) { |
| 217 | + val isSplitable = relation.fileFormat.isSplitable( |
| 218 | + sparkSession, |
| 219 | + relation.options, |
| 220 | + filePath) && !isNeededForSchema(requiredSchema) |
| 221 | + splitFiles( |
| 222 | + sparkSession = sparkSession, |
| 223 | + file = file, |
| 224 | + filePath = filePath, |
| 225 | + isSplitable = isSplitable, |
| 226 | + maxSplitBytes = maxSplitBytes, |
| 227 | + partitionValues = partition.values) |
| 228 | + } else { |
| 229 | + Seq.empty |
| 230 | + } |
| 231 | + } |
| 232 | + } |
| 233 | + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) |
| 234 | + |
| 235 | + FilePartition.getFilePartitions(sparkSession, splitFilesList, maxSplitBytes) |
| 236 | + } |
| 237 | + |
| 238 | + private def setFilesNumAndSizeMetric( |
| 239 | + partitions: Seq[PartitionDirectory], |
| 240 | + static: Boolean): Unit = { |
| 241 | + val filesNum = partitions.map(_.files.size.toLong).sum |
| 242 | + val filesSize = partitions.map(_.files.map(_.getLen).sum).sum |
| 243 | + if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { |
| 244 | + driverMetrics("numFiles") = filesNum |
| 245 | + driverMetrics("filesSize") = filesSize |
| 246 | + } else { |
| 247 | + driverMetrics("staticFilesNum") = filesNum |
| 248 | + driverMetrics("staticFilesSize") = filesSize |
| 249 | + } |
| 250 | + if (relation.partitionSchema.nonEmpty) { |
| 251 | + driverMetrics("numPartitions") = partitions.length |
| 252 | + } |
| 253 | + } |
| 254 | + |
| 255 | + /** |
| 256 | + * Mirror computed driverMetrics into the SQLMetric instances on `wrapped` so |
| 257 | + * `CometNativeScanExec.metrics` (which forwards `originalPlan.metrics.filterKeys(...)`) shows |
| 258 | + * accurate values. Posts to the Spark UI listener bus so the values appear at runtime. |
| 259 | + */ |
| 260 | + private def sendDriverMetrics(): Unit = { |
| 261 | + val updated = driverMetrics.flatMap { case (key, value) => |
| 262 | + wrapped.metrics.get(key).map { metric => |
| 263 | + metric.set(value) |
| 264 | + metric |
| 265 | + } |
| 266 | + }.toSeq |
| 267 | + if (updated.nonEmpty) { |
| 268 | + val sc: SparkContext = relation.sparkSession.sparkContext |
| 269 | + val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) |
| 270 | + SQLMetrics.postDriverMetricUpdates(sc, executionId, updated) |
| 271 | + } |
| 272 | + } |
| 273 | + |
| 274 | + override def output: Seq[Attribute] = wrapped.output |
| 275 | + override def metrics: Map[String, SQLMetric] = wrapped.metrics |
| 276 | + override protected def doExecute() |
| 277 | + : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = |
| 278 | + throw new UnsupportedOperationException("CometFilePartitionHelper is not executable") |
| 279 | +} |
0 commit comments