Skip to content

Commit ea8b6d6

Browse files
BrooksWallsHeartSaVioR
authored andcommitted
[SPARK-56326][SS][CORE] Include streaming query and batch ids in scheduling logs
### What changes were proposed in this pull request? This change adds the streaming query Id and batch Id to some of the scheduling logs in order to aid in debugging structured streaming queries. The following log lines have been updated to include the query and batch Id: - All log lines in TaskSetManager. Examples: - `26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Starting task 0.0 in stage 5.0 (TID 129) (...,executor driver, partition 0, PROCESS_LOCAL, 9728 bytes)` - `26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Finished task 6.0 in stage 5.0 (TID 135) in 12 ms on ...(executor driver) (6/32)` - One log in SchedulableBuilder: - `26/04/02 16:39:09 INFO FairSchedulableBuilder: [queryId = f5660] [batchId = 5] Added task set TaskSet_5.0 to pool default` ### Why are the changes needed? When debugging multiple streaming queries running at the same time it can be difficult to go through the scheduling logs. By including the query and batch Id it is much easier to isolate logs to specific queries and batches. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests were added. Also manually tested by running the spark shell and redirecting info logs to a temporary file. Then ran a basic streaming query and grepped the temp file for the desired log lines to ensure they included the query and batch id. Also confirmed a batch query ran in the shell does not include the query and batch Id in its logs. ### Was this patch authored or co-authored using generative AI tooling? yes, coauthored Generated-by: claude Closes #55166 from BrooksWalls/SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs. Authored-by: Brooks Walls <brooks.walls@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 96e0f8f commit ea8b6d6

9 files changed

Lines changed: 804 additions & 15 deletions

File tree

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2373,6 +2373,22 @@ package object config {
23732373
.enumConf(SchedulingMode)
23742374
.createWithDefault(SchedulingMode.FIFO)
23752375

2376+
private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED =
2377+
ConfigBuilder("spark.scheduler.streaming.idAwareLogging.enabled")
2378+
.doc("When true, scheduler log messages for streaming tasks include " +
2379+
"the structured streaming query ID and batch ID.")
2380+
.version("4.2.0")
2381+
.booleanConf
2382+
.createWithDefault(true)
2383+
2384+
private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH =
2385+
ConfigBuilder("spark.scheduler.streaming.idAwareLogging.queryIdLength")
2386+
.doc("Maximum number of characters of the streaming query ID to include " +
2387+
"in scheduler log messages. Set to -1 to include the full query ID.")
2388+
.version("4.2.0")
2389+
.intConf
2390+
.createWithDefault(5)
2391+
23762392
private[spark] val SCHEDULER_REVIVE_INTERVAL =
23772393
ConfigBuilder("spark.scheduler.revive.interval")
23782394
.version("0.8.1")

core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ import scala.xml.{Node, XML}
2626
import org.apache.hadoop.fs.Path
2727

2828
import org.apache.spark.SparkContext
29-
import org.apache.spark.internal.Logging
30-
import org.apache.spark.internal.LogKeys
29+
import org.apache.spark.internal.{Logging, LogKeys}
3130
import org.apache.spark.internal.LogKeys._
32-
import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE}
31+
import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE, STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED, STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH}
3332
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
3433
import org.apache.spark.util.Utils
3534

@@ -61,6 +60,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
6160
private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
6261
extends SchedulableBuilder with Logging {
6362

63+
val streamingIdAwareLoggingEnabled: Boolean =
64+
sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_ENABLED)
65+
val streamingQueryIdLength: Int =
66+
sc.conf.get(STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH)
6467
val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
6568
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
6669
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
@@ -216,18 +219,33 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext
216219
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
217220
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
218221
rootPool.addSchedulable(parentPool)
219-
logWarning(log"A job was submitted with scheduler pool " +
220-
log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " +
221-
log"configured. This can happen when the file that pools are read from isn't set, or " +
222-
log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " +
223-
log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " +
224-
log"configuration (schedulingMode: " +
225-
log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " +
226-
log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " +
227-
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}")
222+
logWarning(
223+
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
224+
properties,
225+
log"A job was submitted with scheduler pool " +
226+
log"${MDC(SCHEDULER_POOL_NAME, poolName)}, which has not been " +
227+
log"configured. This can happen when the file that pools are read from isn't set, or " +
228+
log"when that file doesn't contain ${MDC(POOL_NAME, poolName)}. " +
229+
log"Created ${MDC(CREATED_POOL_NAME, poolName)} with default " +
230+
log"configuration (schedulingMode: " +
231+
log"${MDC(LogKeys.SCHEDULING_MODE, DEFAULT_SCHEDULING_MODE)}, " +
232+
log"minShare: ${MDC(MIN_SHARE, DEFAULT_MINIMUM_SHARE)}, " +
233+
log"weight: ${MDC(WEIGHT, DEFAULT_WEIGHT)}",
234+
streamingIdAwareLoggingEnabled,
235+
streamingQueryIdLength
236+
)
237+
)
228238
}
229239
parentPool.addSchedulable(manager)
230-
logInfo(log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " +
231-
log"${MDC(LogKeys.POOL_NAME, poolName)}")
240+
241+
logInfo(
242+
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
243+
properties,
244+
log"Added task set ${MDC(LogKeys.TASK_SET_MANAGER, manager.name)} tasks to pool " +
245+
log"${MDC(LogKeys.POOL_NAME, poolName)}",
246+
streamingIdAwareLoggingEnabled,
247+
streamingQueryIdLength
248+
)
249+
)
232250
}
233251
}
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.util.{HashMap, Locale, Properties}
21+
22+
import org.apache.spark.internal.{LogEntry, Logging, LogKeys, MessageWithContext}
23+
24+
/**
25+
* A logging trait for scheduler components where log messages should include
26+
* structured streaming identifiers (query ID and batch ID).
27+
*
28+
* Streaming execution sets these identifiers via
29+
* [[org.apache.spark.SparkContext#setLocalProperty]], which is thread-local.
30+
* Scheduler code typically runs on a different thread (e.g. the
31+
* task-scheduler-event-loop-worker), so `getLocalProperty` would not have
32+
* the streaming context. This trait instead reads the identifiers from the
33+
* task's [[java.util.Properties]], which are propagated with the
34+
* [[org.apache.spark.scheduler.TaskSet]] across thread boundaries.
35+
*
36+
* Mix this trait into any scheduler component that has access to task
37+
* properties and needs streaming-aware log output.
38+
*/
39+
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
40+
// we gather the query and batch Id from the properties of a given TaskSet
41+
protected def properties: Properties
42+
protected def streamingIdAwareLoggingEnabled: Boolean
43+
protected def streamingQueryIdLength: Int
44+
45+
override protected def logInfo(msg: => String): Unit =
46+
super.logInfo(
47+
StructuredStreamingIdAwareSchedulerLogging
48+
.constructStreamingLogEntry(
49+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
50+
51+
override protected def logInfo(entry: LogEntry): Unit = {
52+
super.logInfo(
53+
StructuredStreamingIdAwareSchedulerLogging
54+
.constructStreamingLogEntry(
55+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
56+
}
57+
58+
override protected def logInfo(msg: => String, t: Throwable): Unit =
59+
super.logInfo(
60+
StructuredStreamingIdAwareSchedulerLogging
61+
.constructStreamingLogEntry(
62+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
63+
64+
override protected def logInfo(entry: LogEntry, t: Throwable): Unit = {
65+
super.logInfo(
66+
StructuredStreamingIdAwareSchedulerLogging
67+
.constructStreamingLogEntry(
68+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
69+
}
70+
71+
override protected def logWarning(msg: => String): Unit =
72+
super.logWarning(
73+
StructuredStreamingIdAwareSchedulerLogging
74+
.constructStreamingLogEntry(
75+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
76+
77+
override protected def logWarning(entry: LogEntry): Unit = {
78+
super.logWarning(
79+
StructuredStreamingIdAwareSchedulerLogging
80+
.constructStreamingLogEntry(
81+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
82+
}
83+
84+
override protected def logWarning(msg: => String, t: Throwable): Unit =
85+
super.logWarning(
86+
StructuredStreamingIdAwareSchedulerLogging
87+
.constructStreamingLogEntry(
88+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
89+
90+
override protected def logWarning(entry: LogEntry, t: Throwable): Unit = {
91+
super.logWarning(
92+
StructuredStreamingIdAwareSchedulerLogging
93+
.constructStreamingLogEntry(
94+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
95+
}
96+
97+
override protected def logDebug(msg: => String): Unit =
98+
super.logDebug(
99+
StructuredStreamingIdAwareSchedulerLogging
100+
.constructStreamingLogEntry(
101+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
102+
103+
override protected def logDebug(entry: LogEntry): Unit = {
104+
super.logDebug(
105+
StructuredStreamingIdAwareSchedulerLogging
106+
.constructStreamingLogEntry(
107+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
108+
}
109+
110+
override protected def logDebug(msg: => String, t: Throwable): Unit =
111+
super.logDebug(
112+
StructuredStreamingIdAwareSchedulerLogging
113+
.constructStreamingLogEntry(
114+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
115+
116+
override protected def logDebug(entry: LogEntry, t: Throwable): Unit = {
117+
super.logDebug(
118+
StructuredStreamingIdAwareSchedulerLogging
119+
.constructStreamingLogEntry(
120+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
121+
}
122+
123+
override protected def logError(msg: => String): Unit =
124+
super.logError(
125+
StructuredStreamingIdAwareSchedulerLogging
126+
.constructStreamingLogEntry(
127+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
128+
129+
override protected def logError(entry: LogEntry): Unit = {
130+
super.logError(
131+
StructuredStreamingIdAwareSchedulerLogging
132+
.constructStreamingLogEntry(
133+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
134+
}
135+
136+
override protected def logError(msg: => String, t: Throwable): Unit =
137+
super.logError(
138+
StructuredStreamingIdAwareSchedulerLogging
139+
.constructStreamingLogEntry(
140+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
141+
142+
override protected def logError(entry: LogEntry, t: Throwable): Unit = {
143+
super.logError(
144+
StructuredStreamingIdAwareSchedulerLogging
145+
.constructStreamingLogEntry(
146+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
147+
}
148+
149+
override protected def logTrace(msg: => String): Unit =
150+
super.logTrace(
151+
StructuredStreamingIdAwareSchedulerLogging
152+
.constructStreamingLogEntry(
153+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
154+
155+
override protected def logTrace(entry: LogEntry): Unit = {
156+
super.logTrace(
157+
StructuredStreamingIdAwareSchedulerLogging
158+
.constructStreamingLogEntry(
159+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength))
160+
}
161+
162+
override protected def logTrace(msg: => String, t: Throwable): Unit =
163+
super.logTrace(
164+
StructuredStreamingIdAwareSchedulerLogging
165+
.constructStreamingLogEntry(
166+
properties, msg, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
167+
168+
override protected def logTrace(entry: LogEntry, t: Throwable): Unit = {
169+
super.logTrace(
170+
StructuredStreamingIdAwareSchedulerLogging
171+
.constructStreamingLogEntry(
172+
properties, entry, streamingIdAwareLoggingEnabled, streamingQueryIdLength), t)
173+
}
174+
}
175+
176+
/**
177+
* Helpers for constructing log entries enriched with structured streaming
178+
* identifiers extracted from task properties.
179+
*/
180+
private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging {
181+
val QUERY_ID_KEY = "sql.streaming.queryId"
182+
val BATCH_ID_KEY = "streaming.sql.batchId"
183+
184+
private[scheduler] def constructStreamingLogEntry(
185+
properties: Properties,
186+
entry: LogEntry,
187+
enabled: Boolean,
188+
queryIdLength: Int): LogEntry = {
189+
if (!enabled || properties == null) {
190+
return entry
191+
}
192+
// wrap in log entry to defer until log is evaluated
193+
new LogEntry({
194+
val (queryId: Option[String], batchId: Option[String]) =
195+
getStreamingProperties(properties, queryIdLength)
196+
197+
formatMessage(
198+
queryId,
199+
batchId,
200+
entry
201+
)
202+
})
203+
}
204+
205+
private[scheduler] def constructStreamingLogEntry(
206+
properties: Properties,
207+
msg: => String,
208+
enabled: Boolean,
209+
queryIdLength: Int): LogEntry = {
210+
if (!enabled || properties == null) {
211+
return new LogEntry(
212+
MessageWithContext(msg, java.util.Collections.emptyMap())
213+
)
214+
}
215+
216+
new LogEntry({
217+
val (queryId: Option[String], batchId: Option[String]) =
218+
getStreamingProperties(properties, queryIdLength)
219+
220+
MessageWithContext(
221+
formatMessage(
222+
queryId,
223+
batchId,
224+
msg
225+
),
226+
constructStreamingContext(queryId, batchId)
227+
)
228+
})
229+
}
230+
231+
private def constructStreamingContext(
232+
queryId: Option[String],
233+
batchId: Option[String]): HashMap[String, String] = {
234+
val streamingContext = new HashMap[String, String]()
235+
// MDC places the log key in the context as all lowercase, so we do the same here
236+
queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _))
237+
batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _))
238+
streamingContext
239+
}
240+
241+
private def formatMessage(
242+
queryId: Option[String],
243+
batchId: Option[String],
244+
msg: => String): String = {
245+
val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg)
246+
queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId)
247+
}
248+
249+
private def formatMessage(
250+
queryId: Option[String],
251+
batchId: Option[String],
252+
msg: => LogEntry): MessageWithContext = {
253+
val msgWithBatchId: MessageWithContext = batchId.map(
254+
bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg)
255+
).getOrElse(toMessageWithContext(msg))
256+
queryId.map(
257+
qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId
258+
).getOrElse(msgWithBatchId)
259+
}
260+
261+
private def toMessageWithContext(entry: LogEntry): MessageWithContext = {
262+
MessageWithContext(entry.message, entry.context)
263+
}
264+
265+
private def getStreamingProperties(
266+
properties: Properties,
267+
queryIdLength: Int): (Option[String], Option[String]) = {
268+
val queryId = Option(properties.getProperty(QUERY_ID_KEY)).filter(_.nonEmpty).map { id =>
269+
if (queryIdLength == -1) {
270+
id
271+
} else {
272+
id.take(queryIdLength)
273+
}
274+
}
275+
val batchId = Option(properties.getProperty(BATCH_ID_KEY)).filter(_.nonEmpty)
276+
(queryId, batchId)
277+
}
278+
}

0 commit comments

Comments
 (0)