Skip to content

Commit f33007c

Browse files
committed
Add unit tests covering streaming query and batch Id logs
1 parent cf20903 commit f33007c

3 files changed

Lines changed: 156 additions & 0 deletions

File tree

core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,43 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
367367
}
368368
}
369369

370+
test("Fair Scheduler addTaskSetManager logs include streaming query Id and batch Id") {
371+
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
372+
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
373+
sc = new SparkContext(LOCAL, APP_NAME, conf)
374+
val taskScheduler = new TaskSchedulerImpl(sc)
375+
376+
val rootPool = new Pool("", FAIR, 0, 0)
377+
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
378+
schedulableBuilder.buildPools()
379+
380+
val testQueryId = "test-query-id-5678"
381+
val testBatchId = "99"
382+
val properties = new Properties()
383+
properties.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1")
384+
properties.setProperty("sql.streaming.queryId", testQueryId)
385+
properties.setProperty("streaming.sql.batchId", testBatchId)
386+
387+
val taskSetManager = createTaskSetManager(0, 1, taskScheduler)
388+
389+
val logAppender = new LogAppender("pool streaming logs", maxEvents = 1000)
390+
val loggerName = classOf[FairSchedulableBuilder].getName
391+
392+
withLogAppender(logAppender, loggerNames = Seq(loggerName)) {
393+
schedulableBuilder.addTaskSetManager(taskSetManager, properties)
394+
}
395+
396+
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
397+
val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
398+
val expectedBatchPrefix = s"[batchId = $testBatchId]"
399+
val addedLogs = logs.filter(msg =>
400+
msg.contains("Added task set") &&
401+
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
402+
assert(addedLogs.nonEmpty,
403+
s"Expected 'Added task set' log to contain '$expectedQueryPrefix' " +
404+
s"and '$expectedBatchPrefix'.\nCaptured logs:\n${logs.mkString("\n")}")
405+
}
406+
370407
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
371408
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
372409
val selectedPool = rootPool.getSchedulableByName(poolName)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.util.Properties
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.scheduler.SchedulableBuilder.{BATCH_ID_KEY, QUERY_ID_KEY}
24+
25+
class SchedulableBuilderSuite extends SparkFunSuite {
26+
27+
test("schedulingLogStreamingContext - both queryId and batchId present") {
28+
val props = new Properties()
29+
val queryId = "12345-test-query-id"
30+
val batchId = "23"
31+
32+
props.setProperty(QUERY_ID_KEY, queryId)
33+
props.setProperty(BATCH_ID_KEY, batchId)
34+
35+
val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)
36+
37+
// log context should include the truncated query Id and the provided batch Id
38+
assertResult("[queryId = 12345] [batchId = 23] ")(logContext.message)
39+
}
40+
41+
test("schedulingLogStreamingContext - only queryId and no batchId present") {
42+
val props = new Properties()
43+
val queryId = "12345-test-query-id"
44+
45+
props.setProperty(QUERY_ID_KEY, queryId)
46+
47+
val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)
48+
49+
// log context should include the truncated query Id but no batch Id
50+
assertResult("[queryId = 12345] ")(logContext.message)
51+
}
52+
53+
test("schedulingLogStreamingContext - no queryId or batchId present") {
54+
val props = new Properties()
55+
56+
val logContext = SchedulableBuilder.schedulingLogStreamingContext(props)
57+
58+
// log context should be an empty string
59+
assertResult("")(logContext.message)
60+
}
61+
62+
test("schedulingLogStreamingContext - handles null properties") {
63+
val logContext = SchedulableBuilder.schedulingLogStreamingContext(null)
64+
65+
// log context should be an empty string
66+
assertResult("")(logContext.message)
67+
}
68+
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2759,6 +2759,57 @@ class TaskSetManagerSuite
27592759
assert(taskSetManager.taskSetExcludelistHelperOpt.get.isDryRun)
27602760
}
27612761

2762+
test("SPARK-56326: Streaming query Id and batch Id are included in scheduling log " +
2763+
"messages") {
2764+
sc = new SparkContext("local", "test")
2765+
sched = new FakeTaskScheduler(sc)
2766+
val testQueryId = "test-query-id-1234"
2767+
val testBatchId = "42"
2768+
// Create a TaskSet with a non-null Properties containing the streaming metadata.
2769+
val properties = new Properties()
2770+
properties.setProperty("sql.streaming.queryId", testQueryId)
2771+
properties.setProperty("streaming.sql.batchId", testBatchId)
2772+
val taskSet = new TaskSet(Array(new FakeTask(0, 0, Nil)),
2773+
0, 0, 0, properties, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, None)
2774+
2775+
val clock = new ManualClock
2776+
val logAppender = new LogAppender("streaming scheduling logs", maxEvents = 1000)
2777+
val loggerName = classOf[TaskSetManager].getName
2778+
2779+
withLogAppender(logAppender, loggerNames = Seq(loggerName)) {
2780+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
2781+
2782+
// resourceOffer triggers prepareLaunchingTask which logs "Starting ..."
2783+
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
2784+
assert(taskOption.isDefined)
2785+
2786+
clock.advance(1)
2787+
// handleSuccessfulTask logs "Finished ..."
2788+
manager.handleSuccessfulTask(0, createTaskResult(0))
2789+
}
2790+
2791+
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
2792+
2793+
val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
2794+
val expectedBatchPrefix = s"[batchId = $testBatchId]"
2795+
2796+
// Verify the "Starting" log line includes query Id and batch Id
2797+
val startingLogs = logs.filter(msg =>
2798+
msg.contains("Starting") &&
2799+
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
2800+
assert(startingLogs.nonEmpty,
2801+
s"Expected 'Starting' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." +
2802+
s"\nCaptured logs:\n${logs.mkString("\n")}")
2803+
2804+
// Verify the "Finished" log line includes query Id and batch Id
2805+
val finishedLogs = logs.filter(msg =>
2806+
msg.contains("Finished") &&
2807+
msg.contains(expectedQueryPrefix) && msg.contains(expectedBatchPrefix))
2808+
assert(finishedLogs.nonEmpty,
2809+
s"Expected 'Finished' log to contain '$expectedQueryPrefix' and '$expectedBatchPrefix'." +
2810+
s"\nCaptured logs:\n${logs.mkString("\n")}")
2811+
}
2812+
27622813
}
27632814

27642815
class FakeLongTasks(stageId: Int, partitionId: Int) extends FakeTask(stageId, partitionId) {

0 commit comments

Comments
 (0)