-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode #56055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2396,6 +2396,26 @@ package object config { | |
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| private[spark] val STREAMING_REALTIME_MODE_SLOTS_CHECK_DISABLED = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normally we would use _ENABLED instead of _DISABLED, to avoid double-negative.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intent is that slot checking is enabled by default and you should disable it in extra ordinary circumstances. That is why it was named this way. If you don't feel strongly about this I would rather keep it as it is. |
||
| ConfigBuilder("spark.scheduler.realtimeModeSlotsCheck.disabled") | ||
| .internal() | ||
| .doc("For query running in real time mode, disable the check if the number of slots" + | ||
| " required by all concurrent stages is available before submit the query" ) | ||
| .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) | ||
| .version("4.2.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val DAG_SCHEDULER_TYPE = | ||
| ConfigBuilder("spark.scheduler.dagSchedulerType") | ||
| .internal() | ||
| .doc("The DAGScheduler implementation to use. Set to 'ConcurrentStageDAGScheduler' to " + | ||
| "enable real-time mode, which runs stages concurrently for low-latency streaming queries.") | ||
| .withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE) | ||
| .version("4.2.0") | ||
| .stringConf | ||
| .createWithDefault("DAGScheduler") | ||
|
|
||
| private[spark] val STREAMING_ID_AWARE_SCHEDULER_LOGGING_QUERY_ID_LENGTH = | ||
| ConfigBuilder("spark.scheduler.streaming.idAwareLogging.queryIdLength") | ||
| .doc("Maximum number of characters of the streaming query ID to include " + | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,320 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.scheduler | ||
|
|
||
| import java.util.Properties | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import org.apache.spark.{MapOutputTrackerMaster, SparkContext, SparkEnv, SparkException, SparkRuntimeException, Success} | ||
| import org.apache.spark.internal.LogKeys | ||
| import org.apache.spark.internal.config.{SPECULATION_ENABLED, STREAMING_REALTIME_MODE_SLOTS_CHECK_DISABLED} | ||
| import org.apache.spark.resource.ResourceProfile | ||
| import org.apache.spark.storage.BlockManagerMaster | ||
| import org.apache.spark.util.Clock | ||
| import org.apache.spark.util.SystemClock | ||
|
|
||
| /** | ||
| * A [[DAGScheduler]] that runs all the stages in a job without waiting for its parents | ||
| * complete. This combined with streaming shuffle between the stages, allows for low latency | ||
| * execution of streaming queries in real-time mode. | ||
| */ | ||
| class ConcurrentStageDAGScheduler( | ||
| sc: SparkContext, | ||
| taskScheduler: TaskScheduler, | ||
| listenerBus: LiveListenerBus, | ||
| mapOutputTracker: MapOutputTrackerMaster, | ||
| blockManagerMaster: BlockManagerMaster, | ||
| env: SparkEnv, | ||
| clock: Clock = new SystemClock()) | ||
| extends DAGScheduler( | ||
| sc, taskScheduler, listenerBus, mapOutputTracker, blockManagerMaster, env, clock) { | ||
|
|
||
| import ConcurrentStageDAGScheduler._ | ||
|
|
||
| def this(sc: SparkContext, taskScheduler: TaskScheduler) = { | ||
| this( | ||
| sc, | ||
| taskScheduler, | ||
| sc.listenerBus, | ||
| sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], | ||
| sc.env.blockManager.master, | ||
| sc.env | ||
| ) | ||
| } | ||
|
|
||
| def this(sc: SparkContext) = this(sc, sc.taskScheduler) | ||
|
|
||
| // This contains all the concurrent stages that are yet to be scheduled across all the jobs. | ||
| private[spark] val concurrentStages = new mutable.HashSet[Stage] | ||
|
|
||
| private[scheduler] case class DependentStageInfo( | ||
| parents: mutable.HashSet[Stage] = mutable.HashSet.empty, | ||
| delayedTaskCompletionEvents: mutable.ListBuffer[CompletionEvent] = mutable.ListBuffer.empty) | ||
|
|
||
| // This map holds parents of concurrently scheduled stages. When tasks for such a stage complete, | ||
| // and if any of the parents are still running, we delay processing of such events until parent | ||
| // stages are complete. We save these events in this map until then. | ||
| private[spark] val dependentStageMap = new mutable.HashMap[Stage, DependentStageInfo] | ||
|
|
||
| private def totalNumCoreForStage(stage: Stage): Int = { | ||
| val numTask = stage match { | ||
| case r: ResultStage => r.partitions.length | ||
| case m: ShuffleMapStage => m.numPartitions | ||
| } | ||
| val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) | ||
| val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, sc.conf) | ||
| taskCpus * numTask | ||
| } | ||
|
|
||
| /** | ||
| * Hook invoked after the final stage is created. Registers stages reachable from | ||
| * the final stage as concurrent so they can be submitted in parallel. | ||
| */ | ||
| override def onFinalStageCreated(finalStage: Stage, properties: Properties): Unit = { | ||
|
|
||
| val queryBatchId = getStreamingBatchIdFromProperties(properties) | ||
|
|
||
| if (queryBatchId.nonEmpty && isConcurrentStagesEnabled(properties)) { | ||
| // Speculation is not supported with concurrent stages. Check both the per-job local | ||
| // property (for jobs that override the cluster default via setLocalProperty) and the | ||
| // SparkConf (the documented way to enable speculation cluster-wide). | ||
| if (properties.getProperty(SPECULATION_ENABLED.key) == "true" || | ||
| sc.conf.get(SPECULATION_ENABLED)) { | ||
| throw new SparkException( | ||
| "Speculative execution is not supported with concurrent stages " + | ||
| s"(streaming query: $queryBatchId). Please disable ${SPECULATION_ENABLED.key} config." | ||
| ) | ||
| } | ||
|
|
||
| logInfo(log"Concurrent stages is enabled for [query ${MDC(LogKeys.STREAMING_QUERY_ID, | ||
| queryBatchId.get.queryId)} batch ${MDC(LogKeys.BATCH_ID, queryBatchId.get.batchId)}]") | ||
|
|
||
| // Mark current stage and all its ancestors as concurrent. | ||
| // Collect into a local set first so a slot-check failure below does not leak partial | ||
| // state into concurrentStages. | ||
| val visitedStages = new mutable.HashSet[Stage] | ||
| var totalCoresNeeded = 0 | ||
| def visit(stage: Stage): Unit = { | ||
| if (!visitedStages.contains(stage)) { | ||
| logInfo(log"Marking stage '${MDC(LogKeys.STAGE, stage)}' concurrent for [query ${MDC( | ||
| LogKeys.STREAMING_QUERY_ID, queryBatchId.get.queryId)} batch ${MDC( | ||
| LogKeys.BATCH_ID, queryBatchId.get.batchId)}]") | ||
| visitedStages += stage | ||
| totalCoresNeeded += totalNumCoreForStage(stage) | ||
| stage.parents.foreach(visit) | ||
| } | ||
| } | ||
| visit(finalStage) | ||
|
|
||
| if (!sc.conf.get(STREAMING_REALTIME_MODE_SLOTS_CHECK_DISABLED)) { | ||
| try { | ||
| val totalSlots = sc.schedulerBackend.defaultParallelism() | ||
| val coresInUse = runningStages.toArray.map(totalNumCoreForStage(_)).sum | ||
| if (totalSlots - coresInUse < totalCoresNeeded) { | ||
| throw new SparkRuntimeException( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When this throws, the stages added to concurrentStages above are leaked — handleJobSubmitted catches the exception and fails the job, but nothing ever clears those entries. A subsequent job whose stages share IDs (e.g. retries from the same RDDChain) would inherit them. Either clear concurrentStages of the stages just visited before throwing, or capture them in a local set and only commit to concurrentStages once the slot check passes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed by accumulating into a local visitedStages set during the DAG walk and only committing to concurrentStages after the slot check passes
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though the actual affect of this would likely be small since this would only occur on query failure. |
||
| errorClass = "CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT", | ||
| messageParameters = Map( | ||
| "numSlots" -> (totalSlots - coresInUse).toString, | ||
| "numTasks" -> totalCoresNeeded.toString)) | ||
| } | ||
| } catch { | ||
| case e: UnsupportedOperationException => | ||
| logWarning(log"${MDC(LogKeys.ERROR, e)}. Skipping slot check for RTM.") | ||
| } | ||
| } | ||
|
|
||
| // Slot check passed (or was disabled). Commit the visited stages. | ||
| concurrentStages ++= visitedStages | ||
| } else { | ||
| super.onFinalStageCreated(finalStage, properties) | ||
| } | ||
| } | ||
|
|
||
| override def submitStage(stage: Stage): Unit = { | ||
| super.submitStage(stage) | ||
|
|
||
| if (!waitingStages.contains(stage) && concurrentStages.contains(stage)) { | ||
| // The current stage is not registered in waitingStages, which means it has | ||
| // no parents. This case we should remove it from concurrentStages since it is already | ||
| // running. | ||
| assert(runningStages.contains(stage), "stage should be running if not in waitingStages") | ||
| logInfo(log"Removing stage ${MDC(LogKeys.STAGE, stage)} from concurrentStages") | ||
| concurrentStages -= stage | ||
| } | ||
|
|
||
| // Find the stages that should be submitted concurrently with this stage. | ||
| waitingStages.intersect(concurrentStages).foreach { stage => | ||
| logInfo(log"Submitting stage concurrently: ${MDC(LogKeys.STAGE, stage)}") | ||
| concurrentStages -= stage // Don't submit this stage concurrently for subsequent attempts. | ||
| stage.parents.foreach { parent => | ||
| if (isRunningStage(parent)) { | ||
| logInfo(log"Updating dependent map for stage ${MDC(LogKeys.STAGE, stage)} with parent ${ | ||
| MDC(LogKeys.PARENT_STAGE, parent)}") | ||
| dependentStageMap.getOrElseUpdate(stage, DependentStageInfo()).parents += parent | ||
| } | ||
| } | ||
| // Remove stage and its parents from concurrentStages | ||
| def removeFromConcurrentStages(stage: Stage): Unit = { | ||
| if (concurrentStages.contains(stage)) { | ||
| logInfo(log"Removing stage ${MDC(LogKeys.STAGE, stage)} from concurrentStages") | ||
| concurrentStages -= stage | ||
| } | ||
| stage.parents.foreach { parent => | ||
| assert(!waitingStages.contains(parent), "Parent stage should not still be waiting") | ||
| removeFromConcurrentStages(parent) | ||
| } | ||
| } | ||
| removeFromConcurrentStages(stage) | ||
| submitConcurrentStage(stage) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Submits a child stage even while its parents are still running. Distinct from | ||
| * `submitStage` in that it bypasses the missing-parent check. | ||
| */ | ||
| private def submitConcurrentStage(stage: Stage): Unit = { | ||
| assert(waitingStages.contains(stage)) | ||
| activeJobForStage(stage) match { | ||
| case Some(job) => | ||
| waitingStages -= stage | ||
| submitMissingTasks(stage, job) | ||
| case None => // Not expected. | ||
| throw new IllegalStateException(s"No active job for stage $stage") | ||
| } | ||
| } | ||
|
|
||
| // This is overridden to check if the task completion event should be delayed because a | ||
| // parent stage still has running tasks. See comment for `dependentStageMap` for more details. | ||
| override private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = { | ||
| val stageId = event.task.stageId | ||
| val taskId = event.taskInfo.taskId | ||
|
|
||
| getStage(stageId) match { | ||
| case Some(stage) if event.reason == Success && dependentStageMap.contains(stage) => | ||
| val dependentStageInfo = dependentStageMap(stage) | ||
| logInfo(log"Delaying completion event for task ${MDC(LogKeys.TASK_ID, taskId)} in stage ${ | ||
| MDC(LogKeys.STAGE, stage)}. Active parent(s): ${MDC(LogKeys.PARENT_STAGES, | ||
| dependentStageInfo.parents.mkString(", "))}") | ||
| dependentStageInfo.delayedTaskCompletionEvents += event | ||
|
|
||
| case _ => // Otherwise handle the event as usual. | ||
| super.handleTaskCompletion(event) | ||
| } | ||
| } | ||
|
|
||
| // This is overridden to handle any delayed task completion events for dependent stages. | ||
| override def markStageAsFinished( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dependentStageMap cleanup path only fires when a stage in the map is named as a parent via markStageAsFinished(parent). If a dependent stage itself aborts mid-job (e.g. its single allowed failure under maxTaskFailures=1), its own entry — including any buffered delayedTaskCompletionEvents — is never removed from dependentStageMap. With concurrent jobs sharing a long-lived scheduler instance, that's a slow leak across queries. Consider clearing the entry for stage itself inside markStageAsFinished (especially when errorMessage.isDefined).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will just remove the stage's own entry at the end of markStageAsFinished |
||
| stage: Stage, | ||
| errorMessage: Option[String] = None, | ||
| willRetry: Boolean = false): Unit = { | ||
|
|
||
| super.markStageAsFinished(stage, errorMessage, willRetry) | ||
|
|
||
| // If this is a parent of a stage in dependentStageMap, remove it from parents. | ||
| val dependentStages = dependentStageMap | ||
| .filter(_._2.parents.contains(stage)) | ||
| .keys | ||
|
|
||
| dependentStages.foreach { dependent => | ||
| if (errorMessage.isEmpty) { | ||
| assert( | ||
| isRunningStage(dependent), | ||
| s"Parent stages $stage's dependent stage $dependent should be running") | ||
| } | ||
| logInfo(log"Removing parent stage ${MDC(LogKeys.PARENT_STAGE, stage)} from dependent map " + | ||
| log"for stage ${MDC(LogKeys.STAGE, dependent)}") | ||
| dependentStageMap(dependent).parents -= stage | ||
| checkDependentStageTasks(dependent) | ||
| } | ||
|
|
||
| // Drop this stage's own entry from the map. On the success path | ||
| // `checkDependentStageTasks` (invoked when the stage's last parent finishes) has already | ||
| // removed the entry, so this is a no-op. On failure / cancellation / abort the entry, | ||
| // and any buffered completion events, would otherwise leak for the lifetime of the | ||
| // scheduler. | ||
| // | ||
| // `willRetry=true` paths (e.g. FetchFailed) also reach this cleanup. That is safe under | ||
| // concurrent scheduling because stage retries are not supported here: TaskSchedulerImpl | ||
| // pins `maxFailures=1` for concurrent TaskSets, and any failure restarts the streaming | ||
| // query from its checkpoint rather than retrying tasks against an in-flight streaming | ||
| // shuffle. With no retry to preserve state for, it's correct to drop the entry along | ||
| // with any buffered events. | ||
| dependentStageMap.remove(stage) | ||
| } | ||
|
|
||
| // Checks if the dependent stage's parents are all done. If all the parents are done, | ||
| // enqueues any saved task completion event (if any). | ||
| private def checkDependentStageTasks(stage: Stage): Unit = { | ||
| val dependentStageInfo = dependentStageMap.getOrElse( | ||
| stage, throw new IllegalStateException(s"Stage $stage is not in dependentStageMap") | ||
| ) | ||
|
|
||
| if (dependentStageInfo.parents.isEmpty) { | ||
| val delayedEvents = dependentStageInfo.delayedTaskCompletionEvents | ||
| logInfo(log"All the parents are done for ${MDC(LogKeys.STAGE, stage)}. Removing it from " + | ||
| log"the map. It has ${MDC(LogKeys.NUM_EVENTS, delayedEvents.size.toLong)} " + | ||
| log"task completion events") | ||
| dependentStageMap -= stage | ||
| delayedEvents.foreach { event => | ||
| logInfo(log"Posting delayed task ${MDC(LogKeys.TASK_ID, event.taskInfo.taskId)} " + | ||
| log"completion event for stage ${MDC(LogKeys.STAGE, stage)}") | ||
| eventProcessLoop.post(event) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object ConcurrentStageDAGScheduler { | ||
|
|
||
| val CONCURRENT_STAGES_ENABLED_PROPERTY: String = "streaming.concurrent.stages.enabled" | ||
|
|
||
| def isConcurrentStagesEnabled(properties: Properties): Boolean = { | ||
| properties != null && | ||
| properties.getProperty(CONCURRENT_STAGES_ENABLED_PROPERTY) == "true" | ||
| } | ||
|
|
||
| /** | ||
| * Extracts the [[StreamingBatchId]] from the given properties if both the streaming | ||
| * query id and batch id are present. | ||
| */ | ||
| def getStreamingBatchIdFromProperties(properties: Properties): Option[StreamingBatchId] = { | ||
| if (properties == null) { | ||
| return None | ||
| } | ||
|
|
||
| val queryId = Option(properties.getProperty( | ||
| StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY)) | ||
| val batchId = Option(properties.getProperty( | ||
| StructuredStreamingIdAwareSchedulerLogging.BATCH_ID_KEY)) | ||
| if (queryId.nonEmpty && batchId.nonEmpty) { | ||
| Some(StreamingBatchId(queryId.get, batchId.get.toLong)) | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Case class to identify a batch in a streaming query. | ||
| * | ||
| * @param queryId - Streaming query id | ||
| * @param batchId - Batch id for a micro batch in a streaming query | ||
| */ | ||
| case class StreamingBatchId(queryId: String, batchId: Long) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QUERY_ID already exists and is what StructuredStreamingIdAwareSchedulerLogging uses to log streaming query IDs. Adding STREAMING_QUERY_ID creates a parallel key for the same concept. Suggest dropping this addition and using LogKeys.QUERY_ID at all the callsites, or update the callsites in StructuredStreamingIdAwareSchedulerLogging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A query id and streaming query id are typically not the same. query id for a batch query is simply a transient id for a batch. The streaming query id is persistent for the entirety of the streaming query execution.
I would keep it here and fix it in StructuredStreamingIdAwareSchedulerLogging