[SPARK-56326][SS] Include streaming query and batch ids in scheduling logs#55166
[SPARK-56326][SS] Include streaming query and batch ids in scheduling logs#55166BrooksWalls wants to merge 14 commits intoapache:masterfrom
Conversation
dichlorodiphen
left a comment
There was a problem hiding this comment.
Generally looks good
10d7760 to
92d2f95
Compare
| * Mix this trait into any scheduler component that has access to task | ||
| * properties and needs streaming-aware log output. | ||
| */ | ||
| private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { |
There was a problem hiding this comment.
Use a trait here so all logs published from TaskSetManager will include the query and batch Id when present
| */ | ||
| private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging { | ||
| // we gather the query and batch Id from the properties of a given TaskSet | ||
| protected def properties: Properties |
There was a problem hiding this comment.
Since we can't rely on thread local properties, we need to gather the query and batch Id from the taskSet's properties, this must be set by class which mixes in the trait
| * Helpers for constructing log entries enriched with structured streaming | ||
| * identifiers extracted from task properties. | ||
| */ | ||
| private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging { |
There was a problem hiding this comment.
uses a companion object here so that we can call the methods from SchedulableBuilder which can not set one Properties object at construction as it's shared across tasks
| // formatMessage truncates the queryId for readability | ||
| // so we use a blank messageWithContext to overwrite the full query Id to the context | ||
| formatMessage( | ||
| queryId, | ||
| batchId, | ||
| entry | ||
| ) + MessageWithContext("", constructStreamingContext(queryId, batchId)) |
There was a problem hiding this comment.
This is a little clunky but I wanted to truncate the query Id in the outputted log line so that its more readable as you scan through, but still have the full query id in the log context. To do that we use a blank log line with the query context hashmap so it overrides the truncated query Id.
| // MDC places the log key in the context as all lowercase, so we do the same here | ||
| queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _)) | ||
| batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _)) |
There was a problem hiding this comment.
I'm not sure if the lowercase is necessary here or not, but wanted to match the behavior of the log interpolator
| healthTracker: Option[HealthTracker] = None, | ||
| clock: Clock = new SystemClock()) extends Schedulable with Logging { | ||
| clock: Clock = new SystemClock()) | ||
| extends Schedulable with StructuredStreamingIdAwareSchedulerLogging { |
There was a problem hiding this comment.
Since we extend the StructuredStreamingIdAwareSchedulerLogging instead of logging, all logs published will include the query and batch Id when handling a streaming query TaskSet
| log"${MDC(LogKeys.POOL_NAME, poolName)}") | ||
|
|
||
| logInfo( | ||
| StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry( |
There was a problem hiding this comment.
Like I said above, since this method is called for many different TaskSets we have to use the companion object's method
|
@jiangxb1987 @Ngone51 Maybe logging might take (very) slight more time than before, hence if we think it's on critical path (I can't judge) we should be very careful. I'd love to hear from experts; I'd give this a go when I have at least one approval from CORE module experts. Thanks in advance! |
| if (isStreamingTaskSet(taskSet)) { | ||
| streamingTaskSetManager(taskSet, maxTaskFailures) |
There was a problem hiding this comment.
now only TaskSets which are for streaming queries will get the streaming log line mixin applied. This way we avoid any overhead on the non-streaming path.
| // ensure log name matches the non-streaming version | ||
| override protected def logName: String = classOf[TaskSetManager].getName |
There was a problem hiding this comment.
this is done to ensure the streaming version has the exact same logName as the non-streaming version. We could also move this to StructuredStreamingIdAwareSchedulerLogging but it would look a little different
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
protected def properties: Properties
override protected def logName: String =
this.getClass.getSuperclass.getName.stripSuffix("$")
}
There was a problem hiding this comment.
without one of these 2 options the logname will be something like org.apache.spark.scheduler.TaskSchedulerImpl$$anon$1 just for streamingTaskSetManagers since we are doing the inline mixin
| val logAppender = new LogAppender("streaming log name check") | ||
| // TSM constructor prints some debug logs we can use | ||
| logAppender.setThreshold(Level.DEBUG) | ||
| withLogAppender(logAppender, | ||
| loggerNames = Seq(classOf[TaskSetManager].getName), | ||
| level = Some(Level.DEBUG)) { | ||
| val tsm = taskScheduler.createTaskSetManager(taskSet, 1) | ||
| assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging]) | ||
| } | ||
| // when creating the streaming version we want the log name to match the | ||
| // non-streaming baseline case. By confirming our log appender contains | ||
| // logs we know the log name is correct | ||
| assert(logAppender.loggingEvents.nonEmpty, | ||
| "Expected logs under TaskSetManager logger name") |
There was a problem hiding this comment.
I don't really like the way this test is asserting the logName since it relies on some debug log lines that get published in TaskSetManager's constructor, which, if removed, would make this test fail for no reason. But I don't see a clearly better approach.
The reason we check the logs is to ensure that the code in TaskSchedulerImpl is correctly setting the log name for the streaming version so it matches the non streaming version. We have a few options
- leave what is here and accept that we rely on unrelated debugging logs
- manually call a method on TaskSetManager that publishes a log (still relies on an unrelated log)
- move the log name override to the trait like
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
protected def properties: Properties
override protected def logName: String =
this.getClass.getSuperclass.getName.stripSuffix("$")
}
- just don't have a test which covers the production code for setting the log name, since we have a test in TaskSetManagerSuite which mirrors the code in TaskSetSchedulerImpl and confirms the log name is correct
| return entry | ||
| } | ||
| // wrap in log entry to defer until log is evaluated | ||
| new LogEntry({ |
There was a problem hiding this comment.
we wrap this in a LogEntry since Claude pointed out that we were forcing eager evaluation of the provided logEntry even if the logging level was disabled in the environment. By wrapping everything in a LogEntry, now the logic is only called when the logging level is enabled for the environment. This is important for things like debugging and trace logs
| bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg) | ||
| ).getOrElse(toMessageWithContext(msg)) | ||
| queryId.map( | ||
| qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId |
There was a problem hiding this comment.
I ended up removing the truncation of the query Id here. The reason is that, before, when we added the hashmap to the context containing the full query Id, any log renderer which used the log context would place the full query Id into the log message anyways. Ultimately I think it's probably better to have the full query Id in the log context and subsequently the log, then to have the truncated version. Open to reverting this change back if others feel differently.
There was a problem hiding this comment.
Also the code is simpler without the truncation
2518b31 to
0b0f5f1
Compare
| test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers evaluation") { | ||
| var evaluated = false | ||
| val lazyEntry = new LogEntry({ | ||
| evaluated = true | ||
| MessageWithContext("lazy message", java.util.Collections.emptyMap()) | ||
| }) | ||
|
|
||
| val result = StructuredStreamingIdAwareSchedulerLogging | ||
| .constructStreamingLogEntry(propsWithBothIds(), lazyEntry) | ||
|
|
||
| // Work should be deferred | ||
| assert(!evaluated, | ||
| "LogEntry should not be evaluated during constructStreamingLogEntry") | ||
|
|
||
| // Accessing .message triggers evaluation | ||
| result.message | ||
| assert(evaluated, "LogEntry should be evaluated when .message is accessed") | ||
| } | ||
|
|
||
| test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers property access") { | ||
| var propertiesAccessed = false | ||
| val props = new Properties() { | ||
| override def getProperty(key: String): String = { | ||
| propertiesAccessed = true | ||
| super.getProperty(key) | ||
| } | ||
| } | ||
| props.setProperty(QUERY_ID_KEY, testQueryId) | ||
| props.setProperty(BATCH_ID_KEY, testBatchId) | ||
|
|
||
| val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}" | ||
| val result = StructuredStreamingIdAwareSchedulerLogging | ||
| .constructStreamingLogEntry(props, entry) | ||
|
|
||
| assert(!propertiesAccessed, | ||
| "Properties should not be accessed during constructStreamingLogEntry") | ||
|
|
||
| result.message | ||
| assert(propertiesAccessed, | ||
| "Properties should be accessed when .message is called") | ||
| } |
There was a problem hiding this comment.
new tests for that eager evaluation thing I mentioned above
| properties: Properties, | ||
| msg: => String): LogEntry = { |
|
|
||
| private def constructStreamingContext( | ||
| queryId: Option[String], | ||
| batchId: Option[String]): |
| java.util.HashMap[String, String] = { | ||
| val streamingContext = new java.util.HashMap[String, String]() |
There was a problem hiding this comment.
Could you add it to import list?
| private def formatMessage( | ||
| queryId: Option[String], | ||
| batchId: Option[String], | ||
| msg: => String): String = { |
| private def formatMessage( | ||
| queryId: Option[String], | ||
| batchId: Option[String], | ||
| msg: => LogEntry): MessageWithContext = { |
| private[scheduler] def constructStreamingLogEntry( | ||
| properties: Properties, | ||
| msg: => String): LogEntry = { | ||
| if (properties == null) { |
There was a problem hiding this comment.
Shall we also check QUERY_ID_KEY is non-empty?
There was a problem hiding this comment.
We could check it here. I didn't so far since it would mean checking the properties for query Id on every log call even if that log level is disabled. The overhead of the hashmap lookup is pretty small so it's likely okay but also our code does handle the case where neither queryId or batchId is set and that's currently not called if the log level is disabled.
There was a problem hiding this comment.
The only place this is currently a concern is FairSchedulableBuilder since anything that uses this through TaskSetManager has already been confirmed to have query Id set
There was a problem hiding this comment.
The only place this is currently a concern is FairSchedulableBuilder
Yes. So is there a difference of the log before and after when query Id is empty for non-streaming query?
| // will include query and batch Id in the logs | ||
| private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { | ||
| new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock) | ||
| with StructuredStreamingIdAwareSchedulerLogging { |
There was a problem hiding this comment.
I wonder shall we actually override properties here:
override def properties: Properties = taskSet.propertiesI don't get how the current way overrides that.
There was a problem hiding this comment.
My understanding is that it relied on scala behavior with mixins where it saw that TaskSetManager already has a method that matches the signature so it defaults to that, though it's probably best to go with explicitly setting it here so we don't need the unrelated and seemingly unused method in TaskSetManager
What changes were proposed in this pull request?
This change adds the streaming query Id and batch Id to some of the scheduling logs in order to aid in debugging structured streaming queries.
The following log lines have been updated to include the query and batch Id:
All log lines in TaskSetManager. Examples:
26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Starting task 0.0 in stage 5.0 (TID 129) (...,executor driver, partition 0, PROCESS_LOCAL, 9728 bytes)26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Finished task 6.0 in stage 5.0 (TID 135) in 12 ms on ...(executor driver) (6/32)One log in SchedulableBuilder:
26/04/02 16:39:09 INFO FairSchedulableBuilder: [queryId = f5660] [batchId = 5] Added task set TaskSet_5.0 to pool defaultWhy are the changes needed?
When debugging multiple streaming queries running at the same time it can be difficult to go through the scheduling logs. By including the query and batch Id it is much easier to isolate logs to specific queries and batches.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests were added.
Also manually tested by running the spark shell and redirecting info logs to a temporary file. Then ran a basic streaming query and grepped the temp file for the desired log lines to ensure they included the query and batch id. Also confirmed a batch query ran in the shell does not include the query and batch Id in its logs.
Was this patch authored or co-authored using generative AI tooling?
yes, coauthored
Generated-by: claude