[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055
Open
jerrypeng wants to merge 1 commit into
Open
[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055jerrypeng wants to merge 1 commit into
jerrypeng wants to merge 1 commit into
Conversation
…treaming Ports the ConcurrentStageDAGScheduler from the Databricks runtime so that streaming queries can opt in to a "real-time" execution mode that runs all stages of a job concurrently rather than sequentially. When enabled via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler and the per-job streaming.concurrent.stages.enabled property, the scheduler: - Marks all ancestor stages of the final stage as concurrent on job submission and validates that the cluster has enough free slots (CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT), gated by spark.scheduler.realtimeModeSlotsCheck.disabled. - Submits child stages while parents are still running, delays task completion events for a child whose parent is still running, and replays the delayed events when the parent finishes. - Rejects speculative execution. DAGScheduler changes (no-op for the default scheduler): - New protected onFinalStageCreated hook, invoked from handleJobSubmitted / handleMapStageSubmitted right after final stage creation. - New protected submitConcurrentStage and postSchedulerEvent helpers. - New package-visible isRunningStage and getStage accessors. - submitStage and markStageAsFinished relaxed from private to protected so subclasses can override them. DAGSchedulerSuite refactor: - Renames the concrete suite to abstract DAGSchedulerSuiteBase and adds an empty class DAGSchedulerSuite extends DAGSchedulerSuiteBase to preserve the existing entry point. - Extracts a TestDAGScheduler trait carrying the scheduleShuffleMergeFinalize and handleTaskCompletion overrides; MyDAGScheduler mixes the trait in. - Adds a protected createInitialScheduler hook used by init(). - Loosens submit, completeShuffleMapStageSuccessfully, completeNextResultStageWithSuccess, and assertDataStructuresEmpty to protected so subclass suites can use them. Integration: - SparkContext picks the scheduler implementation based on spark.scheduler.dagSchedulerType. - TaskSchedulerImpl uses maxFailures=1 for concurrent-stage TaskSets so a failure restarts the streaming query instead of being silently retried. - TaskSetManager counts ExecutorLostFailure toward task failures and skips the "executor lost is not the task's fault" exemption in concurrent mode. Adds the supporting LogKeys (PARENT_STAGE, STREAMING_QUERY_ID) and the CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class. Deviations from the runtime source kept to the minimum necessary to compile in OSS: - Extends DAGScheduler directly (runtime extends CrossJobDepDAGScheduler, which gates micro-batch pipelining; not part of OSS). - Hook is named onFinalStageCreated rather than the runtime's populateCrossJobDepInfo, since CrossJobDepDAGScheduler is not part of OSS. - Micro-batch pipelining co-existence check (and its test) dropped, since MBP is not part of OSS. - getStreamingBatchIdFromProperties and StreamingBatchId live in the companion object instead of CrossJobDepDAGScheduler. - Slot check uses sc.schedulerBackend.defaultParallelism() in place of the runtime's TaskSchedulerStats helper. - DatabricksEdgeConfigs.serverlessEnabled gating removed; the spark.scheduler.realtimeModeSlotsCheck.disabled config is the sole knob. - isConcurrentStagesEnabled tolerates null Properties (OSS TaskSet allows null in tests). Co-authored-by: Isaac
3d0058f to
e2a204b
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR introduces
ConcurrentStageDAGScheduler, the scheduler needed to power real-time mode for Structured Streaming.In real-time mode, a streaming query continuously produces output with end-to-end latency on the order of tens of milliseconds — far below the latency floor of traditional micro-batch
execution. To get there, the query has to abandon the "run stage N, materialize its shuffle output, then run stage N+1" model that the default
DAGSchedulerenforces. Instead, everystage of the query must run at the same time, with records flowing from upstream tasks to downstream tasks through a streaming shuffle as they're produced.
ConcurrentStageDAGScheduleris the scheduling half of that design. Concretely, it:DAGSchedulerimplementation,ConcurrentStageDAGScheduler, selected viaspark.scheduler.dagSchedulerType=ConcurrentStageDAGSchedulerand engaged per-job via thestreaming.concurrent.stages.enabled=trueproperty.simultaneously (gated by
spark.scheduler.realtimeModeSlotsCheck.disabled, with a newCONCURRENT_SCHEDULER_INSUFFICIENT_SLOTerror class for the failure path).invariant that
DAGScheduleronly sees "all of a stage's parents are done" task completions, even though tasks are physically running concurrently.DAGSchedulerto make this possible: one emptyonFinalStageCreatedhook, two thinprotectedhelpers (submitConcurrentStage,postSchedulerEvent), two package-private accessors (isRunningStage,getStage), and relaxessubmitStageandmarkStageAsFinishedfromprivatetoprotected. All are no-opsfor the default scheduler.
TaskSchedulerImpl(TaskSets with concurrent stages getmaxTaskFailures=1, since a streaming task failure must restart the query ratherthan silently retry against a still-running shuffle) and
TaskSetManager(ExecutorLostFailurecounts towardmaxTaskFailuresinstead of being exempted, so executor loss propagatesas a query failure rather than a silent stall).
DAGSchedulerSuiteinto an abstractDAGSchedulerSuiteBase+TestDAGSchedulertrait so the new suite can reuse the existing scheduler test harness without duplicatingit.
Why are the changes needed?
Real-time mode is the only execution model in which a Structured Streaming query can deliver sub-100ms end-to-end latency, and concurrent stage scheduling is a hard prerequisite for it.
Here's why the default scheduler can't deliver that on its own:
Sequential stage execution is the latency floor for streaming. The default
DAGSchedulerwaits for stage N to complete — every task done, every byte written to shuffle storage,every map output registered with the
MapOutputTracker— before submitting stage N+1. For a typical streaming query with a source, a stateful operator, and a sink, that means eachmicro-batch's latency is the sum of each stage's processing time plus the sum of each shuffle's serialization/deserialization cost. Even with small per-stage costs, the sum
dominates as queries get more complex, and there's no architectural way to reduce it within the existing scheduler.
Real-time mode pipes data between stages via a streaming shuffle, not a materialized one. Downstream tasks subscribe to upstream tasks' output as it's produced — there's no "stage N
is done, here are the map outputs" handoff. For that to work, all stages of the job must be running simultaneously when records start flowing. If stage N+1 isn't running yet, stage N
has no consumer for the records it produces and either drops them or blocks. So "schedule all stages concurrently" isn't an optimization for real-time mode — it's a correctness
requirement of the streaming shuffle.
Failure semantics also have to change. In batch mode, a task failure caused by an executor crash is exempted from the failure count because the executor's loss isn't the task's
fault and the framework can re-run the task elsewhere. In real-time mode that exemption is wrong: the streaming shuffle has in-flight records that can't be reconstructed, so an executor
loss must fail the query and let it restart from a checkpoint. Similarly, retrying a single task against a streaming shuffle that's already partially consumed would corrupt state — so
concurrent-stage TaskSets are capped at
maxFailures=1.The default scheduler must stay untouched for batch. Real-time mode is opt-in and additive — the cluster still needs to run batch and non-real-time streaming jobs with their
existing semantics. Hence the scheduler-type config, the per-job opt-in property, and the empty-by-default
onFinalStageCreatedhook on the baseDAGScheduler: when nothing opts in,nothing changes.
Does this PR introduce any user-facing change?
No user-facing behavior change for any existing workload. Without setting the new config,
SparkContextbuilds the sameDAGSchedulerit always has, and the default scheduler'sbehavior is unchanged.
The PR does introduce two new internal configs (both
internal(), so not part of the public surface):spark.scheduler.dagSchedulerType— chooses theDAGSchedulerimplementation. Defaults to"DAGScheduler".spark.scheduler.realtimeModeSlotsCheck.disabled— skips the slot-availability check used by the concurrent scheduler. Defaults tofalse.And one new error class:
CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT— thrown by the concurrent scheduler when a streaming job needs more concurrent slots than the cluster offers.How was this patch tested?
Added one new test suite plus three targeted regression tests:
ConcurrentStageDAGSchedulerSuite— exercises the new scheduler end-to-end through the existingDAGSchedulerSuiteBasetest harness:runningStageson submission; the child stage's task completions are buffered until the parent finishes.spark.speculation=truefails on submission with a clear error.By inheriting from
DAGSchedulerSuiteBase, the suite also runs all 149 existingDAGSchedulertests againstConcurrentStageDAGScheduler— free regression coverage that the newscheduler behaves identically to
DAGSchedulerwhen concurrent mode is not engaged. Total: 153 tests pass.TaskSchedulerImplSuite— one new test: aTaskSetwithstreaming.concurrent.stages.enabled=trueis submitted withmaxTaskFailures=1regardless ofspark.task.maxFailures;a regular
TaskSetstill gets the cluster default. Regression-guards both branches of the new conditional.TaskSetManagerSuite— two new tests covering the new failure-counting behavior:ExecutorLostFailurewithexitCausedByApp=falsecounts towardmaxTaskFailures(the query restarts rather than silently absorbing executorloss).
Full run:
core/testOnly *DAGSchedulerSuite *ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite→ 484 tests, all pass.Was this patch authored or co-authored using generative AI tooling?
Co-authored with Claude Code (Claude Opus 4.7)