diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b575a127199cb..b55d51330a724 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1816,6 +1816,36 @@ See the [configuration page](configuration.html) for information on Spark config 4.2.0 + + spark.kubernetes.executor.pvc.resizeInterval + 0min + + Interval between executor PVC resize operations. To disable, set 0 (default). + Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin + is registered via spark.plugins. + + 4.2.0 + + + spark.kubernetes.executor.pvc.resizeThreshold + 0.5 + + The PVC usage ratio (used / capacity) above which the driver triggers a resize. + Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin + is registered via spark.plugins. + + 4.2.0 + + + spark.kubernetes.executor.pvc.resizeFactor + 1.0 + + The factor to grow PVC storage by, relative to the current request. + Takes effect only when org.apache.spark.scheduler.cluster.k8s.ExecutorPVCResizePlugin + is registered via spark.plugins. + + 4.2.0 + #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 6f1130853c5a1..b50b611f6e162 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -262,6 +262,30 @@ private[spark] object Config extends Logging { .checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]") .createWithDefault(0.1) + val PVC_RESIZE_INTERVAL = + ConfigBuilder("spark.kubernetes.executor.pvc.resizeInterval") + .doc("Interval between executor PVC resize operations. To disable, set 0 (default)") + .version("4.2.0") + .timeConf(TimeUnit.MINUTES) + .checkValue(_ >= 0, "Interval should be non-negative") + .createWithDefault(0) + + val PVC_RESIZE_THRESHOLD = + ConfigBuilder("spark.kubernetes.executor.pvc.resizeThreshold") + .doc("The PVC usage ratio (used / capacity) above which the driver triggers a resize.") + .version("4.2.0") + .doubleConf + .checkValue(v => 0 < v && v < 1, "The threshold should be in (0, 1)") + .createWithDefault(0.5) + + val PVC_RESIZE_FACTOR = + ConfigBuilder("spark.kubernetes.executor.pvc.resizeFactor") + .doc("The factor to grow PVC storage by, relative to the current request.") + .version("4.2.0") + .doubleConf + .checkValue(v => 0 < v && v <= 1, "The factor should be in (0, 1]") + .createWithDefault(1.0) + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala new file mode 100644 index 0000000000000..b2e8a87bc8829 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePlugin.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.{Map => JMap} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimBuilder, Pod, Quantity} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.base.PatchContext +import io.fabric8.kubernetes.client.dsl.base.PatchType + +import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys.{CURRENT_DISK_SIZE, ORIGINAL_DISK_SIZE, PVC_METADATA_NAME} +import org.apache.spark.util.ThreadUtils + +/** + * Spark plugin to monitor executor PVC disk usage and grow the PVC storage request + * when the usage exceeds a configurable threshold. + * + * Executors measure their own local-directory usage (via DiskBlockManager) and report + * the maximum filesystem usage ratio to the driver through the plugin RPC channel. + * When the ratio exceeds the threshold, the driver patches every `spark-local-dir-*` + * PVC mounted by the reporting executor's pod to grow its + * `spec.resources.requests.storage`. The underlying StorageClass must have + * `allowVolumeExpansion: true`. + */ +class ExecutorPVCResizePlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new ExecutorPVCResizeDriverPlugin() + + override def executorPlugin(): ExecutorPlugin = new ExecutorPVCResizeExecutorPlugin() +} + +/** + * Message sent from each executor to the driver with the maximum filesystem usage + * ratio (used / total) across the executor's SPARK_LOCAL_DIRS. The driver applies + * this ratio to every PVC mounted by the reporting executor's pod. + */ +private[k8s] case class PVCDiskUsageReport( + executorId: String, + ratio: Double) + +class ExecutorPVCResizeDriverPlugin extends DriverPlugin with Logging { + private var sparkContext: SparkContext = _ + private var namespace: String = _ + private var threshold: Double = _ + private var factor: Double = _ + + private val latestReports = new ConcurrentHashMap[String, PVCDiskUsageReport]() + private val failedPvcs = ConcurrentHashMap.newKeySet[String]() + private val requestedSizes = new ConcurrentHashMap[String, Long]() + + private val periodicService: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-plugin") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val interval = sc.conf.get(PVC_RESIZE_INTERVAL) + if (interval <= 0) { + logInfo("PVCResizePlugin disabled (interval <= 0).") + return Map.empty[String, String].asJava + } + threshold = sc.conf.get(PVC_RESIZE_THRESHOLD) + factor = sc.conf.get(PVC_RESIZE_FACTOR) + namespace = sc.conf.get(KUBERNETES_NAMESPACE) + sparkContext = sc + + periodicService.scheduleAtFixedRate(() => if (!sparkContext.isStopped) { + try { + checkAndResizePVCs() + } catch { + case e: Throwable => logError("Error in PVC resize thread", e) + } + }, interval, interval, TimeUnit.MINUTES) + logInfo("ExecutorPVCResizeDriverPlugin is scheduled") + + // Propagate the interval to executors so they report at the same cadence. + Map(PVC_RESIZE_INTERVAL.key -> interval.toString).asJava + } + + override def receive(message: Any): AnyRef = message match { + case r: PVCDiskUsageReport => + latestReports.put(r.executorId, r) + null + case _ => + null + } + + override def shutdown(): Unit = { + periodicService.shutdown() + } + + private[k8s] def checkAndResizePVCs(): Unit = { + logInfo(s"Latest PVC usage reports: $latestReports") + val appId = sparkContext.applicationId + + sparkContext.schedulerBackend match { + case b: KubernetesClusterSchedulerBackend => + val client = b.kubernetesClient + val pods = client.pods() + .inNamespace(namespace) + .withLabel(SPARK_APP_ID_LABEL, appId) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .list() + .getItems.asScala + + val podByExecId = pods.flatMap { p => + Option(p.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)).map(_ -> p) + }.toMap + + latestReports.values().asScala.foreach { report => + podByExecId.get(report.executorId).foreach { pod => + pvcsOf(pod).foreach { pvcName => + if (!failedPvcs.contains(pvcName)) { + tryResize(client, pvcName, report.ratio, report.executorId) + } + } + } + } + case _ => + logWarning("Skipping PVC resize: schedulerBackend is not " + + "KubernetesClusterSchedulerBackend.") + } + } + + private[k8s] def pvcsOf(pod: Pod): Set[String] = { + val volNameToPvc = pod.getSpec.getVolumes.asScala + .filter(_.getPersistentVolumeClaim != null) + .filter(_.getName.startsWith("spark-local-dir-")) + .map(v => v.getName -> v.getPersistentVolumeClaim.getClaimName) + .toMap + pod.getSpec.getContainers.asScala + .find(_.getName == DEFAULT_EXECUTOR_CONTAINER_NAME) + .orElse(pod.getSpec.getContainers.asScala.headOption) + .toSeq + .flatMap(_.getVolumeMounts.asScala) + .flatMap(m => volNameToPvc.get(m.getName)) + .toSet + } + + private def tryResize( + client: KubernetesClient, pvcName: String, ratio: Double, execId: String): Unit = { + logInfo(s"Try to resize executor $execId PVC $pvcName with ratio $ratio " + + s"(threshold $threshold).") + if (ratio <= threshold) return + try { + val pvc = client.persistentVolumeClaims() + .inNamespace(namespace) + .withName(pvcName) + .get() + if (pvc == null) return + val current = Quantity.getAmountInBytes( + pvc.getSpec.getResources.getRequests.get("storage")).longValue() + val capacity = Option(pvc.getStatus) + .flatMap(s => Option(s.getCapacity)) + .flatMap(c => Option(c.get("storage"))) + .map(q => Quantity.getAmountInBytes(q).longValue()) + .getOrElse(current) + if (current > capacity) { + logInfo(s"PVC $pvcName resize is in progress or failed " + + s"(spec=$current, status=$capacity); skip.") + return + } + val newSize = (current * (1.0 + factor)).toLong + if (requestedSizes.get(pvcName) == newSize) return + logInfo(log"Increase PVC ${MDC(PVC_METADATA_NAME, pvcName)} storage " + + log"from ${MDC(ORIGINAL_DISK_SIZE, current)} to " + + log"${MDC(CURRENT_DISK_SIZE, newSize)} as usage ratio exceeded threshold.") + val patch = new PersistentVolumeClaimBuilder() + .withNewSpec() + .withNewResources() + .addToRequests("storage", new Quantity(newSize.toString)) + .endResources() + .endSpec() + .build() + client.persistentVolumeClaims() + .inNamespace(namespace) + .withName(pvcName) + .patch(PatchContext.of(PatchType.STRATEGIC_MERGE), patch) + requestedSizes.put(pvcName, newSize) + } catch { + case e: Throwable => + failedPvcs.add(pvcName) + logInfo(log"Failed to expand PVC ${MDC(PVC_METADATA_NAME, pvcName)}; " + + log"will skip subsequent attempts.", e) + } + } +} + +class ExecutorPVCResizeExecutorPlugin extends ExecutorPlugin with Logging { + private var pluginContext: PluginContext = _ + private var periodicService: ScheduledExecutorService = _ + + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + val intervalStr = extraConf.get(PVC_RESIZE_INTERVAL.key) + if (intervalStr == null) { + // Driver disabled the plugin; do nothing. + return + } + val interval = intervalStr.toLong + if (interval <= 0) return + + pluginContext = ctx + periodicService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("pvc-resize-reporter") + periodicService.scheduleAtFixedRate(() => { + try { + report() + } catch { + case e: Throwable => logDebug("Failed to report PVC usage", e) + } + }, interval, interval, TimeUnit.MINUTES) + } + + override def shutdown(): Unit = { + if (periodicService != null) periodicService.shutdown() + } + + private def report(): Unit = { + val env = SparkEnv.get + if (env == null) return + val dirs = env.blockManager.diskBlockManager.localDirs + if (dirs == null || dirs.isEmpty) return + val maxRatio = dirs.iterator.flatMap { d => + try { + // Skip if total is 0 (e.g. dir unmounted, statvfs failed) to avoid divide-by-zero. + val total = d.getTotalSpace + if (total > 0) Some((total - d.getUsableSpace).toDouble / total) else None + } catch { case _: Throwable => None } + }.maxOption + maxRatio.foreach { ratio => + logInfo(s"Reporting max PVC disk usage ratio for executor ${env.executorId}: $ratio") + pluginContext.send(PVCDiskUsageReport(env.executorId, ratio)) + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala new file mode 100644 index 0000000000000..1087acf0504d5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPVCResizePluginSuite.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.Collections + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.Resource +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{mock, never, times, verify, when} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.Fabric8Aliases._ + +class ExecutorPVCResizePluginSuite + extends SparkFunSuite with BeforeAndAfter { + + private val namespace = "test-namespace" + private val appId = "spark-test-app" + + private var kubernetesClient: KubernetesClient = _ + private var sparkContext: SparkContext = _ + private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + private var podOperations: PODS = _ + private var podsWithNamespace: PODS_WITH_NAMESPACE = _ + private var labeledPods: LABELED_PODS = _ + private var podList: PodList = _ + private var pvcOperations: PERSISTENT_VOLUME_CLAIMS = _ + private var pvcsWithNamespace: PVC_WITH_NAMESPACE = _ + + before { + kubernetesClient = mock(classOf[KubernetesClient]) + sparkContext = mock(classOf[SparkContext]) + schedulerBackend = mock(classOf[KubernetesClusterSchedulerBackend]) + podOperations = mock(classOf[PODS]) + podsWithNamespace = mock(classOf[PODS_WITH_NAMESPACE]) + labeledPods = mock(classOf[LABELED_PODS]) + podList = mock(classOf[PodList]) + pvcOperations = mock(classOf[PERSISTENT_VOLUME_CLAIMS]) + pvcsWithNamespace = mock(classOf[PVC_WITH_NAMESPACE]) + + when(sparkContext.applicationId).thenReturn(appId) + when(sparkContext.schedulerBackend).thenReturn(schedulerBackend) + when(schedulerBackend.kubernetesClient).thenReturn(kubernetesClient) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace) + when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, appId)).thenReturn(labeledPods) + when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) + when(labeledPods.list()).thenReturn(podList) + when(kubernetesClient.persistentVolumeClaims()).thenReturn(pvcOperations) + when(pvcOperations.inNamespace(namespace)).thenReturn(pvcsWithNamespace) + } + + private def createPlugin( + threshold: Double = 0.9, + factor: Double = 0.1): ExecutorPVCResizeDriverPlugin = { + val plugin = new ExecutorPVCResizeDriverPlugin() + val cls = plugin.getClass + setField(cls, plugin, "sparkContext", sparkContext) + setField(cls, plugin, "namespace", namespace) + setField(cls, plugin, "threshold", threshold) + setField(cls, plugin, "factor", factor) + plugin + } + + private def setField(cls: Class[_], obj: Any, name: String, value: Any): Unit = { + val f = cls.getDeclaredField(name) + f.setAccessible(true) + f.set(obj, value) + } + + private def createPodWithPVC( + executorId: Long, + claimName: String, + mountPath: String, + containerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME, + volumeName: String = "spark-local-dir-1"): Pod = { + new PodBuilder() + .withNewMetadata() + .withName(s"spark-executor-$executorId") + .addToLabels(SPARK_APP_ID_LABEL, appId) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName(volumeName) + .withNewPersistentVolumeClaim() + .withClaimName(claimName) + .endPersistentVolumeClaim() + .endVolume() + .addNewContainer() + .withName(containerName) + .addNewVolumeMount() + .withName(volumeName) + .withMountPath(mountPath) + .endVolumeMount() + .endContainer() + .endSpec() + .build() + } + + private def createPVC( + name: String, + storageBytes: String, + statusCapacityBytes: String = null): PersistentVolumeClaim = { + val builder = new PersistentVolumeClaimBuilder() + .withNewMetadata().withName(name).endMetadata() + .withNewSpec() + .withNewResources() + .addToRequests("storage", new Quantity(storageBytes)) + .endResources() + .endSpec() + val cap = Option(statusCapacityBytes).getOrElse(storageBytes) + builder + .withNewStatus() + .addToCapacity("storage", new Quantity(cap)) + .endStatus() + .build() + } + + private def mockPvcResource( + pvcName: String, + storageBytes: String, + statusCapacityBytes: String = null): Resource[PersistentVolumeClaim] = { + val pvc = createPVC(pvcName, storageBytes, statusCapacityBytes) + val resource = mock(classOf[Resource[PersistentVolumeClaim]]) + when(pvcsWithNamespace.withName(pvcName)).thenReturn(resource) + when(resource.get()).thenReturn(pvc) + resource + } + + test("Empty pod list does not trigger any patch") { + val plugin = createPlugin() + when(podList.getItems).thenReturn(Collections.emptyList()) + plugin.receive(PVCDiskUsageReport("1", 0.1)) + + plugin.checkAndResizePVCs() + + verify(pvcsWithNamespace, never()).withName(org.mockito.ArgumentMatchers.anyString()) + } + + test("Usage below threshold does not trigger patch") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data") + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + val resource = mockPvcResource("pvc-1", "1000000000") // 1GB + plugin.receive(PVCDiskUsageReport("1", 0.5)) // 50% + + plugin.checkAndResizePVCs() + + verify(resource, never()).patch(any(), any(classOf[PersistentVolumeClaim])) + } + + test("Usage above threshold triggers patch with grown size") { + val plugin = createPlugin(threshold = 0.9, factor = 0.1) + val pod = createPodWithPVC(1, "pvc-1", "/data") + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + val resource = mockPvcResource("pvc-1", "1000000000") // 1GB + plugin.receive(PVCDiskUsageReport("1", 0.95)) // 95% + + plugin.checkAndResizePVCs() + + val captor = ArgumentCaptor.forClass(classOf[PersistentVolumeClaim]) + verify(resource, times(1)).patch(any(), captor.capture()) + val patched = Quantity.getAmountInBytes( + captor.getValue.getSpec.getResources.getRequests.get("storage")).longValue() + // current 1GB * (1 + factor 0.1) = 1.1GB + assert(patched === 1100000000L) + } + + test("PVC with pending or failed resize is skipped") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data") + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + // spec.requests.storage > status.capacity.storage simulates VolumeResizeFailed + // or in-progress resize. + val resource = mockPvcResource("pvc-1", "2000000000", + statusCapacityBytes = "1000000000") + plugin.receive(PVCDiskUsageReport("1", 0.95)) + + plugin.checkAndResizePVCs() + + verify(resource, never()).patch(any(), any(classOf[PersistentVolumeClaim])) + } + + test("Repeated reports for the same target size do not patch twice") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data") + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + val resource = mockPvcResource("pvc-1", "1000000000") + plugin.receive(PVCDiskUsageReport("1", 0.95)) + + plugin.checkAndResizePVCs() + plugin.checkAndResizePVCs() + + verify(resource, times(1)).patch(any(), any(classOf[PersistentVolumeClaim])) + } + + test("Patch failure adds PVC to blacklist") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data") + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + val resource = mockPvcResource("pvc-1", "1000000000") + when(resource.patch(any(), any(classOf[PersistentVolumeClaim]))) + .thenThrow(new RuntimeException("expansion not allowed")) + plugin.receive(PVCDiskUsageReport("1", 0.95)) + + plugin.checkAndResizePVCs() + plugin.checkAndResizePVCs() + + // Only one patch attempt despite two check rounds. + verify(resource, times(1)).patch(any(), any(classOf[PersistentVolumeClaim])) + } + + test("Pod with no PVC volume triggers no patch") { + val plugin = createPlugin() + val pod = new PodBuilder() + .withNewMetadata() + .withName("spark-executor-1") + .addToLabels(SPARK_APP_ID_LABEL, appId) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .addToLabels(SPARK_EXECUTOR_ID_LABEL, "1") + .endMetadata() + .withNewSpec() + .addNewContainer().withName(DEFAULT_EXECUTOR_CONTAINER_NAME).endContainer() + .endSpec() + .build() + when(podList.getItems).thenReturn(Collections.singletonList(pod)) + plugin.receive(PVCDiskUsageReport("1", 0.95)) + + plugin.checkAndResizePVCs() + + verify(pvcsWithNamespace, never()).withName(org.mockito.ArgumentMatchers.anyString()) + } + + test("pvcsOf returns claim names mounted by the executor container") { + val plugin = createPlugin() + val pod = createPodWithPVC(7, "pvc-7", "/spark-local") + assert(plugin.pvcsOf(pod) === Set("pvc-7")) + } + + test("pvcsOf falls back to first container when default name absent") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data", containerName = "custom") + assert(plugin.pvcsOf(pod) === Set("pvc-1")) + } + + test("pvcsOf filters out non spark-local-dir-* PVC volumes") { + val plugin = createPlugin() + val pod = createPodWithPVC(1, "pvc-1", "/data", volumeName = "checkpointpvc") + assert(plugin.pvcsOf(pod) === Set.empty) + } + + test("receive ignores non-report messages") { + val plugin = createPlugin() + assert(plugin.receive("unrelated") == null) + assert(plugin.receive(42) == null) + } +}