Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,36 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>4.2.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.pvc.resizeInterval</code></td>
<td><code>0min</code></td>
<td>
Interval between executor PVC resize operations. To disable, set 0 (default).
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
is registered via <code>spark.plugins</code>.
</td>
<td>4.2.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.pvc.resizeThreshold</code></td>
<td><code>0.5</code></td>
<td>
The PVC usage ratio (used / capacity) above which the driver triggers a resize.
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
is registered via <code>spark.plugins</code>.
</td>
<td>4.2.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.pvc.resizeFactor</code></td>
<td><code>1.0</code></td>
<td>
The factor to grow PVC storage by, relative to the current request.
Takes effect only when <code>org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin</code>
is registered via <code>spark.plugins</code>.
</td>
<td>4.2.0</td>
</tr>
</table>

#### Pod template properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,30 @@ private[spark] object Config extends Logging {
.checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
.createWithDefault(0.1)

val PVC_RESIZE_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.pvc.resizeInterval")
.doc("Interval between executor PVC resize operations. To disable, set 0 (default)")
.version("4.2.0")
.timeConf(TimeUnit.MINUTES)
.checkValue(_ >= 0, "Interval should be non-negative")
.createWithDefault(0)

val PVC_RESIZE_THRESHOLD =
ConfigBuilder("spark.kubernetes.executor.pvc.resizeThreshold")
.doc("The PVC usage ratio (used / capacity) above which the driver triggers a resize.")
.version("4.2.0")
.doubleConf
.checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)")
.createWithDefault(0.5)

val PVC_RESIZE_FACTOR =
ConfigBuilder("spark.kubernetes.executor.pvc.resizeFactor")
.doc("The factor to grow PVC storage by, relative to the current request.")
.version("4.2.0")
.doubleConf
.checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
.createWithDefault(1.0)

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.k8s

import java.util.{Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}

import scala.jdk.CollectionConverters._

import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimBuilder, Pod, Quantity}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.base.PatchContext
import io.fabric8.kubernetes.client.dsl.base.PatchType

import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.{CURRENT_DISK_SIZE, ORIGINAL_DISK_SIZE, PVC_METADATA_NAME}
import org.apache.spark.util.ThreadUtils

/**
* Spark plugin to monitor executor PVC disk usage and grow the PVC storage request
* when the usage exceeds a configurable threshold.
*
* Executors measure their own local-directory usage (via DiskBlockManager) and report
* the maximum filesystem usage ratio to the driver through the plugin RPC channel.
* When the ratio exceeds the threshold, the driver patches every `spark-local-dir-*`
* PVC mounted by the reporting executor's pod to grow its
* `spec.resources.requests.storage`. The underlying StorageClass must have
* `allowVolumeExpansion: true`.
*/
class ExecutorPVCResizePlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = new ExecutorPVCResizeDriverPlugin()

override def executorPlugin(): ExecutorPlugin = new ExecutorPVCResizeExecutorPlugin()
}

/**
* Message sent from each executor to the driver with the maximum filesystem usage
* ratio (used / total) across the executor's SPARK_LOCAL_DIRS. The driver applies
* this ratio to every PVC mounted by the reporting executor's pod.
*/
private[k8s] case class PVCDiskUsageReport(
executorId: String,
ratio: Double)

class ExecutorPVCResizeDriverPlugin extends DriverPlugin with Logging {
private var sparkContext: SparkContext = _
private var namespace: String = _
private var threshold: Double = _
private var factor: Double = _

private val latestReports = new ConcurrentHashMap[String, PVCDiskUsageReport]()
private val failedPvcs = ConcurrentHashMap.newKeySet[String]()
private val requestedSizes = new ConcurrentHashMap[String, Long]()

private val periodicService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-plugin")

override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
val interval = sc.conf.get(PVC_RESIZE_INTERVAL)
if (interval <= 0) {
logInfo("PVCResizePlugin disabled (interval <= 0).")
return Map.empty[String, String].asJava
}
threshold = sc.conf.get(PVC_RESIZE_THRESHOLD)
factor = sc.conf.get(PVC_RESIZE_FACTOR)
namespace = sc.conf.get(KUBERNETES_NAMESPACE)
sparkContext = sc

periodicService.scheduleAtFixedRate(() => if (!sparkContext.isStopped) {
try {
checkAndResizePVCs()
} catch {
case e: Throwable => logError("Error in PVC resize thread", e)
}
}, interval, interval, TimeUnit.MINUTES)
logInfo("ExecutorPVCResizeDriverPlugin is scheduled")

// Propagate the interval to executors so they report at the same cadence.
Map(PVC_RESIZE_INTERVAL.key -> interval.toString).asJava
}

override def receive(message: Any): AnyRef = message match {
case r: PVCDiskUsageReport =>
latestReports.put(r.executorId, r)
null
case _ =>
null
}

override def shutdown(): Unit = {
periodicService.shutdown()
}

private[k8s] def checkAndResizePVCs(): Unit = {
logInfo(s"Latest PVC usage reports: $latestReports")
val appId = sparkContext.applicationId

sparkContext.schedulerBackend match {
case b: KubernetesClusterSchedulerBackend =>
val client = b.kubernetesClient
val pods = client.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, appId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.list()
.getItems.asScala

val podByExecId = pods.flatMap { p =>
Option(p.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)).map(_ -> p)
}.toMap

latestReports.values().asScala.foreach { report =>
podByExecId.get(report.executorId).foreach { pod =>
pvcsOf(pod).foreach { pvcName =>
if (!failedPvcs.contains(pvcName)) {
tryResize(client, pvcName, report.ratio, report.executorId)
}
}
}
}
case _ =>
logWarning("Skipping PVC resize: schedulerBackend is not " +
"KubernetesClusterSchedulerBackend.")
}
}

private[k8s] def pvcsOf(pod: Pod): Set[String] = {
val volNameToPvc = pod.getSpec.getVolumes.asScala
.filter(_.getPersistentVolumeClaim != null)
.filter(_.getName.startsWith("spark-local-dir-"))
.map(v => v.getName -> v.getPersistentVolumeClaim.getClaimName)
.toMap
pod.getSpec.getContainers.asScala
.find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
.orElse(pod.getSpec.getContainers.asScala.headOption)
.toSeq
.flatMap(_.getVolumeMounts.asScala)
.flatMap(m => volNameToPvc.get(m.getName))
.toSet
}

private def tryResize(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryResize grows the PVC from the current spec.resources.requests.storage every time the reported ratio is above the threshold. After a successful PVC spec patch, the executor filesystem capacity may not reflect the new size immediately, so the next reporting interval can still report the old high usage ratio. With a 1-minute interval and the default growth factor, a 50Gi PVC could become 100Gi, then 200Gi, then 400Gi before the first resize is actually completed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is especially risky because the PR description already notes that some cloud providers only allow volume modification once every several hours. The current code only blacklists PVCs when the synchronous patch call throws, but async resizer failures or pending resize states are not detected.

I think this should track per-PVC in-flight resize state or cooldown, and check PVC status/conditions before issuing another patch. At minimum, it should avoid issuing another expansion while spec.requests.storage is already larger than the observed/status capacity.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the case you mention doesn't happen, @viirya . It's because the second and subsequent invocation will take no harm from Spark side according to the K8s design policy. We can request the desirable status multiple times, but the second and subsequent requests are not accepted by K8s control plane for next 6 hours.

In other words, the expansion will be processed at every 6 hours. The only exception is we can expand once at any time after creation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, is 6-hour behavior guaranteed by a Kubernetes control-plane? That sounds like an AWS EBS backend limitation rather than a generic Kubernetes PVC expansion policy? I only found that it is mentioned in AWS EBS article like this.

From the Kubernetes side, users request expansion by updating the PVC spec to a larger requested size, and the controller reconciles that desired state asynchronously. The Kubernetes docs also say failed expansion requests are continuously retried and should be monitored via PVC status/events. I don't see a generic Kubernetes rule that rejects subsequent larger PVC spec updates while a previous resize is pending.

So even if EBS rejects actual backend ModifyVolume calls within 6 hours, the Spark driver may still keep increasing the PVC desired size in Kubernetes, for example from 50Gi to 100Gi to 200Gi, before the first resize has converged. That changes the target state and can result in over-expansion once the backend is eventually able to process it, or leave the PVC stuck retrying a much larger requested size.

I think the plugin should be storage-provider-neutral and should not rely on AWS EBS cooldown behavior for safety. A simple guard such as skipping when spec.resources.requests.storage is already greater than status.capacity["storage"], or otherwise tracking an in-flight resize per PVC, would prevent repeated target inflation while still following Kubernetes' desired-state model.

client: KubernetesClient, pvcName: String, ratio: Double, execId: String): Unit = {
logInfo(s"Try to resize executor $execId PVC $pvcName with ratio $ratio " +
s"(threshold $threshold).")
if (ratio <= threshold) return
try {
val pvc = client.persistentVolumeClaims()
.inNamespace(namespace)
.withName(pvcName)
.get()
if (pvc == null) return
val current = Quantity.getAmountInBytes(
pvc.getSpec.getResources.getRequests.get("storage")).longValue()
val capacity = Option(pvc.getStatus)
.flatMap(s => Option(s.getCapacity))
.flatMap(c => Option(c.get("storage")))
.map(q => Quantity.getAmountInBytes(q).longValue())
.getOrElse(current)
if (current > capacity) {
logInfo(s"PVC $pvcName resize is in progress or failed " +
s"(spec=$current, status=$capacity); skip.")
return
}
val newSize = (current * (1.0 + factor)).toLong
if (requestedSizes.get(pvcName) == newSize) return
logInfo(log"Increase PVC ${MDC(PVC_METADATA_NAME, pvcName)} storage " +
log"from ${MDC(ORIGINAL_DISK_SIZE, current)} to " +
log"${MDC(CURRENT_DISK_SIZE, newSize)} as usage ratio exceeded threshold.")
val patch = new PersistentVolumeClaimBuilder()
.withNewSpec()
.withNewResources()
.addToRequests("storage", new Quantity(newSize.toString))
.endResources()
.endSpec()
.build()
client.persistentVolumeClaims()
.inNamespace(namespace)
.withName(pvcName)
.patch(PatchContext.of(PatchType.STRATEGIC_MERGE), patch)
requestedSizes.put(pvcName, newSize)
} catch {
case e: Throwable =>
failedPvcs.add(pvcName)
logInfo(log"Failed to expand PVC ${MDC(PVC_METADATA_NAME, pvcName)}; " +
log"will skip subsequent attempts.", e)
}
}
}

class ExecutorPVCResizeExecutorPlugin extends ExecutorPlugin with Logging {
private var pluginContext: PluginContext = _
private var periodicService: ScheduledExecutorService = _

override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
val intervalStr = extraConf.get(PVC_RESIZE_INTERVAL.key)
if (intervalStr == null) {
// Driver disabled the plugin; do nothing.
return
}
val interval = intervalStr.toLong
if (interval <= 0) return

pluginContext = ctx
periodicService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-reporter")
periodicService.scheduleAtFixedRate(() => {
try {
report()
} catch {
case e: Throwable => logDebug("Failed to report PVC usage", e)
}
}, interval, interval, TimeUnit.MINUTES)
}

override def shutdown(): Unit = {
if (periodicService != null) periodicService.shutdown()
}

private def report(): Unit = {
val env = SparkEnv.get
if (env == null) return
val dirs = env.blockManager.diskBlockManager.localDirs
if (dirs == null || dirs.isEmpty) return
val maxRatio = dirs.iterator.flatMap { d =>
try {
// Skip if total is 0 (e.g. dir unmounted, statvfs failed) to avoid divide-by-zero.
val total = d.getTotalSpace
if (total > 0) Some((total - d.getUsableSpace).toDouble / total) else None
} catch { case _: Throwable => None }
}.maxOption
maxRatio.foreach { ratio =>
logInfo(s"Reporting max PVC disk usage ratio for executor ${env.executorId}: $ratio")
pluginContext.send(PVCDiskUsageReport(env.executorId, ratio))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor reports only the maximum usage ratio across all SPARK_LOCAL_DIRS, and the driver applies that same ratio to every PVC mounted by the executor container. This means that with multiple Spark local-dir PVCs, an empty PVC can be expanded just because another local dir is full. It can also resize unrelated PVCs mounted into the executor container, since pvcsOf returns all mounted PVCs rather than only the PVC backing the reported local dir.

The class comment says the driver maps each reported mount path back to the executor pod’s PVC, but the message does not contain any mount path or per-directory usage, so the implementation cannot actually do that.

I think the executor should report per-local-dir usage, including the path, and the driver should map each reported path to the corresponding volumeMount/PVC. It should also limit resizing to Spark local-dir PVCs rather than every PVC mounted in the executor container.

}
}
}
Loading