Skip to content

Commit 8e37824

Browse files
committed
[SPARK-56693][K8S] Support built-in K8s ExecutorPVCResizePlugin
### What changes were proposed in this pull request? This PR aims to support a new `ExecutorPVCResizePlugin` that monitors executor PVC disk usage and grows each PVC's `spec.resources.requests.storage` when usage exceeds a threshold. The executor side reports the max filesystem usage ratio across `DiskBlockManager.localDirs`. The driver side patches the executor pod's PVCs to `currentSize * (1 + factor)` when the reported ratio exceeds the threshold. New configurations: | Key | Default | Meaning | |---|---|---| | `spark.kubernetes.executor.pvc.resizeInterval` | `0min` | Resize check interval. `0` disables. | | `spark.kubernetes.executor.pvc.resizeThreshold` | `0.5` | Usage ratio above which a resize is triggered. | | `spark.kubernetes.executor.pvc.resizeFactor` | `1.0` | Growth factor. | Note that the public cloud PVC has clear limitations. For example, you can increase only and **once per every six hour**. So, for a short Spark job (whose lifetime is less than 6 hours), `ExecutorPVCResizePlugin` is able to increase only one time. For a recently increased PVCs, K8s will show a warning something like the following. ``` Warning VolumeResizeFailed 6s (x8 over 35s) external-resizer ebs.csi.aws.com resize volume "pvc-baccb8c8-16e7-4b31-b254-dd223a3e0be3" by resizer "ebs.csi.aws.com" failed: rpc error: code = Internal desc = Could not resize volume "vol-0cb3afad24b360ebd": rpc error: code = Internal desc = Could not modify vol ume "vol-0cb3afad24b360ebd": volume "vol-0cb3afad24b360ebd" in OPTIMIZING state, cannot currently modify ``` ### Why are the changes needed? PVC-backed `SPARK_LOCAL_DIRS` must be sized conservatively up front to avoid mid-job disk-full failures, which wastes storage cost. `ExecutorResizePlugin` already established the observe-and-patch pattern for memory; this extends it to PVC storage. ### Does this PR introduce _any_ user-facing change? No. The user needs to set this to `spark.plugins` explicitly. **SUBMIT** ``` bin/spark-submit \ --master k8s://$K8S_MASTER \ --deploy-mode cluster \ -c spark.executor.cores=4 \ -c spark.executor.memory=4g \ -c spark.kubernetes.container.image=docker.apple.com/d_hyun/spark:20260430 \ -c spark.kubernetes.authenticate.driver.serviceAccountName=spark \ -c spark.kubernetes.driver.pod.name=pi \ -c spark.kubernetes.executor.podNamePrefix=pi \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=50Gi \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3 \ -c spark.kubernetes.driver.podTemplateFile=eks-root-pod.yml \ -c spark.kubernetes.executor.podTemplateFile=eks-root-pod.yml \ -c spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin \ -c spark.kubernetes.executor.pvc.resizeInterval=1m \ --class org.apache.spark.examples.SparkPi \ local:///opt/spark/examples/jars/spark-examples.jar 400000 ``` **DRIVER LOGS** ``` $ kubectl logs -f pi | grep Plugin 26/05/01 01:39:38 INFO ExecutorPVCResizeDriverPlugin: ExecutorPVCResizeDriverPlugin is scheduled 26/05/01 01:39:38 INFO DriverPluginContainer: Initialized driver component for plugin org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin. 26/05/01 01:40:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {} 26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Latest PVC usage reports: {1=PVCDiskUsageReport(1,0.6136656796630462), 2=PVCDiskUsageReport(2,3.507855787665699E-4)} 26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 1 PVC pi-exec-1-pvc-0 with ratio 0.6136656796630462 (threshold 0.5). 26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Increase PVC pi-exec-1-pvc-0 storage from 53687091200 to 107374182400 as usage ratio exceeded threshold. 26/05/01 01:41:38 INFO ExecutorPVCResizeDriverPlugin: Try to resize executor 2 PVC pi-exec-2-pvc-0 with ratio 3.507855787665699E-4 (threshold 0.5). ``` **EXECUTOR SIZE REPORTING (60% -> 30%)** ``` $ kubectl logs -f pi-exec-1 | grep Plugin 26/05/01 01:22:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.6136656796630462 26/05/01 01:23:54 INFO ExecutorPVCResizeExecutorPlugin: Reporting max PVC disk usage ratio for executor 1: 0.30591566408202353 ``` **RESIZED PVC** ``` $ kubectl get pvc NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS VOLUMEATTRIBUTESCLASS AGE pi-exec-1-pvc-0 Bound pvc-d279a3da-ddfb-41c2-a32b-0f2bd83941c4 107374182400 RWOP gp3 <unset> 2m28s pi-exec-2-pvc-0 Bound pvc-79f092d3-4a8d-4981-946d-d745d4038fd6 50Gi RWOP gp3 <unset> 2m28s ``` ### How was this patch tested? Pass the CIs with a new `ExecutorPVCResizePluginSuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.7 (1M context) Closes #55642 from dongjoon-hyun/SPARK-56693. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent f4b320e commit 8e37824

4 files changed

Lines changed: 587 additions & 0 deletions

File tree

docs/running-on-kubernetes.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,36 @@ See the [configuration page](configuration.html) for information on Spark config
18161816
</td>
18171817
<td>4.2.0</td>
18181818
</tr>
1819+
<tr>
1820+
<td><code>spark.kubernetes.executor.pvc.resizeInterval</code></td>
1821+
<td><code>0min</code></td>
1822+
<td>
1823+
Interval between executor PVC resize operations. To disable, set 0 (default).
1824+
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
1825+
is registered via <code>spark.plugins</code>.
1826+
</td>
1827+
<td>4.2.0</td>
1828+
</tr>
1829+
<tr>
1830+
<td><code>spark.kubernetes.executor.pvc.resizeThreshold</code></td>
1831+
<td><code>0.5</code></td>
1832+
<td>
1833+
The PVC usage ratio (used / capacity) above which the driver triggers a resize.
1834+
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
1835+
is registered via <code>spark.plugins</code>.
1836+
</td>
1837+
<td>4.2.0</td>
1838+
</tr>
1839+
<tr>
1840+
<td><code>spark.kubernetes.executor.pvc.resizeFactor</code></td>
1841+
<td><code>1.0</code></td>
1842+
<td>
1843+
The factor to grow PVC storage by, relative to the current request.
1844+
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
1845+
is registered via <code>spark.plugins</code>.
1846+
</td>
1847+
<td>4.2.0</td>
1848+
</tr>
18191849
</table>
18201850

18211851
#### Pod template properties

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,30 @@ private[spark] object Config extends Logging {
262262
.checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
263263
.createWithDefault(0.1)
264264

265+
val PVC_RESIZE_INTERVAL =
266+
ConfigBuilder("spark.kubernetes.executor.pvc.resizeInterval")
267+
.doc("Interval between executor PVC resize operations. To disable, set 0 (default)")
268+
.version("4.2.0")
269+
.timeConf(TimeUnit.MINUTES)
270+
.checkValue(_ >= 0, "Interval should be non-negative")
271+
.createWithDefault(0)
272+
273+
val PVC_RESIZE_THRESHOLD =
274+
ConfigBuilder("spark.kubernetes.executor.pvc.resizeThreshold")
275+
.doc("The PVC usage ratio (used / capacity) above which the driver triggers a resize.")
276+
.version("4.2.0")
277+
.doubleConf
278+
.checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)")
279+
.createWithDefault(0.5)
280+
281+
val PVC_RESIZE_FACTOR =
282+
ConfigBuilder("spark.kubernetes.executor.pvc.resizeFactor")
283+
.doc("The factor to grow PVC storage by, relative to the current request.")
284+
.version("4.2.0")
285+
.doubleConf
286+
.checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
287+
.createWithDefault(1.0)
288+
265289
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
266290
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
267291
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster.k8s
18+
19+
import java.util.{Map => JMap}
20+
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
21+
22+
import scala.jdk.CollectionConverters._
23+
24+
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimBuilder, Pod, Quantity}
25+
import io.fabric8.kubernetes.client.KubernetesClient
26+
import io.fabric8.kubernetes.client.dsl.base.PatchContext
27+
import io.fabric8.kubernetes.client.dsl.base.PatchType
28+
29+
import org.apache.spark.{SparkContext, SparkEnv}
30+
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
31+
import org.apache.spark.deploy.k8s.Config._
32+
import org.apache.spark.deploy.k8s.Constants._
33+
import org.apache.spark.internal.Logging
34+
import org.apache.spark.internal.LogKeys.{CURRENT_DISK_SIZE, ORIGINAL_DISK_SIZE, PVC_METADATA_NAME}
35+
import org.apache.spark.util.ThreadUtils
36+
37+
/**
38+
* Spark plugin to monitor executor PVC disk usage and grow the PVC storage request
39+
* when the usage exceeds a configurable threshold.
40+
*
41+
* Executors measure their own local-directory usage (via DiskBlockManager) and report
42+
* the maximum filesystem usage ratio to the driver through the plugin RPC channel.
43+
* When the ratio exceeds the threshold, the driver patches every `spark-local-dir-*`
44+
* PVC mounted by the reporting executor's pod to grow its
45+
* `spec.resources.requests.storage`. The underlying StorageClass must have
46+
* `allowVolumeExpansion: true`.
47+
*/
48+
class ExecutorPVCResizePlugin extends SparkPlugin {
49+
override def driverPlugin(): DriverPlugin = new ExecutorPVCResizeDriverPlugin()
50+
51+
override def executorPlugin(): ExecutorPlugin = new ExecutorPVCResizeExecutorPlugin()
52+
}
53+
54+
/**
55+
* Message sent from each executor to the driver with the maximum filesystem usage
56+
* ratio (used / total) across the executor's SPARK_LOCAL_DIRS. The driver applies
57+
* this ratio to every PVC mounted by the reporting executor's pod.
58+
*/
59+
private[k8s] case class PVCDiskUsageReport(
60+
executorId: String,
61+
ratio: Double)
62+
63+
class ExecutorPVCResizeDriverPlugin extends DriverPlugin with Logging {
64+
private var sparkContext: SparkContext = _
65+
private var namespace: String = _
66+
private var threshold: Double = _
67+
private var factor: Double = _
68+
69+
private val latestReports = new ConcurrentHashMap[String, PVCDiskUsageReport]()
70+
private val failedPvcs = ConcurrentHashMap.newKeySet[String]()
71+
private val requestedSizes = new ConcurrentHashMap[String, Long]()
72+
73+
private val periodicService: ScheduledExecutorService =
74+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-plugin")
75+
76+
override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
77+
val interval = sc.conf.get(PVC_RESIZE_INTERVAL)
78+
if (interval <= 0) {
79+
logInfo("PVCResizePlugin disabled (interval <= 0).")
80+
return Map.empty[String, String].asJava
81+
}
82+
threshold = sc.conf.get(PVC_RESIZE_THRESHOLD)
83+
factor = sc.conf.get(PVC_RESIZE_FACTOR)
84+
namespace = sc.conf.get(KUBERNETES_NAMESPACE)
85+
sparkContext = sc
86+
87+
periodicService.scheduleAtFixedRate(() => if (!sparkContext.isStopped) {
88+
try {
89+
checkAndResizePVCs()
90+
} catch {
91+
case e: Throwable => logError("Error in PVC resize thread", e)
92+
}
93+
}, interval, interval, TimeUnit.MINUTES)
94+
logInfo("ExecutorPVCResizeDriverPlugin is scheduled")
95+
96+
// Propagate the interval to executors so they report at the same cadence.
97+
Map(PVC_RESIZE_INTERVAL.key -> interval.toString).asJava
98+
}
99+
100+
override def receive(message: Any): AnyRef = message match {
101+
case r: PVCDiskUsageReport =>
102+
latestReports.put(r.executorId, r)
103+
null
104+
case _ =>
105+
null
106+
}
107+
108+
override def shutdown(): Unit = {
109+
periodicService.shutdown()
110+
}
111+
112+
private[k8s] def checkAndResizePVCs(): Unit = {
113+
logInfo(s"Latest PVC usage reports: $latestReports")
114+
val appId = sparkContext.applicationId
115+
116+
sparkContext.schedulerBackend match {
117+
case b: KubernetesClusterSchedulerBackend =>
118+
val client = b.kubernetesClient
119+
val pods = client.pods()
120+
.inNamespace(namespace)
121+
.withLabel(SPARK_APP_ID_LABEL, appId)
122+
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
123+
.list()
124+
.getItems.asScala
125+
126+
val podByExecId = pods.flatMap { p =>
127+
Option(p.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)).map(_ -> p)
128+
}.toMap
129+
130+
latestReports.values().asScala.foreach { report =>
131+
podByExecId.get(report.executorId).foreach { pod =>
132+
pvcsOf(pod).foreach { pvcName =>
133+
if (!failedPvcs.contains(pvcName)) {
134+
tryResize(client, pvcName, report.ratio, report.executorId)
135+
}
136+
}
137+
}
138+
}
139+
case _ =>
140+
logWarning("Skipping PVC resize: schedulerBackend is not " +
141+
"KubernetesClusterSchedulerBackend.")
142+
}
143+
}
144+
145+
private[k8s] def pvcsOf(pod: Pod): Set[String] = {
146+
val volNameToPvc = pod.getSpec.getVolumes.asScala
147+
.filter(_.getPersistentVolumeClaim != null)
148+
.filter(_.getName.startsWith("spark-local-dir-"))
149+
.map(v => v.getName -> v.getPersistentVolumeClaim.getClaimName)
150+
.toMap
151+
pod.getSpec.getContainers.asScala
152+
.find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
153+
.orElse(pod.getSpec.getContainers.asScala.headOption)
154+
.toSeq
155+
.flatMap(_.getVolumeMounts.asScala)
156+
.flatMap(m => volNameToPvc.get(m.getName))
157+
.toSet
158+
}
159+
160+
private def tryResize(
161+
client: KubernetesClient, pvcName: String, ratio: Double, execId: String): Unit = {
162+
logInfo(s"Try to resize executor $execId PVC $pvcName with ratio $ratio " +
163+
s"(threshold $threshold).")
164+
if (ratio <= threshold) return
165+
try {
166+
val pvc = client.persistentVolumeClaims()
167+
.inNamespace(namespace)
168+
.withName(pvcName)
169+
.get()
170+
if (pvc == null) return
171+
val current = Quantity.getAmountInBytes(
172+
pvc.getSpec.getResources.getRequests.get("storage")).longValue()
173+
val capacity = Option(pvc.getStatus)
174+
.flatMap(s => Option(s.getCapacity))
175+
.flatMap(c => Option(c.get("storage")))
176+
.map(q => Quantity.getAmountInBytes(q).longValue())
177+
.getOrElse(current)
178+
if (current > capacity) {
179+
logInfo(s"PVC $pvcName resize is in progress or failed " +
180+
s"(spec=$current, status=$capacity); skip.")
181+
return
182+
}
183+
val newSize = (current * (1.0 + factor)).toLong
184+
if (requestedSizes.get(pvcName) == newSize) return
185+
logInfo(log"Increase PVC ${MDC(PVC_METADATA_NAME, pvcName)} storage " +
186+
log"from ${MDC(ORIGINAL_DISK_SIZE, current)} to " +
187+
log"${MDC(CURRENT_DISK_SIZE, newSize)} as usage ratio exceeded threshold.")
188+
val patch = new PersistentVolumeClaimBuilder()
189+
.withNewSpec()
190+
.withNewResources()
191+
.addToRequests("storage", new Quantity(newSize.toString))
192+
.endResources()
193+
.endSpec()
194+
.build()
195+
client.persistentVolumeClaims()
196+
.inNamespace(namespace)
197+
.withName(pvcName)
198+
.patch(PatchContext.of(PatchType.STRATEGIC_MERGE), patch)
199+
requestedSizes.put(pvcName, newSize)
200+
} catch {
201+
case e: Throwable =>
202+
failedPvcs.add(pvcName)
203+
logInfo(log"Failed to expand PVC ${MDC(PVC_METADATA_NAME, pvcName)}; " +
204+
log"will skip subsequent attempts.", e)
205+
}
206+
}
207+
}
208+
209+
class ExecutorPVCResizeExecutorPlugin extends ExecutorPlugin with Logging {
210+
private var pluginContext: PluginContext = _
211+
private var periodicService: ScheduledExecutorService = _
212+
213+
override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
214+
val intervalStr = extraConf.get(PVC_RESIZE_INTERVAL.key)
215+
if (intervalStr == null) {
216+
// Driver disabled the plugin; do nothing.
217+
return
218+
}
219+
val interval = intervalStr.toLong
220+
if (interval <= 0) return
221+
222+
pluginContext = ctx
223+
periodicService =
224+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-reporter")
225+
periodicService.scheduleAtFixedRate(() => {
226+
try {
227+
report()
228+
} catch {
229+
case e: Throwable => logDebug("Failed to report PVC usage", e)
230+
}
231+
}, interval, interval, TimeUnit.MINUTES)
232+
}
233+
234+
override def shutdown(): Unit = {
235+
if (periodicService != null) periodicService.shutdown()
236+
}
237+
238+
private def report(): Unit = {
239+
val env = SparkEnv.get
240+
if (env == null) return
241+
val dirs = env.blockManager.diskBlockManager.localDirs
242+
if (dirs == null || dirs.isEmpty) return
243+
val maxRatio = dirs.iterator.flatMap { d =>
244+
try {
245+
// Skip if total is 0 (e.g. dir unmounted, statvfs failed) to avoid divide-by-zero.
246+
val total = d.getTotalSpace
247+
if (total > 0) Some((total - d.getUsableSpace).toDouble / total) else None
248+
} catch { case _: Throwable => None }
249+
}.maxOption
250+
maxRatio.foreach { ratio =>
251+
logInfo(s"Reporting max PVC disk usage ratio for executor ${env.executorId}: $ratio")
252+
pluginContext.send(PVCDiskUsageReport(env.executorId, ratio))
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)