Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.io.{Closeable, InputStream}
import java.io.{Closeable, File, FileNotFoundException, FileOutputStream, InputStream, IOException}
import java.util.Locale
import java.util.regex.Pattern
import java.util.zip.GZIPInputStream
Expand All @@ -29,13 +29,18 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
import org.apache.commons.io.ByteOrderMark
import org.apache.commons.io.input.{BOMInputStream, CloseShieldInputStream}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.util.LineReader

import org.apache.spark.TaskContext
import org.apache.spark.util.HadoopFSUtils
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.{Logging, LogKeys}
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{HadoopFSUtils, Utils}

/**
* Streaming reader for a single archive file. The archive is opened once and decompressed/unpacked
Expand Down Expand Up @@ -179,13 +184,28 @@ abstract class ArchiveReader(path: Path) {
}
entries
}

/**
* Materializes each kept entry to a file under `localDir` as `(entryName, localFile)`, lazily one
* at a time -- the [[readEntries]] counterpart for random-access formats (Parquet/ORC footers).
*/
final def localizeEntries(
conf: Configuration,
localDir: File,
entryFilter: String => Boolean = _ => true): Iterator[(String, File)] =
readEntries(conf) { (name, in) =>
if (entryFilter(name)) {
Iterator.single((name, ArchiveReader.copyEntryToLocalFile(in, localDir, name)))
} else {
Iterator.empty
}
}
}

object ArchiveReader {
object ArchiveReader extends Logging {

/**
* Whether `path` names an archive this reader can stream. Dispatched purely on the file
* extension -- `.tar`, `.tar.gz`, `.tgz`, or `.zip` -- since the bytes are not inspected here.
* Whether `path` names an archive this reader can stream.
*/
def isArchivePath(path: Path): Boolean = {
val name = path.getName.toLowerCase(Locale.ROOT)
Expand Down Expand Up @@ -256,6 +276,188 @@ object ArchiveReader {
}
}
}

/**
* Copies one entry's bytes to a unique file under `localDir`.
* The entry stream is left open (the reader owns it); the output file is closed.
*/
private def copyEntryToLocalFile(in: InputStream, localDir: File, entryName: String): File = {
val rawBasename = entryName.substring(entryName.lastIndexOf('/') + 1)
val basename = rawBasename.replaceAll("[^A-Za-z0-9._-]", "_")
val local = File.createTempFile("archive-entry-", "-" + basename, localDir)
val out = new FileOutputStream(local)
try Utils.copyStream(in, out) finally out.close()
local
}

/**
* Reads an archive of random-access files -- the [[readEntries]] counterpart for formats needing
* a complete file (Parquet/ORC, Excel). The per-entry [[PartitionedFile]] keeps the
* archive's path, so `input_file_name()`/`_metadata` report it.
*/
def readLocalizedEntries(
file: PartitionedFile,
conf: Configuration,
entryFilter: String => Boolean,
tempPrefix: String)(
readOne: PartitionedFile => Iterator[InternalRow]): Iterator[InternalRow] = {
val tempDir = Utils.createTempDir(Utils.getLocalDir(SparkEnv.get.conf), tempPrefix)
val entries = ArchiveReader(file.toPath).localizeEntries(conf, tempDir, entryFilter)

// Element type is `Object`, not `InternalRow`: a batch scan yields `ColumnarBatch`, so
// per-element casts would fail. The whole iterator is cast back to `Iterator[InternalRow]`
// at the end, matching how the plain reader's columnar output is typed.
val rows = new Iterator[Object] with Closeable {
private var current: Iterator[Object] = Iterator.empty
private var currentFile: File = _
private var done = false

private def releaseCurrent(): Unit = {
current match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
current = Iterator.empty
if (currentFile != null) {
currentFile.delete()
currentFile = null
}
}

// Advance on `hasNext`, not in `next`, so a reader reusing a mutable batch is not probed for
// the next entry before the current batch is consumed.
private def advance(): Unit = {
while (!done && !current.hasNext) {
releaseCurrent()
if (entries.hasNext) {
val (_, entryFile) = entries.next()
currentFile = entryFile
current = readOne(file.copy(
filePath = SparkPath.fromUri(entryFile.toURI),
start = 0L,
length = entryFile.length(),
fileSize = entryFile.length(),
modificationTime = file.modificationTime)).asInstanceOf[Iterator[Object]]
} else {
done = true
}
}
}

override def hasNext: Boolean = {
advance()
!done && current.hasNext
}

override def next(): Object = {
if (!hasNext) throw new NoSuchElementException
current.next()
}

override def close(): Unit = {
done = true
releaseCurrent()
entries match {
case c: Closeable => try c.close() catch { case NonFatal(_) => }
case _ =>
}
}
}

// Delete only the temp dir here; FileScanRDD closes `rows`. Closing the per-entry reader from
// this listener (it runs before FileScanRDD's) would free the vectorized reader's off-heap
// vectors while downstream operators still read them.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit] { _ =>
Utils.deleteRecursively(tempDir)
})
rows.asInstanceOf[Iterator[InternalRow]]
}

/**
* Localizes one archive's kept entries and applies `use`. The entries iterator is closed before
* returning (the driver has no TaskContext), so `use` must consume what it needs first.
*/
private[datasources] def withLocalizedArchive[T](
archive: FileStatus,
conf: Configuration,
tempDir: File,
entryFilter: String => Boolean,
ignoreMissingFiles: Boolean,
ignoreCorruptFiles: Boolean,
onSkip: => T)(
use: Iterator[(String, File)] => T): T = {
var entries: Iterator[(String, File)] = null
try {
entries = ArchiveReader(archive.getPath).localizeEntries(conf, tempDir, entryFilter)
use(entries)
} catch {
case e: Exception if ignoreMissingFiles &&
ExceptionUtils.getThrowables(e).exists(_.isInstanceOf[FileNotFoundException]) =>
logWarning(log"Skipping missing archive during inference: " +
log"${MDC(LogKeys.PATH, archive.getPath.toString)}", e)
onSkip
case NonFatal(e) =>
Utils.getRootCause(e) match {
// A missing archive is a FileNotFoundException; govern it by
// ignoreMissingFiles (handled above), not by ignoreCorruptFiles -- matching FileScanRDD,
// which rethrows missing-file errors regardless of ignoreCorruptFiles.
case _: FileNotFoundException => throw e
case _: RuntimeException | _: IOException if ignoreCorruptFiles =>
logWarning(log"Skipping corrupt archive during inference: " +
log"${MDC(LogKeys.PATH, archive.getPath.toString)}", e)
onSkip
case _ => throw e
}
} finally {
entries match {
case c: Closeable => c.close()
case _ =>
}
}
}

/**
* Driver-side schema inference for random-access archive formats: `looseInfer` seeds from loose
* files; each archive's entries fold in one at a time -- `inferOne` reads one unpacked entry,
* `mergeSchemas` combines it -- deleting each before the next. `mergeSchema` off samples one.
*/
def inferArchiveSchema(
files: Seq[FileStatus],
conf: Configuration,
tempPrefix: String,
entryFilter: String => Boolean,
ignoreMissingFiles: Boolean,
ignoreCorruptFiles: Boolean,
mergeSchema: Boolean,
looseInfer: Seq[FileStatus] => Option[StructType],
inferOne: File => Option[StructType],
mergeSchemas: (StructType, StructType) => StructType): Option[StructType] = {
val (archives, nonArchives) = files.partition(f => isArchivePath(f.getPath))
val tempDir = Utils.createTempDir(namePrefix = tempPrefix)
def foldArchive(archive: FileStatus, seed: Option[StructType], limit: Int): Option[StructType] =
withLocalizedArchive(
archive, conf, tempDir, entryFilter, ignoreMissingFiles, ignoreCorruptFiles,
onSkip = seed) { entries =>
var merged = seed
entries.take(limit).foreach { case (_, file) =>
try inferOne(file).foreach(s => merged = Some(merged.fold(s)(mergeSchemas(_, s))))
finally file.delete()
}
merged
}
try {
val looseSchema = if (nonArchives.nonEmpty) looseInfer(nonArchives) else None
if (mergeSchema) {
archives.foldLeft(looseSchema)((acc, a) => foldArchive(a, acc, Int.MaxValue))
} else if (looseSchema.isDefined) {
looseSchema
} else {
archives.iterator.map(foldArchive(_, None, 1)).find(_.isDefined).flatten
}
} finally {
Utils.deleteRecursively(tempDir)
}
}
}

/**
Expand Down
Loading