Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ import org.apache.spark.resource.ResourceProfile
*/
@DeveloperApi
abstract class AbstractPodsAllocator {
/*
* Optional lifecycle manager for tracking executor pod lifecycle events.
* Set via setExecutorPodsLifecycleManager for backward compatibility.
*/
protected var executorPodsLifecycleManager: Option[ExecutorPodsLifecycleManager] = None

/*
* Set the lifecycle manager for tracking executor pod lifecycle events.
* This method is optional and may not exist in custom implementations based on older versions.
*/
def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager): Unit = {
executorPodsLifecycleManager = Some(manager)
}

/*
* Set the total expected executors for an application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class ExecutorPodsAllocator(

protected val numOutstandingPods = new AtomicInteger()

// Track total failed pod creation attempts across the application lifecycle
protected val totalFailedPodCreations = new AtomicInteger(0)

protected var lastSnapshot = ExecutorPodsSnapshot()

protected var appId: String = _
Expand Down Expand Up @@ -459,32 +462,55 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, Seq(resource))
}
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(log"Trying to create PersistentVolumeClaim " +
log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " +
log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}")
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
PVC_COUNTER.incrementAndGet()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
val optCreatedExecutorPod = try {
Some(kubernetesClient
.pods()
.inNamespace(namespace)
.resource(podWithAttachedContainer)
.create())
} catch {
case NonFatal(e) =>
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
// Register failure with global tracker if lifecycle manager is available
val failureCount = registerPodCreationFailure()
logError(log"Failed to create executor pod ${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 473-479 seems to repeated twice here and 506-513. Could you make a method to remove the code duplication?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

None
}
optCreatedExecutorPod.foreach { createdExecutorPod =>
try {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why an error during pod creation handled differently from an error coming during adding the owner reference and creating PVC?

In both case the nonfatal error ends in deleting the pod so why the 2nd case does not tracked as a failure?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors from owner reference and PVC creation are handled differently it appears (since they throw an exception). I've added them to the tracking by the lifecycle manager but also retained the current behaviour of throwing an exception for these cases.
Would you prefer we stop throwing an exception and have everything handled by the lifecycle manager?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I have missed that line. Could you please run a manual test and check what happens with the exception which thrown here (by throwing an exception here directly and running one of the integration test)?
I am afraid it can go way up to CoarseGrainedSchedulerBackend.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exception being thrown is fine. It. is propagated up through requestNewExecutors() → onNewSnapshots(). The exception is caught at ExecutorPodsSnapshotStoreImpl.processSnapshotsInternal and logged as a warning.
The test test("SPARK-41410: An exception during PVC creation should not increase PVC counter") is testing explicitly for an exception to be thrown in this case. I modified the code to throw an Exception and the test passed (as long as the exception is a KubernetesClientException).

addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, Seq(resource))
}
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(log"Trying to create PersistentVolumeClaim " +
log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " +
log"StorageClass ${MDC(LogKeys.CLASS_NAME, pvc.getSpec.getStorageClassName)}")
kubernetesClient
.persistentVolumeClaims()
.inNamespace(namespace)
.resource(pvc)
.create()
PVC_COUNTER.incrementAndGet()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
// Register failure with global tracker if lifecycle manager is available
val failureCount = registerPodCreationFailure()
logError(log"Failed to add owner reference or create PVC for executor pod " +
log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
}
}
Expand Down Expand Up @@ -521,6 +547,16 @@ class ExecutorPodsAllocator(
resources.filterNot(replacedResources.contains)
}

/**
* Registers a pod creation failure with the lifecycle manager and increments the local counter.
* Returns the total failure count for logging purposes.
*/
protected def registerPodCreationFailure(): Int = {
val failureCount = totalFailedPodCreations.incrementAndGet()
executorPodsLifecycleManager.foreach(_.registerExecutorFailure())
failureCount
}

protected def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime: Long): Boolean = {
try {
val creationTime = Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ private[spark] class ExecutorPodsLifecycleManager(

protected[spark] def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

/**
* Register an executor failure. This increments the global executor failure count
* which is checked against spark.executor.maxNumFailures.
*/
protected[spark] def registerExecutorFailure(): Unit = {
failureTracker.registerExecutorFailure()
}

def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) { executorPodsSnapshot =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
kubernetesClient,
snapshotsStore)

val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore)
val executorPodsAllocator = makeExecutorPodsAllocator(
sc, kubernetesClient, snapshotsStore, Some(executorPodsLifecycleManager))

val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
Expand All @@ -158,8 +159,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
podsPollingEventSource)
}

private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore) = {
private[k8s] def makeExecutorPodsAllocator(
sc: SparkContext,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = {
val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) &&
sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
Expand All @@ -184,13 +188,20 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
classOf[SparkConf], classOf[org.apache.spark.SecurityManager],
classOf[KubernetesExecutorBuilder], classOf[KubernetesClient],
classOf[ExecutorPodsSnapshotsStore], classOf[Clock])
cstr.newInstance(
val allocatorInstance = cstr.newInstance(
sc.conf,
sc.env.securityManager,
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
new SystemClock())
new SystemClock()).asInstanceOf[AbstractPodsAllocator]

// Set the lifecycle manager if provided
lifecycleManager.foreach { manager =>
allocatorInstance.setExecutorPodsLifecycleManager(manager)
}

allocatorInstance
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _

@Mock
private var lifecycleManager: ExecutorPodsLifecycleManager = _

private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _

private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
Expand Down Expand Up @@ -142,6 +145,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator(
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
Expand Down Expand Up @@ -202,6 +206,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
val confWithLowMaxPendingPods = conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3")
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
Expand Down Expand Up @@ -268,6 +273,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
.set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2")
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

// Request more than the max per rp for one rp
Expand Down Expand Up @@ -321,6 +327,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
val confWithAllocationMaximum = conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1")
podsAllocatorUnderTest = new ExecutorPodsAllocator(confWithAllocationMaximum, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
Expand Down Expand Up @@ -838,6 +845,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

when(podsWithNamespace
Expand Down Expand Up @@ -936,6 +944,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

when(podsWithNamespace
Expand Down Expand Up @@ -1005,6 +1014,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

val startTime = Instant.now.toEpochMilli
Expand Down Expand Up @@ -1052,4 +1062,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
k8sConf.resourceProfileId.toInt), Seq.empty)
}

test("SPARK-55075: Pod creation failures are tracked by ExecutorFailureTracker") {
// Make all pod creation attempts fail
when(podResource.create()).thenThrow(new KubernetesClientException("Simulated pod" +
" creation failure"))

// Request 3 executors
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3))

// Verify that pod creation was attempted 3 times (once per executor, no retries)
verify(podResource, times(3)).create()

// Verify that registerPodCreationFailure was called 3 times (once per failed executor)
verify(lifecycleManager, times(3)).registerExecutorFailure()

// Verify no pods were created since all attempts failed
assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0)
}
}