diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 601515e57dc82..1cc36fb9e99aa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -80,7 +80,11 @@ abstract class EventLogFileWriter( protected var writer: Option[PrintWriter] = None protected def requireLogBaseDirAsDirectory(): Unit = { - if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) { + val basePath = new Path(logBaseDir) + if (!fileSystem.exists(basePath)) { + FileSystem.mkdirs(fileSystem, basePath, EventLogFileWriter.LOG_FOLDER_PERMISSIONS) + } + if (!fileSystem.getFileStatus(basePath).isDirectory) { throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d166e61bfb82c..22c8033a1a302 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -349,12 +349,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } catch { case f: FileNotFoundException => - var msg = s"Log directory specified does not exist: $dir" - if (dir == DEFAULT_LOG_DIR) { - msg += " Did you configure the correct one through spark.history.fs.logDirectory?" + if (FileSystem.mkdirs(dirFs, path, EventLogFileWriter.LOG_FOLDER_PERMISSIONS)) { + logInfo(log"Created missing history log directory ${MDC(HISTORY_DIR, dir)}.") + true + } else { + var msg = s"Log directory specified does not exist: $dir" + if (dir == DEFAULT_LOG_DIR) { + msg += " Did you configure the correct one through spark.history.fs.logDirectory?" + } + logWarning(msg) + false } - logWarning(msg) - false } } require(validDirs.nonEmpty, diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index 00a92c503be4e..8f8606bcf4ad6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -100,6 +100,25 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon } } + test("create missing spark.eventLog.dir automatically") { + val appId = getUniqueApplicationId + val attemptId = None + val missingDir = new File(testDir, "missing-event-log-dir") + val missingDirPath = new Path(missingDir.getAbsolutePath) + assert(!missingDir.exists()) + + val conf = getLoggingConf(missingDirPath, None) + val writer = createWriter(appId, attemptId, missingDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + writer.writeEvent("dummy", flushLogger = true) + writer.stop() + + assert(missingDir.isDirectory) + } + + test("Use the default value of spark.eventLog.compression.codec") { val conf = new SparkConf conf.set(EVENT_LOG_COMPRESS, true) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 829010179bda4..0265d62189760 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -2266,6 +2266,29 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P } } + test("create missing spark.history.fs.logDirectory automatically") { + val missingDir = Utils.createTempDir(namePrefix = "missingHistoryDir") + val missingDirPath = missingDir.getAbsolutePath + Utils.deleteRecursively(missingDir) + assert(!new File(missingDirPath).exists()) + + val conf = createTestConf().set(HISTORY_LOG_DIR, missingDirPath) + val provider = new FsHistoryProvider(conf) + try { + updateAndCheck(provider) { list => + list.size should be(0) + } + new File(missingDirPath).isDirectory should be(true) + } finally { + provider.stop() + val recreatedDir = new File(missingDirPath) + if (recreatedDir.exists()) { + Utils.deleteRecursively(recreatedDir) + } + } + } + + test("SPARK-55864: directory temporarily inaccessible then recovers") { val dir2 = Utils.createTempDir(namePrefix = "logDir2") try {