@@ -26,7 +26,6 @@ import org.apache.gluten.sql.shims.SparkShimLoader
2626import org .apache .gluten .substrait .plan .PlanNode
2727import org .apache .gluten .substrait .rel .{LocalFilesBuilder , LocalFilesNode , SplitInfo }
2828import org .apache .gluten .substrait .rel .LocalFilesNode .ReadFileFormat
29- import org .apache .gluten .utils .DeltaDeletionVectorRegistry
3029import org .apache .gluten .vectorized ._
3130
3231import org .apache .spark .{Partition , SparkConf , TaskContext }
@@ -41,17 +40,14 @@ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
4140import org .apache .spark .sql .vectorized .ColumnarBatch
4241import org .apache .spark .util .SparkDirectoryUtil
4342
44- import org .apache .hadoop .fs .Path
45-
4643import java .lang .{Long => JLong }
4744import java .nio .ByteBuffer
4845import java .nio .charset .StandardCharsets
4946import java .time .ZoneOffset
50- import java .util .{ ArrayList => JArrayList , HashMap => JHashMap , UUID }
47+ import java .util .UUID
5148
5249import scala .collection .JavaConverters ._
5350import scala .collection .mutable
54- import scala .util .Try
5551
5652class VeloxIteratorApi extends IteratorApi with Logging {
5753 private val deltaMetadataUtilsClassName =
@@ -102,8 +98,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
10298 .map(
10399 f => SparkShimLoader .getSparkShims.generateMetadataColumns(f, metadataColumnNames).asJava)
104100 val (otherMetadataColumns, deletionVectorPayloads) =
105- normalizeRegisteredDeltaSplitMetadata(partitionFiles, properties)
106- .orElse(normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles))
101+ normalizeDeltaSplitMetadata(partitionSchema.fields.length, partitionFiles)
107102 .getOrElse {
108103 (
109104 partitionFiles.map {
@@ -247,73 +242,6 @@ class VeloxIteratorApi extends IteratorApi with Logging {
247242 }
248243 }
249244
250- private def normalizeRegisteredDeltaSplitMetadata (
251- partitionFiles : Seq [PartitionedFile ],
252- properties : Map [String , String ])
253- : Option [(Seq [java.util.Map [String , Object ]], Array [Array [Byte ]])] = {
254- properties
255- .get(DeltaDeletionVectorRegistry .RegistryIdProperty )
256- .flatMap(DeltaDeletionVectorRegistry .get)
257- .flatMap {
258- registeredEntries =>
259- val normalizedMetadataColumns = new JArrayList [java.util.Map [String , Object ]]()
260- val deletionVectorPayloads = mutable.ArrayBuffer .empty[Array [Byte ]]
261- var matchedDeletionVectors = 0
262- partitionFiles.foreach {
263- file =>
264- val metadata = new JHashMap [String , Object ]()
265- val baseMetadata =
266- SparkShimLoader .getSparkShims.getOtherConstantMetadataColumnValues(file)
267- if (baseMetadata != null ) {
268- metadata.putAll(baseMetadata)
269- }
270- lookupRegisteredDeltaDeletionVector(file, registeredEntries).foreach {
271- entry =>
272- metadata.put(" delta_dv_cardinality" , Long .box(entry.cardinality))
273- metadata.put(" row_index_filter_type" , entry.filterType)
274- metadata.put(" delta_dv_payload_index" , Int .box(deletionVectorPayloads.length))
275- deletionVectorPayloads += entry.payload
276- matchedDeletionVectors += 1
277- }
278- normalizedMetadataColumns.add(metadata)
279- }
280- if (matchedDeletionVectors == 0 ) {
281- None
282- } else {
283- Some ((normalizedMetadataColumns.asScala.toSeq, deletionVectorPayloads.toArray))
284- }
285- }
286- }
287-
288- private def lookupRegisteredDeltaDeletionVector (
289- file : PartitionedFile ,
290- registeredEntries : Map [String , DeltaDeletionVectorRegistry .Entry ])
291- : Option [DeltaDeletionVectorRegistry .Entry ] = {
292- deltaDeletionVectorPathCandidates(file).iterator
293- .map(registeredEntries.get)
294- .collectFirst { case Some (entry) => entry }
295- }
296-
297- private def deltaDeletionVectorPathCandidates (file : PartitionedFile ): Seq [String ] = {
298- val rawPath = unescapePathName(file.filePath.toString)
299- val path = new Path (rawPath)
300- val pathUri = partitionedFilePathUri(file)
301- Seq (
302- pathUri.map(_.toASCIIString),
303- pathUri.map(_.getPath),
304- Some (rawPath),
305- Some (path.toUri.toASCIIString),
306- Some (path.toUri.getPath),
307- Some (rawPath.stripPrefix(" /" ))
308- ).flatten
309- .map(DeltaDeletionVectorRegistry .normalizePathKey(_))
310- .filter(_.nonEmpty)
311- .distinct
312- }
313-
314- private def partitionedFilePathUri (file : PartitionedFile ): Option [java.net.URI ] =
315- Try (file.getClass.getMethod(" pathUri" ).invoke(file).asInstanceOf [java.net.URI ]).toOption
316-
317245 /** Generate Iterator[ColumnarBatch] for first stage. */
318246 override def genFirstStageIterator (
319247 inputPartition : BaseGlutenPartition ,
0 commit comments