diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5edb24a6eb3e8..f768e675dcd00 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2373,6 +2373,22 @@ package object config { .enumConf(SchedulingMode) .createWithDefault(SchedulingMode.FIFO) + private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED = + ConfigBuilder("spark.scheduler.streaming.idAwareLogging.enabled") + .doc("When true, scheduler log messages for streaming tasks include " + + "the structured streaming query ID and batch ID.") + .version("4.2.0") + .booleanConf + .createWithDefault(true) + + private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH = + ConfigBuilder("spark.scheduler.streaming.idAwareLogging.queryIdLength") + .doc("Maximum number of characters of the streaming query ID to include " + + "in scheduler log messages. Set to -1 to include the full query ID.") + .version("4.2.0") + .intConf + .createWithDefault(5) + private[spark] val SCHEDULER_REVIVE_INTERVAL = ConfigBuilder("spark.scheduler.revive.interval") .version("0.8.1") diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 06b6045cccd99..dc853d15206dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,10 +26,9 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} +import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED, STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -61,6 +60,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext) extends SchedulableBuilder with Logging { + val streamingIdAwareLoggingEnabled: Boolean = + sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) + val streamingQueryIdLength: Int = + sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH) val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL @@ -216,18 +219,33 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logWarning(log"A job was submitted with scheduler pool " + - log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " + - log"configured. This can happen when the file that pools are read from isn't set, or " + - log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " + - log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " + - log"configuration (schedulingMode: " + - log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + - log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + - log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") + logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( + properties, + log"A job was submitted with scheduler pool " + + log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " + + log"configured. This can happen when the file that pools are read from isn't set, or " + + log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " + + log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " + + log"configuration (schedulingMode: " + + log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + + log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + + log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}", + streamingIdAwareLoggingEnabled, + streamingQueryIdLength + ) + ) } parentPool.addSchedulable(manager) - logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + - log"${MDC(LogKeys.POOL_NAME, poolName)}") + + logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( + properties, + log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + + log"${MDC(LogKeys.POOL_NAME, poolName)}", + streamingIdAwareLoggingEnabled, + streamingQueryIdLength + ) + ) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala new file mode 100644 index 0000000000000..ef76e139188a1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{HashMap, Locale, Properties} + +import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext} + +/** + * A logging trait for scheduler components where log messages should include + * structured streaming identifiers (query ID and batch ID). + * + * Streaming execution sets these identifiers via + * [[org.apache.spark.SparkContext#setLocalProperty]], which is thread-local. + * Scheduler code typically runs on a different thread (e.g. the + * task-scheduler-event-loop-worker), so `getLocalProperty` would not have + * the streaming context. This trait instead reads the identifiers from the + * task's [[java.util.Properties]], which are propagated with the + * [[org.apache.spark.scheduler.TaskSet]] across thread boundaries. + * + * Mix this trait into any scheduler component that has access to task + * properties and needs streaming-aware log output. + */ +private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { + // we gather the query and batch Id from the properties of a given TaskSet + protected def properties: Properties + protected def streamingIdAwareLoggingEnabled: Boolean + protected def streamingQueryIdLength: Int + + override protected def logInfo(msg: => String): Unit = + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + + override protected def logInfo(entry: LogEntry): Unit = { + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + } + + override protected def logInfo(msg: => String, t: Throwable): Unit = + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + + override protected def logInfo(entry: LogEntry, t: Throwable): Unit = { + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + } + + override protected def logWarning(msg: => String): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + + override protected def logWarning(entry: LogEntry): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + } + + override protected def logWarning(msg: => String, t: Throwable): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + + override protected def logWarning(entry: LogEntry, t: Throwable): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + } + + override protected def logDebug(msg: => String): Unit = + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + + override protected def logDebug(entry: LogEntry): Unit = { + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + } + + override protected def logDebug(msg: => String, t: Throwable): Unit = + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + + override protected def logDebug(entry: LogEntry, t: Throwable): Unit = { + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + } + + override protected def logError(msg: => String): Unit = + super.logError( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + + override protected def logError(entry: LogEntry): Unit = { + super.logError( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + } + + override protected def logError(msg: => String, t: Throwable): Unit = + super.logError( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + + override protected def logError(entry: LogEntry, t: Throwable): Unit = { + super.logError( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + } + + override protected def logTrace(msg: => String): Unit = + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + + override protected def logTrace(entry: LogEntry): Unit = { + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) + } + + override protected def logTrace(msg: => String, t: Throwable): Unit = + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + + override protected def logTrace(entry: LogEntry, t: Throwable): Unit = { + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) + } +} + +/** + * Helpers for constructing log entries enriched with structured streaming + * identifiers extracted from task properties. + */ +private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging { + val QUERY_ID_KEY = "sql.streaming.queryId" + val BATCH_ID_KEY = "streaming.sql.batchId" + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + entry: LogEntry, + enabled: Boolean, + queryIdLength: Int): LogEntry = { + if (!enabled || properties == null) { + return entry + } + // wrap in log entry to defer until log is evaluated + new LogEntry({ + val (queryId: Option[String], batchId: Option[String]) = + getStreamingProperties(properties, queryIdLength) + + formatMessage( + queryId, + batchId, + entry + ) + }) + } + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + msg: => String, + enabled: Boolean, + queryIdLength: Int): LogEntry = { + if (!enabled || properties == null) { + return new LogEntry( + MessageWithContext(msg, java.util.Collections.emptyMap()) + ) + } + + new LogEntry({ + val (queryId: Option[String], batchId: Option[String]) = + getStreamingProperties(properties, queryIdLength) + + MessageWithContext( + formatMessage( + queryId, + batchId, + msg + ), + constructStreamingContext(queryId, batchId) + ) + }) + } + + private def constructStreamingContext( + queryId: Option[String], + batchId: Option[String]): HashMap[String, String] = { + val streamingContext = new HashMap[String, String]() + // MDC places the log key in the context as all lowercase, so we do the same here + queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) + batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) + streamingContext + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => String): String = { + val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg) + queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId) + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => LogEntry): MessageWithContext = { + val msgWithBatchId: MessageWithContext = batchId.map( + bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) + ).getOrElse(toMessageWithContext(msg)) + queryId.map( + qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId + ).getOrElse(msgWithBatchId) + } + + private def toMessageWithContext(entry: LogEntry): MessageWithContext = { + MessageWithContext(entry.message, entry.context) + } + + private def getStreamingProperties( + properties: Properties, + queryIdLength: Int): (Option[String], Option[String]) = { + val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty).map { id => + if (queryIdLength == -1) { + id + } else { + id.take(queryIdLength) + } + } + val batchId = Option(properties.getProperty(BATCH_ID_KEY)).filter(_.nonEmpty) + (queryId, batchId) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 66f9a907158b5..618c8eb459026 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.TimerTask +import java.util.{Properties, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -288,9 +288,33 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + if (isStreamingTaskSet(taskSet)) { + streamingTaskSetManager(taskSet, maxTaskFailures) + } else { + new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) + } + } + + // Create task set manager for streaming tasks sets which + // will include query and batch Id in the logs + private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) + with StructuredStreamingIdAwareSchedulerLogging { + override protected def properties: Properties = this.taskSet.properties + override protected val streamingIdAwareLoggingEnabled: Boolean = + conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) + override protected val streamingQueryIdLength: Int = + conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH) + // ensure log name matches the non-streaming version + override protected def logName: String = classOf[TaskSetManager].getName + } } + private def isStreamingTaskSet(taskSet: TaskSet): Boolean = + taskSet.properties != null && + taskSet.properties.getProperty( + StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY) != null + // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up // once all the tasks has been finished. The stage attempt could be aborted after the call of diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 30ed80dbe848d..37de8338ad905 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -367,6 +367,46 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-56326: Fair Scheduler addTaskSetManager logs include " + + "streaming query Id and batch Id") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) + schedulableBuilder.buildPools() + + val testQueryId = "test-query-id-5678" + val testBatchId = "99" + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1") + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) + + val taskSetManager = createTaskSetManager(0, 1, taskScheduler) + + val logAppender = new LogAppender("pool streaming logs", maxEvents = 1000) + val loggerName = classOf[FairSchedulableBuilder].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + schedulableBuilder.addTaskSetManager(taskSetManager, properties) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + // default queryIdLength is 5, so the query ID is truncated + val truncatedQueryId = testQueryId.take(5) + val expectedQueryPrefix = s"[queryId = $truncatedQueryId]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + val addedLogs = logs.filter(msg => + msg.contains("Added task set") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(addedLogs.nonEmpty, + s"Expected 'Added task set' log to contain '$expectedQueryPrefix' " + + s"and '$expectedBatchPrefix'.\nCaptured logs:\n${logs.mkString("\n")}") + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala new file mode 100644 index 0000000000000..52e11e4d14d6f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{Locale, Properties} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.{LogEntry, Logging, LogKey, LogKeys, MessageWithContext} +import org.apache.spark.scheduler.StructuredStreamingIdAwareSchedulerLogging.{BATCH_ID_KEY, QUERY_ID_KEY} + +class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { + + override def beforeAll(): Unit = { + super.beforeAll() + Logging.enableStructuredLogging() + } + + override def afterAll(): Unit = { + Logging.disableStructuredLogging() + super.afterAll() + } + + private def assertContextValue( + context: java.util.Map[String, String], + key: LogKey, + expected: String): Unit = { + assert(context.get(key.name.toLowerCase(Locale.ROOT)) === expected) + } + + private def assertContextAbsent( + context: java.util.Map[String, String], + key: LogKey): Unit = { + assert(!context.containsKey(key.name.toLowerCase(Locale.ROOT))) + } + + private val testQueryId = "abc-query-id" + private val testBatchId = "42" + + private def propsWithBothIds(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, testBatchId) + props + } + + private def propsWithQueryIdOnly(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props + } + + test("SPARK-56326: constructStreamingLogEntry with String - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry with String - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithQueryIdOnly(), "test message", enabled = true, queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] test message")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + } + + test("SPARK-56326: constructStreamingLogEntry with String - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + new Properties(), "test message", enabled = true, queryIdLength = -1) + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + null, "test message", enabled = true, queryIdLength = -1) + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - empty queryId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, "test message", enabled = true, queryIdLength = -1) + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - empty batchId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, "test message", enabled = true, queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] test message")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] " + + s"[batchId = $testBatchId] test message Dummy Context")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithQueryIdOnly(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(new Properties(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(null, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - empty queryId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - empty batchId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = -1) + + assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers evaluation") { + var evaluated = false + val lazyEntry = new LogEntry({ + evaluated = true + MessageWithContext("lazy message", java.util.Collections.emptyMap()) + }) + + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), lazyEntry, enabled = true, queryIdLength = -1) + + // Work should be deferred + assert(!evaluated, + "LogEntry should not be evaluated during constructStreamingLogEntry") + + // Accessing .message triggers evaluation + result.message + assert(evaluated, "LogEntry should be evaluated when .message is accessed") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers property access") { + var propertiesAccessed = false + val props = new Properties() { + override def getProperty(key: String): String = { + propertiesAccessed = true + super.getProperty(key) + } + } + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, testBatchId) + + val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + props, entry, enabled = true, queryIdLength = -1) + + assert(!propertiesAccessed, + "Properties should not be accessed during constructStreamingLogEntry") + + result.message + assert(propertiesAccessed, + "Properties should be accessed when .message is called") + } + + test("SPARK-56326: constructStreamingLogEntry with String - disabled skips enrichment") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = false, queryIdLength = -1) + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - disabled skips enrichment") { + val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), entry, enabled = false, queryIdLength = -1) + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with String - queryIdLength truncates queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = 5) + + val truncatedId = testQueryId.take(5) + assertResult(s"[queryId = $truncatedId] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, truncatedId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - queryIdLength truncates queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = 5) + + val truncatedId = testQueryId.take(5) + assertResult(s"[queryId = $truncatedId] " + + s"[batchId = $testBatchId] test message Dummy Context")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, truncatedId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry - " + + "queryIdLength greater than id length returns full id") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = 1000) + + assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9007ae4e0990a..c9c9e529405ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -2713,4 +2713,43 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext } } + test("SPARK-56326: Streaming TaskSet with queryId in properties " + + "uses StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + val testQueryId = "test-query-id-1234" + val testBatchId = "42" + // Create a TaskSet with a non-null Properties containing the streaming metadata. + val properties = new Properties() + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + + test("SPARK-56326: Streaming TaskSet without queryId in properties " + + "does not use StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + // create task with empty properties (using FakeTask will cause null properties) + val properties = new Properties() + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + + assert(!tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + + test("SPARK-56326: Streaming TaskSet with null properties " + + "does not use StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + val taskSet = FakeTask.createTaskSet(10) + + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + + assert(!tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index adcb57a0187a4..09901ead9a68c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2867,6 +2867,61 @@ class TaskSetManagerSuite assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun) } + test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " + + "messages") { + sc = new SparkContext("local", "test") + val clock = new ManualClock + sched = new FakeTaskScheduler(sc, clock, ("exec1", "host1")) + val testQueryId = "test-query-id-1234" + val testBatchId = "42" + // Create a TaskSet with a non-null Properties containing the streaming metadata. + val properties = new Properties() + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000) + val loggerName = classOf[TaskSetManager].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + // uses TaskSchedulerImpl.streamingTaskSetManager to ensure + // logging and properties is initialized correctly + val manager = sched.createTaskSetManager(taskSet, MAX_TASK_FAILURES) + + // resourceOffer triggers prepareLaunchingTask which logs "Starting ..." + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + + clock.advance(1) + // handleSuccessfulTask logs "Finished ..." + manager.handleSuccessfulTask(0, createTaskResult(0)) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + + // default queryIdLength is 5, so the query ID is truncated + val truncatedQueryId = testQueryId.take(5) + val expectedQueryPrefix = s"[queryId = $truncatedQueryId]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + + // Verify the "Starting" log line includes query Id and batch Id + val startingLogs = logs.filter(msg => + msg.contains("Starting") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(startingLogs.nonEmpty, + s"Expected 'Starting' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + + // Verify the "Finished" log line includes query Id and batch Id + val finishedLogs = logs.filter(msg => + msg.contains("Finished") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(finishedLogs.nonEmpty, + s"Expected 'Finished' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + } + } class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) { diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index a84b0fc3b6c77..d49d0eab2fe77 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -319,6 +319,8 @@ spark.scheduler.numCancelledJobGroupsToTrack spark.scheduler.resource.profileMergeConflicts spark.scheduler.revive.interval spark.scheduler.stage.legacyAbortAfterKillTasks +spark.scheduler.streaming.idAwareLogging.enabled +spark.scheduler.streaming.idAwareLogging.queryIdLength spark.security.credentials.renewalRatio spark.security.credentials.retryWait spark.serializer