Skip to content

Commit 52bf90b

Browse files
committed
KubernetesExecutor: retry pod creation on Kubernetes API 500 errors
Treat 500 InternalServerError responses from the Kubernetes API as transient when creating worker pods and requeue the task according to `task_publish_max_retries` (or indefinitely when -1). This avoids failing tasks due to transient kube-apiserver errors and logs retry attempts with details.
1 parent b2f7873 commit 52bf90b

2 files changed

Lines changed: 24 additions & 0 deletions

File tree

providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ def sync(self) -> None:
385385
if (
386386
(str(e.status) == "403" and "exceeded quota" in message)
387387
or (str(e.status) == "409" and "object has been modified" in message)
388+
or str(e.status) == "500"
388389
) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries):
389390
self.log.warning(
390391
"[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s",

providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,26 @@ def setup_method(self) -> None:
407407
State.FAILED,
408408
id="429 Too Many Requests (empty body)",
409409
),
410+
pytest.param(
411+
HTTPResponse(
412+
body='{"message": "Internal error occurred: failed calling webhook \\"mutation.azure-workload-identity.io\\": failed to call webhook: Post \\"https://azure-wi-webhook-webhook-service.kube-system.svc:443/mutate-v1-pod?timeout=10s\\""}',
413+
status=500,
414+
),
415+
1,
416+
True,
417+
State.SUCCESS,
418+
id="500 Internal Server Error (webhook failure)",
419+
),
420+
pytest.param(
421+
HTTPResponse(
422+
body='{"message": "Internal error occurred: failed calling webhook"}',
423+
status=500,
424+
),
425+
1,
426+
True,
427+
State.FAILED,
428+
id="500 Internal Server Error (webhook failure) (retry failed)",
429+
),
410430
],
411431
)
412432
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@@ -439,6 +459,9 @@ def test_run_next_exception_requeue(
439459
- your requested namespace doesn't exists
440460
- 422 Unprocessable Entity will returns in scenarios like
441461
- your request parameters are valid but unsupported e.g. limits lower than requests.
462+
- 500 Internal Server Error will returns in scenarios like
463+
- failed calling webhook - typically transient API server or webhook service issues
464+
- should be retried if task_publish_max_retries > 0
442465
443466
"""
444467
template_file = data_file("pods/generator_base_with_secrets.yaml").as_posix()

0 commit comments

Comments
 (0)