Skip to content

[SPARK-56693][K8S] Support built-in K8s ExecutorPVCResizePlugin#55642

Open
dongjoon-hyun wants to merge 1 commit intoapache:masterfrom
dongjoon-hyun:SPARK-56693
Open

[SPARK-56693][K8S] Support built-in K8s ExecutorPVCResizePlugin#55642
dongjoon-hyun wants to merge 1 commit intoapache:masterfrom
dongjoon-hyun:SPARK-56693

Conversation

@dongjoon-hyun
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun commented May 1, 2026

What changes were proposed in this pull request?

This PR aims to support a new ExecutorPVCResizePlugin that monitors executor PVC disk usage
and grows each PVC's spec.resources.requests.storage when usage exceeds a
threshold.

The executor side reports the max filesystem usage ratio across
DiskBlockManager.localDirs. The driver side patches the executor pod's PVCs to
currentSize * (1 + factor) when the reported ratio exceeds the threshold.

New configurations:

Key Default Meaning
spark.kubernetes.executor.pvc.resizeInterval 0min Resize check interval. 0 disables.
spark.kubernetes.executor.pvc.resizeThreshold 0.5 Usage ratio above which a resize is triggered.
spark.kubernetes.executor.pvc.resizeFactor 1.0 Growth factor.

Note that the public cloud PVC has clear limitations. For example, you can increase only and once per every six hour. So, for a short Spark job (whose lifetime is less than 6 hours), ExecutorPVCResizePlugin is able to increase only one time. For a recently increased PVCs, K8s will show a warning something like the following.

  Warning  VolumeResizeFailed          6s (x8 over 35s)   external-resizer ebs.csi.aws.com
                     resize volume "pvc-baccb8c8-16e7-4b31-b254-dd223a3e0be3" by resizer "ebs.csi.aws.com" failed: rpc error:
code = Internal desc = Could not resize volume "vol-0cb3afad24b360ebd": rpc error: code = Internal desc = Could not modify vol
ume "vol-0cb3afad24b360ebd": volume "vol-0cb3afad24b360ebd" in OPTIMIZING state, cannot currently modify

Why are the changes needed?

PVC-backed SPARK_LOCAL_DIRS must be sized conservatively up front to avoid
mid-job disk-full failures, which wastes storage cost. ExecutorResizePlugin
already established the observe-and-patch pattern for memory; this extends it to
PVC storage.

Does this PR introduce any user-facing change?

No. The user needs to set this to spark.plugins explicitly.

SUBMIT

bin/spark-submit \                                                                                  
--master k8s://$K8S_MASTER \                                                                        
--deploy-mode cluster \                                                                             
-c spark.executor.cores=4 \                                                                         
-c spark.executor.memory=4g \                                                                       
-c spark.kubernetes.container.image=docker.apple.com/d_hyun/spark:20260430 \                        
-c spark.kubernetes.authenticate.driver.serviceAccountName=spark \                                  
-c spark.kubernetes.driver.pod.name=pi \                                                            
-c spark.kubernetes.executor.podNamePrefix=pi \                                                     
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data \     
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false \ 
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=50Gi \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3 \
-c spark.kubernetes.driver.podTemplateFile=eks-root-pod.yml \                                       
-c spark.kubernetes.executor.podTemplateFile=eks-root-pod.yml \                                     
-c spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin \                   
-c spark.kubernetes.executor.pvc.resizeInterval=1m \                                                
--class org.apache.spark.examples.SparkPi \                                                         
local:///opt/spark/examples/jars/spark-examples.jar 400000    

DRIVER LOGS

$ kubectl logs -f pi | grep Plugin
26/05/01 01:39:38 INFO ExecutorPVCResizeDriverPlugin: ExecutorPVCResizeDriverPlugin is scheduled
26/05/01 01:39:38 INFO DriverPluginContainer: Initialized driver component for plugin org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin.
26/05/01 01:40:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {}
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {1=PVCDiskUsageReport(1,0.6136656796630462), 2=PVCDiskUsageReport(2,3.507855787665699E-4)}
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 1 PVC pi-exec-1-pvc-0 with ratio 0.6136656796630462 (threshold 0.5).
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Increase PVC pi-exec-1-pvc-0 storage from 53687091200 to 107374182400 as usage ratio exceeded threshold.
26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 2 PVC pi-exec-2-pvc-0 with ratio 3.507855787665699E-4 (threshold 0.5).

EXECUTOR SIZE REPORTING (60% -> 30%)

$ kubectl logs -f pi-exec-1 | grep Plugin
26/05/01 01:22:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.6136656796630462
26/05/01 01:23:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.30591566408202353

RESIZED PVC

$ kubectl get pvc
NAME              STATUS   VOLUME                                     CAPACITY       ACCESS MODES   STORAGECLASS   VOLUMEATTRIBUTESCLASS   AGE
pi-exec-1-pvc-0   Bound    pvc-d279a3da-ddfb-41c2-a32b-0f2bd83941c4   107374182400   RWOP           gp3            <unset>                 2m28s
pi-exec-2-pvc-0   Bound    pvc-79f092d3-4a8d-4981-946d-d745d4038fd6   50Gi           RWOP           gp3            <unset>                 2m28s

How was this patch tested?

Pass the CIs with a new ExecutorPVCResizePluginSuite.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7 (1M context)

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Could you review this PR when you have some time, @viirya ?

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

Could you review this PR, @peter-toth ?

viirya
viirya previously approved these changes May 1, 2026
@viirya viirya dismissed their stale review May 1, 2026 05:05

found a few issues.

.toSet
}

private def tryResize(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryResize grows the PVC from the current spec.resources.requests.storage every time the reported ratio is above the threshold. After a successful PVC spec patch, the executor filesystem capacity may not reflect the new size immediately, so the next reporting interval can still report the old high usage ratio. With a 1-minute interval and the default growth factor, a 50Gi PVC could become 100Gi, then 200Gi, then 400Gi before the first resize is actually completed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is especially risky because the PR description already notes that some cloud providers only allow volume modification once every several hours. The current code only blacklists PVCs when the synchronous patch call throws, but async resizer failures or pending resize states are not detected.

I think this should track per-PVC in-flight resize state or cooldown, and check PVC status/conditions before issuing another patch. At minimum, it should avoid issuing another expansion while spec.requests.storage is already larger than the observed/status capacity.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the case you mention doesn't happen, @viirya . It's because the second and subsequent invocation will take no harm from Spark side according to the K8s design policy. We can request the desirable status multiple times, but the second and subsequent requests are not accepted by K8s control plane for next 6 hours.

In other words, the expansion will be processed at every 6 hours. The only exception is we can expand once at any time after creation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, is 6-hour behavior guaranteed by a Kubernetes control-plane? That sounds like an AWS EBS backend limitation rather than a generic Kubernetes PVC expansion policy? I only found that it is mentioned in AWS EBS article like this.

From the Kubernetes side, users request expansion by updating the PVC spec to a larger requested size, and the controller reconciles that desired state asynchronously. The Kubernetes docs also say failed expansion requests are continuously retried and should be monitored via PVC status/events. I don't see a generic Kubernetes rule that rejects subsequent larger PVC spec updates while a previous resize is pending.

So even if EBS rejects actual backend ModifyVolume calls within 6 hours, the Spark driver may still keep increasing the PVC desired size in Kubernetes, for example from 50Gi to 100Gi to 200Gi, before the first resize has converged. That changes the target state and can result in over-expansion once the backend is eventually able to process it, or leave the PVC stuck retrying a much larger requested size.

I think the plugin should be storage-provider-neutral and should not rely on AWS EBS cooldown behavior for safety. A simple guard such as skipping when spec.resources.requests.storage is already greater than status.capacity["storage"], or otherwise tracking an in-flight resize per PVC, would prevent repeated target inflation while still following Kubernetes' desired-state model.

}.maxOption
maxRatio.foreach { ratio =>
logInfo(s"Reporting max PVC disk usage ratio for executor ${env.executorId}: $ratio")
pluginContext.send(PVCDiskUsageReport(env.executorId, ratio))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor reports only the maximum usage ratio across all SPARK_LOCAL_DIRS, and the driver applies that same ratio to every PVC mounted by the executor container. This means that with multiple Spark local-dir PVCs, an empty PVC can be expanded just because another local dir is full. It can also resize unrelated PVCs mounted into the executor container, since pvcsOf returns all mounted PVCs rather than only the PVC backing the reported local dir.

The class comment says the driver maps each reported mount path back to the executor pod’s PVC, but the message does not contain any mount path or per-directory usage, so the implementation cannot actually do that.

I think the executor should report per-local-dir usage, including the path, and the driver should map each reported path to the corresponding volumeMount/PVC. It should also limit resizing to Spark local-dir PVCs rather than every PVC mounted in the executor container.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests cover the basic above/below-threshold behavior, patch failure blacklisting, and pods without PVCs, but they do not cover the main correctness risks of this feature.

I think we probably should add tests for:

  • Multiple local-dir PVCs where only one directory is above the threshold. Only the PVC backing that directory should be resized.
  • Executor pods with additional non-local-dir PVC mounts. Those PVCs should not be resized.
  • A PVC whose spec.resources.requests.storage is already larger than status.capacity, meaning a resize is already pending. The plugin should not issue another expansion.
  • Repeated check intervals with the same high usage report before the filesystem capacity changes. This should not repeatedly double the PVC size.
  • The actual patched storage value, not just that patch() was called.

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

dongjoon-hyun commented May 1, 2026

Multiple local-dir PVCs where only one directory is above the threshold. Only the PVC backing that directory should be resized.

It's one of theoretically proper alternatives, but it's not MUST-HAVE. I chose this simple design for the heavy load executor because all PVCs (of that executor) will be released (affected) if the executor dies with Out-Of-Disk of one PVC.

Executor pods with additional non-local-dir PVC mounts. Those PVCs should not be resized.

ExecutorPVCResizePlugin is supposed to be used for homogeneous PVC shuffle storage case (the following) and not designed for those heterogeneous scenario.

-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=50Gi \
-c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3 \

A PVC whose spec.resources.requests.storage is already larger than status.capacity, meaning a resize is already pending. The plugin should not issue another expansion.

As I mentioned, it sounds good but it's not K8s design policy where requesters set the desirable status and controller meets the target asynchronously. There is no need to be SHOULD NOT.

Repeated check intervals with the same high usage report before the filesystem capacity changes. This should not repeatedly double the PVC size. The actual patched storage value, not just that patch() was called.

Technically, hourly check is assumed for the long running jobs. The main reason why the interval is MINUTE is only to support the first expansion after creation for small jobs, @viirya .

@dongjoon-hyun
Copy link
Copy Markdown
Member Author

dongjoon-hyun commented May 1, 2026

I'd like to explain the background a little more, @viirya .

  1. K8s VolumeResizeFailed warning message is a typical message type in K8s world. K8s control plane shows those kind of messages until they reach the user-requested desirable status. It's the pattern of asynchronous behavior.

  2. This PR is a part of SPARK-55555 Support heterogeneous K8s executor management which aims to make Spark more robust proactively from the pod failure by allowing the resource adjustment. There is no risk of correctness here.

@viirya
Copy link
Copy Markdown
Member

viirya commented May 1, 2026

It's one of theoretically proper alternatives, but it's not MUST-HAVE. I chose this simple design for the heavy load executor because all PVCs (of that executor) will be released (affected) if the executor dies with Out-Of-Disk of one PVC.

Thanks, I understand the intended trade-off now: if any local-dir PVC can kill the executor when it runs out of space, resizing all local-dir PVCs for that executor is a simpler conservative policy.

My remaining concern is that the current implementation does not limit this policy to Spark local-dir PVCs. It resizes every PVC mounted by the executor container, including PVCs that are unrelated to Spark local storage. If the intended contract is “all Spark local-dir PVCs are resized together for a homogeneous local-dir setup,” then I think the code and docs should make that explicit and avoid touching non-local-dir PVCs.

@viirya
Copy link
Copy Markdown
Member

viirya commented May 1, 2026

ExecutorPVCResizePlugin is supposed to be used for homogeneous PVC shuffle storage case (the following) and not designed for those heterogeneous scenario.

Thanks, that helps clarify the intended scope. If the intended contract is a homogeneous PVC shuffle/local-dir storage setup, then I think we should make that contract explicit.

Right now the implementation resizes every PVC mounted by the executor container, but the docs/configs only say “executor PVC” and do not warn that heterogeneous PVC mounts are unsupported. Users may reasonably combine Spark local-dir PVCs with other executor PVC mounts via pod templates, and those unrelated PVCs would also be resized.

Could we either document this limitation very explicitly, or preferably restrict the implementation to Spark local-dir PVC mounts only? That would preserve the simple homogeneous design while avoiding accidental resize of unrelated PVCs.

@viirya
Copy link
Copy Markdown
Member

viirya commented May 1, 2026

As I mentioned, it sounds good but it's not K8s design policy where requesters set the desirable status and controller meets the target asynchronously. There is no need to be SHOULD NOT.

I agree with the Kubernetes desired-state model, but my concern is not about reasserting the same desired state. The current code computes a new desired state from the already-increased spec on each interval.

If the PVC has status.capacity=50Gi and spec.requests=100Gi, the previous resize is still pending. If the executor still reports high usage because the filesystem is still 50Gi, the next check will grow the desired state from 100Gi to 200Gi. That is not just re-requesting the same desired state; it is inflating the target based on stale capacity.

So the guard is not against Kubernetes async reconciliation itself. It is to avoid deriving another larger target while the previous target has not converged yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants