Skip to content

[SPARK-56326][SS] Include streaming query and batch ids in scheduling logs#55166

Open
BrooksWalls wants to merge 14 commits intoapache:masterfrom
BrooksWalls:SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs
Open

[SPARK-56326][SS] Include streaming query and batch ids in scheduling logs#55166
BrooksWalls wants to merge 14 commits intoapache:masterfrom
BrooksWalls:SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs

Conversation

@BrooksWalls
Copy link
Copy Markdown

@BrooksWalls BrooksWalls commented Apr 2, 2026

What changes were proposed in this pull request?

This change adds the streaming query Id and batch Id to some of the scheduling logs in order to aid in debugging structured streaming queries.

The following log lines have been updated to include the query and batch Id:

  • All log lines in TaskSetManager. Examples:

    • 26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Starting task 0.0 in stage 5.0 (TID 129) (...,executor driver, partition 0, PROCESS_LOCAL, 9728 bytes)

    • 26/04/02 16:34:01 INFO TaskSetManager: [queryId = 1251e] [batchId = 5] Finished task 6.0 in stage 5.0 (TID 135) in 12 ms on ...(executor driver) (6/32)

  • One log in SchedulableBuilder:

    • 26/04/02 16:39:09 INFO FairSchedulableBuilder: [queryId = f5660] [batchId = 5] Added task set TaskSet_5.0 to pool default

Why are the changes needed?

When debugging multiple streaming queries running at the same time it can be difficult to go through the scheduling logs. By including the query and batch Id it is much easier to isolate logs to specific queries and batches.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests were added.

Also manually tested by running the spark shell and redirecting info logs to a temporary file. Then ran a basic streaming query and grepped the temp file for the desired log lines to ensure they included the query and batch id. Also confirmed a batch query ran in the shell does not include the query and batch Id in its logs.

Was this patch authored or co-authored using generative AI tooling?

yes, coauthored

Generated-by: claude

Copy link
Copy Markdown
Contributor

@dichlorodiphen dichlorodiphen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good

@BrooksWalls BrooksWalls force-pushed the SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs branch from 10d7760 to 92d2f95 Compare April 2, 2026 20:09
* Mix this trait into any scheduler component that has access to task
* properties and needs streaming-aware log output.
*/
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a trait here so all logs published from TaskSetManager will include the query and batch Id when present

*/
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {
// we gather the query and batch Id from the properties of a given TaskSet
protected def properties: Properties
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can't rely on thread local properties, we need to gather the query and batch Id from the taskSet's properties, this must be set by class which mixes in the trait

* Helpers for constructing log entries enriched with structured streaming
* identifiers extracted from task properties.
*/
private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Logging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uses a companion object here so that we can call the methods from SchedulableBuilder which can not set one Properties object at construction as it's shared across tasks

Comment on lines +116 to +122
// formatMessage truncates the queryId for readability
// so we use a blank messageWithContext to overwrite the full query Id to the context
formatMessage(
queryId,
batchId,
entry
) + MessageWithContext("", constructStreamingContext(queryId, batchId))
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little clunky but I wanted to truncate the query Id in the outputted log line so that its more readable as you scan through, but still have the full query id in the log context. To do that we use a blank log line with the query context hashmap so it overrides the truncated query Id.

Comment on lines +153 to +155
// MDC places the log key in the context as all lowercase, so we do the same here
queryId.foreach(streamingContext.put(LogKeys.QUERY_ID.name.toLowerCase(Locale.ROOT), _))
batchId.foreach(streamingContext.put(LogKeys.BATCH_ID.name.toLowerCase(Locale.ROOT), _))
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the lowercase is necessary here or not, but wanted to match the behavior of the log interpolator

healthTracker: Option[HealthTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging {
clock: Clock = new SystemClock())
extends Schedulable with StructuredStreamingIdAwareSchedulerLogging {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we extend the StructuredStreamingIdAwareSchedulerLogging instead of logging, all logs published will include the query and batch Id when handling a streaming query TaskSet

log"${MDC(LogKeys.POOL_NAME, poolName)}")

logInfo(
StructuredStreamingIdAwareSchedulerLogging.constructStreamingLogEntry(
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I said above, since this method is called for many different TaskSets we have to use the companion object's method

@HyukjinKwon HyukjinKwon changed the title [SPARK-56326] Include streaming query and batch ids in scheduling logs [SPARK-56326][SS] Include streaming query and batch ids in scheduling logs Apr 5, 2026
@HeartSaVioR
Copy link
Copy Markdown
Contributor

@jiangxb1987 @Ngone51
Would love to hear your voice on the change. We are trying to correlate the scheduler log with streaming query, but that's unfortunately an inverse direction of dependency, hence we had to make CORE be aware of streaming.

Maybe logging might take (very) slight more time than before, hence if we think it's on critical path (I can't judge) we should be very careful. I'd love to hear from experts; I'd give this a go when I have at least one approval from CORE module experts.

Thanks in advance!

Comment on lines +291 to +292
if (isStreamingTaskSet(taskSet)) {
streamingTaskSetManager(taskSet, maxTaskFailures)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now only TaskSets which are for streaming queries will get the streaming log line mixin applied. This way we avoid any overhead on the non-streaming path.

Comment on lines +303 to +304
// ensure log name matches the non-streaming version
override protected def logName: String = classOf[TaskSetManager].getName
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done to ensure the streaming version has the exact same logName as the non-streaming version. We could also move this to StructuredStreamingIdAwareSchedulerLogging but it would look a little different

private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {    
    protected def properties: Properties
    override protected def logName: String =                                               
      this.getClass.getSuperclass.getName.stripSuffix("$")
  } 

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without one of these 2 options the logname will be something like org.apache.spark.scheduler.TaskSchedulerImpl$$anon$1 just for streamingTaskSetManagers since we are doing the inline mixin

Comment on lines +2729 to +2742
val logAppender = new LogAppender("streaming log name check")
// TSM constructor prints some debug logs we can use
logAppender.setThreshold(Level.DEBUG)
withLogAppender(logAppender,
loggerNames = Seq(classOf[TaskSetManager].getName),
level = Some(Level.DEBUG)) {
val tsm = taskScheduler.createTaskSetManager(taskSet, 1)
assert(tsm.isInstanceOf[StructuredStreamingIdAwareSchedulerLogging])
}
// when creating the streaming version we want the log name to match the
// non-streaming baseline case. By confirming our log appender contains
// logs we know the log name is correct
assert(logAppender.loggingEvents.nonEmpty,
"Expected logs under TaskSetManager logger name")
Copy link
Copy Markdown
Author

@BrooksWalls BrooksWalls Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the way this test is asserting the logName since it relies on some debug log lines that get published in TaskSetManager's constructor, which, if removed, would make this test fail for no reason. But I don't see a clearly better approach.

The reason we check the logs is to ensure that the code in TaskSchedulerImpl is correctly setting the log name for the streaming version so it matches the non streaming version. We have a few options

  1. leave what is here and accept that we rely on unrelated debugging logs
  2. manually call a method on TaskSetManager that publishes a log (still relies on an unrelated log)
  3. move the log name override to the trait like
private[scheduler] trait StructuredStreamingIdAwareSchedulerLogging extends Logging {    
    protected def properties: Properties
    override protected def logName: String =                                               
      this.getClass.getSuperclass.getName.stripSuffix("$")
  } 
  1. just don't have a test which covers the production code for setting the log name, since we have a test in TaskSetManagerSuite which mirrors the code in TaskSetSchedulerImpl and confirms the log name is correct

return entry
}
// wrap in log entry to defer until log is evaluated
new LogEntry({
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we wrap this in a LogEntry since Claude pointed out that we were forcing eager evaluation of the provided logEntry even if the logging level was disabled in the environment. By wrapping everything in a LogEntry, now the logic is only called when the logging level is enabled for the environment. This is important for things like debugging and trace logs

bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg)
).getOrElse(toMessageWithContext(msg))
queryId.map(
qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing the truncation of the query Id here. The reason is that, before, when we added the hashmap to the context containing the full query Id, any log renderer which used the log context would place the full query Id into the log message anyways. Ultimately I think it's probably better to have the full query Id in the log context and subsequently the log, then to have the truncated version. Open to reverting this change back if others feel differently.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the code is simpler without the truncation

@BrooksWalls BrooksWalls force-pushed the SPARK-56326/streamingQueryIdAndBatchIdInSchedulingLogs branch from 2518b31 to 0b0f5f1 Compare April 9, 2026 02:17
Comment on lines +148 to +188
test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers evaluation") {
var evaluated = false
val lazyEntry = new LogEntry({
evaluated = true
MessageWithContext("lazy message", java.util.Collections.emptyMap())
})

val result = StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(propsWithBothIds(), lazyEntry)

// Work should be deferred
assert(!evaluated,
"LogEntry should not be evaluated during constructStreamingLogEntry")

// Accessing .message triggers evaluation
result.message
assert(evaluated, "LogEntry should be evaluated when .message is accessed")
}

test("SPARK-56326: constructStreamingLogEntry with LogEntry - defers property access") {
var propertiesAccessed = false
val props = new Properties() {
override def getProperty(key: String): String = {
propertiesAccessed = true
super.getProperty(key)
}
}
props.setProperty(QUERY_ID_KEY, testQueryId)
props.setProperty(BATCH_ID_KEY, testBatchId)

val entry = log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}"
val result = StructuredStreamingIdAwareSchedulerLogging
.constructStreamingLogEntry(props, entry)

assert(!propertiesAccessed,
"Properties should not be accessed during constructStreamingLogEntry")

result.message
assert(propertiesAccessed,
"Properties should be accessed when .message is called")
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new tests for that eager evaluation thing I mentioned above

Comment on lines +162 to +163
properties: Properties,
msg: => String): LogEntry = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 indents


private def constructStreamingContext(
queryId: Option[String],
batchId: Option[String]):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 indents.

Comment on lines +188 to +189
java.util.HashMap[String, String] = {
val streamingContext = new java.util.HashMap[String, String]()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add it to import list?

private def formatMessage(
queryId: Option[String],
batchId: Option[String],
msg: => String): String = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indents.

private def formatMessage(
queryId: Option[String],
batchId: Option[String],
msg: => LogEntry): MessageWithContext = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indents.

private[scheduler] def constructStreamingLogEntry(
properties: Properties,
msg: => String): LogEntry = {
if (properties == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also check QUERY_ID_KEY is non-empty?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check it here. I didn't so far since it would mean checking the properties for query Id on every log call even if that log level is disabled. The overhead of the hashmap lookup is pretty small so it's likely okay but also our code does handle the case where neither queryId or batchId is set and that's currently not called if the log level is disabled.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place this is currently a concern is FairSchedulableBuilder since anything that uses this through TaskSetManager has already been confirmed to have query Id set

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place this is currently a concern is FairSchedulableBuilder

Yes. So is there a difference of the log before and after when query Id is empty for non-streaming query?

// will include query and batch Id in the logs
private def streamingTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
with StructuredStreamingIdAwareSchedulerLogging {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder shall we actually override properties here:

override def properties: Properties = taskSet.properties

I don't get how the current way overrides that.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that it relied on scala behavior with mixins where it saw that TaskSetManager already has a method that matches the signature so it defaults to that, though it's probably best to go with explicitly setting it here so we don't need the unrelated and seemingly unused method in TaskSetManager

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants