Skip to content

Race Condition in KubernetesJobOperator with XCom #56596

@pmcquighan-camus

Description

@pmcquighan-camus

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==10.7.0

Apache Airflow version

3.0.6

Operating System

debian 12

Deployment

Official Apache Airflow Helm Chart

Deployment details

Running on GKE , kubernetes version 1.33

What happened

Occasionally, the KubernetesJobOperator ends up creating both a Job object and a Pod object. Kubernetes' controller-manager also creates a pod, so there are then 2 pods running for this airflow task. One will have a prefix like job- created by the job from the KubernetesJobOperator, and the other will not. This could be okay, except that if you have do_xcom_push=True on the job, then there is the XCom sidecar container running which will indefinitely until it is terminated by airflow by doing an exec and sending a signal to the main process. So, when there are 2 pods that exist, the query to find the appropriate pod to exec into possibly choose the wrong pod, and the main job will run indefinitely (or until kubernetes kills it due to an active_deadline_seconds) or other airflow timeout and ultimately fail the task.

Side note, the operator does not sleep in between retries when trying to find pods for the job, and does len(pod_list) != self.parallelism, instead of a len(pod_list) > self.parallelism (or something) since a kubernetes Job can have retries on Pods and so there might be 3 pods for a job with parallelism 1 (if the first 2 are Failed).

What you think should happen instead

The problem, I believe is that if paralellism is None the operator will call a method get_or_create_pod which looks for an existing pod (by label). However, the job controller in k8s might not have created a matching pod yet, and so this returns None, and a second pod gets created (not associated to the job).

I think I have been able to work around this race condition by always setting parallelism flag on the Job, but this seems like unexpected behavior, and the job operator should never result in launching a pod, and should always launch a job.

How to reproduce

Using a simple DAG that just writes something to the XCom file and then tries to read it, if I trigger this 10 times it will typically fail about 3-4 of them (i.e. the task will run indefinitely until being killed by k8s)

with DAG(
    dag_id="k8s-test",
    schedule=None,
    catchup=False,
    start_date=datetime(2025, 8, 1, 0, 0, 0, 0, timezone.utc),
    max_active_runs=2,
    default_args={
        "retries": 2,
        "retry_delay": timedelta(seconds=30),
    },
) as dag:
    k8s_job = GKEStartJobOperator(
        # Task config
        task_id="k8s_output",
        cluster_name="xx",
        location="xx",
        deferrable=True,
        poll_interval=30.0,
        backoff_limit=3,  # Number of times pod will be retried (independent of task being retried)
        do_xcom_push=True,  # Need to push the output file paths on to later stages
        base_container_name="k8s-output",
        # Don't mark this stage as complete until the job is actually done
        wait_until_job_complete=True,
        # Job config - write out a file with a string-wrapped dictionary
        arguments=[
            "python3",
            "-c",
            """from pathlib import Path
import time
import json
Path("/airflow/xcom/return.json").parent.mkdir(parents=True, exist_ok=True)
with open("/airflow/xcom/return.json", "w") as f:
  f.write(json.dumps({"hello":"world"}))
""",
        ],
        image="python:3.12.10-alpine",
        namespace="default",
    )

    parse_xcom = GKEStartJobOperator(
        # Task config
        task_id="parse_xcom",
        cluster_name="xx",
        location="xx",
        deferrable=True,
        poll_interval=30.0,
        backoff_limit=3,
        base_container_name="parse-xcom",
        wait_until_job_complete=True,
        arguments=[
            "echo",
            "{{ ti.xcom_pull('k8s_output')",
        ],
        image="python:3.12.10-alpine",
        namespace="default",
        ttl_seconds_after_finished=180,
    )

    _ = parse_xcom << k8s_job

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions