Skip to content

Commit d06ec27

Browse files
Mohammad LinjawiMohammad Linjawi
authored andcommitted
[VL][Delta] Simplify DV scan payload handoff
1 parent d142569 commit d06ec27

3 files changed

Lines changed: 2 additions & 430 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala

Lines changed: 2 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
2626
import org.apache.gluten.substrait.plan.PlanNode
2727
import org.apache.gluten.substrait.rel.{LocalFilesBuilder, LocalFilesNode, SplitInfo}
2828
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
29-
import org.apache.gluten.utils.DeltaDeletionVectorRegistry
3029
import org.apache.gluten.vectorized._
3130

3231
import org.apache.spark.{Partition, SparkConf, TaskContext}
@@ -41,17 +40,14 @@ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
4140
import org.apache.spark.sql.vectorized.ColumnarBatch
4241
import org.apache.spark.util.SparkDirectoryUtil
4342

44-
import org.apache.hadoop.fs.Path
45-
4643
import java.lang.{Long => JLong}
4744
import java.nio.ByteBuffer
4845
import java.nio.charset.StandardCharsets
4946
import java.time.ZoneOffset
50-
import java.util.{ArrayList => JArrayList, HashMap => JHashMap, UUID}
47+
import java.util.UUID
5148

5249
import scala.collection.JavaConverters._
5350
import scala.collection.mutable
54-
import scala.util.Try
5551

5652
class VeloxIteratorApi extends IteratorApi with Logging {
5753
private val deltaMetadataUtilsClassName =
@@ -102,8 +98,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
10298
.map(
10399
f => SparkShimLoader.getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
104100
val (otherMetadataColumns, deletionVectorPayloads) =
105-
normalizeRegisteredDeltaSplitMetadata(partitionFiles, properties)
106-
.orElse(normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles))
101+
normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles)
107102
.getOrElse {
108103
(
109104
partitionFiles.map {
@@ -247,73 +242,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
247242
}
248243
}
249244

250-
private def normalizeRegisteredDeltaSplitMetadata(
251-
partitionFiles: Seq[PartitionedFile],
252-
properties: Map[String, String])
253-
: Option[(Seq[java.util.Map[String, Object]], Array[Array[Byte]])] = {
254-
properties
255-
.get(DeltaDeletionVectorRegistry.RegistryIdProperty)
256-
.flatMap(DeltaDeletionVectorRegistry.get)
257-
.flatMap {
258-
registeredEntries =>
259-
val normalizedMetadataColumns = new JArrayList[java.util.Map[String, Object]]()
260-
val deletionVectorPayloads = mutable.ArrayBuffer.empty[Array[Byte]]
261-
var matchedDeletionVectors = 0
262-
partitionFiles.foreach {
263-
file =>
264-
val metadata = new JHashMap[String, Object]()
265-
val baseMetadata =
266-
SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file)
267-
if (baseMetadata != null) {
268-
metadata.putAll(baseMetadata)
269-
}
270-
lookupRegisteredDeltaDeletionVector(file, registeredEntries).foreach {
271-
entry =>
272-
metadata.put("delta_dv_cardinality", Long.box(entry.cardinality))
273-
metadata.put("row_index_filter_type", entry.filterType)
274-
metadata.put("delta_dv_payload_index", Int.box(deletionVectorPayloads.length))
275-
deletionVectorPayloads += entry.payload
276-
matchedDeletionVectors += 1
277-
}
278-
normalizedMetadataColumns.add(metadata)
279-
}
280-
if (matchedDeletionVectors == 0) {
281-
None
282-
} else {
283-
Some((normalizedMetadataColumns.asScala.toSeq, deletionVectorPayloads.toArray))
284-
}
285-
}
286-
}
287-
288-
private def lookupRegisteredDeltaDeletionVector(
289-
file: PartitionedFile,
290-
registeredEntries: Map[String, DeltaDeletionVectorRegistry.Entry])
291-
: Option[DeltaDeletionVectorRegistry.Entry] = {
292-
deltaDeletionVectorPathCandidates(file).iterator
293-
.map(registeredEntries.get)
294-
.collectFirst { case Some(entry) => entry }
295-
}
296-
297-
private def deltaDeletionVectorPathCandidates(file: PartitionedFile): Seq[String] = {
298-
val rawPath = unescapePathName(file.filePath.toString)
299-
val path = new Path(rawPath)
300-
val pathUri = partitionedFilePathUri(file)
301-
Seq(
302-
pathUri.map(_.toASCIIString),
303-
pathUri.map(_.getPath),
304-
Some(rawPath),
305-
Some(path.toUri.toASCIIString),
306-
Some(path.toUri.getPath),
307-
Some(rawPath.stripPrefix("/"))
308-
).flatten
309-
.map(DeltaDeletionVectorRegistry.normalizePathKey(_))
310-
.filter(_.nonEmpty)
311-
.distinct
312-
}
313-
314-
private def partitionedFilePathUri(file: PartitionedFile): Option[java.net.URI] =
315-
Try(file.getClass.getMethod("pathUri").invoke(file).asInstanceOf[java.net.URI]).toOption
316-
317245
/** Generate Iterator[ColumnarBatch] for first stage. */
318246
override def genFirstStageIterator(
319247
inputPartition: BaseGlutenPartition,

0 commit comments

Comments
 (0)