From 9ed6cd620d8722d419d14524b5c2f58f0571618c Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 1 Apr 2026 20:59:59 +0000 Subject: [PATCH 01/18] Add streaming query Id and batch Id to scheduling logs --- .../spark/scheduler/SchedulableBuilder.scala | 39 ++++++++++++++++++- .../spark/scheduler/TaskSetManager.scala | 6 ++- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 06b6045cccd99..a66bb34af2123 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.MessageWithContext import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -58,6 +59,40 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) } } +private[spark] object SchedulableBuilder extends Logging { + val QUERY_ID_KEY = "sql.streaming.queryId" + val BATCH_ID_KEY = "streaming.sql.batchId" + + /** + * Helper method used to generate a logging prefix containing the query Id and batch Id + * when they are set. These properties are only set for streaming queries and are used to + * aid in debugging multiple streaming queries running at the same time. + * + * @param properties the task properties to check for query Id and batch Id + * @return a log prefix containing the query Id and batch Id when both are present and + * non-null; otherwise, an empty prefix + */ + def schedulingLogStreamingContext(properties: Properties): MessageWithContext = { + if (properties == null) { + return log"" + } + + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + + queryId match { + case Some(qId) => + val prefix = log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " + batchId.fold(prefix)(bId => + prefix + log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + ) + case _ => + // Query id not set; not a streaming query so nothing to add + log"" + } + } +} + private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext) extends SchedulableBuilder with Logging { @@ -227,7 +262,9 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") } parentPool.addSchedulable(manager) - logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + + + logInfo(SchedulableBuilder.schedulingLogStreamingContext(properties) + + log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " + log"${MDC(LogKeys.POOL_NAME, poolName)}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c077a7a3bbb8..b8d05e870c07b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -571,7 +571,8 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val tName = taskName(taskId) - logInfo(log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," + + logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) + + log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," + log"executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}, " + log"partition ${MDC(PARTITION_ID, task.partitionId)}, " + log"${MDC(TASK_LOCALITY, taskLocality)}, " + @@ -865,7 +866,8 @@ private[spark] class TaskSetManager( } if (!successful(index)) { tasksSuccessful += 1 - logInfo(log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " + + logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) + + log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " + log"${MDC(DURATION, info.duration)} ms on ${MDC(HOST, info.host)} " + log"(executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}) " + log"(${MDC(NUM_SUCCESSFUL_TASKS, tasksSuccessful)}/${MDC(NUM_TASKS, numTasks)})") From 4a53e4bcb5fc0677c5862bab6a25b2867a26327f Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 2 Apr 2026 00:13:14 +0000 Subject: [PATCH 02/18] Add unit tests covering streaming query and batch Id logs --- .../apache/spark/scheduler/PoolSuite.scala | 37 ++++++++++ .../scheduler/SchedulableBuilderSuite.scala | 68 +++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 51 ++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 30ed80dbe848d..02f97668be696 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -367,6 +367,43 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } + test("Fair Scheduler addTaskSetManager logs include streaming query Id and batch Id") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) + sc = new SparkContext(LOCAL, APP_NAME, conf) + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc) + schedulableBuilder.buildPools() + + val testQueryId = "test-query-id-5678" + val testBatchId = "99" + val properties = new Properties() + properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1") + properties.setProperty("sql.streaming.queryId", testQueryId) + properties.setProperty("streaming.sql.batchId", testBatchId) + + val taskSetManager = createTaskSetManager(0, 1, taskScheduler) + + val logAppender = new LogAppender("pool streaming logs", maxEvents = 1000) + val loggerName = classOf[FairSchedulableBuilder].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + schedulableBuilder.addTaskSetManager(taskSetManager, properties) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + val addedLogs = logs.filter(msg => + msg.contains("Added task set") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(addedLogs.nonEmpty, + s"Expected 'Added task set' log to contain '$expectedQueryPrefix' " + + s"and '$expectedBatchPrefix'.\nCaptured logs:\n${logs.mkString("\n")}") + } + private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int, expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = { val selectedPool = rootPool.getSchedulableByName(poolName) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala new file mode 100644 index 0000000000000..ef6c040347452 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.Properties + +import org.apache.spark.SparkFunSuite +import org.apache.spark.scheduler.SchedulableBuilder.{BATCH_ID_KEY, QUERY_ID_KEY} + +class SchedulableBuilderSuite extends SparkFunSuite { + + test("schedulingLogStreamingContext - both queryId and batchId present") { + val props = new Properties() + val queryId = "12345-test-query-id" + val batchId = "23" + + props.setProperty(QUERY_ID_KEY, queryId) + props.setProperty(BATCH_ID_KEY, batchId) + + val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) + + // log context should include the truncated query Id and the provided batch Id + assertResult("[queryId = 12345] [batchId = 23] ")(logContext.message) + } + + test("schedulingLogStreamingContext - only queryId and no batchId present") { + val props = new Properties() + val queryId = "12345-test-query-id" + + props.setProperty(QUERY_ID_KEY, queryId) + + val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) + + // log context should include the truncated query Id but no batch Id + assertResult("[queryId = 12345] ")(logContext.message) + } + + test("schedulingLogStreamingContext - no queryId or batchId present") { + val props = new Properties() + + val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) + + // log context should be an empty string + assertResult("")(logContext.message) + } + + test("schedulingLogStreamingContext - handles null properties") { + val logContext = SchedulableBuilder.schedulingLogStreamingContext(null) + + // log context should be an empty string + assertResult("")(logContext.message) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index adcb57a0187a4..3b66945d89a90 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2867,6 +2867,57 @@ class TaskSetManagerSuite assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun) } + test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " + + "messages") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val testQueryId = "test-query-id-1234" + val testBatchId = "42" + // Create a TaskSet with a non-null Properties containing the streaming metadata. + val properties = new Properties() + properties.setProperty("sql.streaming.queryId", testQueryId) + properties.setProperty("streaming.sql.batchId", testBatchId) + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val clock = new ManualClock + val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000) + val loggerName = classOf[TaskSetManager].getName + + withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + + // resourceOffer triggers prepareLaunchingTask which logs "Starting ..." + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + assert(taskOption.isDefined) + + clock.advance(1) + // handleSuccessfulTask logs "Finished ..." + manager.handleSuccessfulTask(0, createTaskResult(0)) + } + + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + + val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedBatchPrefix = s"[batchId = $testBatchId]" + + // Verify the "Starting" log line includes query Id and batch Id + val startingLogs = logs.filter(msg => + msg.contains("Starting") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(startingLogs.nonEmpty, + s"Expected 'Starting' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + + // Verify the "Finished" log line includes query Id and batch Id + val finishedLogs = logs.filter(msg => + msg.contains("Finished") && + msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix)) + assert(finishedLogs.nonEmpty, + s"Expected 'Finished' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." + + s"\nCaptured logs:\n${logs.mkString("\n")}") + } + } class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) { From 92d2f959eba1a78537b060fb1b868fd634d8579a Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 2 Apr 2026 00:20:23 +0000 Subject: [PATCH 03/18] add jira ID to test name in PoolSuite --- core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 02f97668be696..873ec4023c9cd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -367,7 +367,8 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } } - test("Fair Scheduler addTaskSetManager logs include streaming query Id and batch Id") { + test("SPARK-56326: Fair Scheduler addTaskSetManager logs include " + + "streaming query Id and batch Id") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext(LOCAL, APP_NAME, conf) From 484521503ed66efd7670194e0e971af1662e555e Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Fri, 3 Apr 2026 20:28:35 +0000 Subject: [PATCH 04/18] switch from SchedulableBuilder companion object to IdAware logger --- .../spark/scheduler/SchedulableBuilder.scala | 48 +---- ...uredStreamingIdAwareSchedulerLogging.scala | 182 ++++++++++++++++++ .../spark/scheduler/TaskSetManager.scala | 15 +- .../scheduler/SchedulableBuilderSuite.scala | 68 ------- ...treamingIdAwareSchedulerLoggingSuite.scala | 150 +++++++++++++++ 5 files changed, 348 insertions(+), 115 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a66bb34af2123..acc649085a0f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,10 +26,8 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.MessageWithContext import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -59,40 +57,6 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) } } -private[spark] object SchedulableBuilder extends Logging { - val QUERY_ID_KEY = "sql.streaming.queryId" - val BATCH_ID_KEY = "streaming.sql.batchId" - - /** - * Helper method used to generate a logging prefix containing the query Id and batch Id - * when they are set. These properties are only set for streaming queries and are used to - * aid in debugging multiple streaming queries running at the same time. - * - * @param properties the task properties to check for query Id and batch Id - * @return a log prefix containing the query Id and batch Id when both are present and - * non-null; otherwise, an empty prefix - */ - def schedulingLogStreamingContext(properties: Properties): MessageWithContext = { - if (properties == null) { - return log"" - } - - val queryId = Option(properties.getProperty(QUERY_ID_KEY)) - val batchId = Option(properties.getProperty(BATCH_ID_KEY)) - - queryId match { - case Some(qId) => - val prefix = log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " - batchId.fold(prefix)(bId => - prefix + log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " - ) - case _ => - // Query id not set; not a streaming query so nothing to add - log"" - } - } -} - private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext) extends SchedulableBuilder with Logging { @@ -263,8 +227,12 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext } parentPool.addSchedulable(manager) - logInfo(SchedulableBuilder.schedulingLogStreamingContext(properties) + - log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " + - log"${MDC(LogKeys.POOL_NAME, poolName)}") + logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( + properties, + log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " + + log"${MDC(LogKeys.POOL_NAME, poolName)}" + ) + ) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala new file mode 100644 index 0000000000000..38df3afe43344 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{Locale, Properties} + +import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext} + +/** + * A logging trait for scheduler components where log messages should include + * structured streaming identifiers (query ID and batch ID). + * + * Streaming execution sets these identifiers via + * [[org.apache.spark.SparkContext#setLocalProperty]], which is thread-local. + * Scheduler code typically runs on a different thread (e.g. the + * task-scheduler-event-loop-worker), so `getLocalProperty` would not have + * the streaming context. This trait instead reads the identifiers from the + * task's [[java.util.Properties]], which are propagated with the + * [[org.apache.spark.scheduler.TaskSet]] across thread boundaries. + * + * Mix this trait into any scheduler component that has access to task + * properties and needs streaming-aware log output. + */ +private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { + // we gather the query and batch Id from the properties of a given TaskSet + protected def properties: Properties + + override protected def logInfo(msg: => String): Unit = + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logInfo(entry: LogEntry): Unit = { + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logWarning(msg: => String): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logWarning(entry: LogEntry): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logWarning(msg: => String, t: Throwable): Unit = + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logWarning(entry: LogEntry, t: Throwable): Unit = { + super.logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } + + override protected def logDebug(msg: => String): Unit = + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logDebug(entry: LogEntry): Unit = { + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logError(msg: => String): Unit = + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logError(entry: LogEntry): Unit = { + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } + + override protected def logTrace(msg: => String): Unit = + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + + override protected def logTrace(entry: LogEntry): Unit = { + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + } +} + +/** + * Helpers for constructing log entries enriched with structured streaming + * identifiers extracted from task properties. + */ +private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging { + val QUERY_ID_KEY = "sql.streaming.queryId" + val BATCH_ID_KEY = "streaming.sql.batchId" + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + entry: LogEntry): LogEntry = { + if (properties == null) { + return entry + } + + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + + // formatMessage truncates the queryId for readability + // so we use a blank messageWithContext to overwrite the full query Id to the context + formatMessage( + queryId, + batchId, + entry + ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) + } + + private[scheduler] def constructStreamingLogEntry( + properties: Properties, + msg: => String): LogEntry = { + if (properties == null) { + return new LogEntry( + MessageWithContext(msg, java.util.Collections.emptyMap()) + ) + } + + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + new LogEntry({ + MessageWithContext( + formatMessage( + queryId, + batchId, + msg + ), + constructStreamingContext(queryId, batchId) + ) + }) + } + + private def constructStreamingContext( + queryId: Option[String], + batchId: Option[String]): + java.util.HashMap[String, String] = { + val streamingContext = new java.util.HashMap[String, String]() + // MDC places the log key in the context as all lowercase, so we do the same here + queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) + batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) + streamingContext + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => String): String = { + val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg) + queryId.map(qId => s"[queryId = ${qId.take(5)}] $msgWithBatchId").getOrElse(msgWithBatchId) + } + + private def formatMessage( + queryId: Option[String], + batchId: Option[String], + msg: => LogEntry): MessageWithContext = { + val msgWithBatchId: MessageWithContext = batchId.map( + bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) + ).getOrElse(toMessageWithContext(msg)) + queryId.map( + qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " + msgWithBatchId + ).getOrElse(msgWithBatchId) + } + + private def toMessageWithContext(entry: LogEntry): MessageWithContext = { + MessageWithContext(entry.message, entry.context) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b8d05e870c07b..1f1d67d020812 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer +import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -30,7 +31,7 @@ import org.apache.spark.InternalAccumulator import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, Logging, LogKeys} +import org.apache.spark.internal.{config, LogKeys} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ @@ -58,10 +59,12 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, healthTracker: Option[HealthTracker] = None, - clock: Clock = new SystemClock()) extends Schedulable with Logging { + clock: Clock = new SystemClock()) + extends Schedulable with StructuredStreamingIdAwareSchedulerLogging { - private val conf = sched.sc.conf + override def properties: Properties = taskSet.properties + private val conf = sched.sc.conf val maxResultSize = conf.get(config.MAX_RESULT_SIZE) // Serializer for closures and tasks. @@ -571,8 +574,7 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val tName = taskName(taskId) - logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) + - log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," + + logInfo(log"Starting ${MDC(TASK_NAME, tName)} (${MDC(HOST, host)}," + log"executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}, " + log"partition ${MDC(PARTITION_ID, task.partitionId)}, " + log"${MDC(TASK_LOCALITY, taskLocality)}, " + @@ -866,8 +868,7 @@ private[spark] class TaskSetManager( } if (!successful(index)) { tasksSuccessful += 1 - logInfo(SchedulableBuilder.schedulingLogStreamingContext(taskSet.properties) + - log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " + + logInfo(log"Finished ${MDC(TASK_NAME, taskName(info.taskId))} in " + log"${MDC(DURATION, info.duration)} ms on ${MDC(HOST, info.host)} " + log"(executor ${MDC(LogKeys.EXECUTOR_ID, info.executorId)}) " + log"(${MDC(NUM_SUCCESSFUL_TASKS, tasksSuccessful)}/${MDC(NUM_TASKS, numTasks)})") diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala deleted file mode 100644 index ef6c040347452..0000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulableBuilderSuite.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import java.util.Properties - -import org.apache.spark.SparkFunSuite -import org.apache.spark.scheduler.SchedulableBuilder.{BATCH_ID_KEY, QUERY_ID_KEY} - -class SchedulableBuilderSuite extends SparkFunSuite { - - test("schedulingLogStreamingContext - both queryId and batchId present") { - val props = new Properties() - val queryId = "12345-test-query-id" - val batchId = "23" - - props.setProperty(QUERY_ID_KEY, queryId) - props.setProperty(BATCH_ID_KEY, batchId) - - val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) - - // log context should include the truncated query Id and the provided batch Id - assertResult("[queryId = 12345] [batchId = 23] ")(logContext.message) - } - - test("schedulingLogStreamingContext - only queryId and no batchId present") { - val props = new Properties() - val queryId = "12345-test-query-id" - - props.setProperty(QUERY_ID_KEY, queryId) - - val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) - - // log context should include the truncated query Id but no batch Id - assertResult("[queryId = 12345] ")(logContext.message) - } - - test("schedulingLogStreamingContext - no queryId or batchId present") { - val props = new Properties() - - val logContext = SchedulableBuilder.schedulingLogStreamingContext(props) - - // log context should be an empty string - assertResult("")(logContext.message) - } - - test("schedulingLogStreamingContext - handles null properties") { - val logContext = SchedulableBuilder.schedulingLogStreamingContext(null) - - // log context should be an empty string - assertResult("")(logContext.message) - } -} diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala new file mode 100644 index 0000000000000..d314ce29aef5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.{Locale, Properties} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.internal.{Logging, LogKey, LogKeys} +import org.apache.spark.scheduler.StructuredStreamingIdAwareSchedulerLogging.{ + BATCH_ID_KEY, + QUERY_ID_KEY +} + +class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { + + override def beforeAll(): Unit = { + super.beforeAll() + Logging.enableStructuredLogging() + } + + override def afterAll(): Unit = { + Logging.disableStructuredLogging() + super.afterAll() + } + + private def assertContextValue( + context: java.util.Map[String, String], + key: LogKey, + expected: String): Unit = { + assert(context.get(key.name.toLowerCase(Locale.ROOT)) === expected) + } + + private def assertContextAbsent( + context: java.util.Map[String, String], + key: LogKey): Unit = { + assert(!context.containsKey(key.name.toLowerCase(Locale.ROOT))) + } + + private val testQueryId = "abc-query-id" + private val testBatchId = "42" + + private def propsWithBothIds(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, testBatchId) + props + } + + private def propsWithQueryIdOnly(): Properties = { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props + } + + test("SPARK-56326: constructStreamingLogEntry with String - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), "test message") + + assertResult(s"[queryId = ${testQueryId.take(5)}] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry with String - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message") + + assertResult(s"[queryId = ${testQueryId.take(5)}] test message")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + } + + test("SPARK-56326: constructStreamingLogEntry with String - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(new Properties(), "test message") + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(null, "test message") + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - both queryId and batchId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult(s"[queryId = ${testQueryId.take(5)}] " + + s"[batchId = $testBatchId] test message Dummy Context")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - only queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithQueryIdOnly(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult(s"[queryId = ${testQueryId.take(5)}] test message Dummy Context")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - no streaming properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(new Properties(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - null properties") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(null, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } +} From 53099c989bcb632d361a40f868b8a35f58a1ad8f Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 8 Apr 2026 22:29:19 +0000 Subject: [PATCH 05/18] Move mixin to just streaming tasksets' TaskSetManagers --- .../spark/scheduler/TaskSchedulerImpl.scala | 19 +++++++ .../spark/scheduler/TaskSetManager.scala | 10 ++-- .../apache/spark/scheduler/PoolSuite.scala | 4 +- .../scheduler/TaskSchedulerImplSuite.scala | 52 +++++++++++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 9 +++- 5 files changed, 85 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 66f9a907158b5..734be9a53bf1d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -288,9 +288,28 @@ private[spark] class TaskSchedulerImpl( private[scheduler] def createTaskSetManager( taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + if (isStreamingTaskSet(taskSet)) { + streamingTaskSetManager(taskSet, maxTaskFailures) + } else { + new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) + } + } + + // Create task set manager for streaming tasks sets which + // will include query and batch Id in the logs + private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int) = { new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) + with StructuredStreamingIdAwareSchedulerLogging { + // ensure log name matches the non-streaming version + override protected def logName: String = classOf[TaskSetManager].getName + } } + private def isStreamingTaskSet(taskSet: TaskSet): Boolean = + taskSet.properties != null && + taskSet.properties.getProperty( + StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY) != null + // Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't // be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up // once all the tasks has been finished. The stage attempt could be aborted after the call of diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1f1d67d020812..bd923c05cea06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.InternalAccumulator import org.apache.spark.InternalAccumulator.{input, shuffleRead} import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.{config, LogKeys} +import org.apache.spark.internal.{config, Logging, LogKeys} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ @@ -59,10 +59,7 @@ private[spark] class TaskSetManager( val taskSet: TaskSet, val maxTaskFailures: Int, healthTracker: Option[HealthTracker] = None, - clock: Clock = new SystemClock()) - extends Schedulable with StructuredStreamingIdAwareSchedulerLogging { - - override def properties: Properties = taskSet.properties + clock: Clock = new SystemClock()) extends Schedulable with Logging { private val conf = sched.sc.conf val maxResultSize = conf.get(config.MAX_RESULT_SIZE) @@ -79,6 +76,9 @@ private[spark] class TaskSetManager( .map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) + // used in the case of a streaming task set + // where we need the properties for StructuredStreamingIdAwareSchedulerLogging + protected def properties: Properties = taskSet.properties val speculationEnabled = conf.get(SPECULATION_ENABLED) private val efficientTaskProcessMultiplier = diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 873ec4023c9cd..cc0c97a89a60a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -382,8 +382,8 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val testBatchId = "99" val properties = new Properties() properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1") - properties.setProperty("sql.streaming.queryId", testQueryId) - properties.setProperty("streaming.sql.batchId", testBatchId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) val taskSetManager = createTaskSetManager(0, 1, taskScheduler) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9007ae4e0990a..3d12ef0eefe41 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.reflectiveCalls +import org.apache.logging.log4j.Level import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when} import org.scalatest.concurrent.Eventually @@ -2713,4 +2714,55 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext } } + test("SPARK-56326: Streaming TaskSet with queryId in properties " + + "uses StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + val testQueryId = "test-query-id-1234" + val testBatchId = "42" + // Create a TaskSet with a non-null Properties containing the streaming metadata. + val properties = new Properties() + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val logAppender = new LogAppender("streaming log name check") + // TSM constructor prints some debug logs we can use + logAppender.setThreshold(Level.DEBUG) + withLogAppender(logAppender, + loggerNames = Seq(classOf[TaskSetManager].getName), + level = Some(Level.DEBUG)) { + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + // when creating the streaming version we want the log name to match the + // non-streaming baseline case. By confirming our log appender contains + // logs we know the log name is correct + assert(logAppender.loggingEvents.nonEmpty, + "Expected logs under TaskSetManager logger name") + } + + test("SPARK-56326: Streaming TaskSet without queryId in properties " + + "does not use StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + // create task with empty properties (using FakeTask will cause null properties) + val properties = new Properties() + val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + + assert(!tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + + test("SPARK-56326: Streaming TaskSet with null properties " + + "does not use StructuredStreamingIdAwareSchedulerLogging") { + setupScheduler() + val taskSet = FakeTask.createTaskSet(10) + + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + + assert(!tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 3b66945d89a90..c6ffbb574e5e5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2875,8 +2875,8 @@ class TaskSetManagerSuite val testBatchId = "42" // Create a TaskSet with a non-null Properties containing the streaming metadata. val properties = new Properties() - properties.setProperty("sql.streaming.queryId", testQueryId) - properties.setProperty("streaming.sql.batchId", testBatchId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) + properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) @@ -2885,7 +2885,12 @@ class TaskSetManagerSuite val loggerName = classOf[TaskSetManager].getName withLogAppender(logAppender, loggerNames = Seq(loggerName)) { + // mirrors TaskSchedulerImpl.streamingTaskSetManager val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + with StructuredStreamingIdAwareSchedulerLogging { + // ensure log name matches the non-streaming version + override protected def logName: String = classOf[TaskSetManager].getName + } // resourceOffer triggers prepareLaunchingTask which logs "Starting ..." val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 From 280d11b759d7d5ee2aedf0c34278ab00c75a271c Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 8 Apr 2026 22:42:45 +0000 Subject: [PATCH 06/18] clean up white space diff --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index bd923c05cea06..823599ff0a093 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -62,6 +62,7 @@ private[spark] class TaskSetManager( clock: Clock = new SystemClock()) extends Schedulable with Logging { private val conf = sched.sc.conf + val maxResultSize = conf.get(config.MAX_RESULT_SIZE) // Serializer for closures and tasks. From f1497f66d653cf4e70350d48d59aa2a0cab95aac Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 8 Apr 2026 23:57:40 +0000 Subject: [PATCH 07/18] Override all Throwable-accepting log methods in streaming trait --- ...uredStreamingIdAwareSchedulerLogging.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 38df3afe43344..8ef30b135d552 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -49,6 +49,15 @@ private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logg StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) } + override protected def logInfo(msg: => String, t: Throwable): Unit = + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logInfo(entry: LogEntry, t: Throwable): Unit = { + super.logInfo( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } + override protected def logWarning(msg: => String): Unit = super.logWarning( StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) @@ -76,6 +85,15 @@ private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logg StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) } + override protected def logDebug(msg: => String, t: Throwable): Unit = + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logDebug(entry: LogEntry, t: Throwable): Unit = { + super.logDebug( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } + override protected def logError(msg: => String): Unit = super.logError( StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) @@ -85,6 +103,15 @@ private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logg StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) } + override protected def logError(msg: => String, t: Throwable): Unit = + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logError(entry: LogEntry, t: Throwable): Unit = { + super.logError( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } + override protected def logTrace(msg: => String): Unit = super.logTrace( StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) @@ -93,6 +120,15 @@ private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logg super.logTrace( StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) } + + override protected def logTrace(msg: => String, t: Throwable): Unit = + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + + override protected def logTrace(entry: LogEntry, t: Throwable): Unit = { + super.logTrace( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + } } /** From f5a191574bab4f282377e94e3f5fc75cf3368893 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 8 Apr 2026 23:58:33 +0000 Subject: [PATCH 08/18] Enrich FairSchedulableBuilder logWarning with streaming IDs --- .../spark/scheduler/SchedulableBuilder.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index acc649085a0f5..2fdd85b7d785e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -215,15 +215,20 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) - logWarning(log"A job was submitted with scheduler pool " + - log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " + - log"configured. This can happen when the file that pools are read from isn't set, or " + - log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " + - log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " + - log"configuration (schedulingMode: " + - log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + - log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + - log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}") + logWarning( + StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( + properties, + log"A job was submitted with scheduler pool " + + log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " + + log"configured. This can happen when the file that pools are read from isn't set, or " + + log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " + + log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " + + log"configuration (schedulingMode: " + + log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + + log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + + log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}" + ) + ) } parentPool.addSchedulable(manager) From 413c21c1bd4e1bcb32ebb5e6a5b7503b29034346 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 8 Apr 2026 23:59:42 +0000 Subject: [PATCH 09/18] Restore 'tasks' in 'Added task set ... tasks to pool' log message --- .../scala/org/apache/spark/scheduler/SchedulableBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 2fdd85b7d785e..4d700c41726a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -235,7 +235,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext logInfo( StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( properties, - log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} to pool " + + log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + log"${MDC(LogKeys.POOL_NAME, poolName)}" ) ) From fedfcdc3185b28b651917d7daf8d17485aa1b62d Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 9 Apr 2026 00:00:34 +0000 Subject: [PATCH 10/18] Add explicit return type to streamingTaskSetManager --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 734be9a53bf1d..8872355a503fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -297,7 +297,7 @@ private[spark] class TaskSchedulerImpl( // Create task set manager for streaming tasks sets which // will include query and batch Id in the logs - private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int) = { + private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) with StructuredStreamingIdAwareSchedulerLogging { // ensure log name matches the non-streaming version From dc6bd591d23dd264542b29e9fcf0b1282b064f93 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 9 Apr 2026 02:05:51 +0000 Subject: [PATCH 11/18] wrap streaming message in logEntry to avoid eager execution on disabled log lines --- ...uredStreamingIdAwareSchedulerLogging.scala | 29 ++++++----- ...treamingIdAwareSchedulerLoggingSuite.scala | 49 +++++++++++++++++-- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 8ef30b135d552..132ac313d5687 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -145,17 +145,19 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log if (properties == null) { return entry } - - val queryId = Option(properties.getProperty(QUERY_ID_KEY)) - val batchId = Option(properties.getProperty(BATCH_ID_KEY)) - - // formatMessage truncates the queryId for readability - // so we use a blank messageWithContext to overwrite the full query Id to the context - formatMessage( - queryId, - batchId, - entry - ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) + // wrap in log entry to defer until log is evaluated + new LogEntry({ + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + + // formatMessage truncates the queryId for readability + // so we use a blank messageWithContext to overwrite the full query Id to the context + formatMessage( + queryId, + batchId, + entry + ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) + }) } private[scheduler] def constructStreamingLogEntry( @@ -167,9 +169,10 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log ) } - val queryId = Option(properties.getProperty(QUERY_ID_KEY)) - val batchId = Option(properties.getProperty(BATCH_ID_KEY)) new LogEntry({ + val queryId = Option(properties.getProperty(QUERY_ID_KEY)) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + MessageWithContext( formatMessage( queryId, diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala index d314ce29aef5e..1b8eeeaab4451 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -20,11 +20,8 @@ package org.apache.spark.scheduler import java.util.{Locale, Properties} import org.apache.spark.SparkFunSuite -import org.apache.spark.internal.{Logging, LogKey, LogKeys} -import org.apache.spark.scheduler.StructuredStreamingIdAwareSchedulerLogging.{ - BATCH_ID_KEY, - QUERY_ID_KEY -} +import org.apache.spark.internal.{LogEntry, Logging, LogKey, LogKeys, MessageWithContext} +import org.apache.spark.scheduler.StructuredStreamingIdAwareSchedulerLogging.{BATCH_ID_KEY, QUERY_ID_KEY} class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { @@ -147,4 +144,46 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { assertContextAbsent(result.context, LogKeys.BATCH_ID) assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers evaluation") { + var evaluated = false + val lazyEntry = new LogEntry({ + evaluated = true + MessageWithContext("lazy message", java.util.Collections.emptyMap()) + }) + + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), lazyEntry) + + // Work should be deferred + assert(!evaluated, + "LogEntry should not be evaluated during constructStreamingLogEntry") + + // Accessing .message triggers evaluation + result.message + assert(evaluated, "LogEntry should be evaluated when .message is accessed") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers property access") { + var propertiesAccessed = false + val props = new Properties() { + override def getProperty(key: String): String = { + propertiesAccessed = true + super.getProperty(key) + } + } + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, testBatchId) + + val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, entry) + + assert(!propertiesAccessed, + "Properties should not be accessed during constructStreamingLogEntry") + + result.message + assert(propertiesAccessed, + "Properties should be accessed when .message is called") + } } From 0b0f5f1cc901a9dad0863ae38463d5ce35f0d657 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 9 Apr 2026 02:11:15 +0000 Subject: [PATCH 12/18] remove query Id truncation --- .../StructuredStreamingIdAwareSchedulerLogging.scala | 8 +++----- .../test/scala/org/apache/spark/scheduler/PoolSuite.scala | 2 +- .../StructuredStreamingIdAwareSchedulerLoggingSuite.scala | 8 ++++---- .../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 132ac313d5687..327a8836a7a21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -150,13 +150,11 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log val queryId = Option(properties.getProperty(QUERY_ID_KEY)) val batchId = Option(properties.getProperty(BATCH_ID_KEY)) - // formatMessage truncates the queryId for readability - // so we use a blank messageWithContext to overwrite the full query Id to the context formatMessage( queryId, batchId, entry - ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) + ) }) } @@ -200,7 +198,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log batchId: Option[String], msg: => String): String = { val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg) - queryId.map(qId => s"[queryId = ${qId.take(5)}] $msgWithBatchId").getOrElse(msgWithBatchId) + queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId) } private def formatMessage( @@ -211,7 +209,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) ).getOrElse(toMessageWithContext(msg)) queryId.map( - qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " + msgWithBatchId + qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId ).getOrElse(msgWithBatchId) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index cc0c97a89a60a..297998308092a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -395,7 +395,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) - val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedQueryPrefix = s"[queryId = ${testQueryId}]" val expectedBatchPrefix = s"[batchId = $testBatchId]" val addedLogs = logs.filter(msg => msg.contains("Added task set") && diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala index 1b8eeeaab4451..f4f7872f8e960 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -68,7 +68,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithBothIds(), "test message") - assertResult(s"[queryId = ${testQueryId.take(5)}] [batchId = $testBatchId] test message")( + assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) @@ -78,7 +78,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message") - assertResult(s"[queryId = ${testQueryId.take(5)}] test message")(result.message) + assertResult(s"[queryId = $testQueryId] test message")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) assertContextAbsent(result.context, LogKeys.BATCH_ID) } @@ -104,7 +104,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { .constructStreamingLogEntry(propsWithBothIds(), log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") - assertResult(s"[queryId = ${testQueryId.take(5)}] " + + assertResult(s"[queryId = $testQueryId] " + s"[batchId = $testBatchId] test message Dummy Context")( result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -117,7 +117,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { .constructStreamingLogEntry(propsWithQueryIdOnly(), log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") - assertResult(s"[queryId = ${testQueryId.take(5)}] test message Dummy Context")(result.message) + assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) assertContextAbsent(result.context, LogKeys.BATCH_ID) assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index c6ffbb574e5e5..48b6d4581fe3d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2903,7 +2903,7 @@ class TaskSetManagerSuite val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) - val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]" + val expectedQueryPrefix = s"[queryId = ${testQueryId}]" val expectedBatchPrefix = s"[batchId = $testBatchId]" // Verify the "Starting" log line includes query Id and batch Id From 395112427ebf116aedec12c3739c6d701c8eae5f Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 9 Apr 2026 17:00:28 +0000 Subject: [PATCH 13/18] address nits in StructuredStreamingIdAwareSchedulerLogging --- ...uredStreamingIdAwareSchedulerLogging.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 327a8836a7a21..de68b7295e48b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.util.{Locale, Properties} +import java.util.{HashMap, Locale, Properties} import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext} @@ -140,8 +140,8 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log val BATCH_ID_KEY = "streaming.sql.batchId" private[scheduler] def constructStreamingLogEntry( - properties: Properties, - entry: LogEntry): LogEntry = { + properties: Properties, + entry: LogEntry): LogEntry = { if (properties == null) { return entry } @@ -159,8 +159,8 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } private[scheduler] def constructStreamingLogEntry( - properties: Properties, - msg: => String): LogEntry = { + properties: Properties, + msg: => String): LogEntry = { if (properties == null) { return new LogEntry( MessageWithContext(msg, java.util.Collections.emptyMap()) @@ -183,10 +183,9 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } private def constructStreamingContext( - queryId: Option[String], - batchId: Option[String]): - java.util.HashMap[String, String] = { - val streamingContext = new java.util.HashMap[String, String]() + queryId: Option[String], + batchId: Option[String]): HashMap[String, String] = { + val streamingContext = new HashMap[String, String]() // MDC places the log key in the context as all lowercase, so we do the same here queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) @@ -194,17 +193,17 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } private def formatMessage( - queryId: Option[String], - batchId: Option[String], - msg: => String): String = { + queryId: Option[String], + batchId: Option[String], + msg: => String): String = { val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg) queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId) } private def formatMessage( - queryId: Option[String], - batchId: Option[String], - msg: => LogEntry): MessageWithContext = { + queryId: Option[String], + batchId: Option[String], + msg: => LogEntry): MessageWithContext = { val msgWithBatchId: MessageWithContext = batchId.map( bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) ).getOrElse(toMessageWithContext(msg)) From 8272bc4d3f5236959e1df05c1053019bdbfceeb5 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Thu, 9 Apr 2026 17:02:37 +0000 Subject: [PATCH 14/18] override properties in TaskSchedulerImpl, update test to use TaskSchedulerImpl to construct TSM --- .../spark/scheduler/TaskSchedulerImpl.scala | 3 ++- .../apache/spark/scheduler/TaskSetManager.scala | 4 ---- .../scheduler/TaskSchedulerImplSuite.scala | 17 ++--------------- .../spark/scheduler/TaskSetManagerSuite.scala | 15 ++++++--------- 4 files changed, 10 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 8872355a503fc..04044e1e7cff8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.TimerTask +import java.util.{Properties, TimerTask} import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong @@ -300,6 +300,7 @@ private[spark] class TaskSchedulerImpl( private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) with StructuredStreamingIdAwareSchedulerLogging { + override protected def properties: Properties = this.taskSet.properties // ensure log name matches the non-streaming version override protected def logName: String = classOf[TaskSetManager].getName } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 823599ff0a093..5c077a7a3bbb8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.nio.ByteBuffer -import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -77,9 +76,6 @@ private[spark] class TaskSetManager( .map { case (t, idx) => t.partitionId -> idx }.toMap val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) - // used in the case of a streaming task set - // where we need the properties for StructuredStreamingIdAwareSchedulerLogging - protected def properties: Properties = taskSet.properties val speculationEnabled = conf.get(SPECULATION_ENABLED) private val efficientTaskProcessMultiplier = diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 3d12ef0eefe41..c9c9e529405ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -28,7 +28,6 @@ import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.reflectiveCalls -import org.apache.logging.log4j.Level import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when} import org.scalatest.concurrent.Eventually @@ -2726,20 +2725,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) - val logAppender = new LogAppender("streaming log name check") - // TSM constructor prints some debug logs we can use - logAppender.setThreshold(Level.DEBUG) - withLogAppender(logAppender, - loggerNames = Seq(classOf[TaskSetManager].getName), - level = Some(Level.DEBUG)) { - val tsm = taskScheduler.createTaskSetManager(taskSet, 1) - assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) - } - // when creating the streaming version we want the log name to match the - // non-streaming baseline case. By confirming our log appender contains - // logs we know the log name is correct - assert(logAppender.loggingEvents.nonEmpty, - "Expected logs under TaskSetManager logger name") + val tsm = taskScheduler.createTaskSetManager(taskSet, 1) + assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) } test("SPARK-56326: Streaming TaskSet without queryId in properties " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 48b6d4581fe3d..ad910568368dd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2870,7 +2870,8 @@ class TaskSetManagerSuite test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " + "messages") { sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc) + val clock = new ManualClock + sched = new FakeTaskScheduler(sc, clock, ("exec1", "host1")) val testQueryId = "test-query-id-1234" val testBatchId = "42" // Create a TaskSet with a non-null Properties containing the streaming metadata. @@ -2878,19 +2879,15 @@ class TaskSetManagerSuite properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY, testQueryId) properties.setProperty(StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY, testBatchId) val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)), - 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) + 0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None) - val clock = new ManualClock val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000) val loggerName = classOf[TaskSetManager].getName withLogAppender(logAppender, loggerNames = Seq(loggerName)) { - // mirrors TaskSchedulerImpl.streamingTaskSetManager - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - with StructuredStreamingIdAwareSchedulerLogging { - // ensure log name matches the non-streaming version - override protected def logName: String = classOf[TaskSetManager].getName - } + // uses TaskSchedulerImpl.streamingTaskSetManager to ensure + // logging and properties is initialized correctly + val manager = sched.createTaskSetManager(taskSet, MAX_TASK_FAILURES) // resourceOffer triggers prepareLaunchingTask which logs "Starting ..." val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 From 6f40341a7e2c685c6a50ac9a352db8ff0a0c6332 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Fri, 10 Apr 2026 16:35:44 +0000 Subject: [PATCH 15/18] handle empty strings in streaming properties --- ...uredStreamingIdAwareSchedulerLogging.scala | 12 +++-- ...treamingIdAwareSchedulerLoggingSuite.scala | 50 +++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index de68b7295e48b..2392cd6e7311c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -147,8 +147,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } // wrap in log entry to defer until log is evaluated new LogEntry({ - val queryId = Option(properties.getProperty(QUERY_ID_KEY)) - val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + val (queryId: Option[String], batchId: Option[String]) = getStreamingProperties(properties) formatMessage( queryId, @@ -168,8 +167,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } new LogEntry({ - val queryId = Option(properties.getProperty(QUERY_ID_KEY)) - val batchId = Option(properties.getProperty(BATCH_ID_KEY)) + val (queryId: Option[String], batchId: Option[String]) = getStreamingProperties(properties) MessageWithContext( formatMessage( @@ -215,4 +213,10 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log private def toMessageWithContext(entry: LogEntry): MessageWithContext = { MessageWithContext(entry.message, entry.context) } + + private def getStreamingProperties(properties: Properties): (Option[String], Option[String]) = { + val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty) + val batchId = Option(properties.getProperty(BATCH_ID_KEY)).filter(_.nonEmpty) + (queryId, batchId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala index f4f7872f8e960..3bf0bdd41948c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -99,6 +99,28 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { assert(result.context.isEmpty) } + test("SPARK-56326: constructStreamingLogEntry with String - empty queryId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, "test message") + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with String - empty batchId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, "test message") + + assertResult(s"[queryId = $testQueryId] test message")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + } + test("SPARK-56326: constructStreamingLogEntry with LogEntry - both queryId and batchId") { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithBothIds(), @@ -145,6 +167,34 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - empty queryId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - empty batchId") { + val props = new Properties() + props.setProperty(QUERY_ID_KEY, testQueryId) + props.setProperty(BATCH_ID_KEY, "") + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(props, + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + + assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } + test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers evaluation") { var evaluated = false val lazyEntry = new LogEntry({ From c54701f046637b3bb6ae8d8cb8d6d12c95f1131c Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 15 Apr 2026 00:43:11 +0000 Subject: [PATCH 16/18] introduce conf for disabling structured streaming logging on the scheduling path --- .../spark/internal/config/package.scala | 8 +++ .../spark/scheduler/SchedulableBuilder.scala | 10 ++- ...uredStreamingIdAwareSchedulerLogging.scala | 71 ++++++++++++------- .../spark/scheduler/TaskSchedulerImpl.scala | 2 + ...treamingIdAwareSchedulerLoggingSuite.scala | 53 ++++++++++---- 5 files changed, 103 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 5edb24a6eb3e8..2f2d614f1a993 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2373,6 +2373,14 @@ package object config { .enumConf(SchedulingMode) .createWithDefault(SchedulingMode.FIFO) + private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED = + ConfigBuilder("spark.scheduler.streaming.idAwareLogging.enabled") + .doc("When true, scheduler log messages for streaming tasks include " + + "the structured streaming query ID and batch ID.") + .version("4.2.0") + .booleanConf + .createWithDefault(true) + private[spark] val SCHEDULER_REVIVE_INTERVAL = ConfigBuilder("spark.scheduler.revive.interval") .version("0.8.1") diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 4d700c41726a1..88770e16dbb12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} +import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -60,6 +60,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext) extends SchedulableBuilder with Logging { + val streamingIdAwareLoggingEnabled: Boolean = + sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL @@ -226,7 +228,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"configuration (schedulingMode: " + log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + - log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}" + log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}", + streamingIdAwareLoggingEnabled ) ) } @@ -236,7 +239,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( properties, log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + - log"${MDC(LogKeys.POOL_NAME, poolName)}" + log"${MDC(LogKeys.POOL_NAME, poolName)}", + streamingIdAwareLoggingEnabled ) ) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 2392cd6e7311c..8f9cd2221e95e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -39,95 +39,116 @@ import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { // we gather the query and batch Id from the properties of a given TaskSet protected def properties: Properties + protected def streamingIdAwareLoggingEnabled: Boolean override protected def logInfo(msg: => String): Unit = super.logInfo( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) override protected def logInfo(entry: LogEntry): Unit = { super.logInfo( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) } override protected def logInfo(msg: => String, t: Throwable): Unit = super.logInfo( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) override protected def logInfo(entry: LogEntry, t: Throwable): Unit = { super.logInfo( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) } override protected def logWarning(msg: => String): Unit = super.logWarning( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) override protected def logWarning(entry: LogEntry): Unit = { super.logWarning( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) } override protected def logWarning(msg: => String, t: Throwable): Unit = super.logWarning( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) override protected def logWarning(entry: LogEntry, t: Throwable): Unit = { super.logWarning( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) } override protected def logDebug(msg: => String): Unit = super.logDebug( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) override protected def logDebug(entry: LogEntry): Unit = { super.logDebug( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) } override protected def logDebug(msg: => String, t: Throwable): Unit = super.logDebug( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) override protected def logDebug(entry: LogEntry, t: Throwable): Unit = { super.logDebug( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) } override protected def logError(msg: => String): Unit = super.logError( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) override protected def logError(entry: LogEntry): Unit = { super.logError( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) } override protected def logError(msg: => String, t: Throwable): Unit = super.logError( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) override protected def logError(entry: LogEntry, t: Throwable): Unit = { super.logError( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) } override protected def logTrace(msg: => String): Unit = super.logTrace( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) override protected def logTrace(entry: LogEntry): Unit = { super.logTrace( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry)) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) } override protected def logTrace(msg: => String, t: Throwable): Unit = super.logTrace( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, msg), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) override protected def logTrace(entry: LogEntry, t: Throwable): Unit = { super.logTrace( - StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(properties, entry), t) + StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) } } @@ -141,8 +162,9 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log private[scheduler] def constructStreamingLogEntry( properties: Properties, - entry: LogEntry): LogEntry = { - if (properties == null) { + entry: LogEntry, + enabled: Boolean): LogEntry = { + if (!enabled || properties == null) { return entry } // wrap in log entry to defer until log is evaluated @@ -159,8 +181,9 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log private[scheduler] def constructStreamingLogEntry( properties: Properties, - msg: => String): LogEntry = { - if (properties == null) { + msg: => String, + enabled: Boolean): LogEntry = { + if (!enabled || properties == null) { return new LogEntry( MessageWithContext(msg, java.util.Collections.emptyMap()) ) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 04044e1e7cff8..cf86497990888 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -301,6 +301,8 @@ private[spark] class TaskSchedulerImpl( new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) with StructuredStreamingIdAwareSchedulerLogging { override protected def properties: Properties = this.taskSet.properties + override protected val streamingIdAwareLoggingEnabled: Boolean = + conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) // ensure log name matches the non-streaming version override protected def logName: String = classOf[TaskSetManager].getName } diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala index 3bf0bdd41948c..b59f140500aca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -66,7 +66,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - both queryId and batchId") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), "test message") + .constructStreamingLogEntry(propsWithBothIds(), "test message", enabled = true) assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( result.message) @@ -76,7 +76,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - only queryId") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message") + .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message", enabled = true) assertResult(s"[queryId = $testQueryId] test message")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -85,7 +85,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - no streaming properties") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(new Properties(), "test message") + .constructStreamingLogEntry(new Properties(), "test message", enabled = true) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -93,7 +93,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - null properties") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(null, "test message") + .constructStreamingLogEntry(null, "test message", enabled = true) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -103,7 +103,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val props = new Properties() props.setProperty(QUERY_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, "test message") + .constructStreamingLogEntry(props, "test message", enabled = true) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -114,7 +114,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { props.setProperty(QUERY_ID_KEY, testQueryId) props.setProperty(BATCH_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, "test message") + .constructStreamingLogEntry(props, "test message", enabled = true) assertResult(s"[queryId = $testQueryId] test message")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -124,7 +124,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with LogEntry - both queryId and batchId") { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithBothIds(), - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult(s"[queryId = $testQueryId] " + s"[batchId = $testBatchId] test message Dummy Context")( @@ -137,7 +138,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with LogEntry - only queryId") { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithQueryIdOnly(), - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -148,7 +150,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with LogEntry - no streaming properties") { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(new Properties(), - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -159,7 +162,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with LogEntry - null properties") { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(null, - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -173,7 +177,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { props.setProperty(QUERY_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(props, - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -187,7 +192,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { props.setProperty(BATCH_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(props, - log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}") + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true) assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -203,7 +209,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { }) val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), lazyEntry) + .constructStreamingLogEntry(propsWithBothIds(), lazyEntry, enabled = true) // Work should be deferred assert(!evaluated, @@ -227,7 +233,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, entry) + .constructStreamingLogEntry(props, entry, enabled = true) assert(!propertiesAccessed, "Properties should not be accessed during constructStreamingLogEntry") @@ -236,4 +242,23 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { assert(propertiesAccessed, "Properties should be accessed when .message is called") } + + test("SPARK-56326: constructStreamingLogEntry with String - disabled skips enrichment") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), "test message", enabled = false) + + assertResult("test message")(result.message) + assert(result.context.isEmpty) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - disabled skips enrichment") { + val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry(propsWithBothIds(), entry, enabled = false) + + assertResult("test message Dummy Context")(result.message) + assertContextAbsent(result.context, LogKeys.QUERY_ID) + assertContextAbsent(result.context, LogKeys.BATCH_ID) + assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") + } } From 5607babf5c66eecf9db7ae5381fecb732da23d23 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 15 Apr 2026 17:41:30 +0000 Subject: [PATCH 17/18] add SS logging conf to configs-without-binding-policy-exceptions --- .../configs-without-binding-policy-exceptions | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index a84b0fc3b6c77..c2b0fa46cf956 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -319,6 +319,7 @@ spark.scheduler.numCancelledJobGroupsToTrack spark.scheduler.resource.profileMergeConflicts spark.scheduler.revive.interval spark.scheduler.stage.legacyAbortAfterKillTasks +spark.scheduler.streaming.idAwareLogging.enabled spark.security.credentials.renewalRatio spark.security.credentials.retryWait spark.serializer From 47368d4c20d7a0da703e0304bddd5794d79ba1e0 Mon Sep 17 00:00:00 2001 From: Brooks Walls Date: Wed, 15 Apr 2026 18:20:46 +0000 Subject: [PATCH 18/18] introduce config for controlling truncation of queryId in structured streaming scheduling logs --- .../spark/internal/config/package.scala | 8 ++ .../spark/scheduler/SchedulableBuilder.scala | 10 ++- ...uredStreamingIdAwareSchedulerLogging.scala | 85 +++++++++++++------ .../spark/scheduler/TaskSchedulerImpl.scala | 2 + .../apache/spark/scheduler/PoolSuite.scala | 4 +- ...treamingIdAwareSchedulerLoggingSuite.scala | 85 +++++++++++++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 4 +- .../configs-without-binding-policy-exceptions | 1 + 8 files changed, 152 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2f2d614f1a993..f768e675dcd00 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2381,6 +2381,14 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH = + ConfigBuilder("spark.scheduler.streaming.idAwareLogging.queryIdLength") + .doc("Maximum number of characters of the streaming query ID to include " + + "in scheduler log messages. Set to -1 to include the full query ID.") + .version("4.2.0") + .intConf + .createWithDefault(5) + private[spark] val SCHEDULER_REVIVE_INTERVAL = ConfigBuilder("spark.scheduler.revive.interval") .version("0.8.1") diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 88770e16dbb12..dc853d15206dd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED} +import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED, STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -62,6 +62,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext val streamingIdAwareLoggingEnabled: Boolean = sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) + val streamingQueryIdLength: Int = + sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH) val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL @@ -229,7 +231,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " + log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " + log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}", - streamingIdAwareLoggingEnabled + streamingIdAwareLoggingEnabled, + streamingQueryIdLength ) ) } @@ -240,7 +243,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext properties, log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " + log"${MDC(LogKeys.POOL_NAME, poolName)}", - streamingIdAwareLoggingEnabled + streamingIdAwareLoggingEnabled, + streamingQueryIdLength ) ) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala index 8f9cd2221e95e..ef76e139188a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala @@ -40,115 +40,136 @@ private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logg // we gather the query and batch Id from the properties of a given TaskSet protected def properties: Properties protected def streamingIdAwareLoggingEnabled: Boolean + protected def streamingQueryIdLength: Int override protected def logInfo(msg: => String): Unit = super.logInfo( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) override protected def logInfo(entry: LogEntry): Unit = { super.logInfo( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) } override protected def logInfo(msg: => String, t: Throwable): Unit = super.logInfo( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) override protected def logInfo(entry: LogEntry, t: Throwable): Unit = { super.logInfo( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) } override protected def logWarning(msg: => String): Unit = super.logWarning( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) override protected def logWarning(entry: LogEntry): Unit = { super.logWarning( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) } override protected def logWarning(msg: => String, t: Throwable): Unit = super.logWarning( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) override protected def logWarning(entry: LogEntry, t: Throwable): Unit = { super.logWarning( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) } override protected def logDebug(msg: => String): Unit = super.logDebug( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) override protected def logDebug(entry: LogEntry): Unit = { super.logDebug( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) } override protected def logDebug(msg: => String, t: Throwable): Unit = super.logDebug( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) override protected def logDebug(entry: LogEntry, t: Throwable): Unit = { super.logDebug( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) } override protected def logError(msg: => String): Unit = super.logError( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) override protected def logError(entry: LogEntry): Unit = { super.logError( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) } override protected def logError(msg: => String, t: Throwable): Unit = super.logError( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) override protected def logError(entry: LogEntry, t: Throwable): Unit = { super.logError( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) } override protected def logTrace(msg: => String): Unit = super.logTrace( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) override protected def logTrace(entry: LogEntry): Unit = { super.logTrace( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled)) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength)) } override protected def logTrace(msg: => String, t: Throwable): Unit = super.logTrace( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, msg, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) override protected def logTrace(entry: LogEntry, t: Throwable): Unit = { super.logTrace( StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(properties, entry, streamingIdAwareLoggingEnabled), t) + .constructStreamingLogEntry( + properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t) } } @@ -163,13 +184,15 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log private[scheduler] def constructStreamingLogEntry( properties: Properties, entry: LogEntry, - enabled: Boolean): LogEntry = { + enabled: Boolean, + queryIdLength: Int): LogEntry = { if (!enabled || properties == null) { return entry } // wrap in log entry to defer until log is evaluated new LogEntry({ - val (queryId: Option[String], batchId: Option[String]) = getStreamingProperties(properties) + val (queryId: Option[String], batchId: Option[String]) = + getStreamingProperties(properties, queryIdLength) formatMessage( queryId, @@ -182,7 +205,8 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log private[scheduler] def constructStreamingLogEntry( properties: Properties, msg: => String, - enabled: Boolean): LogEntry = { + enabled: Boolean, + queryIdLength: Int): LogEntry = { if (!enabled || properties == null) { return new LogEntry( MessageWithContext(msg, java.util.Collections.emptyMap()) @@ -190,7 +214,8 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log } new LogEntry({ - val (queryId: Option[String], batchId: Option[String]) = getStreamingProperties(properties) + val (queryId: Option[String], batchId: Option[String]) = + getStreamingProperties(properties, queryIdLength) MessageWithContext( formatMessage( @@ -237,8 +262,16 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log MessageWithContext(entry.message, entry.context) } - private def getStreamingProperties(properties: Properties): (Option[String], Option[String]) = { - val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty) + private def getStreamingProperties( + properties: Properties, + queryIdLength: Int): (Option[String], Option[String]) = { + val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty).map { id => + if (queryIdLength == -1) { + id + } else { + id.take(queryIdLength) + } + } val batchId = Option(properties.getProperty(BATCH_ID_KEY)).filter(_.nonEmpty) (queryId, batchId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cf86497990888..618c8eb459026 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -303,6 +303,8 @@ private[spark] class TaskSchedulerImpl( override protected def properties: Properties = this.taskSet.properties override protected val streamingIdAwareLoggingEnabled: Boolean = conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED) + override protected val streamingQueryIdLength: Int = + conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH) // ensure log name matches the non-streaming version override protected def logName: String = classOf[TaskSetManager].getName } diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 297998308092a..37de8338ad905 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -395,7 +395,9 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { } val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) - val expectedQueryPrefix = s"[queryId = ${testQueryId}]" + // default queryIdLength is 5, so the query ID is truncated + val truncatedQueryId = testQueryId.take(5) + val expectedQueryPrefix = s"[queryId = $truncatedQueryId]" val expectedBatchPrefix = s"[batchId = $testBatchId]" val addedLogs = logs.filter(msg => msg.contains("Added task set") && diff --git a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala index b59f140500aca..52e11e4d14d6f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala @@ -66,7 +66,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - both queryId and batchId") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), "test message", enabled = true) + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = -1) assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( result.message) @@ -76,7 +77,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - only queryId") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithQueryIdOnly(), "test message", enabled = true) + .constructStreamingLogEntry( + propsWithQueryIdOnly(), "test message", enabled = true, queryIdLength = -1) assertResult(s"[queryId = $testQueryId] test message")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -85,7 +87,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - no streaming properties") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(new Properties(), "test message", enabled = true) + .constructStreamingLogEntry( + new Properties(), "test message", enabled = true, queryIdLength = -1) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -93,7 +96,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - null properties") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(null, "test message", enabled = true) + .constructStreamingLogEntry( + null, "test message", enabled = true, queryIdLength = -1) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -103,7 +107,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val props = new Properties() props.setProperty(QUERY_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, "test message", enabled = true) + .constructStreamingLogEntry(props, "test message", enabled = true, queryIdLength = -1) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -114,7 +118,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { props.setProperty(QUERY_ID_KEY, testQueryId) props.setProperty(BATCH_ID_KEY, "") val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, "test message", enabled = true) + .constructStreamingLogEntry(props, "test message", enabled = true, queryIdLength = -1) assertResult(s"[queryId = $testQueryId] test message")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -125,7 +129,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithBothIds(), log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult(s"[queryId = $testQueryId] " + s"[batchId = $testBatchId] test message Dummy Context")( @@ -139,7 +144,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(propsWithQueryIdOnly(), log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -151,7 +157,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(new Properties(), log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -163,7 +170,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(null, log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -178,7 +186,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(props, log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) @@ -193,7 +202,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val result = StructuredStreamingIdAwareSchedulerLogging .constructStreamingLogEntry(props, log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", - enabled = true) + enabled = true, + queryIdLength = -1) assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message) assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) @@ -209,7 +219,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { }) val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), lazyEntry, enabled = true) + .constructStreamingLogEntry( + propsWithBothIds(), lazyEntry, enabled = true, queryIdLength = -1) // Work should be deferred assert(!evaluated, @@ -233,7 +244,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(props, entry, enabled = true) + .constructStreamingLogEntry( + props, entry, enabled = true, queryIdLength = -1) assert(!propertiesAccessed, "Properties should not be accessed during constructStreamingLogEntry") @@ -245,7 +257,8 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with String - disabled skips enrichment") { val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), "test message", enabled = false) + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = false, queryIdLength = -1) assertResult("test message")(result.message) assert(result.context.isEmpty) @@ -254,11 +267,51 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite { test("SPARK-56326: constructStreamingLogEntry with LogEntry - disabled skips enrichment") { val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" val result = StructuredStreamingIdAwareSchedulerLogging - .constructStreamingLogEntry(propsWithBothIds(), entry, enabled = false) + .constructStreamingLogEntry( + propsWithBothIds(), entry, enabled = false, queryIdLength = -1) assertResult("test message Dummy Context")(result.message) assertContextAbsent(result.context, LogKeys.QUERY_ID) assertContextAbsent(result.context, LogKeys.BATCH_ID) assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context") } + + test("SPARK-56326: constructStreamingLogEntry with String - queryIdLength truncates queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = 5) + + val truncatedId = testQueryId.take(5) + assertResult(s"[queryId = $truncatedId] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, truncatedId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry with LogEntry - queryIdLength truncates queryId") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), + log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}", + enabled = true, + queryIdLength = 5) + + val truncatedId = testQueryId.take(5) + assertResult(s"[queryId = $truncatedId] " + + s"[batchId = $testBatchId] test message Dummy Context")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, truncatedId) + assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId) + } + + test("SPARK-56326: constructStreamingLogEntry - " + + "queryIdLength greater than id length returns full id") { + val result = StructuredStreamingIdAwareSchedulerLogging + .constructStreamingLogEntry( + propsWithBothIds(), "test message", enabled = true, queryIdLength = 1000) + + assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")( + result.message) + assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ad910568368dd..09901ead9a68c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -2900,7 +2900,9 @@ class TaskSetManagerSuite val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) - val expectedQueryPrefix = s"[queryId = ${testQueryId}]" + // default queryIdLength is 5, so the query ID is truncated + val truncatedQueryId = testQueryId.take(5) + val expectedQueryPrefix = s"[queryId = $truncatedQueryId]" val expectedBatchPrefix = s"[batchId = $testBatchId]" // Verify the "Starting" log line includes query Id and batch Id diff --git a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions index c2b0fa46cf956..d49d0eab2fe77 100644 --- a/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions +++ b/sql/hive/src/test/resources/conf/binding-policy-exceptions/configs-without-binding-policy-exceptions @@ -320,6 +320,7 @@ spark.scheduler.resource.profileMergeConflicts spark.scheduler.revive.interval spark.scheduler.stage.legacyAbortAfterKillTasks spark.scheduler.streaming.idAwareLogging.enabled +spark.scheduler.streaming.idAwareLogging.queryIdLength spark.security.credentials.renewalRatio spark.security.credentials.retryWait spark.serializer