From 83b440313dde3691f7bc3d405a40df24079b7345 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 13 Jan 2026 17:34:25 -0800 Subject: [PATCH 1/4] [SPARK-55075][K8S] Track executor pod creation errors with ExecutorFailureTracke --- .../cluster/k8s/AbstractPodsAllocator.scala | 14 ++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 72 ++++++++++++------- .../k8s/ExecutorPodsLifecycleManager.scala | 8 +++ .../k8s/KubernetesClusterManager.scala | 27 +++++-- .../k8s/ExecutorPodsAllocatorSuite.scala | 28 ++++++++ .../k8s/KubernetesClusterManagerSuite.scala | 6 +- 6 files changed, 124 insertions(+), 31 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala index cc081202cf89a..721f8f0483a1d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala @@ -35,6 +35,20 @@ import org.apache.spark.resource.ResourceProfile */ @DeveloperApi abstract class AbstractPodsAllocator { + /* + * Optional lifecycle manager for tracking executor pod lifecycle events. + * Set via setExecutorPodsLifecycleManager for backward compatibility. + */ + protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _ + + /* + * Set the lifecycle manager for tracking executor pod lifecycle events. + * This method is optional and may not exist in custom implementations based on older versions. + */ + def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = { + executorPodsLifecycleManager = manager + } + /* * Set the total expected executors for an application */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 126d07c0926f4..bc2e0b99b4523 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -118,6 +118,9 @@ class ExecutorPodsAllocator( protected val numOutstandingPods = new AtomicInteger() + // Track total failed pod creation attempts across the application lifecycle + protected val totalFailedPodCreations = new AtomicInteger(0) + protected var lastSnapshot = ExecutorPodsSnapshot() protected var appId: String = _ @@ -459,32 +462,53 @@ class ExecutorPodsAllocator( .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) - val createdExecutorPod = - kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() - try { - addOwnerReference(createdExecutorPod, resources) - resources - .filter(_.getKind == "PersistentVolumeClaim") - .foreach { resource => - if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { - addOwnerReference(driverPod.get, Seq(resource)) - } - val pvc = resource.asInstanceOf[PersistentVolumeClaim] - logInfo(log"Trying to create PersistentVolumeClaim " + - log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " + - log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}") - kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() - PVC_COUNTER.incrementAndGet() - } - newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) - logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + val optCreatedExecutorPod = try { + Some(kubernetesClient + .pods() + .inNamespace(namespace) + .resource(podWithAttachedContainer) + .create()) } catch { case NonFatal(e) => - kubernetesClient.pods() - .inNamespace(namespace) - .resource(createdExecutorPod) - .delete() - throw e + // Register failure with global tracker if lifecycle manager is available + val failureCount = totalFailedPodCreations.incrementAndGet() + if (executorPodsLifecycleManager != null) { + executorPodsLifecycleManager.registerPodCreationFailure() + } + logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) + None + } + optCreatedExecutorPod.foreach { createdExecutorPod => + try { + addOwnerReference(createdExecutorPod, resources) + resources + .filter(_.getKind == "PersistentVolumeClaim") + .foreach { resource => + if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { + addOwnerReference(driverPod.get, Seq(resource)) + } + val pvc = resource.asInstanceOf[PersistentVolumeClaim] + logInfo(log"Trying to create PersistentVolumeClaim " + + log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " + + log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}") + kubernetesClient + .persistentVolumeClaims() + .inNamespace(namespace) + .resource(pvc) + .create() + PVC_COUNTER.incrementAndGet() + } + newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) + logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + } catch { + case NonFatal(e) => + kubernetesClient.pods() + .inNamespace(namespace) + .resource(createdExecutorPod) + .delete() + throw e + } } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index c57a014dcfa67..e3bc2cf41f672 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -77,6 +77,14 @@ private[spark] class ExecutorPodsLifecycleManager( protected[spark] def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors + /** + * Register a pod creation failure. This increments the global executor failure count + * which is checked against spark.executor.maxNumFailures. + */ + protected[spark] def registerPodCreationFailure(): Unit = { + failureTracker.registerExecutorFailure() + } + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) snapshotsStore.addSubscriber(eventProcessingInterval) { executorPodsSnapshot => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index d2c6789f2bb5a..ea68a2e9e806c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -134,7 +134,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit kubernetesClient, snapshotsStore) - val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore) + val executorPodsAllocator = makeExecutorPodsAllocator( + sc, kubernetesClient, snapshotsStore, executorPodsLifecycleManager) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, @@ -158,8 +159,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit podsPollingEventSource) } - private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient, - snapshotsStore: ExecutorPodsSnapshotsStore) = { + private[k8s] def makeExecutorPodsAllocator( + sc: SparkContext, + kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore, + lifecycleManager: ExecutorPodsLifecycleManager) = { val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) && sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { @@ -184,13 +188,28 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit classOf[SparkConf], classOf[org.apache.spark.SecurityManager], classOf[KubernetesExecutorBuilder], classOf[KubernetesClient], classOf[ExecutorPodsSnapshotsStore], classOf[Clock]) - cstr.newInstance( + val allocatorInstance = cstr.newInstance( sc.conf, sc.env.securityManager, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) + + // Try to set the lifecycle manager using reflection for backward compatibility + // with custom allocators that may not have this method + try { + val setLifecycleManagerMethod = cls.getMethod( + "setExecutorPodsLifecycleManager", + classOf[ExecutorPodsLifecycleManager]) + setLifecycleManagerMethod.invoke(allocatorInstance, lifecycleManager) + } catch { + case _: NoSuchMethodException => + logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + + "Pod creation failures will not be tracked.") + } + + allocatorInstance } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e994ccbed9a65..61b8bd8fb0cca 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -112,6 +112,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + @Mock + private var lifecycleManager: ExecutorPodsLifecycleManager = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -142,6 +145,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) @@ -202,6 +206,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) @@ -268,6 +273,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) // Request more than the max per rp for one rp @@ -321,6 +327,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() @@ -838,6 +845,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(podsWithNamespace @@ -936,6 +944,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(podsWithNamespace @@ -1005,6 +1014,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) val startTime = Instant.now.toEpochMilli @@ -1052,4 +1062,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt, k8sConf.resourceProfileId.toInt), Seq.empty) } + +test("Pod creation failures are tracked by ExecutorFailureTracker") { + // Make all pod creation attempts fail + when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod" + + " creation failure")) + + // Request 3 executors + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + + // Verify that pod creation was attempted 3 times (once per executor, no retries) + verify(podResource, times(3)).create() + + // Verify that registerPodCreationFailure was called 3 times (once per failed executor) + verify(lifecycleManager, times(3)).registerPodCreationFailure() + + // Verify no pods were created since all attempts failed + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala index 78e0942cfb82c..6a2403a339f3b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala @@ -65,7 +65,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { validConfigs.foreach { c => val manager = new KubernetesClusterManager() sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c) - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) } } @@ -96,7 +96,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.set("spark.shuffle.service.enabled", "true") val e = intercept[SparkException] { - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) } assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)) } @@ -108,7 +108,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1) sparkConf.set("spark.shuffle.service.enabled", "true") - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) } private def resetDynamicAllocatorConfig(): Unit = { From d220f39cc88aaf9d522cc9100360189d4d13f070 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Thu, 19 Feb 2026 11:07:24 +0530 Subject: [PATCH 2/4] address review comments --- .../cluster/k8s/ExecutorPodsAllocator.scala | 8 +++++++ .../k8s/KubernetesClusterManager.scala | 24 ++++++++++--------- .../k8s/ExecutorPodsAllocatorSuite.scala | 2 +- .../k8s/KubernetesClusterManagerSuite.scala | 6 ++--- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index bc2e0b99b4523..9a5261408e6ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -503,6 +503,14 @@ class ExecutorPodsAllocator( logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } catch { case NonFatal(e) => + // Register failure with global tracker if lifecycle manager is available + val failureCount = totalFailedPodCreations.incrementAndGet() + if (executorPodsLifecycleManager != null) { + executorPodsLifecycleManager.registerPodCreationFailure() + } + logError(log"Failed to add owner reference or create PVC for executor pod " + + log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) kubernetesClient.pods() .inNamespace(namespace) .resource(createdExecutorPod) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index ea68a2e9e806c..35093a1fc26ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -135,7 +135,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit snapshotsStore) val executorPodsAllocator = makeExecutorPodsAllocator( - sc, kubernetesClient, snapshotsStore, executorPodsLifecycleManager) + sc, kubernetesClient, snapshotsStore, Some(executorPodsLifecycleManager)) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, @@ -163,7 +163,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, - lifecycleManager: ExecutorPodsLifecycleManager) = { + lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = { val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) && sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { @@ -198,15 +198,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit // Try to set the lifecycle manager using reflection for backward compatibility // with custom allocators that may not have this method - try { - val setLifecycleManagerMethod = cls.getMethod( - "setExecutorPodsLifecycleManager", - classOf[ExecutorPodsLifecycleManager]) - setLifecycleManagerMethod.invoke(allocatorInstance, lifecycleManager) - } catch { - case _: NoSuchMethodException => - logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + - "Pod creation failures will not be tracked.") + lifecycleManager.foreach { manager => + try { + val setLifecycleManagerMethod = cls.getMethod( + "setExecutorPodsLifecycleManager", + classOf[ExecutorPodsLifecycleManager]) + setLifecycleManagerMethod.invoke(allocatorInstance, manager) + } catch { + case _: NoSuchMethodException => + logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + + "Pod creation failures will not be tracked.") + } } allocatorInstance diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 61b8bd8fb0cca..7c1185d8d3096 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -1063,7 +1063,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { k8sConf.resourceProfileId.toInt), Seq.empty) } -test("Pod creation failures are tracked by ExecutorFailureTracker") { + test("SPARK-55075: Pod creation failures are tracked by ExecutorFailureTracker") { // Make all pod creation attempts fail when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod" + " creation failure")) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala index 6a2403a339f3b..78e0942cfb82c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala @@ -65,7 +65,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { validConfigs.foreach { c => val manager = new KubernetesClusterManager() sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c) - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) } } @@ -96,7 +96,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.set("spark.shuffle.service.enabled", "true") val e = intercept[SparkException] { - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) } assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)) } @@ -108,7 +108,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1) sparkConf.set("spark.shuffle.service.enabled", "true") - manager.makeExecutorPodsAllocator(sc, kubernetesClient, null, null) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) } private def resetDynamicAllocatorConfig(): Unit = { From 99b2112b259012b13b0c9b70a5f6a116517ba119 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Sat, 21 Feb 2026 18:20:59 -0800 Subject: [PATCH 3/4] Address more review comments --- .../cluster/k8s/AbstractPodsAllocator.scala | 4 ++-- .../cluster/k8s/ExecutorPodsAllocator.scala | 20 +++++++++++-------- .../k8s/ExecutorPodsLifecycleManager.scala | 4 ++-- .../k8s/KubernetesClusterManager.scala | 16 +++------------ .../k8s/ExecutorPodsAllocatorSuite.scala | 8 ++++---- 5 files changed, 23 insertions(+), 29 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala index 721f8f0483a1d..8803d3d031e47 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala @@ -39,14 +39,14 @@ abstract class AbstractPodsAllocator { * Optional lifecycle manager for tracking executor pod lifecycle events. * Set via setExecutorPodsLifecycleManager for backward compatibility. */ - protected var executorPodsLifecycleManager: ExecutorPodsLifecycleManager = _ + protected var executorPodsLifecycleManager: Option[ExecutorPodsLifecycleManager] = None /* * Set the lifecycle manager for tracking executor pod lifecycle events. * This method is optional and may not exist in custom implementations based on older versions. */ def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = { - executorPodsLifecycleManager = manager + executorPodsLifecycleManager = Some(manager) } /* diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 9a5261408e6ba..340cc9c76e46f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -471,10 +471,7 @@ class ExecutorPodsAllocator( } catch { case NonFatal(e) => // Register failure with global tracker if lifecycle manager is available - val failureCount = totalFailedPodCreations.incrementAndGet() - if (executorPodsLifecycleManager != null) { - executorPodsLifecycleManager.registerPodCreationFailure() - } + val failureCount = registerPodCreationFailure() logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) None @@ -504,10 +501,7 @@ class ExecutorPodsAllocator( } catch { case NonFatal(e) => // Register failure with global tracker if lifecycle manager is available - val failureCount = totalFailedPodCreations.incrementAndGet() - if (executorPodsLifecycleManager != null) { - executorPodsLifecycleManager.registerPodCreationFailure() - } + val failureCount = registerPodCreationFailure() logError(log"Failed to add owner reference or create PVC for executor pod " + log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) @@ -553,6 +547,16 @@ class ExecutorPodsAllocator( resources.filterNot(replacedResources.contains) } + /** + * Registers a pod creation failure with the lifecycle manager and increments the local counter. + * Returns the total failure count for logging purposes. + */ + protected def registerPodCreationFailure(): Int = { + val failureCount = totalFailedPodCreations.incrementAndGet() + executorPodsLifecycleManager.foreach(_.registerExecutorFailure()) + failureCount + } + protected def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { try { val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index e3bc2cf41f672..84cfd0d72b3bb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -78,10 +78,10 @@ private[spark] class ExecutorPodsLifecycleManager( protected[spark] def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors /** - * Register a pod creation failure. This increments the global executor failure count + * Register an executor failure. This increments the global executor failure count * which is checked against spark.executor.maxNumFailures. */ - protected[spark] def registerPodCreationFailure(): Unit = { + protected[spark] def registerExecutorFailure(): Unit = { failureTracker.registerExecutorFailure() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 35093a1fc26ee..782fac670fa88 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -194,21 +194,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, - new SystemClock()) + new SystemClock()).asInstanceOf[AbstractPodsAllocator] - // Try to set the lifecycle manager using reflection for backward compatibility - // with custom allocators that may not have this method + // Set the lifecycle manager if provided lifecycleManager.foreach { manager => - try { - val setLifecycleManagerMethod = cls.getMethod( - "setExecutorPodsLifecycleManager", - classOf[ExecutorPodsLifecycleManager]) - setLifecycleManagerMethod.invoke(allocatorInstance, manager) - } catch { - case _: NoSuchMethodException => - logInfo("Allocator does not support setExecutorPodsLifecycleManager method. " + - "Pod creation failures will not be tracked.") - } + allocatorInstance.setExecutorPodsLifecycleManager(manager) } allocatorInstance diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 7c1185d8d3096..b8316b0964f46 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -1024,9 +1024,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) - intercept[KubernetesClientException] { - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) - } +// intercept[KubernetesClientException] { +// podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) +// } assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) } @@ -1075,7 +1075,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podResource, times(3)).create() // Verify that registerPodCreationFailure was called 3 times (once per failed executor) - verify(lifecycleManager, times(3)).registerPodCreationFailure() + verify(lifecycleManager, times(3)).registerExecutorFailure() // Verify no pods were created since all attempts failed assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) From c95de5c5c4b7f7e7e78cf6849b31ba336db944bb Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 23 Feb 2026 09:27:10 -0800 Subject: [PATCH 4/4] Undo commented out code --- .../scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index b8316b0964f46..fcdf248b2a22c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -1024,9 +1024,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) -// intercept[KubernetesClientException] { -// podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) -// } + intercept[KubernetesClientException] { + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + } assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) }