Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ class MicroBatchExecution(
// into every subsequent batch's query plan.
private val stateSchemaMetadatas = MutableMap[Long, StateSchemaBroadcast]()

/**
* Cached result of the first `offsetLog.getLatest()` call. Reused by both
* `logicalPlan` (to determine `enforceNamed`) and `initializeExecution` (to seed
* `latestStartedBatch`). This avoids a redundant `ListStatus` on the checkpoint's
* `offsets/` directory during stream startup. Safe to cache: between construction
* and `initializeExecution`, nothing else writes the offset log on the query thread.
*/
private lazy val initialLatestOffsetSeq: Option[(Long, OffsetSeqBase)] =
offsetLog.getLatest()

override lazy val logicalPlan: LogicalPlan = {
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
Expand All @@ -188,7 +198,7 @@ class MicroBatchExecution(

// Read the source evolution enforcement from the last written offset log entry. If no entries
// are found, use the session config value.
val enforceNamed = offsetLog.getLatest().flatMap { case (_, offsetSeq) =>
val enforceNamed = initialLatestOffsetSeq.flatMap { case (_, offsetSeq) =>
offsetSeq.metadataOpt.flatMap { metadata =>
OffsetSeqMetadata.readValueOpt(metadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION)
.map(_.toBoolean)
Expand Down Expand Up @@ -435,7 +445,7 @@ class MicroBatchExecution(

private def initializeExecution(
sparkSessionForStream: SparkSession): MicroBatchExecutionContext = {
var latestStartedBatch = offsetLog.getLatest()
var latestStartedBatch = initialLatestOffsetSeq
val latestCommittedBatch = commitLog.getLatest()

val lastCommittedBatchId = latestCommittedBatch match {
Expand Down