From 6f4547fddbcdae454de02982e70e3719b9265b86 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 2 Mar 2026 10:53:23 +0800 Subject: [PATCH 01/17] Add lazy initialization for provider configs in ProvidersManagerTaskRuntime --- .../airflow/sdk/providers_manager_runtime.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index e7b1b65e2bf12..e6e73031da624 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -152,6 +152,8 @@ def __init__(self): self._plugins_set: set[PluginInfo] = set() self._provider_schema_validator = _create_provider_info_schema_validator() self._init_airflow_core_hooks() + # _provider configs is required by respecting provider default config for sdk conf + self._provider_configs: dict[str, dict[str, Any]] = {} def _init_airflow_core_hooks(self): """Initialize the hooks dict with default hooks from Airflow core.""" @@ -218,6 +220,18 @@ def initialize_providers_taskflow_decorator(self): self.initialize_providers_list() self._discover_taskflow_decorators() + @provider_info_cache("provider_configs") + def initialize_provider_configs(self): + """Lazy initialization of provider configs.""" + self.initialize_providers_list() + self._discover_config() + + def _discover_config(self) -> None: + """Retrieve all configs defined in the providers.""" + for provider_package, provider in self._provider_dict.items(): + if provider.data.get("config"): + self._provider_configs[provider_package] = provider.data.get("config") + def _discover_hooks_from_connection_types( self, hook_class_names_registered: set[str], @@ -597,6 +611,11 @@ def plugins(self) -> list[PluginInfo]: self.initialize_providers_plugins() return sorted(self._plugins_set, key=lambda x: x.plugin_class) + @property + def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: + self.initialize_provider_configs() + return sorted(self._provider_configs.items(), key=lambda x: x[0]) + def _cleanup(self): self._initialized_cache.clear() self._provider_dict.clear() @@ -608,6 +627,7 @@ def _cleanup(self): self._asset_uri_handlers.clear() self._asset_factories.clear() self._asset_to_openlineage_converters.clear() + self._provider_configs.clear() self._initialized = False self._initialization_stack_trace = None From fa6de5d7808e149708a95eb507e60a414c2880ca Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 2 Mar 2026 11:04:13 +0800 Subject: [PATCH 02/17] Add common load_providers_configuration in shared lib --- airflow-core/src/airflow/configuration.py | 35 +---------- .../prek/check_airflow_imports_in_shared.py | 26 ++++++++ .../airflow_shared/configuration/parser.py | 60 ++++++++++++++++++- task-sdk/src/airflow/sdk/configuration.py | 15 +++++ 4 files changed, 101 insertions(+), 35 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 69d936be01d82..dac9cc155b3e0 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -import contextlib import logging import os import pathlib @@ -29,7 +28,6 @@ from base64 import b64encode from collections.abc import Callable from configparser import ConfigParser -from copy import deepcopy from inspect import ismodule from io import StringIO from re import Pattern @@ -578,40 +576,9 @@ def load_providers_configuration(self): and settings are loaded. Therefore, in order to reload provider configuration we need to additionally load provider - specific configuration. """ - log.debug("Loading providers configuration") from airflow.providers_manager import ProvidersManager - self.restore_core_default_configuration() - for provider, config in ProvidersManager().already_initialized_provider_configs: - for provider_section, provider_section_content in config.items(): - provider_options = provider_section_content["options"] - section_in_current_config = self.configuration_description.get(provider_section) - if not section_in_current_config: - self.configuration_description[provider_section] = deepcopy(provider_section_content) - section_in_current_config = self.configuration_description.get(provider_section) - section_in_current_config["source"] = f"default-{provider}" - for option in provider_options: - section_in_current_config["options"][option]["source"] = f"default-{provider}" - else: - section_source = section_in_current_config.get("source", "Airflow's core package").split( - "default-" - )[-1] - raise AirflowConfigException( - f"The provider {provider} is attempting to contribute " - f"configuration section {provider_section} that " - f"has already been added before. The source of it: {section_source}. " - "This is forbidden. A provider can only add new sections. It " - "cannot contribute options to existing sections or override other " - "provider's configuration.", - UserWarning, - ) - self._default_values = create_default_config_parser(self.configuration_description) - # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete - # the cached values, and it will be refreshed on next access. - with contextlib.suppress(AttributeError): - # no problem if cache is not set yet - del self.sensitive_config_values - self._providers_configuration_loaded = True + self._load_providers_configuration(ProvidersManager, create_default_config_parser) def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: """ diff --git a/scripts/ci/prek/check_airflow_imports_in_shared.py b/scripts/ci/prek/check_airflow_imports_in_shared.py index 63dd94e090f9c..8e12e750a6068 100755 --- a/scripts/ci/prek/check_airflow_imports_in_shared.py +++ b/scripts/ci/prek/check_airflow_imports_in_shared.py @@ -32,6 +32,26 @@ from common_prek_utils import console +def _is_type_checking_guard(node: ast.If) -> bool: + """Check if an If node is a ``TYPE_CHECKING`` guard.""" + test = node.test + if isinstance(test, ast.Name) and test.id == "TYPE_CHECKING": + return True + if isinstance(test, ast.Attribute) and test.attr == "TYPE_CHECKING": + return True + return False + + +def _collect_type_checking_node_ids(tree: ast.AST) -> set[int]: + """Return the ``id()`` of every AST node nested inside an ``if TYPE_CHECKING`` block.""" + guarded: set[int] = set() + for node in ast.walk(tree): + if isinstance(node, ast.If) and _is_type_checking_guard(node): + for child in ast.walk(node): + guarded.add(id(child)) + return guarded + + def check_file_for_prohibited_imports(file_path: Path) -> list[tuple[int, str]]: try: source = file_path.read_text(encoding="utf-8") @@ -39,9 +59,15 @@ def check_file_for_prohibited_imports(file_path: Path) -> list[tuple[int, str]]: except (OSError, UnicodeDecodeError, SyntaxError): return [] + type_checking_ids = _collect_type_checking_node_ids(tree) violations = [] for node in ast.walk(tree): + if id(node) in type_checking_ids: + console.print( + f"[blue]Skipping node on line {getattr(node, 'lineno', 'unknown')} due to TYPE_CHECKING guard[/blue]" + ) + continue # Check `from airflow.x import y` statements if isinstance(node, ast.ImportFrom): if node.module and node.module.startswith("airflow."): diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index b06f9611a98e0..9840180bf6a27 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -33,10 +33,11 @@ from collections.abc import Callable, Generator, Iterable from configparser import ConfigParser, NoOptionError, NoSectionError from contextlib import contextmanager +from copy import deepcopy from enum import Enum from json.decoder import JSONDecodeError from re import Pattern -from typing import IO, Any, TypeVar, overload +from typing import IO, TYPE_CHECKING, Any, TypeVar, overload from .exceptions import AirflowConfigException @@ -78,6 +79,11 @@ def _collect_kwarg_env_vars(prefix: str) -> dict[str, str]: ENV_VAR_PREFIX = "AIRFLOW__" +if TYPE_CHECKING: + from airflow.providers_manager import ProvidersManager + from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime + + class ValueNotFound: """Object of this is raised when a configuration value cannot be found.""" @@ -1046,6 +1052,58 @@ def _resolve_deprecated_lookup( return section, key, deprecated_section, deprecated_key, warning_emitted + def _load_providers_configuration( + self, + provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime], + create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser], + ) -> None: + """ + Load configuration for providers. + + This should be done after initial configuration have been performed. Initializing and discovering + providers is an expensive operation and cannot be performed when we load configuration for the first + time when airflow starts, because we initialize configuration very early, during importing of the + `airflow` package and the module is not yet ready to be used when it happens and until configuration + and settings are loaded. Therefore, in order to reload provider configuration we need to additionally + load provider - specific configuration. + + :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller. + :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller. + """ + log.debug("Loading providers configuration") + + self.restore_core_default_configuration() + for provider, config in provider_manager_type().already_initialized_provider_configs: + for provider_section, provider_section_content in config.items(): + provider_options = provider_section_content["options"] + section_in_current_config = self.configuration_description.get(provider_section) + if not section_in_current_config: + self.configuration_description[provider_section] = deepcopy(provider_section_content) + section_in_current_config = self.configuration_description.get(provider_section) + section_in_current_config["source"] = f"default-{provider}" + for option in provider_options: + section_in_current_config["options"][option]["source"] = f"default-{provider}" + else: + section_source = section_in_current_config.get("source", "Airflow's core package").split( + "default-" + )[-1] + raise AirflowConfigException( + f"The provider {provider} is attempting to contribute " + f"configuration section {provider_section} that " + f"has already been added before. The source of it: {section_source}. " + "This is forbidden. A provider can only add new sections. It " + "cannot contribute options to existing sections or override other " + "provider's configuration.", + UserWarning, + ) + self._default_values = create_default_config_parser_callable(self.configuration_description) + # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete + # the cached values, and it will be refreshed on next access. + with contextlib.suppress(AttributeError): + # no problem if cache is not set yet + del self.sensitive_config_values + self._providers_configuration_loaded = True + @overload # type: ignore[override] def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index 30e073d9ee323..f2a79e260a1d1 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -145,6 +145,21 @@ def __init__( if default_config is not None: self._update_defaults_from_string(default_config) + def load_providers_configuration(self): + """ + Load configuration for providers. + + This should be done after initial configuration have been performed. Initializing and discovering + providers is an expensive operation and cannot be performed when we load configuration for the first + time when airflow starts, because we initialize configuration very early, during importing of the + `airflow` package and the module is not yet ready to be used when it happens and until configuration + and settings are loaded. Therefore, in order to reload provider configuration we need to additionally + load provider - specific configuration. + """ + from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime + + self._load_providers_configuration(ProvidersManagerTaskRuntime, create_default_config_parser) + def expand_all_configuration_values(self): """Expand all configuration values using SDK-specific expansion variables.""" all_vars = get_sdk_expansion_variables() From 1972c4e687141f2e85c0d33e99d1d33b95dc38df Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 2 Mar 2026 12:29:43 +0800 Subject: [PATCH 03/17] Add provider metadata config fallback lookup in core conf Fix _provider_metadata_config_fallback_default_values --- airflow-core/src/airflow/configuration.py | 56 ++++++++++++++++++----- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index dac9cc155b3e0..298899a6ca447 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -28,6 +28,7 @@ from base64 import b64encode from collections.abc import Callable from configparser import ConfigParser +from functools import cached_property from inspect import ismodule from io import StringIO from re import Pattern @@ -208,7 +209,7 @@ def __init__( super().__init__(configuration_description, _default_values, *args, **kwargs) self.configuration_description = configuration_description self._default_values = _default_values - self._provider_config_fallback_default_values = create_provider_config_fallback_defaults() + self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults() if default_config is not None: self._update_defaults_from_string(default_config) self._update_logging_deprecated_template_to_one_from_defaults() @@ -229,17 +230,21 @@ def _validators(self) -> list[Callable[[], None]]: @property def _lookup_sequence(self) -> list[Callable]: """Overring _lookup_sequence from shared base class to add provider fallbacks.""" - return super()._lookup_sequence + [self._get_option_from_provider_fallbacks] + return super()._lookup_sequence + [ + self._get_option_from_provider_cfg_config_fallbacks, + self._get_option_from_provider_metadata_config_fallbacks, + ] def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: """Override the base method to add provider fallbacks.""" return [ - ("provider-fallback-defaults", self._provider_config_fallback_default_values), + ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), + ("provider-metadata-fallback-defaults", self._provider_metadata_config_fallback_default_values), ("default", self._default_values), ("airflow.cfg", self), ] - def _get_option_from_provider_fallbacks( + def _get_option_from_provider_cfg_config_fallbacks( self, deprecated_key: str | None, deprecated_section: str | None, @@ -250,9 +255,25 @@ def _get_option_from_provider_fallbacks( **kwargs, ) -> str | ValueNotFound: """Get config option from provider fallback defaults.""" - if self.get_provider_config_fallback_defaults(section, key) is not None: + if self.get_from_provider_cfg_config_fallback_defaults(section, key) is not None: # no expansion needed - return self.get_provider_config_fallback_defaults(section, key, **kwargs) + return self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) + return VALUE_NOT_FOUND_SENTINEL + + def _get_option_from_provider_metadata_config_fallbacks( + self, + deprecated_key: str | None, + deprecated_section: str | None, + key: str, + section: str, + issue_warning: bool = True, + extra_stacklevel: int = 0, + **kwargs, + ) -> str | ValueNotFound: + """Get config option from provider metadata fallback defaults.""" + if self.get_from_provider_metadata_config_fallback_defaults(section, key) is not None: + # no expansion needed + return self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) return VALUE_NOT_FOUND_SENTINEL def _update_logging_deprecated_template_to_one_from_defaults(self): @@ -265,12 +286,23 @@ def _update_logging_deprecated_template_to_one_from_defaults(self): default, ) - def get_provider_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: + def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: """Get provider config fallback default values.""" - # Remove team_name from kwargs as the fallback defaults ConfigParser - # does not support team-aware lookups (it's a standard ConfigParser). - kwargs.pop("team_name", None) - return self._provider_config_fallback_default_values.get(section, key, fallback=None, **kwargs) + return self._provider_cfg_config_fallback_default_values.get(section, key, fallback=None, **kwargs) + + @cached_property + def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: + """Provider metadata config fallback default values.""" + configuration_description = retrieve_configuration_description( + include_airflow=False, include_providers=True + ) + return create_default_config_parser(configuration_description) + + def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: + """Get provider metadata config fallback default values.""" + return self._provider_metadata_config_fallback_default_values.get( + section, key, fallback=None, **kwargs + ) # A mapping of old default values that we want to change and warn the user # about. Mapping of section -> setting -> { old, replace } @@ -669,7 +701,7 @@ def create_default_config_parser(configuration_description: dict[str, dict[str, return parser -def create_provider_config_fallback_defaults() -> ConfigParser: +def create_provider_cfg_config_fallback_defaults() -> ConfigParser: """ Create fallback defaults. From 567dc32cb16da0b0db6cf3d2c768985e6faf1e60 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 2 Mar 2026 16:55:14 +0800 Subject: [PATCH 04/17] Add provider lookup and related utils to shared --- airflow-core/src/airflow/configuration.py | 121 ++---------------- .../airflow_shared/configuration/parser.py | 116 +++++++++++++++-- task-sdk/src/airflow/sdk/configuration.py | 26 ++-- 3 files changed, 125 insertions(+), 138 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 298899a6ca447..dd865f2c7153a 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -28,7 +28,6 @@ from base64 import b64encode from collections.abc import Callable from configparser import ConfigParser -from functools import cached_property from inspect import ismodule from io import StringIO from re import Pattern @@ -38,13 +37,12 @@ from typing_extensions import overload from airflow._shared.configuration.parser import ( - VALUE_NOT_FOUND_SENTINEL, AirflowConfigParser as _SharedAirflowConfigParser, - ValueNotFound, configure_parser_from_configuration_description, ) from airflow._shared.module_loading import import_string from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning +from airflow.providers_manager import ProvidersManager from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH from airflow.task.weight_rule import WeightRule from airflow.utils import yaml @@ -206,10 +204,17 @@ def __init__( # interpolation placeholders. The _default_values config parser will interpolate them # properly when we call get() on it. _default_values = create_default_config_parser(configuration_description) - super().__init__(configuration_description, _default_values, *args, **kwargs) + super().__init__( + configuration_description, + _default_values, + ProvidersManager, + create_default_config_parser, + _default_config_file_path("provider_config_fallback_defaults.cfg"), + *args, + **kwargs, + ) self.configuration_description = configuration_description self._default_values = _default_values - self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults() if default_config is not None: self._update_defaults_from_string(default_config) self._update_logging_deprecated_template_to_one_from_defaults() @@ -227,55 +232,6 @@ def _validators(self) -> list[Callable[[], None]]: self._upgrade_postgres_metastore_conn, ] - @property - def _lookup_sequence(self) -> list[Callable]: - """Overring _lookup_sequence from shared base class to add provider fallbacks.""" - return super()._lookup_sequence + [ - self._get_option_from_provider_cfg_config_fallbacks, - self._get_option_from_provider_metadata_config_fallbacks, - ] - - def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: - """Override the base method to add provider fallbacks.""" - return [ - ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), - ("provider-metadata-fallback-defaults", self._provider_metadata_config_fallback_default_values), - ("default", self._default_values), - ("airflow.cfg", self), - ] - - def _get_option_from_provider_cfg_config_fallbacks( - self, - deprecated_key: str | None, - deprecated_section: str | None, - key: str, - section: str, - issue_warning: bool = True, - extra_stacklevel: int = 0, - **kwargs, - ) -> str | ValueNotFound: - """Get config option from provider fallback defaults.""" - if self.get_from_provider_cfg_config_fallback_defaults(section, key) is not None: - # no expansion needed - return self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) - return VALUE_NOT_FOUND_SENTINEL - - def _get_option_from_provider_metadata_config_fallbacks( - self, - deprecated_key: str | None, - deprecated_section: str | None, - key: str, - section: str, - issue_warning: bool = True, - extra_stacklevel: int = 0, - **kwargs, - ) -> str | ValueNotFound: - """Get config option from provider metadata fallback defaults.""" - if self.get_from_provider_metadata_config_fallback_defaults(section, key) is not None: - # no expansion needed - return self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) - return VALUE_NOT_FOUND_SENTINEL - def _update_logging_deprecated_template_to_one_from_defaults(self): default = self.get_default_value("logging", "log_filename_template") if default: @@ -286,24 +242,6 @@ def _update_logging_deprecated_template_to_one_from_defaults(self): default, ) - def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: - """Get provider config fallback default values.""" - return self._provider_cfg_config_fallback_default_values.get(section, key, fallback=None, **kwargs) - - @cached_property - def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: - """Provider metadata config fallback default values.""" - configuration_description = retrieve_configuration_description( - include_airflow=False, include_providers=True - ) - return create_default_config_parser(configuration_description) - - def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: - """Get provider metadata config fallback default values.""" - return self._provider_metadata_config_fallback_default_values.get( - section, key, fallback=None, **kwargs - ) - # A mapping of old default values that we want to change and warn the user # about. Mapping of section -> setting -> { old, replace } deprecated_values: dict[str, dict[str, tuple[Pattern, str]]] = { @@ -597,21 +535,6 @@ def providers_configuration_loaded(self) -> bool: """Checks if providers have been loaded.""" return self._providers_configuration_loaded - def load_providers_configuration(self): - """ - Load configuration for providers. - - This should be done after initial configuration have been performed. Initializing and discovering - providers is an expensive operation and cannot be performed when we load configuration for the first - time when airflow starts, because we initialize configuration very early, during importing of the - `airflow` package and the module is not yet ready to be used when it happens and until configuration - and settings are loaded. Therefore, in order to reload provider configuration we need to additionally - load provider - specific configuration. - """ - from airflow.providers_manager import ProvidersManager - - self._load_providers_configuration(ProvidersManager, create_default_config_parser) - def _get_config_value_from_secret_backend(self, config_key: str) -> str | None: """ Override to use module-level function that reads from global conf. @@ -701,30 +624,6 @@ def create_default_config_parser(configuration_description: dict[str, dict[str, return parser -def create_provider_cfg_config_fallback_defaults() -> ConfigParser: - """ - Create fallback defaults. - - This parser contains provider defaults for Airflow configuration, containing fallback default values - that might be needed when provider classes are being imported - before provider's configuration - is loaded. - - Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead - to retrieving provider configuration before the defaults for the provider are loaded. - - Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or - environment variables) those will be used as usual. - - NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, - at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. - - You've been warned! - """ - config_parser = ConfigParser() - config_parser.read(_default_config_file_path("provider_config_fallback_defaults.cfg")) - return config_parser - - def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: airflow_config = pathlib.Path(AIRFLOW_CONFIG) if airflow_config.is_dir(): diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index 9840180bf6a27..1a5f9265d6b8a 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -35,6 +35,7 @@ from contextlib import contextmanager from copy import deepcopy from enum import Enum +from functools import cached_property from json.decoder import JSONDecodeError from re import Pattern from typing import IO, TYPE_CHECKING, Any, TypeVar, overload @@ -172,6 +173,34 @@ def configure_parser_from_configuration_description( parser.set(section, key, default_value) +def create_provider_cfg_config_fallback_defaults( + provider_config_fallback_defaults_cfg_path: str, +) -> ConfigParser: + """ + Create fallback defaults. + + This parser contains provider defaults for Airflow configuration, containing fallback default values + that might be needed when provider classes are being imported - before provider's configuration + is loaded. + + Unfortunately airflow currently performs a lot of stuff during importing and some of that might lead + to retrieving provider configuration before the defaults for the provider are loaded. + + Those are only defaults, so if you have "real" values configured in your configuration (.cfg file or + environment variables) those will be used as usual. + + NOTE!! Do NOT attempt to remove those default fallbacks thinking that they are unnecessary duplication, + at least not until we fix the way how airflow imports "do stuff". This is unlikely to succeed. + + You've been warned! + + :param provider_config_fallback_defaults_cfg_path: path to the provider config fallback defaults .cfg file + """ + config_parser = ConfigParser() + config_parser.read(provider_config_fallback_defaults_cfg_path) + return config_parser + + class AirflowConfigParser(ConfigParser): """ Base configuration parser with pure parsing logic. @@ -247,8 +276,69 @@ def _lookup_sequence(self) -> list[Callable]: self._get_option_from_commands, self._get_option_from_secrets, self._get_option_from_defaults, + self._get_option_from_provider_cfg_config_fallbacks, + self._get_option_from_provider_metadata_config_fallbacks, + ] + + def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: + """Override the base method to add provider fallbacks.""" + return [ + ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), + ("provider-metadata-fallback-defaults", self._provider_metadata_config_fallback_default_values), + ("default", self._default_values), + ("airflow.cfg", self), ] + def _get_option_from_provider_cfg_config_fallbacks( + self, + deprecated_key: str | None, + deprecated_section: str | None, + key: str, + section: str, + issue_warning: bool = True, + extra_stacklevel: int = 0, + **kwargs, + ) -> str | ValueNotFound: + """Get config option from provider fallback defaults.""" + if self.get_from_provider_cfg_config_fallback_defaults(section, key) is not None: + # no expansion needed + return self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) + return VALUE_NOT_FOUND_SENTINEL + + def _get_option_from_provider_metadata_config_fallbacks( + self, + deprecated_key: str | None, + deprecated_section: str | None, + key: str, + section: str, + issue_warning: bool = True, + extra_stacklevel: int = 0, + **kwargs, + ) -> str | ValueNotFound: + """Get config option from provider metadata fallback defaults.""" + if self.get_from_provider_metadata_config_fallback_defaults(section, key) is not None: + # no expansion needed + return self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) + return VALUE_NOT_FOUND_SENTINEL + + def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: + """Get provider config fallback default values.""" + return self._provider_cfg_config_fallback_default_values.get(section, key, fallback=None, **kwargs) + + @cached_property + def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: + """Return Provider metadata config fallback default values.""" + base_configuration_description: dict[str, dict[str, Any]] = {} + for _, config in self.provider_manager_type().provider_configs: + base_configuration_description.update(config) + return self.create_default_config_parser_callable(base_configuration_description) + + def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: + """Get provider metadata config fallback default values.""" + return self._provider_metadata_config_fallback_default_values.get( + section, key, fallback=None, **kwargs + ) + @property def _validators(self) -> list[Callable[[], None]]: """ @@ -306,6 +396,9 @@ def __init__( self, configuration_description: dict[str, dict[str, Any]], _default_values: ConfigParser, + provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime], + create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser], + provider_config_fallback_defaults_cfg_path: str, *args, **kwargs, ): @@ -314,10 +407,18 @@ def __init__( :param configuration_description: Description of configuration options :param _default_values: ConfigParser with default values + :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller. + :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller. + :param provider_config_fallback_defaults_cfg_path: Path to the `provider_config_fallback_defaults.cfg` file. """ super().__init__(*args, **kwargs) self.configuration_description = configuration_description self._default_values = _default_values + self.provider_manager_type = provider_manager_type + self.create_default_config_parser_callable = create_default_config_parser_callable + self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults( + provider_config_fallback_defaults_cfg_path + ) self._suppress_future_warnings = False self.upgraded_values: dict[tuple[str, str], str] = {} @@ -1052,11 +1153,7 @@ def _resolve_deprecated_lookup( return section, key, deprecated_section, deprecated_key, warning_emitted - def _load_providers_configuration( - self, - provider_manager_type: type[ProvidersManager] | type[ProvidersManagerTaskRuntime], - create_default_config_parser_callable: Callable[[dict[str, dict[str, Any]]], ConfigParser], - ) -> None: + def load_providers_configuration(self) -> None: """ Load configuration for providers. @@ -1066,14 +1163,11 @@ def _load_providers_configuration( `airflow` package and the module is not yet ready to be used when it happens and until configuration and settings are loaded. Therefore, in order to reload provider configuration we need to additionally load provider - specific configuration. - - :param provider_manager_type: Either ProvidersManager or ProvidersManagerTaskRuntime, depending on the context of the caller. - :param create_default_config_parser_callable: The `create_default_config_parser` function from core or SDK, depending on the context of the caller. """ log.debug("Loading providers configuration") self.restore_core_default_configuration() - for provider, config in provider_manager_type().already_initialized_provider_configs: + for provider, config in self.provider_manager_type().already_initialized_provider_configs: for provider_section, provider_section_content in config.items(): provider_options = provider_section_content["options"] section_in_current_config = self.configuration_description.get(provider_section) @@ -1096,7 +1190,7 @@ def _load_providers_configuration( "provider's configuration.", UserWarning, ) - self._default_values = create_default_config_parser_callable(self.configuration_description) + self._default_values = self.create_default_config_parser_callable(self.configuration_description) # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete # the cached values, and it will be refreshed on next access. with contextlib.suppress(AttributeError): @@ -1490,7 +1584,7 @@ def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: """ Get list of config sources to use in as_dict(). - Subclasses can override to add additional sources (e.g., provider configs). + Both core and SDK need provider configs. """ return [ ("default", self._default_values), diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index f2a79e260a1d1..fd512ab95c219 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -32,6 +32,7 @@ configure_parser_from_configuration_description, ) from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH +from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime log = logging.getLogger(__name__) @@ -129,7 +130,15 @@ def __init__( configuration_description = retrieve_configuration_description() # Create default values parser _default_values = create_default_config_parser(configuration_description) - super().__init__(configuration_description, _default_values, *args, **kwargs) + super().__init__( + configuration_description, + _default_values, + ProvidersManagerTaskRuntime, + create_default_config_parser, + _default_config_file_path("provider_config_fallback_defaults.cfg"), + *args, + **kwargs, + ) self.configuration_description = configuration_description self._default_values = _default_values self._suppress_future_warnings = False @@ -145,21 +154,6 @@ def __init__( if default_config is not None: self._update_defaults_from_string(default_config) - def load_providers_configuration(self): - """ - Load configuration for providers. - - This should be done after initial configuration have been performed. Initializing and discovering - providers is an expensive operation and cannot be performed when we load configuration for the first - time when airflow starts, because we initialize configuration very early, during importing of the - `airflow` package and the module is not yet ready to be used when it happens and until configuration - and settings are loaded. Therefore, in order to reload provider configuration we need to additionally - load provider - specific configuration. - """ - from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime - - self._load_providers_configuration(ProvidersManagerTaskRuntime, create_default_config_parser) - def expand_all_configuration_values(self): """Expand all configuration values using SDK-specific expansion variables.""" all_vars = get_sdk_expansion_variables() From 10fd69c2def58f47abb5a9378a19eb38bda52bdc Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 2 Mar 2026 17:04:57 +0800 Subject: [PATCH 05/17] Add missing provider_configs property in ProvidersManagerTaskRuntime --- task-sdk/src/airflow/sdk/providers_manager_runtime.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index e6e73031da624..fa7909943f3e2 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -612,10 +612,14 @@ def plugins(self) -> list[PluginInfo]: return sorted(self._plugins_set, key=lambda x: x.plugin_class) @property - def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: + def provider_configs(self) -> list[tuple[str, dict[str, Any]]]: self.initialize_provider_configs() return sorted(self._provider_configs.items(), key=lambda x: x[0]) + @property + def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]: + return sorted(self._provider_configs.items(), key=lambda x: x[0]) + def _cleanup(self): self._initialized_cache.clear() self._provider_dict.clear() From c71295e80a6b7eb277888bc47687ca51fdefd49b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 6 Mar 2026 16:54:09 +0800 Subject: [PATCH 06/17] Address all the comments from Kxail - remove _get_config_sources_for_as_dict in child class - clarify cached_property and constructing ProvidersManagerTaskRuntime in SDK conf - _get_option_from_provider_metadata_config_fallbacks nit - ProvidersManager lazy import - provider.data.get("config") nit - remove print in prek check --- airflow-core/src/airflow/configuration.py | 3 ++- .../prek/check_airflow_imports_in_shared.py | 3 --- .../airflow_shared/configuration/parser.py | 23 +++++-------------- .../airflow/sdk/providers_manager_runtime.py | 4 ++-- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index dd865f2c7153a..d26ccef8def7d 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -42,7 +42,6 @@ ) from airflow._shared.module_loading import import_string from airflow.exceptions import AirflowConfigException, RemovedInAirflow4Warning -from airflow.providers_manager import ProvidersManager from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH from airflow.task.weight_rule import WeightRule from airflow.utils import yaml @@ -204,6 +203,8 @@ def __init__( # interpolation placeholders. The _default_values config parser will interpolate them # properly when we call get() on it. _default_values = create_default_config_parser(configuration_description) + from airflow.providers_manager import ProvidersManager + super().__init__( configuration_description, _default_values, diff --git a/scripts/ci/prek/check_airflow_imports_in_shared.py b/scripts/ci/prek/check_airflow_imports_in_shared.py index 8e12e750a6068..ece603ec11a1a 100755 --- a/scripts/ci/prek/check_airflow_imports_in_shared.py +++ b/scripts/ci/prek/check_airflow_imports_in_shared.py @@ -64,9 +64,6 @@ def check_file_for_prohibited_imports(file_path: Path) -> list[tuple[int, str]]: for node in ast.walk(tree): if id(node) in type_checking_ids: - console.print( - f"[blue]Skipping node on line {getattr(node, 'lineno', 'unknown')} due to TYPE_CHECKING guard[/blue]" - ) continue # Check `from airflow.x import y` statements if isinstance(node, ast.ImportFrom): diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index 1a5f9265d6b8a..fd66aaf9f4eca 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -300,9 +300,9 @@ def _get_option_from_provider_cfg_config_fallbacks( **kwargs, ) -> str | ValueNotFound: """Get config option from provider fallback defaults.""" - if self.get_from_provider_cfg_config_fallback_defaults(section, key) is not None: - # no expansion needed - return self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) + value = self.get_from_provider_cfg_config_fallback_defaults(section, key, **kwargs) + if value is not None: + return value return VALUE_NOT_FOUND_SENTINEL def _get_option_from_provider_metadata_config_fallbacks( @@ -316,9 +316,9 @@ def _get_option_from_provider_metadata_config_fallbacks( **kwargs, ) -> str | ValueNotFound: """Get config option from provider metadata fallback defaults.""" - if self.get_from_provider_metadata_config_fallback_defaults(section, key) is not None: - # no expansion needed - return self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) + value = self.get_from_provider_metadata_config_fallback_defaults(section, key, **kwargs) + if value is not None: + return value return VALUE_NOT_FOUND_SENTINEL def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: @@ -1580,17 +1580,6 @@ def optionxform(self, optionstr: str) -> str: """ return optionstr - def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: - """ - Get list of config sources to use in as_dict(). - - Both core and SDK need provider configs. - """ - return [ - ("default", self._default_values), - ("airflow.cfg", self), - ] - def as_dict( self, display_source: bool = False, diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index fa7909943f3e2..218d72bf47d01 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -229,8 +229,8 @@ def initialize_provider_configs(self): def _discover_config(self) -> None: """Retrieve all configs defined in the providers.""" for provider_package, provider in self._provider_dict.items(): - if provider.data.get("config"): - self._provider_configs[provider_package] = provider.data.get("config") + if config := provider.data.get("config"): + self._provider_configs[provider_package] = config def _discover_hooks_from_connection_types( self, From 3c2befe59eb407e7aa2f24dd99554002786e250a Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 6 Mar 2026 21:52:26 +0800 Subject: [PATCH 07/17] Fix unit tests --- .../airflow_shared/configuration/parser.py | 10 ++++-- .../tests/configuration/test_parser.py | 35 +++++++++++++++++-- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index fd66aaf9f4eca..8ccf9282f71c2 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -323,7 +323,11 @@ def _get_option_from_provider_metadata_config_fallbacks( def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: """Get provider config fallback default values.""" - return self._provider_cfg_config_fallback_default_values.get(section, key, fallback=None, **kwargs) + raw = kwargs.get("raw", False) + vars_ = kwargs.get("vars") + return self._provider_cfg_config_fallback_default_values.get( + section, key, fallback=None, raw=raw, vars=vars_ + ) @cached_property def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: @@ -335,8 +339,10 @@ def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: """Get provider metadata config fallback default values.""" + raw = kwargs.get("raw", False) + vars_ = kwargs.get("vars") return self._provider_metadata_config_fallback_default_values.get( - section, key, fallback=None, **kwargs + section, key, fallback=None, raw=raw, vars=vars_ ) @property diff --git a/shared/configuration/tests/configuration/test_parser.py b/shared/configuration/tests/configuration/test_parser.py index a018a931ed5f9..dd5bcfcf9b867 100644 --- a/shared/configuration/tests/configuration/test_parser.py +++ b/shared/configuration/tests/configuration/test_parser.py @@ -37,6 +37,22 @@ ) +class _NoOpProvidersManager: + """Stub providers manager for tests — no providers, no side effects.""" + + @property + def provider_configs(self): + return [] + + @property + def already_initialized_provider_configs(self): + return [] + + +def _create_empty_config_parser(desc: dict) -> ConfigParser: + return ConfigParser() + + class AirflowConfigParser(_SharedAirflowConfigParser): """Test parser that extends shared parser for testing.""" @@ -53,7 +69,15 @@ def __init__(self, default_config: str | None = None, *args, **kwargs): _default_values.add_section("test") _default_values.set("test", "key1", "default_value") _default_values.set("test", "key2", "123") - super().__init__(configuration_description, _default_values, *args, **kwargs) + super().__init__( + configuration_description, + _default_values, + _NoOpProvidersManager, + _create_empty_config_parser, + "", + *args, + **kwargs, + ) self.configuration_description = configuration_description self._default_values = _default_values self._suppress_future_warnings = False @@ -861,7 +885,14 @@ def __init__(self): configure_parser_from_configuration_description( _default_values, configuration_description, {} ) - _SharedAirflowConfigParser.__init__(self, configuration_description, _default_values) + _SharedAirflowConfigParser.__init__( + self, + configuration_description, + _default_values, + _NoOpProvidersManager, + _create_empty_config_parser, + "", + ) test_conf = TestConfigParser() deprecated_conf_list = [ From cb2ba4ea1b87be2b75fdb2f3b484599097fa48ec Mon Sep 17 00:00:00 2001 From: "Jason(Zhe-You) Liu" <68415893+jason810496@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:08:48 +0800 Subject: [PATCH 08/17] Fix TestDeprecatedConf tests failing when run in isolation The 6 tests in TestDeprecatedConf that use cmd/secret config sources failed when running only TestDeprecatedConf (not the full file) because sensitive_config_values -- a cached_property on the config parser -- was computed during airflow initialization before the test module added its deprecated_options entries. The stale cache meant _include_commands and _include_secrets never processed ('core', 'sql_alchemy_conn'). Invalidate both sensitive_config_values and inversed_deprecated_options caches right after modifying deprecated_options at module level. Co-authored-by: Cursor Agent Co-authored-by: Jason(Zhe-You) Liu --- airflow-core/tests/unit/core/test_configuration.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airflow-core/tests/unit/core/test_configuration.py b/airflow-core/tests/unit/core/test_configuration.py index be0f191f8c9b7..6cc4ed971cccb 100644 --- a/airflow-core/tests/unit/core/test_configuration.py +++ b/airflow-core/tests/unit/core/test_configuration.py @@ -64,6 +64,10 @@ "deactivate_stale_dags_interval", "2.5.0", ) +# Invalidate cached properties that depend on deprecated_options, since they may have been +# computed during airflow initialization before the entries above were added. +for attr in ("sensitive_config_values", "inversed_deprecated_options"): + conf.__dict__.pop(attr, None) @pytest.fixture(scope="module", autouse=True) From 7f042791b2ff63e364d10f82a7eb61f0b96cb45b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 10 Mar 2026 15:03:37 +0800 Subject: [PATCH 09/17] Respect worker_mode of _get_custom_secret_backend for conf Fix _get_custom_secret_backend --- airflow-core/src/airflow/configuration.py | 5 +++++ task-sdk/src/airflow/sdk/configuration.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index d26ccef8def7d..0888afba75173 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -459,6 +459,11 @@ def _validate_sqlite3_version(self): f"See {get_docs_url('howto/set-up-database.html#setting-up-a-sqlite-database')}" ) + def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None: + return super()._get_custom_secret_backend( + worker_mode=worker_mode if worker_mode is not None else False + ) + def mask_secrets(self): from airflow._shared.configuration.parser import _build_kwarg_env_prefix, _collect_kwarg_env_vars from airflow._shared.secrets_masker import mask_secret as mask_secret_core diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index fd512ab95c219..84938fab59dc9 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -154,6 +154,11 @@ def __init__( if default_config is not None: self._update_defaults_from_string(default_config) + def _get_custom_secret_backend(self, worker_mode: bool | None = None) -> Any | None: + return super()._get_custom_secret_backend( + worker_mode=worker_mode if worker_mode is not None else True + ) + def expand_all_configuration_values(self): """Expand all configuration values using SDK-specific expansion variables.""" all_vars = get_sdk_expansion_variables() From 44458c458c40bc066dadb0b182999c967618a847 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Tue, 10 Mar 2026 23:33:29 +0800 Subject: [PATCH 10/17] Fetch all the conf in constructor of SchedulerJobRunner to avoid hitting prohibit_commit block --- .../src/airflow/jobs/scheduler_job_runner.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c64bd166f1b3a..eaebc909f1eac 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -271,6 +271,11 @@ def __init__( self.num_runs = num_runs self.only_idle = only_idle self._scheduler_idle_sleep_time = scheduler_idle_sleep_time + + # Note: + # We need to fetch ALL the conf before the `prohibit_commit` block, otherwise we will encounter `UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS` since the Core conf might access the MetadataMetastoreBackend + # so the most easiest way is to fetch all the conf in the init method and store them as attributes of the SchedulerJobRunner instance. + # How many seconds do we wait for tasks to heartbeat before timeout. self._task_instance_heartbeat_timeout_secs = conf.getint( "scheduler", "task_instance_heartbeat_timeout" @@ -284,6 +289,9 @@ def __init__( key="num_stuck_in_queued_retries", fallback=2, ) + self._scheduler_use_job_schedule = conf.getboolean("scheduler", "use_job_schedule", fallback=True) + self._parallelism = conf.getint("core", "parallelism") + self._multi_team = conf.getboolean("core", "multi_team") self.executors: list[BaseExecutor] = executors if executors else ExecutorLoader.init_executors() self.executor: BaseExecutor = self.executors[0] @@ -656,7 +664,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - self.log.info("%s tasks up for execution:\n%s", len(task_instances_to_examine), task_instance_str) dag_id_to_team_name: dict[str, str | None] = {} - if conf.getboolean("core", "multi_team"): + if self._multi_team: # Batch query to resolve team names for all DAG IDs to optimize performance # Instead of individual queries in _try_to_load_executor(), resolve all team names upfront unique_dag_ids = {ti.dag_id for ti in task_instances_to_examine} @@ -1005,11 +1013,10 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int: # we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally # across all executors. num_occupied_slots = sum([executor.slots_occupied for executor in self.executors]) - parallelism = conf.getint("core", "parallelism") if self.job.max_tis_per_query == 0: - max_tis = parallelism - num_occupied_slots + max_tis = self._parallelism - num_occupied_slots else: - max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots) + max_tis = min(self.job.max_tis_per_query, self._parallelism - num_occupied_slots) if max_tis <= 0: self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!") return 0 @@ -1039,7 +1046,7 @@ def _enqueue_executor_callbacks(self, session: Session) -> None: :param session: The database session """ num_occupied_slots = sum(executor.slots_occupied for executor in self.executors) - max_callbacks = conf.getint("core", "parallelism") - num_occupied_slots + max_callbacks = self._parallelism - num_occupied_slots if max_callbacks <= 0: self.log.debug("No available slots for callbacks; all executors at capacity") @@ -1694,7 +1701,7 @@ def _do_scheduling(self, session: Session) -> int: """ # Put a check in place to make sure we don't commit unexpectedly with prohibit_commit(session) as guard: - if conf.getboolean("scheduler", "use_job_schedule", fallback=True): + if self._scheduler_use_job_schedule: self._create_dagruns_for_dags(guard, session) self._start_queued_dagruns(session) @@ -2874,7 +2881,7 @@ def _find_task_instances_without_heartbeats(self, *, session: Session) -> list[T def _purge_task_instances_without_heartbeats( self, task_instances_without_heartbeats: list[TI], *, session: Session ) -> None: - if conf.getboolean("core", "multi_team"): + if self._multi_team: unique_dag_ids = {ti.dag_id for ti in task_instances_without_heartbeats} dag_id_to_team_name = self._get_team_names_for_dag_ids(unique_dag_ids, session) else: @@ -3122,7 +3129,7 @@ def _executor_to_workloads( ) -> dict[BaseExecutor, list[SchedulerWorkload]]: """Organize workloads into lists per their respective executor.""" workloads_iter: Iterable[SchedulerWorkload] - if conf.getboolean("core", "multi_team"): + if self._multi_team: if dag_id_to_team_name is None: if isinstance(workloads, list): workloads_list = workloads @@ -3170,7 +3177,7 @@ def _try_to_load_executor( will query the database to resolve team name. None indicates global team. """ executor = None - if conf.getboolean("core", "multi_team"): + if self._multi_team: # Use provided team_name if available, otherwise query the database if team_name is NOTSET: team_name = self._get_workload_team_name(workload, session) From 08499a0cc8ebc3b7d0eb02edba118c89817306c6 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 11 Mar 2026 15:10:34 +0800 Subject: [PATCH 11/17] Fix test_execute_task_instances_unlimited_multiple_executors - We need the conf_vars block at constcutor level as we will pre-fetch all the conf at SchedulerJobRunner constructor --- airflow-core/tests/unit/jobs/test_scheduler_job.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 997ebd24bcf0d..f9ae60a9c3583 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2779,7 +2779,10 @@ def test_execute_task_instances_unlimited_multiple_executors( task2 = EmptyOperator(task_id=task_id_2, executor=task2_exec) scheduler_job = Job() - self.job_runner = SchedulerJobRunner(job=scheduler_job) + with conf_vars({("core", "parallelism"): "40"}): + # 40 dag runs * 2 tasks each = 80. Two executors have capacity for 61 concurrent jobs, but they + # together respect core.parallelism and will not run more in aggregate then that allows. + self.job_runner = SchedulerJobRunner(job=scheduler_job) def _create_dagruns(): dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.RUNNING) @@ -2804,10 +2807,7 @@ def _create_dagruns(): executor.slots_available = 31 total_enqueued = 0 - with conf_vars({("core", "parallelism"): "40"}): - # 40 dag runs * 2 tasks each = 80. Two executors have capacity for 61 concurrent jobs, but they - # together respect core.parallelism and will not run more in aggregate then that allows. - total_enqueued += self.job_runner._critical_section_enqueue_task_instances(session) + total_enqueued += self.job_runner._critical_section_enqueue_task_instances(session) if task1_exec != task2_exec: # Two executors will execute up to core parallelism From 945590ca027da8e3f470a696a24aed42d6cab8d1 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 12 Mar 2026 09:44:20 +0800 Subject: [PATCH 12/17] Fix otel integration test --- airflow-core/tests/integration/otel/test_otel.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index c9bd4ad8b789d..0d40156c45ed0 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -209,6 +209,11 @@ def setup_class(cls): # during scheduler handoff (see https://github.com/apache/airflow/issues/61070). wait_for_otel_collector(otel_host, otel_port) + # The pytest plugin strips AIRFLOW__*__* env vars (including the JWT secret set + # by Breeze). Both the scheduler and api-server subprocesses must share the same + # secret; otherwise each generates its own random key and token verification fails. + os.environ["AIRFLOW__API_AUTH__JWT_SECRET"] = "test-secret-key-for-testing" + os.environ["AIRFLOW__API_AUTH__JWT_ISSUER"] = "airflow" os.environ["AIRFLOW__TRACES__OTEL_ON"] = "True" os.environ["OTEL_EXPORTER_OTLP_PROTOCOL"] = "http/protobuf" os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = "http://breeze-otel-collector:4318/v1/traces" From f8a4d41be353122823c5c40d5d964f32ec3ea643 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 12 Mar 2026 15:42:19 +0800 Subject: [PATCH 13/17] Resolve partial of Amogh comments - Add invalidate_cache method at shared Parser - Unified initialize_providers_configuration for Core & SDK provider_manager - call conf.initialize_providers_configuration for both provider_manager - Replace manual clear cache with invalidate_cache method in test_configuration --- airflow-core/src/airflow/configuration.py | 2 +- airflow-core/src/airflow/providers_manager.py | 12 +++++---- .../tests/unit/core/test_configuration.py | 3 +-- .../airflow_shared/configuration/parser.py | 27 +++++++++++++------ .../airflow/sdk/providers_manager_runtime.py | 20 +++++++++++++- 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 0888afba75173..fe6553f7e69f0 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -372,7 +372,7 @@ def _ensure_providers_config_loaded(self) -> None: if not self._providers_configuration_loaded: from airflow.providers_manager import ProvidersManager - ProvidersManager()._initialize_providers_configuration() + ProvidersManager().initialize_providers_configuration() def _ensure_providers_config_unloaded(self) -> bool: """Ensure providers configurations are unloaded temporarily to load core configs. Returns True if providers get unloaded.""" diff --git a/airflow-core/src/airflow/providers_manager.py b/airflow-core/src/airflow/providers_manager.py index a28f1bd9274fd..b0340c498fcac 100644 --- a/airflow-core/src/airflow/providers_manager.py +++ b/airflow-core/src/airflow/providers_manager.py @@ -610,12 +610,8 @@ def initialize_providers_auth_managers_without_check(self): @provider_info_cache("config") def initialize_providers_configuration(self): - """Lazy initialization of providers configuration information.""" - self._initialize_providers_configuration() - - def _initialize_providers_configuration(self): """ - Initialize providers configuration information. + Lazy initialization of providers configuration information. Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. In some cases we might want to make sure that the configuration is initialized, but we do not want @@ -1491,6 +1487,12 @@ def _cleanup(self): self._executor_without_check_set.clear() self._queue_class_name_set.clear() self._provider_configs.clear() + + # clear provider config cache in conf as well + from airflow.configuration import conf + + conf.invalidate_cache() + self._trigger_info_set.clear() self._notification_info_set.clear() self._plugins_set.clear() diff --git a/airflow-core/tests/unit/core/test_configuration.py b/airflow-core/tests/unit/core/test_configuration.py index 6cc4ed971cccb..9ee2f73753397 100644 --- a/airflow-core/tests/unit/core/test_configuration.py +++ b/airflow-core/tests/unit/core/test_configuration.py @@ -66,8 +66,7 @@ ) # Invalidate cached properties that depend on deprecated_options, since they may have been # computed during airflow initialization before the entries above were added. -for attr in ("sensitive_config_values", "inversed_deprecated_options"): - conf.__dict__.pop(attr, None) +conf.invalidate_cache() @pytest.fixture(scope="module", autouse=True) diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index 8ccf9282f71c2..c4cc4df65e9bf 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -35,7 +35,6 @@ from contextlib import contextmanager from copy import deepcopy from enum import Enum -from functools import cached_property from json.decoder import JSONDecodeError from re import Pattern from typing import IO, TYPE_CHECKING, Any, TypeVar, overload @@ -177,7 +176,7 @@ def create_provider_cfg_config_fallback_defaults( provider_config_fallback_defaults_cfg_path: str, ) -> ConfigParser: """ - Create fallback defaults. + Create fallback defaults for configuration. This parser contains provider defaults for Airflow configuration, containing fallback default values that might be needed when provider classes are being imported - before provider's configuration @@ -329,7 +328,7 @@ def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, section, key, fallback=None, raw=raw, vars=vars_ ) - @cached_property + @functools.cached_property def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: """Return Provider metadata config fallback default values.""" base_configuration_description: dict[str, dict[str, Any]] = {} @@ -428,6 +427,20 @@ def __init__( self._suppress_future_warnings = False self.upgraded_values: dict[tuple[str, str], str] = {} + def invalidate_cache(self) -> None: + """ + Clear all ``functools.cached_property`` entries on this instance. + + Call this after mutating class-level attributes (e.g. ``deprecated_options``) + so that derived cached properties are recomputed on next access. + """ + for attr_name in ( + name + for name in dir(type(self)) + if isinstance(getattr(type(self), name, None), functools.cached_property) + ): + self.__dict__.pop(attr_name, None) + @functools.cached_property def inversed_deprecated_options(self): """Build inverse mapping from old options to new options.""" @@ -1197,11 +1210,9 @@ def load_providers_configuration(self) -> None: UserWarning, ) self._default_values = self.create_default_config_parser_callable(self.configuration_description) - # sensitive_config_values needs to be refreshed here. This is a cached_property, so we can delete - # the cached values, and it will be refreshed on next access. - with contextlib.suppress(AttributeError): - # no problem if cache is not set yet - del self.sensitive_config_values + # Cached properties derived from configuration_description (e.g. sensitive_config_values) need + # to be recomputed now that provider config has been merged in. + self.invalidate_cache() self._providers_configuration_loaded = True @overload # type: ignore[override] diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index 218d72bf47d01..c528c60ac1c58 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -222,9 +222,22 @@ def initialize_providers_taskflow_decorator(self): @provider_info_cache("provider_configs") def initialize_provider_configs(self): - """Lazy initialization of provider configs.""" + """ + Lazy initialization of providers configuration information. + + Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. + In some cases we might want to make sure that the configuration is initialized, but we do not want + to cache the initialization method - for example when we just want to write configuration with + providers, but it is used in the context where no providers are loaded yet we will eventually + restore the original configuration and we want the subsequent ``initialize_providers_configuration`` + method to be run in order to load the configuration for providers again. + """ self.initialize_providers_list() self._discover_config() + # Now update conf with the new provider configuration from providers + from airflow.sdk.configuration import conf + + conf.load_providers_configuration() def _discover_config(self) -> None: """Retrieve all configs defined in the providers.""" @@ -633,5 +646,10 @@ def _cleanup(self): self._asset_to_openlineage_converters.clear() self._provider_configs.clear() + # clear provider config cache in conf as well + from airflow.sdk.configuration import conf + + conf.invalidate_cache() + self._initialized = False self._initialization_stack_trace = None From 4bafd9640a137424918cfc3997a26034150031d1 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 20 Mar 2026 16:40:57 +0800 Subject: [PATCH 14/17] Resolve Kaxil's second review comments --- airflow-core/src/airflow/configuration.py | 11 ------ .../src/airflow/jobs/scheduler_job_runner.py | 5 +-- airflow-core/src/airflow/providers_manager.py | 11 +----- .../airflow_shared/configuration/parser.py | 21 +++++++---- .../airflow/sdk/providers_manager_runtime.py | 13 ++----- .../test_providers_manager_runtime.py | 35 +++++++++++++++++++ 6 files changed, 56 insertions(+), 40 deletions(-) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index fe6553f7e69f0..0863c46f52d2d 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -385,17 +385,6 @@ def _reload_provider_configs(self) -> None: """Reload providers configuration.""" self.load_providers_configuration() - def restore_core_default_configuration(self) -> None: - """ - Restore default configuration for core Airflow. - - It does not restore configuration for providers. If you want to restore configuration for - providers, you need to call ``load_providers_configuration`` method. - """ - self.configuration_description = retrieve_configuration_description(include_providers=False) - self._default_values = create_default_config_parser(self.configuration_description) - self._providers_configuration_loaded = False - def _upgrade_postgres_metastore_conn(self): """ Upgrade SQL schemas. diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index eaebc909f1eac..bda95bbf4b06a 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -273,8 +273,9 @@ def __init__( self._scheduler_idle_sleep_time = scheduler_idle_sleep_time # Note: - # We need to fetch ALL the conf before the `prohibit_commit` block, otherwise we will encounter `UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS` since the Core conf might access the MetadataMetastoreBackend - # so the most easiest way is to fetch all the conf in the init method and store them as attributes of the SchedulerJobRunner instance. + # We need to fetch all conf values before the `prohibit_commit` block; otherwise the Core conf may + # access the MetadataMetastoreBackend and trigger `UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS`. + # The easiest way to keep the scheduler loop side-effect free is to read those values in `__init__`. # How many seconds do we wait for tasks to heartbeat before timeout. self._task_instance_heartbeat_timeout_secs = conf.getint( diff --git a/airflow-core/src/airflow/providers_manager.py b/airflow-core/src/airflow/providers_manager.py index b0340c498fcac..7e3d5128082d2 100644 --- a/airflow-core/src/airflow/providers_manager.py +++ b/airflow-core/src/airflow/providers_manager.py @@ -610,16 +610,7 @@ def initialize_providers_auth_managers_without_check(self): @provider_info_cache("config") def initialize_providers_configuration(self): - """ - Lazy initialization of providers configuration information. - - Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. - In some cases we might want to make sure that the configuration is initialized, but we do not want - to cache the initialization method - for example when we just want to write configuration with - providers, but it is used in the context where no providers are loaded yet we will eventually - restore the original configuration and we want the subsequent ``initialize_providers_configuration`` - method to be run in order to load the configuration for providers again. - """ + """Lazy initialization of provider configuration metadata and merge it into ``conf``.""" self.initialize_providers_list() self._discover_config() # Now update conf with the new provider configuration from providers diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index c4cc4df65e9bf..65779adc0eab1 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -332,9 +332,9 @@ def get_from_provider_cfg_config_fallback_defaults(self, section: str, key: str, def _provider_metadata_config_fallback_default_values(self) -> ConfigParser: """Return Provider metadata config fallback default values.""" base_configuration_description: dict[str, dict[str, Any]] = {} - for _, config in self.provider_manager_type().provider_configs: + for _, config in self._provider_manager_type().provider_configs: base_configuration_description.update(config) - return self.create_default_config_parser_callable(base_configuration_description) + return self._create_default_config_parser_callable(base_configuration_description) def get_from_provider_metadata_config_fallback_defaults(self, section: str, key: str, **kwargs) -> Any: """Get provider metadata config fallback default values.""" @@ -418,14 +418,16 @@ def __init__( """ super().__init__(*args, **kwargs) self.configuration_description = configuration_description + self._base_configuration_description = deepcopy(configuration_description) self._default_values = _default_values - self.provider_manager_type = provider_manager_type - self.create_default_config_parser_callable = create_default_config_parser_callable + self._provider_manager_type = provider_manager_type + self._create_default_config_parser_callable = create_default_config_parser_callable self._provider_cfg_config_fallback_default_values = create_provider_cfg_config_fallback_defaults( provider_config_fallback_defaults_cfg_path ) self._suppress_future_warnings = False self.upgraded_values: dict[tuple[str, str], str] = {} + self._providers_configuration_loaded = False def invalidate_cache(self) -> None: """ @@ -1186,7 +1188,7 @@ def load_providers_configuration(self) -> None: log.debug("Loading providers configuration") self.restore_core_default_configuration() - for provider, config in self.provider_manager_type().already_initialized_provider_configs: + for provider, config in self._provider_manager_type().already_initialized_provider_configs: for provider_section, provider_section_content in config.items(): provider_options = provider_section_content["options"] section_in_current_config = self.configuration_description.get(provider_section) @@ -1209,12 +1211,19 @@ def load_providers_configuration(self) -> None: "provider's configuration.", UserWarning, ) - self._default_values = self.create_default_config_parser_callable(self.configuration_description) + self._default_values = self._create_default_config_parser_callable(self.configuration_description) # Cached properties derived from configuration_description (e.g. sensitive_config_values) need # to be recomputed now that provider config has been merged in. self.invalidate_cache() self._providers_configuration_loaded = True + def restore_core_default_configuration(self) -> None: + """Restore the parser state before provider-contributed sections were loaded.""" + self.configuration_description = deepcopy(self._base_configuration_description) + self._default_values = self._create_default_config_parser_callable(self.configuration_description) + self.invalidate_cache() + self._providers_configuration_loaded = False + @overload # type: ignore[override] def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str: ... diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index c528c60ac1c58..2434adc291f22 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -152,7 +152,7 @@ def __init__(self): self._plugins_set: set[PluginInfo] = set() self._provider_schema_validator = _create_provider_info_schema_validator() self._init_airflow_core_hooks() - # _provider configs is required by respecting provider default config for sdk conf + # Populated by initialize_provider_configs(); holds provider-contributed config sections. self._provider_configs: dict[str, dict[str, Any]] = {} def _init_airflow_core_hooks(self): @@ -222,16 +222,7 @@ def initialize_providers_taskflow_decorator(self): @provider_info_cache("provider_configs") def initialize_provider_configs(self): - """ - Lazy initialization of providers configuration information. - - Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method. - In some cases we might want to make sure that the configuration is initialized, but we do not want - to cache the initialization method - for example when we just want to write configuration with - providers, but it is used in the context where no providers are loaded yet we will eventually - restore the original configuration and we want the subsequent ``initialize_providers_configuration`` - method to be run in order to load the configuration for providers again. - """ + """Lazy initialization of provider configuration metadata and merge it into SDK ``conf``.""" self.initialize_providers_list() self._discover_config() # Now update conf with the new provider configuration from providers diff --git a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py index da6600a6fda5d..aee5a115363f5 100644 --- a/task-sdk/tests/task_sdk/test_providers_manager_runtime.py +++ b/task-sdk/tests/task_sdk/test_providers_manager_runtime.py @@ -236,3 +236,38 @@ def test_optional_feature_debug(self, mock_importlib_import_string): assert self._caplog.messages == [ "Optional provider feature disabled when importing 'HookClass' from 'test_package' package" ] + + def test_initialize_provider_configs_can_reload_sdk_conf(self): + from airflow.sdk.configuration import conf + + providers_manager = ProvidersManagerTaskRuntime() + provider_config = { + "test_sdk_provider": { + "description": "Provider config used in runtime tests.", + "options": { + "test_option": { + "default": "provider-default", + } + }, + } + } + + def initialize_provider_configs() -> None: + providers_manager._provider_dict["apache-airflow-providers-test-sdk"] = ProviderInfo( + version="0.0.1", + data={"config": provider_config}, + ) + with patch.object(providers_manager, "initialize_providers_list"): + providers_manager.initialize_provider_configs() + + conf.restore_core_default_configuration() + try: + initialize_provider_configs() + assert conf.get("test_sdk_provider", "test_option") == "provider-default" + + providers_manager._cleanup() + + initialize_provider_configs() + assert conf.get("test_sdk_provider", "test_option") == "provider-default" + finally: + conf.restore_core_default_configuration() From 7a83b3dfcaafc8fc336ca27042e1cb55a0cba064 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 23 Mar 2026 10:33:21 +0800 Subject: [PATCH 15/17] Fix test_write_default_config_contains_generated_secrets error --- airflow-core/src/airflow/configuration.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow-core/src/airflow/configuration.py b/airflow-core/src/airflow/configuration.py index 0863c46f52d2d..1426199c7e07e 100644 --- a/airflow-core/src/airflow/configuration.py +++ b/airflow-core/src/airflow/configuration.py @@ -647,11 +647,13 @@ def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: conf.configuration_description["core"]["options"]["fernet_key"]["default"] = ( _SecretKeys.fernet_key ) + conf._default_values.set("core", "fernet_key", _SecretKeys.fernet_key) _SecretKeys.jwt_secret_key = b64encode(os.urandom(16)).decode("utf-8") conf.configuration_description["api_auth"]["options"]["jwt_secret"]["default"] = ( _SecretKeys.jwt_secret_key ) + conf._default_values.set("api_auth", "jwt_secret", _SecretKeys.jwt_secret_key) pathlib.Path(airflow_config.__fspath__()).touch() make_group_other_inaccessible(airflow_config.__fspath__()) with open(airflow_config, "w") as file: From 7693bea1b1c68b52f573c605eede7acc12e93dd3 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 23 Mar 2026 14:46:35 +0800 Subject: [PATCH 16/17] Address Copilot's comments --- .../airflow_shared/configuration/parser.py | 25 +++++++++---- .../tests/configuration/test_parser.py | 36 +++++++++++++++++++ task-sdk/src/airflow/sdk/configuration.py | 3 +- 3 files changed, 56 insertions(+), 8 deletions(-) diff --git a/shared/configuration/src/airflow_shared/configuration/parser.py b/shared/configuration/src/airflow_shared/configuration/parser.py index 65779adc0eab1..55f24c529e08d 100644 --- a/shared/configuration/src/airflow_shared/configuration/parser.py +++ b/shared/configuration/src/airflow_shared/configuration/parser.py @@ -280,13 +280,24 @@ def _lookup_sequence(self) -> list[Callable]: ] def _get_config_sources_for_as_dict(self) -> list[tuple[str, ConfigParser]]: - """Override the base method to add provider fallbacks.""" - return [ - ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), - ("provider-metadata-fallback-defaults", self._provider_metadata_config_fallback_default_values), + """Override the base method to add provider fallbacks when providers are loaded.""" + sources: list[tuple[str, ConfigParser]] = [ ("default", self._default_values), ("airflow.cfg", self), ] + if self._providers_configuration_loaded: + sources.insert( + 0, + ( + "provider-metadata-fallback-defaults", + self._provider_metadata_config_fallback_default_values, + ), + ) + sources.insert( + 0, + ("provider-cfg-fallback-defaults", self._provider_cfg_config_fallback_default_values), + ) + return sources def _get_option_from_provider_cfg_config_fallbacks( self, @@ -1908,10 +1919,10 @@ def write( # type: ignore[override] :param extra_spacing: Add extra spacing before examples and after variables :param only_defaults: Only include default values when writing the config, not the actual values """ - sources_dict = {} - if include_sources: - sources_dict = self.as_dict(display_source=True) with self.make_sure_configuration_loaded(with_providers=include_providers): + sources_dict = {} + if include_sources: + sources_dict = self.as_dict(display_source=True) for section_to_write in self.get_sections_including_defaults(): section_config_description = self.configuration_description.get(section_to_write, {}) if section_to_write != section and section is not None: diff --git a/shared/configuration/tests/configuration/test_parser.py b/shared/configuration/tests/configuration/test_parser.py index dd5bcfcf9b867..27434526b71ad 100644 --- a/shared/configuration/tests/configuration/test_parser.py +++ b/shared/configuration/tests/configuration/test_parser.py @@ -26,6 +26,7 @@ import textwrap from configparser import ConfigParser from enum import Enum +from io import StringIO from unittest.mock import patch import pytest @@ -95,6 +96,22 @@ def _update_defaults_from_string(self, config_string: str): for key, value in parser.items(section): self._default_values.set(section, key, value) + def _ensure_providers_config_loaded(self) -> None: + """Load provider configuration for tests when requested.""" + if not self._providers_configuration_loaded: + self.load_providers_configuration() + + def _ensure_providers_config_unloaded(self) -> bool: + """Unload provider configuration for tests when requested.""" + if self._providers_configuration_loaded: + self.restore_core_default_configuration() + return True + return False + + def _reload_provider_configs(self) -> None: + """Reload provider configuration for tests after temporary unloads.""" + self.load_providers_configuration() + class TestAirflowConfigParser: """Test the shared AirflowConfigParser parser methods.""" @@ -806,6 +823,25 @@ def test_get_mandatory_list_value(self): with pytest.raises(ValueError, match=r"The value test/missing_key should be set!"): test_conf.get_mandatory_list_value("test", "missing_key", fallback=None) + def test_as_dict_only_materializes_provider_sources_after_loading_providers(self): + test_conf = AirflowConfigParser() + + test_conf.as_dict(display_source=True) + assert "_provider_metadata_config_fallback_default_values" not in test_conf.__dict__ + + test_conf.load_providers_configuration() + test_conf.as_dict(display_source=True) + assert "_provider_metadata_config_fallback_default_values" in test_conf.__dict__ + + def test_write_materializes_provider_sources_in_requested_context(self): + test_conf = AirflowConfigParser() + + test_conf.write(StringIO(), include_sources=True, include_providers=False) + assert "_provider_metadata_config_fallback_default_values" not in test_conf.__dict__ + + test_conf.write(StringIO(), include_sources=True, include_providers=True) + assert "_provider_metadata_config_fallback_default_values" in test_conf.__dict__ + def test_set_case_insensitive(self): # both get and set should be case insensitive test_conf = AirflowConfigParser() diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index 84938fab59dc9..46702c9e20563 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -32,7 +32,6 @@ configure_parser_from_configuration_description, ) from airflow.sdk.execution_time.secrets import _SERVER_DEFAULT_SECRETS_SEARCH_PATH -from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime log = logging.getLogger(__name__) @@ -126,6 +125,8 @@ def __init__( *args, **kwargs, ): + from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime + # Read Core's config.yml (Phase 1: shared config.yml) configuration_description = retrieve_configuration_description() # Create default values parser From 6b83699ce986e4d64ebde7a4406097ea5af4cb00 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Mon, 23 Mar 2026 15:23:02 +0800 Subject: [PATCH 17/17] Address second phrase of Copilot's comments --- airflow-core/src/airflow/providers_manager.py | 4 +- .../tests/configuration/test_parser.py | 55 ++++++++++++++++++- task-sdk/src/airflow/sdk/configuration.py | 2 + .../airflow/sdk/providers_manager_runtime.py | 4 +- 4 files changed, 58 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/providers_manager.py b/airflow-core/src/airflow/providers_manager.py index 7e3d5128082d2..02f5a6957c936 100644 --- a/airflow-core/src/airflow/providers_manager.py +++ b/airflow-core/src/airflow/providers_manager.py @@ -613,7 +613,7 @@ def initialize_providers_configuration(self): """Lazy initialization of provider configuration metadata and merge it into ``conf``.""" self.initialize_providers_list() self._discover_config() - # Now update conf with the new provider configuration from providers + # Imported lazily to avoid a configuration/providers_manager import cycle during startup. from airflow.configuration import conf conf.load_providers_configuration() @@ -1479,7 +1479,7 @@ def _cleanup(self): self._queue_class_name_set.clear() self._provider_configs.clear() - # clear provider config cache in conf as well + # Imported lazily to avoid a configuration/providers_manager import cycle during cleanup. from airflow.configuration import conf conf.invalidate_cache() diff --git a/shared/configuration/tests/configuration/test_parser.py b/shared/configuration/tests/configuration/test_parser.py index 27434526b71ad..45b8ee1fd57ee 100644 --- a/shared/configuration/tests/configuration/test_parser.py +++ b/shared/configuration/tests/configuration/test_parser.py @@ -54,10 +54,23 @@ def _create_empty_config_parser(desc: dict) -> ConfigParser: return ConfigParser() +def _create_default_config_parser(desc: dict) -> ConfigParser: + parser = ConfigParser() + configure_parser_from_configuration_description(parser, desc, {}) + return parser + + class AirflowConfigParser(_SharedAirflowConfigParser): """Test parser that extends shared parser for testing.""" - def __init__(self, default_config: str | None = None, *args, **kwargs): + def __init__( + self, + default_config: str | None = None, + provider_manager_type=_NoOpProvidersManager, + create_default_config_parser_callable=_create_empty_config_parser, + *args, + **kwargs, + ): configuration_description = { "test": { "options": { @@ -73,8 +86,8 @@ def __init__(self, default_config: str | None = None, *args, **kwargs): super().__init__( configuration_description, _default_values, - _NoOpProvidersManager, - _create_empty_config_parser, + provider_manager_type, + create_default_config_parser_callable, "", *args, **kwargs, @@ -842,6 +855,42 @@ def test_write_materializes_provider_sources_in_requested_context(self): test_conf.write(StringIO(), include_sources=True, include_providers=True) assert "_provider_metadata_config_fallback_default_values" in test_conf.__dict__ + def test_get_uses_provider_metadata_fallback_before_loading_providers(self): + provider_configs = [ + ( + "apache-airflow-providers-test", + { + "test_provider": { + "options": { + "test_option": { + "default": "provider-default", + } + } + } + }, + ) + ] + + class ProvidersManagerWithConfig: + @property + def provider_configs(self): + return provider_configs + + @property + def already_initialized_provider_configs(self): + return [] + + test_conf = AirflowConfigParser( + provider_manager_type=ProvidersManagerWithConfig, + create_default_config_parser_callable=_create_default_config_parser, + ) + + assert test_conf._providers_configuration_loaded is False + assert test_conf.configuration_description.get("test_provider") is None + assert test_conf.get("test_provider", "test_option") == "provider-default" + assert test_conf._providers_configuration_loaded is False + assert test_conf.configuration_description.get("test_provider") is None + def test_set_case_insensitive(self): # both get and set should be case insensitive test_conf = AirflowConfigParser() diff --git a/task-sdk/src/airflow/sdk/configuration.py b/task-sdk/src/airflow/sdk/configuration.py index 46702c9e20563..64bda4b3a56eb 100644 --- a/task-sdk/src/airflow/sdk/configuration.py +++ b/task-sdk/src/airflow/sdk/configuration.py @@ -125,6 +125,8 @@ def __init__( *args, **kwargs, ): + # Imported lazily to preserve the module-level lazy ``conf`` initialization and avoid a + # configuration/providers_manager_runtime import cycle. from airflow.sdk.providers_manager_runtime import ProvidersManagerTaskRuntime # Read Core's config.yml (Phase 1: shared config.yml) diff --git a/task-sdk/src/airflow/sdk/providers_manager_runtime.py b/task-sdk/src/airflow/sdk/providers_manager_runtime.py index 2434adc291f22..4d764596814ed 100644 --- a/task-sdk/src/airflow/sdk/providers_manager_runtime.py +++ b/task-sdk/src/airflow/sdk/providers_manager_runtime.py @@ -225,7 +225,7 @@ def initialize_provider_configs(self): """Lazy initialization of provider configuration metadata and merge it into SDK ``conf``.""" self.initialize_providers_list() self._discover_config() - # Now update conf with the new provider configuration from providers + # Imported lazily to preserve SDK conf lazy initialization and avoid a configuration/runtime cycle. from airflow.sdk.configuration import conf conf.load_providers_configuration() @@ -637,7 +637,7 @@ def _cleanup(self): self._asset_to_openlineage_converters.clear() self._provider_configs.clear() - # clear provider config cache in conf as well + # Imported lazily to preserve SDK conf lazy initialization and avoid a configuration/runtime cycle. from airflow.sdk.configuration import conf conf.invalidate_cache()