Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 19 additions & 124 deletions airflow-core/src/airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import contextlib
import logging
import os
import pathlib
Expand All @@ -29,7 +28,6 @@
from base64 import b64encode
from collections.abc import Callable
from configparser import ConfigParser
from copy import deepcopy
from inspect import ismodule
from io import StringIO
from re import Pattern
Expand All @@ -39,9 +37,7 @@
from typing_extensions import overload

from airflow._shared.configuration.parser import (
VALUE_NOT_FOUND_SENTINEL,
AirflowConfigParser as _SharedAirflowConfigParser,
ValueNotFound,
configure_parser_from_configuration_description,
)
from airflow._shared.module_loading import import_string
Expand Down Expand Up @@ -207,10 +203,19 @@ def __init__(
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
_default_values = create_default_config_parser(configuration_description)
super().__init__(configuration_description, _default_values, *args, **kwargs)
from airflow.providers_manager import ProvidersManager

super().__init__(
configuration_description,
_default_values,
ProvidersManager,
create_default_config_parser,
_default_config_file_path("provider_config_fallback_defaults.cfg"),
*args,
**kwargs,
)
self.configuration_description = configuration_description
self._default_values = _default_values
self._provider_config_fallback_default_values = create_provider_config_fallback_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
Expand All @@ -228,35 +233,6 @@ def _validators(self) -> list[Callable[[], None]]:
self._upgrade_postgres_metastore_conn,
]

@property
def _lookup_sequence(self) -> list[Callable]:
"""Overring _lookup_sequence from shared base class to add provider fallbacks."""
return super()._lookup_sequence + [self._get_option_from_provider_fallbacks]

def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]:
"""Override the base method to add provider fallbacks."""
return [
("provider-fallback-defaults", self._provider_config_fallback_default_values),
("default", self._default_values),
("airflow.cfg", self),
]

def _get_option_from_provider_fallbacks(
self,
deprecated_key: str | None,
deprecated_section: str | None,
key: str,
section: str,
issue_warning: bool = True,
extra_stacklevel: int = 0,
**kwargs,
) -> str | ValueNotFound:
"""Get config option from provider fallback defaults."""
if self.get_provider_config_fallback_defaults(section, key) is not None:
# no expansion needed
return self.get_provider_config_fallback_defaults(section, key, **kwargs)
return VALUE_NOT_FOUND_SENTINEL

def _update_logging_deprecated_template_to_one_from_defaults(self):
default = self.get_default_value("logging", "log_filename_template")
if default:
Expand All @@ -267,13 +243,6 @@ def _update_logging_deprecated_template_to_one_from_defaults(self):
default,
)

def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any:
"""Get provider config fallback default values."""
# Remove team_name from kwargs as the fallback defaults ConfigParser
# does not support team-aware lookups (it's a standard ConfigParser).
kwargs.pop("team_name", None)
return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs)

# A mapping of old default values that we want to change and warn the user
# about. Mapping of section -> setting -> { old, replace }
deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = {
Expand Down Expand Up @@ -403,7 +372,7 @@ def _ensure_providers_config_loaded(self) -> None:
if not self._providers_configuration_loaded:
from airflow.providers_manager import ProvidersManager

ProvidersManager()._initialize_providers_configuration()
ProvidersManager().initialize_providers_configuration()

def _ensure_providers_config_unloaded(self) -> bool:
"""Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded."""
Expand All @@ -416,17 +385,6 @@ def _reload_provider_configs(self) -> None:
"""Reload providers configuration."""
self.load_providers_configuration()

def restore_core_default_configuration(self) -> None:
"""
Restore default configuration for core Airflow.

It does not restore configuration for providers. If you want to restore configuration for
providers, you need to call ``load_providers_configuration`` method.
"""
self.configuration_description = retrieve_configuration_description(include_providers=False)
self._default_values = create_default_config_parser(self.configuration_description)
self._providers_configuration_loaded = False

def _upgrade_postgres_metastore_conn(self):
"""
Upgrade SQL schemas.
Expand Down Expand Up @@ -490,6 +448,11 @@ def _validate_sqlite3_version(self):
f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}"
)

def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None:
return super()._get_custom_secret_backend(
worker_mode=worker_mode if worker_mode is not None else False
)

def mask_secrets(self):
from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars
from airflow._shared.secrets_masker import mask_secret as mask_secret_core
Expand Down Expand Up @@ -567,52 +530,6 @@ def providers_configuration_loaded(self) -> bool:
"""Checks if providers have been loaded."""
return self._providers_configuration_loaded

def load_providers_configuration(self):
"""
Load configuration for providers.

This should be done after initial configuration have been performed. Initializing and discovering
providers is an expensive operation and cannot be performed when we load configuration for the first
time when airflow starts, because we initialize configuration very early, during importing of the
`airflow` package and the module is not yet ready to be used when it happens and until configuration
and settings are loaded. Therefore, in order to reload provider configuration we need to additionally
load provider - specific configuration.
"""
log.debug("Loading providers configuration")
from airflow.providers_manager import ProvidersManager

self.restore_core_default_configuration()
for provider, config in ProvidersManager().already_initialized_provider_configs:
for provider_section, provider_section_content in config.items():
provider_options = provider_section_content["options"]
section_in_current_config = self.configuration_description.get(provider_section)
if not section_in_current_config:
self.configuration_description[provider_section] = deepcopy(provider_section_content)
section_in_current_config = self.configuration_description.get(provider_section)
section_in_current_config["source"] = f"default-{provider}"
for option in provider_options:
section_in_current_config["options"][option]["source"] = f"default-{provider}"
else:
section_source = section_in_current_config.get("source", "Airflow's core package").split(
"default-"
)[-1]
raise AirflowConfigException(
f"The provider {provider} is attempting to contribute "
f"configuration section {provider_section} that "
f"has already been added before. The source of it: {section_source}. "
"This is forbidden. A provider can only add new sections. It "
"cannot contribute options to existing sections or override other "
"provider's configuration.",
UserWarning,
)
self._default_values = create_default_config_parser(self.configuration_description)
# sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete
# the cached values, and it will be refreshed on next access.
with contextlib.suppress(AttributeError):
# no problem if cache is not set yet
del self.sensitive_config_values
self._providers_configuration_loaded = True

def _get_config_value_from_secret_backend(self, config_key: str) -> str | None:
"""
Override to use module-level function that reads from global conf.
Expand Down Expand Up @@ -702,30 +619,6 @@ def create_default_config_parser(configuration_description: dict[str, dict[str,
return parser


def create_provider_config_fallback_defaults() -> ConfigParser:
"""
Create fallback defaults.

This parser contains provider defaults for Airflow configuration, containing fallback default values
that might be needed when provider classes are being imported - before provider's configuration
is loaded.

Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead
to retrieving provider configuration before the defaults for the provider are loaded.

Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or
environment variables) those will be used as usual.

NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication,
at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed.

You've been warned!
"""
config_parser = ConfigParser()
config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg"))
return config_parser


def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
airflow_config = pathlib.Path(AIRFLOW_CONFIG)
if airflow_config.is_dir():
Expand Down Expand Up @@ -754,11 +647,13 @@ def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
conf.configuration_description["core"]["options"]["fernet_key"]["default"] = (
_SecretKeys.fernet_key
)
conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key)

_SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8")
conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = (
_SecretKeys.jwt_secret_key
)
conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key)
pathlib.Path(airflow_config.__fspath__()).touch()
make_group_other_inaccessible(airflow_config.__fspath__())
with open(airflow_config, "w") as file:
Expand Down
26 changes: 17 additions & 9 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ def __init__(
self.num_runs = num_runs
self.only_idle = only_idle
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time

# Note:
# We need to fetch all conf values before the `prohibit_commit` block; otherwise the Core conf may
# access the MetadataMetastoreBackend and trigger `UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS`.
# The easiest way to keep the scheduler loop side-effect free is to read those values in `__init__`.

# How many seconds do we wait for tasks to heartbeat before timeout.
self._task_instance_heartbeat_timeout_secs = conf.getint(
"scheduler", "task_instance_heartbeat_timeout"
Expand All @@ -284,6 +290,9 @@ def __init__(
key="num_stuck_in_queued_retries",
fallback=2,
)
self._scheduler_use_job_schedule = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
self._parallelism = conf.getint("core", "parallelism")
self._multi_team = conf.getboolean("core", "multi_team")
Comment thread
jason810496 marked this conversation as resolved.

self.executors: list[BaseExecutor] = executors if executors else ExecutorLoader.init_executors()
self.executor: BaseExecutor = self.executors[0]
Expand Down Expand Up @@ -656,7 +665,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
self.log.info("%s tasks up for execution:\n%s", len(task_instances_to_examine), task_instance_str)

dag_id_to_team_name: dict[str, str | None] = {}
if conf.getboolean("core", "multi_team"):
if self._multi_team:
# Batch query to resolve team names for all DAG IDs to optimize performance
# Instead of individual queries in _try_to_load_executor(), resolve all team names upfront
unique_dag_ids = {ti.dag_id for ti in task_instances_to_examine}
Expand Down Expand Up @@ -1005,11 +1014,10 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int:
# we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally
# across all executors.
num_occupied_slots = sum([executor.slots_occupied for executor in self.executors])
parallelism = conf.getint("core", "parallelism")
if self.job.max_tis_per_query == 0:
max_tis = parallelism - num_occupied_slots
max_tis = self._parallelism - num_occupied_slots
else:
max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots)
max_tis = min(self.job.max_tis_per_query, self._parallelism - num_occupied_slots)
if max_tis <= 0:
self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!")
return 0
Expand Down Expand Up @@ -1039,7 +1047,7 @@ def _enqueue_executor_callbacks(self, session: Session) -> None:
:param session: The database session
"""
num_occupied_slots = sum(executor.slots_occupied for executor in self.executors)
max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots
max_callbacks = self._parallelism - num_occupied_slots

if max_callbacks <= 0:
self.log.debug("No available slots for callbacks; all executors at capacity")
Expand Down Expand Up @@ -1694,7 +1702,7 @@ def _do_scheduling(self, session: Session) -> int:
"""
# Put a check in place to make sure we don't commit unexpectedly
with prohibit_commit(session) as guard:
if conf.getboolean("scheduler", "use_job_schedule", fallback=True):
if self._scheduler_use_job_schedule:
self._create_dagruns_for_dags(guard, session)

self._start_queued_dagruns(session)
Expand Down Expand Up @@ -2874,7 +2882,7 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T
def _purge_task_instances_without_heartbeats(
self, task_instances_without_heartbeats: list[TI], *, session: Session
) -> None:
if conf.getboolean("core", "multi_team"):
if self._multi_team:
unique_dag_ids = {ti.dag_id for ti in task_instances_without_heartbeats}
dag_id_to_team_name = self._get_team_names_for_dag_ids(unique_dag_ids, session)
else:
Expand Down Expand Up @@ -3122,7 +3130,7 @@ def _executor_to_workloads(
) -> dict[BaseExecutor, list[SchedulerWorkload]]:
"""Organize workloads into lists per their respective executor."""
workloads_iter: Iterable[SchedulerWorkload]
if conf.getboolean("core", "multi_team"):
if self._multi_team:
if dag_id_to_team_name is None:
if isinstance(workloads, list):
workloads_list = workloads
Expand Down Expand Up @@ -3170,7 +3178,7 @@ def _try_to_load_executor(
will query the database to resolve team name. None indicates global team.
"""
executor = None
if conf.getboolean("core", "multi_team"):
if self._multi_team:
# Use provided team_name if available, otherwise query the database
if team_name is NOTSET:
team_name = self._get_workload_team_name(workload, session)
Expand Down
23 changes: 8 additions & 15 deletions airflow-core/src/airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,23 +610,10 @@ def initialize_providers_auth_managers_without_check(self):

@provider_info_cache("config")
def initialize_providers_configuration(self):
"""Lazy initialization of providers configuration information."""
self._initialize_providers_configuration()

def _initialize_providers_configuration(self):
"""
Initialize providers configuration information.

Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method.
In some cases we might want to make sure that the configuration is initialized, but we do not want
to cache the initialization method - for example when we just want to write configuration with
providers, but it is used in the context where no providers are loaded yet we will eventually
restore the original configuration and we want the subsequent ``initialize_providers_configuration``
method to be run in order to load the configuration for providers again.
"""
"""Lazy initialization of provider configuration metadata and merge it into ``conf``."""
self.initialize_providers_list()
self._discover_config()
# Now update conf with the new provider configuration from providers
# Imported lazily to avoid a configuration/providers_manager import cycle during startup.
from airflow.configuration import conf

conf.load_providers_configuration()
Expand Down Expand Up @@ -1491,6 +1478,12 @@ def _cleanup(self):
self._executor_without_check_set.clear()
self._queue_class_name_set.clear()
self._provider_configs.clear()

# Imported lazily to avoid a configuration/providers_manager import cycle during cleanup.
from airflow.configuration import conf

conf.invalidate_cache()

self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()
Expand Down
5 changes: 5 additions & 0 deletions airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ def setup_class(cls):
# during scheduler handoff (see https://github.com/apache/airflow/issues/61070).
wait_for_otel_collector(otel_host, otel_port)

# The pytest plugin strips AIRFLOW__*__* env vars (including the JWT secret set
# by Breeze). Both the scheduler and api-server subprocesses must share the same
# secret; otherwise each generates its own random key and token verification fails.
os.environ["AIRFLOW__API_AUTH__JWT_SECRET"] = "test-secret-key-for-testing"
os.environ["AIRFLOW__API_AUTH__JWT_ISSUER"] = "airflow"
Comment thread
jason810496 marked this conversation as resolved.
os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True"
os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf"
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "http://breeze-otel-collector:4318/v1/traces"
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/tests/unit/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
"deactivate_stale_dags_interval",
"2.5.0",
)
# Invalidate cached properties that depend on deprecated_options, since they may have been
# computed during airflow initialization before the entries above were added.
conf.invalidate_cache()


@pytest.fixture(scope="module", autouse=True)
Expand Down
Loading
Loading