diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b575a127199cb..b55d51330a724 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1816,6 +1816,36 @@ See the [configuration page](configuration.html) for information on Spark config
4.2.0 |
+
+ spark.kubernetes.executor.pvc.resizeInterval |
+ 0min |
+
+ Interval between executor PVC resize operations. To disable, set 0 (default).
+ Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin
+ is registered via spark.plugins.
+ |
+ 4.2.0 |
+
+
+ spark.kubernetes.executor.pvc.resizeThreshold |
+ 0.5 |
+
+ The PVC usage ratio (used / capacity) above which the driver triggers a resize.
+ Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin
+ is registered via spark.plugins.
+ |
+ 4.2.0 |
+
+
+ spark.kubernetes.executor.pvc.resizeFactor |
+ 1.0 |
+
+ The factor to grow PVC storage by, relative to the current request.
+ Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin
+ is registered via spark.plugins.
+ |
+ 4.2.0 |
+
#### Pod template properties
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 6f1130853c5a1..b50b611f6e162 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -262,6 +262,30 @@ private[spark] object Config extends Logging {
.checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
.createWithDefault(0.1)
+ val PVC_RESIZE_INTERVAL =
+ ConfigBuilder("spark.kubernetes.executor.pvc.resizeInterval")
+ .doc("Interval between executor PVC resize operations. To disable, set 0 (default)")
+ .version("4.2.0")
+ .timeConf(TimeUnit.MINUTES)
+ .checkValue(_ >= 0, "Interval should be non-negative")
+ .createWithDefault(0)
+
+ val PVC_RESIZE_THRESHOLD =
+ ConfigBuilder("spark.kubernetes.executor.pvc.resizeThreshold")
+ .doc("The PVC usage ratio (used / capacity) above which the driver triggers a resize.")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)")
+ .createWithDefault(0.5)
+
+ val PVC_RESIZE_FACTOR =
+ ConfigBuilder("spark.kubernetes.executor.pvc.resizeFactor")
+ .doc("The factor to grow PVC storage by, relative to the current request.")
+ .version("4.2.0")
+ .doubleConf
+ .checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]")
+ .createWithDefault(1.0)
+
val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala
new file mode 100644
index 0000000000000..b2e8a87bc8829
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.{Map => JMap}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimBuilder, Pod, Quantity}
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.base.PatchContext
+import io.fabric8.kubernetes.client.dsl.base.PatchType
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{CURRENT_DISK_SIZE, ORIGINAL_DISK_SIZE, PVC_METADATA_NAME}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark plugin to monitor executor PVC disk usage and grow the PVC storage request
+ * when the usage exceeds a configurable threshold.
+ *
+ * Executors measure their own local-directory usage (via DiskBlockManager) and report
+ * the maximum filesystem usage ratio to the driver through the plugin RPC channel.
+ * When the ratio exceeds the threshold, the driver patches every `spark-local-dir-*`
+ * PVC mounted by the reporting executor's pod to grow its
+ * `spec.resources.requests.storage`. The underlying StorageClass must have
+ * `allowVolumeExpansion: true`.
+ */
+class ExecutorPVCResizePlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = new ExecutorPVCResizeDriverPlugin()
+
+ override def executorPlugin(): ExecutorPlugin = new ExecutorPVCResizeExecutorPlugin()
+}
+
+/**
+ * Message sent from each executor to the driver with the maximum filesystem usage
+ * ratio (used / total) across the executor's SPARK_LOCAL_DIRS. The driver applies
+ * this ratio to every PVC mounted by the reporting executor's pod.
+ */
+private[k8s] case class PVCDiskUsageReport(
+ executorId: String,
+ ratio: Double)
+
+class ExecutorPVCResizeDriverPlugin extends DriverPlugin with Logging {
+ private var sparkContext: SparkContext = _
+ private var namespace: String = _
+ private var threshold: Double = _
+ private var factor: Double = _
+
+ private val latestReports = new ConcurrentHashMap[String, PVCDiskUsageReport]()
+ private val failedPvcs = ConcurrentHashMap.newKeySet[String]()
+ private val requestedSizes = new ConcurrentHashMap[String, Long]()
+
+ private val periodicService: ScheduledExecutorService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-plugin")
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
+ val interval = sc.conf.get(PVC_RESIZE_INTERVAL)
+ if (interval <= 0) {
+ logInfo("PVCResizePlugin disabled (interval <= 0).")
+ return Map.empty[String, String].asJava
+ }
+ threshold = sc.conf.get(PVC_RESIZE_THRESHOLD)
+ factor = sc.conf.get(PVC_RESIZE_FACTOR)
+ namespace = sc.conf.get(KUBERNETES_NAMESPACE)
+ sparkContext = sc
+
+ periodicService.scheduleAtFixedRate(() => if (!sparkContext.isStopped) {
+ try {
+ checkAndResizePVCs()
+ } catch {
+ case e: Throwable => logError("Error in PVC resize thread", e)
+ }
+ }, interval, interval, TimeUnit.MINUTES)
+ logInfo("ExecutorPVCResizeDriverPlugin is scheduled")
+
+ // Propagate the interval to executors so they report at the same cadence.
+ Map(PVC_RESIZE_INTERVAL.key -> interval.toString).asJava
+ }
+
+ override def receive(message: Any): AnyRef = message match {
+ case r: PVCDiskUsageReport =>
+ latestReports.put(r.executorId, r)
+ null
+ case _ =>
+ null
+ }
+
+ override def shutdown(): Unit = {
+ periodicService.shutdown()
+ }
+
+ private[k8s] def checkAndResizePVCs(): Unit = {
+ logInfo(s"Latest PVC usage reports: $latestReports")
+ val appId = sparkContext.applicationId
+
+ sparkContext.schedulerBackend match {
+ case b: KubernetesClusterSchedulerBackend =>
+ val client = b.kubernetesClient
+ val pods = client.pods()
+ .inNamespace(namespace)
+ .withLabel(SPARK_APP_ID_LABEL, appId)
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .list()
+ .getItems.asScala
+
+ val podByExecId = pods.flatMap { p =>
+ Option(p.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)).map(_ -> p)
+ }.toMap
+
+ latestReports.values().asScala.foreach { report =>
+ podByExecId.get(report.executorId).foreach { pod =>
+ pvcsOf(pod).foreach { pvcName =>
+ if (!failedPvcs.contains(pvcName)) {
+ tryResize(client, pvcName, report.ratio, report.executorId)
+ }
+ }
+ }
+ }
+ case _ =>
+ logWarning("Skipping PVC resize: schedulerBackend is not " +
+ "KubernetesClusterSchedulerBackend.")
+ }
+ }
+
+ private[k8s] def pvcsOf(pod: Pod): Set[String] = {
+ val volNameToPvc = pod.getSpec.getVolumes.asScala
+ .filter(_.getPersistentVolumeClaim != null)
+ .filter(_.getName.startsWith("spark-local-dir-"))
+ .map(v => v.getName -> v.getPersistentVolumeClaim.getClaimName)
+ .toMap
+ pod.getSpec.getContainers.asScala
+ .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME)
+ .orElse(pod.getSpec.getContainers.asScala.headOption)
+ .toSeq
+ .flatMap(_.getVolumeMounts.asScala)
+ .flatMap(m => volNameToPvc.get(m.getName))
+ .toSet
+ }
+
+ private def tryResize(
+ client: KubernetesClient, pvcName: String, ratio: Double, execId: String): Unit = {
+ logInfo(s"Try to resize executor $execId PVC $pvcName with ratio $ratio " +
+ s"(threshold $threshold).")
+ if (ratio <= threshold) return
+ try {
+ val pvc = client.persistentVolumeClaims()
+ .inNamespace(namespace)
+ .withName(pvcName)
+ .get()
+ if (pvc == null) return
+ val current = Quantity.getAmountInBytes(
+ pvc.getSpec.getResources.getRequests.get("storage")).longValue()
+ val capacity = Option(pvc.getStatus)
+ .flatMap(s => Option(s.getCapacity))
+ .flatMap(c => Option(c.get("storage")))
+ .map(q => Quantity.getAmountInBytes(q).longValue())
+ .getOrElse(current)
+ if (current > capacity) {
+ logInfo(s"PVC $pvcName resize is in progress or failed " +
+ s"(spec=$current, status=$capacity); skip.")
+ return
+ }
+ val newSize = (current * (1.0 + factor)).toLong
+ if (requestedSizes.get(pvcName) == newSize) return
+ logInfo(log"Increase PVC ${MDC(PVC_METADATA_NAME, pvcName)} storage " +
+ log"from ${MDC(ORIGINAL_DISK_SIZE, current)} to " +
+ log"${MDC(CURRENT_DISK_SIZE, newSize)} as usage ratio exceeded threshold.")
+ val patch = new PersistentVolumeClaimBuilder()
+ .withNewSpec()
+ .withNewResources()
+ .addToRequests("storage", new Quantity(newSize.toString))
+ .endResources()
+ .endSpec()
+ .build()
+ client.persistentVolumeClaims()
+ .inNamespace(namespace)
+ .withName(pvcName)
+ .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), patch)
+ requestedSizes.put(pvcName, newSize)
+ } catch {
+ case e: Throwable =>
+ failedPvcs.add(pvcName)
+ logInfo(log"Failed to expand PVC ${MDC(PVC_METADATA_NAME, pvcName)}; " +
+ log"will skip subsequent attempts.", e)
+ }
+ }
+}
+
+class ExecutorPVCResizeExecutorPlugin extends ExecutorPlugin with Logging {
+ private var pluginContext: PluginContext = _
+ private var periodicService: ScheduledExecutorService = _
+
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ val intervalStr = extraConf.get(PVC_RESIZE_INTERVAL.key)
+ if (intervalStr == null) {
+ // Driver disabled the plugin; do nothing.
+ return
+ }
+ val interval = intervalStr.toLong
+ if (interval <= 0) return
+
+ pluginContext = ctx
+ periodicService =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-reporter")
+ periodicService.scheduleAtFixedRate(() => {
+ try {
+ report()
+ } catch {
+ case e: Throwable => logDebug("Failed to report PVC usage", e)
+ }
+ }, interval, interval, TimeUnit.MINUTES)
+ }
+
+ override def shutdown(): Unit = {
+ if (periodicService != null) periodicService.shutdown()
+ }
+
+ private def report(): Unit = {
+ val env = SparkEnv.get
+ if (env == null) return
+ val dirs = env.blockManager.diskBlockManager.localDirs
+ if (dirs == null || dirs.isEmpty) return
+ val maxRatio = dirs.iterator.flatMap { d =>
+ try {
+ // Skip if total is 0 (e.g. dir unmounted, statvfs failed) to avoid divide-by-zero.
+ val total = d.getTotalSpace
+ if (total > 0) Some((total - d.getUsableSpace).toDouble / total) else None
+ } catch { case _: Throwable => None }
+ }.maxOption
+ maxRatio.foreach { ratio =>
+ logInfo(s"Reporting max PVC disk usage ratio for executor ${env.executorId}: $ratio")
+ pluginContext.send(PVCDiskUsageReport(env.executorId, ratio))
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala
new file mode 100644
index 0000000000000..1087acf0504d5
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.Collections
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.Resource
+import org.mockito.ArgumentCaptor
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+
+class ExecutorPVCResizePluginSuite
+ extends SparkFunSuite with BeforeAndAfter {
+
+ private val namespace = "test-namespace"
+ private val appId = "spark-test-app"
+
+ private var kubernetesClient: KubernetesClient = _
+ private var sparkContext: SparkContext = _
+ private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+ private var podOperations: PODS = _
+ private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+ private var labeledPods: LABELED_PODS = _
+ private var podList: PodList = _
+ private var pvcOperations: PERSISTENT_VOLUME_CLAIMS = _
+ private var pvcsWithNamespace: PVC_WITH_NAMESPACE = _
+
+ before {
+ kubernetesClient = mock(classOf[KubernetesClient])
+ sparkContext = mock(classOf[SparkContext])
+ schedulerBackend = mock(classOf[KubernetesClusterSchedulerBackend])
+ podOperations = mock(classOf[PODS])
+ podsWithNamespace = mock(classOf[PODS_WITH_NAMESPACE])
+ labeledPods = mock(classOf[LABELED_PODS])
+ podList = mock(classOf[PodList])
+ pvcOperations = mock(classOf[PERSISTENT_VOLUME_CLAIMS])
+ pvcsWithNamespace = mock(classOf[PVC_WITH_NAMESPACE])
+
+ when(sparkContext.applicationId).thenReturn(appId)
+ when(sparkContext.schedulerBackend).thenReturn(schedulerBackend)
+ when(schedulerBackend.kubernetesClient).thenReturn(kubernetesClient)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, appId)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ when(labeledPods.list()).thenReturn(podList)
+ when(kubernetesClient.persistentVolumeClaims()).thenReturn(pvcOperations)
+ when(pvcOperations.inNamespace(namespace)).thenReturn(pvcsWithNamespace)
+ }
+
+ private def createPlugin(
+ threshold: Double = 0.9,
+ factor: Double = 0.1): ExecutorPVCResizeDriverPlugin = {
+ val plugin = new ExecutorPVCResizeDriverPlugin()
+ val cls = plugin.getClass
+ setField(cls, plugin, "sparkContext", sparkContext)
+ setField(cls, plugin, "namespace", namespace)
+ setField(cls, plugin, "threshold", threshold)
+ setField(cls, plugin, "factor", factor)
+ plugin
+ }
+
+ private def setField(cls: Class[_], obj: Any, name: String, value: Any): Unit = {
+ val f = cls.getDeclaredField(name)
+ f.setAccessible(true)
+ f.set(obj, value)
+ }
+
+ private def createPodWithPVC(
+ executorId: Long,
+ claimName: String,
+ mountPath: String,
+ containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME,
+ volumeName: String = "spark-local-dir-1"): Pod = {
+ new PodBuilder()
+ .withNewMetadata()
+ .withName(s"spark-executor-$executorId")
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+ .endMetadata()
+ .withNewSpec()
+ .addNewVolume()
+ .withName(volumeName)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimName)
+ .endPersistentVolumeClaim()
+ .endVolume()
+ .addNewContainer()
+ .withName(containerName)
+ .addNewVolumeMount()
+ .withName(volumeName)
+ .withMountPath(mountPath)
+ .endVolumeMount()
+ .endContainer()
+ .endSpec()
+ .build()
+ }
+
+ private def createPVC(
+ name: String,
+ storageBytes: String,
+ statusCapacityBytes: String = null): PersistentVolumeClaim = {
+ val builder = new PersistentVolumeClaimBuilder()
+ .withNewMetadata().withName(name).endMetadata()
+ .withNewSpec()
+ .withNewResources()
+ .addToRequests("storage", new Quantity(storageBytes))
+ .endResources()
+ .endSpec()
+ val cap = Option(statusCapacityBytes).getOrElse(storageBytes)
+ builder
+ .withNewStatus()
+ .addToCapacity("storage", new Quantity(cap))
+ .endStatus()
+ .build()
+ }
+
+ private def mockPvcResource(
+ pvcName: String,
+ storageBytes: String,
+ statusCapacityBytes: String = null): Resource[PersistentVolumeClaim] = {
+ val pvc = createPVC(pvcName, storageBytes, statusCapacityBytes)
+ val resource = mock(classOf[Resource[PersistentVolumeClaim]])
+ when(pvcsWithNamespace.withName(pvcName)).thenReturn(resource)
+ when(resource.get()).thenReturn(pvc)
+ resource
+ }
+
+ test("Empty pod list does not trigger any patch") {
+ val plugin = createPlugin()
+ when(podList.getItems).thenReturn(Collections.emptyList())
+ plugin.receive(PVCDiskUsageReport("1", 0.1))
+
+ plugin.checkAndResizePVCs()
+
+ verify(pvcsWithNamespace, never()).withName(org.mockito.ArgumentMatchers.anyString())
+ }
+
+ test("Usage below threshold does not trigger patch") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data")
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ val resource = mockPvcResource("pvc-1", "1000000000") // 1GB
+ plugin.receive(PVCDiskUsageReport("1", 0.5)) // 50%
+
+ plugin.checkAndResizePVCs()
+
+ verify(resource, never()).patch(any(), any(classOf[PersistentVolumeClaim]))
+ }
+
+ test("Usage above threshold triggers patch with grown size") {
+ val plugin = createPlugin(threshold = 0.9, factor = 0.1)
+ val pod = createPodWithPVC(1, "pvc-1", "/data")
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ val resource = mockPvcResource("pvc-1", "1000000000") // 1GB
+ plugin.receive(PVCDiskUsageReport("1", 0.95)) // 95%
+
+ plugin.checkAndResizePVCs()
+
+ val captor = ArgumentCaptor.forClass(classOf[PersistentVolumeClaim])
+ verify(resource, times(1)).patch(any(), captor.capture())
+ val patched = Quantity.getAmountInBytes(
+ captor.getValue.getSpec.getResources.getRequests.get("storage")).longValue()
+ // current 1GB * (1 + factor 0.1) = 1.1GB
+ assert(patched === 1100000000L)
+ }
+
+ test("PVC with pending or failed resize is skipped") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data")
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ // spec.requests.storage > status.capacity.storage simulates VolumeResizeFailed
+ // or in-progress resize.
+ val resource = mockPvcResource("pvc-1", "2000000000",
+ statusCapacityBytes = "1000000000")
+ plugin.receive(PVCDiskUsageReport("1", 0.95))
+
+ plugin.checkAndResizePVCs()
+
+ verify(resource, never()).patch(any(), any(classOf[PersistentVolumeClaim]))
+ }
+
+ test("Repeated reports for the same target size do not patch twice") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data")
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ val resource = mockPvcResource("pvc-1", "1000000000")
+ plugin.receive(PVCDiskUsageReport("1", 0.95))
+
+ plugin.checkAndResizePVCs()
+ plugin.checkAndResizePVCs()
+
+ verify(resource, times(1)).patch(any(), any(classOf[PersistentVolumeClaim]))
+ }
+
+ test("Patch failure adds PVC to blacklist") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data")
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ val resource = mockPvcResource("pvc-1", "1000000000")
+ when(resource.patch(any(), any(classOf[PersistentVolumeClaim])))
+ .thenThrow(new RuntimeException("expansion not allowed"))
+ plugin.receive(PVCDiskUsageReport("1", 0.95))
+
+ plugin.checkAndResizePVCs()
+ plugin.checkAndResizePVCs()
+
+ // Only one patch attempt despite two check rounds.
+ verify(resource, times(1)).patch(any(), any(classOf[PersistentVolumeClaim]))
+ }
+
+ test("Pod with no PVC volume triggers no patch") {
+ val plugin = createPlugin()
+ val pod = new PodBuilder()
+ .withNewMetadata()
+ .withName("spark-executor-1")
+ .addToLabels(SPARK_APP_ID_LABEL, appId)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, "1")
+ .endMetadata()
+ .withNewSpec()
+ .addNewContainer().withName(DEFAULT_EXECUTOR_CONTAINER_NAME).endContainer()
+ .endSpec()
+ .build()
+ when(podList.getItems).thenReturn(Collections.singletonList(pod))
+ plugin.receive(PVCDiskUsageReport("1", 0.95))
+
+ plugin.checkAndResizePVCs()
+
+ verify(pvcsWithNamespace, never()).withName(org.mockito.ArgumentMatchers.anyString())
+ }
+
+ test("pvcsOf returns claim names mounted by the executor container") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(7, "pvc-7", "/spark-local")
+ assert(plugin.pvcsOf(pod) === Set("pvc-7"))
+ }
+
+ test("pvcsOf falls back to first container when default name absent") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data", containerName = "custom")
+ assert(plugin.pvcsOf(pod) === Set("pvc-1"))
+ }
+
+ test("pvcsOf filters out non spark-local-dir-* PVC volumes") {
+ val plugin = createPlugin()
+ val pod = createPodWithPVC(1, "pvc-1", "/data", volumeName = "checkpointpvc")
+ assert(plugin.pvcsOf(pod) === Set.empty)
+ }
+
+ test("receive ignores non-report messages") {
+ val plugin = createPlugin()
+ assert(plugin.receive("unrelated") == null)
+ assert(plugin.receive(42) == null)
+ }
+}