Skip to content

[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055

Open
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:oss-concurrent-stage-scheduler
Open

[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055
jerrypeng wants to merge 1 commit into
apache:masterfrom
jerrypeng:oss-concurrent-stage-scheduler

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

@jerrypeng jerrypeng commented May 22, 2026

What changes were proposed in this pull request?

This PR introduces ConcurrentStageDAGScheduler, the scheduler needed to power real-time mode for Structured Streaming.

In real-time mode, a streaming query continuously produces output with end-to-end latency on the order of tens of milliseconds — far below the latency floor of traditional micro-batch
execution. To get there, the query has to abandon the "run stage N, materialize its shuffle output, then run stage N+1" model that the default DAGScheduler enforces. Instead, every
stage of the query must run at the same time, with records flowing from upstream tasks to downstream tasks through a streaming shuffle as they're produced.

ConcurrentStageDAGScheduler is the scheduling half of that design. Concretely, it:

  • Adds a new opt-in DAGScheduler implementation, ConcurrentStageDAGScheduler, selected via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler and engaged per-job via the
    streaming.concurrent.stages.enabled=true property.
  • Walks the stage DAG on job submission, marks all stages reachable from the final stage as concurrent, and validates that the cluster has enough free slots to run them all
    simultaneously (gated by spark.scheduler.realtimeModeSlotsCheck.disabled, with a new CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class for the failure path).
  • Submits child stages while their parents are still running, and defers a child stage's task-completion events until every concurrent parent has actually finished — preserving the
    invariant that DAGScheduler only sees "all of a stage's parents are done" task completions, even though tasks are physically running concurrently.
  • Adds the smallest possible extension points to DAGScheduler to make this possible: one empty onFinalStageCreated hook, two thin protected helpers (submitConcurrentStage,
    postSchedulerEvent), two package-private accessors (isRunningStage, getStage), and relaxes submitStage and markStageAsFinished from private to protected. All are no-ops
    for the default scheduler.
  • Wires real-time-mode-aware behavior into TaskSchedulerImpl (TaskSets with concurrent stages get maxTaskFailures=1, since a streaming task failure must restart the query rather
    than silently retry against a still-running shuffle) and TaskSetManager (ExecutorLostFailure counts toward maxTaskFailures instead of being exempted, so executor loss propagates
    as a query failure rather than a silent stall).
  • Refactors DAGSchedulerSuite into an abstract DAGSchedulerSuiteBase + TestDAGScheduler trait so the new suite can reuse the existing scheduler test harness without duplicating
    it.

Why are the changes needed?

Real-time mode is the only execution model in which a Structured Streaming query can deliver sub-100ms end-to-end latency, and concurrent stage scheduling is a hard prerequisite for it.
Here's why the default scheduler can't deliver that on its own:

Sequential stage execution is the latency floor for streaming. The default DAGScheduler waits for stage N to complete — every task done, every byte written to shuffle storage,
every map output registered with the MapOutputTracker — before submitting stage N+1. For a typical streaming query with a source, a stateful operator, and a sink, that means each
micro-batch's latency is the sum of each stage's processing time plus the sum of each shuffle's serialization/deserialization cost. Even with small per-stage costs, the sum
dominates as queries get more complex, and there's no architectural way to reduce it within the existing scheduler.

Real-time mode pipes data between stages via a streaming shuffle, not a materialized one. Downstream tasks subscribe to upstream tasks' output as it's produced — there's no "stage N
is done, here are the map outputs" handoff. For that to work, all stages of the job must be running simultaneously when records start flowing. If stage N+1 isn't running yet, stage N
has no consumer for the records it produces and either drops them or blocks. So "schedule all stages concurrently" isn't an optimization for real-time mode — it's a correctness
requirement of the streaming shuffle.

Failure semantics also have to change. In batch mode, a task failure caused by an executor crash is exempted from the failure count because the executor's loss isn't the task's
fault and the framework can re-run the task elsewhere. In real-time mode that exemption is wrong: the streaming shuffle has in-flight records that can't be reconstructed, so an executor
loss must fail the query and let it restart from a checkpoint. Similarly, retrying a single task against a streaming shuffle that's already partially consumed would corrupt state — so
concurrent-stage TaskSets are capped at maxFailures=1.

The default scheduler must stay untouched for batch. Real-time mode is opt-in and additive — the cluster still needs to run batch and non-real-time streaming jobs with their
existing semantics. Hence the scheduler-type config, the per-job opt-in property, and the empty-by-default onFinalStageCreated hook on the base DAGScheduler: when nothing opts in,
nothing changes.

Does this PR introduce any user-facing change?

No user-facing behavior change for any existing workload. Without setting the new config, SparkContext builds the same DAGScheduler it always has, and the default scheduler's
behavior is unchanged.

The PR does introduce two new internal configs (both internal(), so not part of the public surface):

  • spark.scheduler.dagSchedulerType — chooses the DAGScheduler implementation. Defaults to "DAGScheduler".
  • spark.scheduler.realtimeModeSlotsCheck.disabled — skips the slot-availability check used by the concurrent scheduler. Defaults to false.

And one new error class:

  • CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT — thrown by the concurrent scheduler when a streaming job needs more concurrent slots than the cluster offers.

How was this patch tested?

Added one new test suite plus three targeted regression tests:

  1. ConcurrentStageDAGSchedulerSuite — exercises the new scheduler end-to-end through the existing DAGSchedulerSuiteBase test harness:

    • Simple two-stage concurrent job: both stages enter runningStages on submission; the child stage's task completions are buffered until the parent finishes.
    • Concurrent stages disabled in properties: scheduler falls back to default sequential behavior.
    • Complex six-stage DAG with diamond dependencies: verifies parent-tracking, deferred-event buffering, and the correct release order when parents finish out of order.
    • Speculation rejected: a job with concurrent stages and spark.speculation=true fails on submission with a clear error.

    By inheriting from DAGSchedulerSuiteBase, the suite also runs all 149 existing DAGScheduler tests against ConcurrentStageDAGScheduler — free regression coverage that the new
    scheduler behaves identically to DAGScheduler when concurrent mode is not engaged. Total: 153 tests pass.

  2. TaskSchedulerImplSuite — one new test: a TaskSet with streaming.concurrent.stages.enabled=true is submitted with maxTaskFailures=1 regardless of spark.task.maxFailures;
    a regular TaskSet still gets the cluster default. Regression-guards both branches of the new conditional.

  3. TaskSetManagerSuite — two new tests covering the new failure-counting behavior:

    • With concurrent stages enabled, an ExecutorLostFailure with exitCausedByApp=false counts toward maxTaskFailures (the query restarts rather than silently absorbing executor
      loss).
    • Without concurrent stages, the same failure does not count — regression guard for the default behavior.

Full run: core/testOnly *DAGSchedulerSuite *ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite484 tests, all pass.

Was this patch authored or co-authored using generative AI tooling?

Co-authored with Claude Code (Claude Opus 4.7)

@jerrypeng jerrypeng changed the title [SPARK-XXXXX][CORE] Add ConcurrentStageDAGScheduler for low-latency s… [SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode May 22, 2026
…treaming

Ports the ConcurrentStageDAGScheduler from the Databricks runtime so that
streaming queries can opt in to a "real-time" execution mode that runs all
stages of a job concurrently rather than sequentially.

When enabled via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler
and the per-job streaming.concurrent.stages.enabled property, the scheduler:

- Marks all ancestor stages of the final stage as concurrent on job submission
  and validates that the cluster has enough free slots
  (CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT), gated by
  spark.scheduler.realtimeModeSlotsCheck.disabled.
- Submits child stages while parents are still running, delays task completion
  events for a child whose parent is still running, and replays the delayed
  events when the parent finishes.
- Rejects speculative execution.

DAGScheduler changes (no-op for the default scheduler):
- New protected onFinalStageCreated hook, invoked from handleJobSubmitted /
  handleMapStageSubmitted right after final stage creation.
- New protected submitConcurrentStage and postSchedulerEvent helpers.
- New package-visible isRunningStage and getStage accessors.
- submitStage and markStageAsFinished relaxed from private to protected so
  subclasses can override them.

DAGSchedulerSuite refactor:
- Renames the concrete suite to abstract DAGSchedulerSuiteBase and adds an
  empty class DAGSchedulerSuite extends DAGSchedulerSuiteBase to preserve
  the existing entry point.
- Extracts a TestDAGScheduler trait carrying the scheduleShuffleMergeFinalize
  and handleTaskCompletion overrides; MyDAGScheduler mixes the trait in.
- Adds a protected createInitialScheduler hook used by init().
- Loosens submit, completeShuffleMapStageSuccessfully,
  completeNextResultStageWithSuccess, and assertDataStructuresEmpty to
  protected so subclass suites can use them.

Integration:
- SparkContext picks the scheduler implementation based on
  spark.scheduler.dagSchedulerType.
- TaskSchedulerImpl uses maxFailures=1 for concurrent-stage TaskSets so a
  failure restarts the streaming query instead of being silently retried.
- TaskSetManager counts ExecutorLostFailure toward task failures and skips
  the "executor lost is not the task's fault" exemption in concurrent mode.

Adds the supporting LogKeys (PARENT_STAGE, STREAMING_QUERY_ID) and the
CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class.

Deviations from the runtime source kept to the minimum necessary to compile
in OSS:
- Extends DAGScheduler directly (runtime extends CrossJobDepDAGScheduler,
  which gates micro-batch pipelining; not part of OSS).
- Hook is named onFinalStageCreated rather than the runtime's
  populateCrossJobDepInfo, since CrossJobDepDAGScheduler is not part of OSS.
- Micro-batch pipelining co-existence check (and its test) dropped, since
  MBP is not part of OSS.
- getStreamingBatchIdFromProperties and StreamingBatchId live in the
  companion object instead of CrossJobDepDAGScheduler.
- Slot check uses sc.schedulerBackend.defaultParallelism() in place of
  the runtime's TaskSchedulerStats helper.
- DatabricksEdgeConfigs.serverlessEnabled gating removed; the
  spark.scheduler.realtimeModeSlotsCheck.disabled config is the sole knob.
- isConcurrentStagesEnabled tolerates null Properties (OSS TaskSet allows
  null in tests).

Co-authored-by: Isaac
@jerrypeng jerrypeng force-pushed the oss-concurrent-stage-scheduler branch from 3d0058f to e2a204b Compare May 22, 2026 17:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant