Skip to content

Commit 0b0f5f1

Browse files
committed
remove query Id truncation
1 parent dc6bd59 commit 0b0f5f1

4 files changed

Lines changed: 9 additions & 11 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLogging.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,11 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log
150150
val queryId = Option(properties.getProperty(QUERY_ID_KEY))
151151
val batchId = Option(properties.getProperty(BATCH_ID_KEY))
152152

153-
// formatMessage truncates the queryId for readability
154-
// so we use a blank messageWithContext to overwrite the full query Id to the context
155153
formatMessage(
156154
queryId,
157155
batchId,
158156
entry
159-
) + MessageWithContext("", constructStreamingContext(queryId, batchId))
157+
)
160158
})
161159
}
162160

@@ -200,7 +198,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log
200198
batchId: Option[String],
201199
msg: => String): String = {
202200
val msgWithBatchId = batchId.map(bid => s"[batchId = $bid] $msg").getOrElse(msg)
203-
queryId.map(qId => s"[queryId = ${qId.take(5)}] $msgWithBatchId").getOrElse(msgWithBatchId)
201+
queryId.map(qId => s"[queryId = $qId] $msgWithBatchId").getOrElse(msgWithBatchId)
204202
}
205203

206204
private def formatMessage(
@@ -211,7 +209,7 @@ private[scheduler] object StructuredStreamingIdAwareSchedulerLogging extends Log
211209
bId => log"[batchId = ${MDC(LogKeys.BATCH_ID, bId)}] " + toMessageWithContext(msg)
212210
).getOrElse(toMessageWithContext(msg))
213211
queryId.map(
214-
qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId.take(5))}] " + msgWithBatchId
212+
qId => log"[queryId = ${MDC(LogKeys.QUERY_ID, qId)}] " + msgWithBatchId
215213
).getOrElse(msgWithBatchId)
216214
}
217215

core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
395395
}
396396

397397
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
398-
val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
398+
val expectedQueryPrefix = s"[queryId = ${testQueryId}]"
399399
val expectedBatchPrefix = s"[batchId = $testBatchId]"
400400
val addedLogs = logs.filter(msg =>
401401
msg.contains("Added task set") &&

core/src/test/scala/org/apache/spark/scheduler/StructuredStreamingIdAwareSchedulerLoggingSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite {
6868
val result = StructuredStreamingIdAwareSchedulerLogging
6969
.constructStreamingLogEntry(propsWithBothIds(), "test message")
7070

71-
assertResult(s"[queryId = ${testQueryId.take(5)}] [batchId = $testBatchId] test message")(
71+
assertResult(s"[queryId = $testQueryId] [batchId = $testBatchId] test message")(
7272
result.message)
7373
assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId)
7474
assertContextValue(result.context, LogKeys.BATCH_ID, testBatchId)
@@ -78,7 +78,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite {
7878
val result = StructuredStreamingIdAwareSchedulerLogging
7979
.constructStreamingLogEntry(propsWithQueryIdOnly(), "test message")
8080

81-
assertResult(s"[queryId = ${testQueryId.take(5)}] test message")(result.message)
81+
assertResult(s"[queryId = $testQueryId] test message")(result.message)
8282
assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId)
8383
assertContextAbsent(result.context, LogKeys.BATCH_ID)
8484
}
@@ -104,7 +104,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite {
104104
.constructStreamingLogEntry(propsWithBothIds(),
105105
log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}")
106106

107-
assertResult(s"[queryId = ${testQueryId.take(5)}] " +
107+
assertResult(s"[queryId = $testQueryId] " +
108108
s"[batchId = $testBatchId] test message Dummy Context")(
109109
result.message)
110110
assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId)
@@ -117,7 +117,7 @@ class StructuredStreamingIdAwareSchedulerLoggingSuite extends SparkFunSuite {
117117
.constructStreamingLogEntry(propsWithQueryIdOnly(),
118118
log"test message ${MDC(LogKeys.MESSAGE, "Dummy Context")}")
119119

120-
assertResult(s"[queryId = ${testQueryId.take(5)}] test message Dummy Context")(result.message)
120+
assertResult(s"[queryId = $testQueryId] test message Dummy Context")(result.message)
121121
assertContextValue(result.context, LogKeys.QUERY_ID, testQueryId)
122122
assertContextAbsent(result.context, LogKeys.BATCH_ID)
123123
assertContextValue(result.context, LogKeys.MESSAGE, "Dummy Context")

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2903,7 +2903,7 @@ class TaskSetManagerSuite
29032903

29042904
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
29052905

2906-
val expectedQueryPrefix = s"[queryId = ${testQueryId.take(5)}]"
2906+
val expectedQueryPrefix = s"[queryId = ${testQueryId}]"
29072907
val expectedBatchPrefix = s"[batchId = $testBatchId]"
29082908

29092909
// Verify the "Starting" log line includes query Id and batch Id

0 commit comments

Comments
 (0)