Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ labelPRBasedOnFilePath:

area:Scheduler:
- airflow/jobs/**/*
- airflow/task/task_runner/**/*
- airflow/task/standard_task_runner.py
- airflow/dag_processing/**/*
- docs/apache-airflow/administration-and-deployment/scheduler.rst
- tests/jobs/**/*
Expand Down
9 changes: 0 additions & 9 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,6 @@ core:
type: string
example: ~
default: "50"
task_runner:
description: |
The class to use for running task instances in a subprocess.
Choices include StandardTaskRunner, CgroupTaskRunner or the full import path to the class
when using a custom task runner.
version_added: ~
type: string
example: ~
default: "StandardTaskRunner"
default_impersonation:
description: |
If set, tasks without a ``run_as_user`` argument will be run with this user
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/local_task_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def __init__(
self._overtime = 0.0

def _execute(self) -> int | None:
from airflow.task.task_runner import get_task_runner
from airflow.task.standard_task_runner import StandardTaskRunner

self.task_runner = get_task_runner(self)
self.task_runner = StandardTaskRunner(self)

# Print a marker post execution for internals of post task processing
self.log.info("::group::Pre task execution logs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,90 @@
import logging
import os
import signal
import subprocess
import threading
import time
from typing import TYPE_CHECKING

import psutil
from setproctitle import setproctitle

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.models.taskinstance import TaskReturnCode
from airflow.settings import CAN_FORK
from airflow.stats import Stats
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.configuration import tmp_configuration_copy
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.platform import IS_WINDOWS, getuser
from airflow.utils.process_utils import reap_process_group, set_new_process_group

if TYPE_CHECKING:
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner


class StandardTaskRunner(BaseTaskRunner):
"""Standard runner for all tasks."""
class StandardTaskRunner(LoggingMixin):
"""
Runs Airflow task instances via CLI.

Invoke the `airflow tasks run` command with raw mode enabled in a subprocess.

:param job_runner: The LocalTaskJobRunner associated with the task runner
"""

def __init__(self, job_runner: LocalTaskJobRunner):
super().__init__(job_runner=job_runner)
self.job_runner = job_runner
super().__init__(job_runner.task_instance)
self._task_instance = job_runner.task_instance

popen_prepend = []
if self._task_instance.run_as_user:
self.run_as_user: str | None = self._task_instance.run_as_user
else:
try:
self.run_as_user = conf.get("core", "default_impersonation")
except AirflowConfigException:
self.run_as_user = None

# Add sudo commands to change user if we need to. Needed to handle SubDagOperator
# case using a SequentialExecutor.
self.log.debug("Planning to run as the %s user", self.run_as_user)
if self.run_as_user and (self.run_as_user != getuser()):
# We want to include any environment variables now, as we won't
# want to have to specify them in the sudo call - they would show
# up in `ps` that way! And run commands now, as the other user
# might not be able to run the cmds to get credentials
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=True, include_cmds=True)

# Give ownership of file to user; only they can read and write
subprocess.check_call(["sudo", "chown", self.run_as_user, cfg_path], close_fds=True)

# propagate PYTHONPATH environment variable
pythonpath_value = os.environ.get("PYTHONPATH", "")
popen_prepend = ["sudo", "-E", "-H", "-u", self.run_as_user]

if pythonpath_value:
popen_prepend.append(f"PYTHONPATH={pythonpath_value}")

else:
# Always provide a copy of the configuration file settings. Since
# we are running as the same user, and can pass through environment
# variables then we don't need to include those in the config copy
# - the runner can read/execute those values as it needs
cfg_path = tmp_configuration_copy(chmod=0o600, include_env=False, include_cmds=False)

self._cfg_path = cfg_path
self._command = popen_prepend + self._task_instance.command_as_list(
raw=True,
pickle_id=self.job_runner.pickle_id,
mark_success=self.job_runner.mark_success,
job_id=self.job_runner.job.id,
pool=self.job_runner.pool,
cfg_path=cfg_path,
)
self.process = None
self._rc = None
if TYPE_CHECKING:
assert self._task_instance.task
Expand Down Expand Up @@ -216,3 +277,70 @@ def _read_task_utilization(self):
except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
self.log.info("Process not found (most likely exited), stop collecting metrics")
return

def _read_task_logs(self, stream):
while True:
line = stream.readline()
if isinstance(line, bytes):
line = line.decode("utf-8")
if not line:
break
self.log.info(
"Job %s: Subtask %s %s",
self._task_instance.job_id,
self._task_instance.task_id,
line.rstrip("\n"),
)

def run_command(self, run_with=None) -> subprocess.Popen:
"""
Run the task command.

:param run_with: list of tokens to run the task command with e.g. ``['bash', '-c']``
:return: the process that was run
"""
run_with = run_with or []
full_cmd = run_with + self._command

self.log.info("Running on host: %s", get_hostname())
self.log.info("Running: %s", full_cmd)
with _airflow_parsing_context_manager(
dag_id=self._task_instance.dag_id,
task_id=self._task_instance.task_id,
):
if IS_WINDOWS:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
)
else:
proc = subprocess.Popen(
full_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
close_fds=True,
env=os.environ.copy(),
preexec_fn=os.setsid,
)

# Start daemon thread to read subprocess logging output
log_reader = threading.Thread(
target=self._read_task_logs,
args=(proc.stdout,),
)
log_reader.daemon = True
log_reader.start()
return proc

def on_finish(self) -> None:
"""Execute when this is done running."""
if self._cfg_path and os.path.isfile(self._cfg_path):
if self.run_as_user:
subprocess.call(["sudo", "rm", self._cfg_path], close_fds=True)
else:
os.remove(self._cfg_path)
66 changes: 0 additions & 66 deletions airflow/task/task_runner/__init__.py

This file was deleted.

Loading