diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala index cc081202cf89a..8803d3d031e47 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala @@ -35,6 +35,20 @@ import org.apache.spark.resource.ResourceProfile */ @DeveloperApi abstract class AbstractPodsAllocator { + /* + * Optional lifecycle manager for tracking executor pod lifecycle events. + * Set via setExecutorPodsLifecycleManager for backward compatibility. + */ + protected var executorPodsLifecycleManager: Option[ExecutorPodsLifecycleManager] = None + + /* + * Set the lifecycle manager for tracking executor pod lifecycle events. + * This method is optional and may not exist in custom implementations based on older versions. + */ + def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = { + executorPodsLifecycleManager = Some(manager) + } + /* * Set the total expected executors for an application */ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 126d07c0926f4..340cc9c76e46f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -118,6 +118,9 @@ class ExecutorPodsAllocator( protected val numOutstandingPods = new AtomicInteger() + // Track total failed pod creation attempts across the application lifecycle + protected val totalFailedPodCreations = new AtomicInteger(0) + protected var lastSnapshot = ExecutorPodsSnapshot() protected var appId: String = _ @@ -459,32 +462,55 @@ class ExecutorPodsAllocator( .build() val resources = replacePVCsIfNeeded( podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs) - val createdExecutorPod = - kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create() - try { - addOwnerReference(createdExecutorPod, resources) - resources - .filter(_.getKind == "PersistentVolumeClaim") - .foreach { resource => - if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { - addOwnerReference(driverPod.get, Seq(resource)) - } - val pvc = resource.asInstanceOf[PersistentVolumeClaim] - logInfo(log"Trying to create PersistentVolumeClaim " + - log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " + - log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}") - kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() - PVC_COUNTER.incrementAndGet() - } - newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) - logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + val optCreatedExecutorPod = try { + Some(kubernetesClient + .pods() + .inNamespace(namespace) + .resource(podWithAttachedContainer) + .create()) } catch { case NonFatal(e) => - kubernetesClient.pods() - .inNamespace(namespace) - .resource(createdExecutorPod) - .delete() - throw e + // Register failure with global tracker if lifecycle manager is available + val failureCount = registerPodCreationFailure() + logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) + None + } + optCreatedExecutorPod.foreach { createdExecutorPod => + try { + addOwnerReference(createdExecutorPod, resources) + resources + .filter(_.getKind == "PersistentVolumeClaim") + .foreach { resource => + if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) { + addOwnerReference(driverPod.get, Seq(resource)) + } + val pvc = resource.asInstanceOf[PersistentVolumeClaim] + logInfo(log"Trying to create PersistentVolumeClaim " + + log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " + + log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}") + kubernetesClient + .persistentVolumeClaims() + .inNamespace(namespace) + .resource(pvc) + .create() + PVC_COUNTER.incrementAndGet() + } + newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis()) + logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + } catch { + case NonFatal(e) => + // Register failure with global tracker if lifecycle manager is available + val failureCount = registerPodCreationFailure() + logError(log"Failed to add owner reference or create PVC for executor pod " + + log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " + + log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e) + kubernetesClient.pods() + .inNamespace(namespace) + .resource(createdExecutorPod) + .delete() + throw e + } } } } @@ -521,6 +547,16 @@ class ExecutorPodsAllocator( resources.filterNot(replacedResources.contains) } + /** + * Registers a pod creation failure with the lifecycle manager and increments the local counter. + * Returns the total failure count for logging purposes. + */ + protected def registerPodCreationFailure(): Int = { + val failureCount = totalFailedPodCreations.incrementAndGet() + executorPodsLifecycleManager.foreach(_.registerExecutorFailure()) + failureCount + } + protected def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = { try { val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index c57a014dcfa67..84cfd0d72b3bb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -77,6 +77,14 @@ private[spark] class ExecutorPodsLifecycleManager( protected[spark] def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors + /** + * Register an executor failure. This increments the global executor failure count + * which is checked against spark.executor.maxNumFailures. + */ + protected[spark] def registerExecutorFailure(): Unit = { + failureTracker.registerExecutorFailure() + } + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) snapshotsStore.addSubscriber(eventProcessingInterval) { executorPodsSnapshot => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index d2c6789f2bb5a..782fac670fa88 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -134,7 +134,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit kubernetesClient, snapshotsStore) - val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore) + val executorPodsAllocator = makeExecutorPodsAllocator( + sc, kubernetesClient, snapshotsStore, Some(executorPodsLifecycleManager)) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, @@ -158,8 +159,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit podsPollingEventSource) } - private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient, - snapshotsStore: ExecutorPodsSnapshotsStore) = { + private[k8s] def makeExecutorPodsAllocator( + sc: SparkContext, + kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore, + lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = { val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) && sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) { @@ -184,13 +188,20 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit classOf[SparkConf], classOf[org.apache.spark.SecurityManager], classOf[KubernetesExecutorBuilder], classOf[KubernetesClient], classOf[ExecutorPodsSnapshotsStore], classOf[Clock]) - cstr.newInstance( + val allocatorInstance = cstr.newInstance( sc.conf, sc.env.securityManager, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, - new SystemClock()) + new SystemClock()).asInstanceOf[AbstractPodsAllocator] + + // Set the lifecycle manager if provided + lifecycleManager.foreach { manager => + allocatorInstance.setExecutorPodsLifecycleManager(manager) + } + + allocatorInstance } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e994ccbed9a65..fcdf248b2a22c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -112,6 +112,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + @Mock + private var lifecycleManager: ExecutorPodsLifecycleManager = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -142,6 +145,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims) @@ -202,6 +206,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3)) @@ -268,6 +273,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) // Request more than the max per rp for one rp @@ -321,6 +327,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1") podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() @@ -838,6 +845,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(podsWithNamespace @@ -936,6 +944,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) when(podsWithNamespace @@ -1005,6 +1014,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { podsAllocatorUnderTest = new ExecutorPodsAllocator( confWithPVC, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager) podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) val startTime = Instant.now.toEpochMilli @@ -1052,4 +1062,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt, k8sConf.resourceProfileId.toInt), Seq.empty) } + + test("SPARK-55075: Pod creation failures are tracked by ExecutorFailureTracker") { + // Make all pod creation attempts fail + when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod" + + " creation failure")) + + // Request 3 executors + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3)) + + // Verify that pod creation was attempted 3 times (once per executor, no retries) + verify(podResource, times(3)).create() + + // Verify that registerPodCreationFailure was called 3 times (once per failed executor) + verify(lifecycleManager, times(3)).registerExecutorFailure() + + // Verify no pods were created since all attempts failed + assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0) + } }