Skip to content

Commit f15c40e

Browse files
committed
Test standard provider with Airflow 2.8 and 2.9
The standard provider has now min version of Airflow = 2.8 since #43553, but we have not tested it for Airflow 2.8 and 2.9.
1 parent 1c38b2a commit f15c40e

9 files changed

Lines changed: 223 additions & 95 deletions

File tree

dev/breeze/src/airflow_breeze/global_constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,13 +574,13 @@ def get_airflow_extras():
574574
{
575575
"python-version": "3.9",
576576
"airflow-version": "2.8.4",
577-
"remove-providers": "cloudant fab edge standard",
577+
"remove-providers": "cloudant fab edge",
578578
"run-tests": "true",
579579
},
580580
{
581581
"python-version": "3.9",
582582
"airflow-version": "2.9.3",
583-
"remove-providers": "cloudant edge standard",
583+
"remove-providers": "cloudant edge",
584584
"run-tests": "true",
585585
},
586586
{

providers/src/airflow/providers/standard/operators/python.py

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,16 @@
5454
from airflow.settings import _ENABLE_AIP_44
5555
from airflow.typing_compat import Literal
5656
from airflow.utils import hashlib_wrapper
57-
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
57+
from airflow.utils.context import context_copy_partial, context_merge
5858
from airflow.utils.file import get_unique_dag_module_name
59-
from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters
60-
from airflow.utils.process_utils import execute_in_subprocess
59+
from airflow.utils.operator_helpers import KeywordParameters
60+
from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs
6161
from airflow.utils.session import create_session
6262

6363
log = logging.getLogger(__name__)
6464

6565
AIRFLOW_VERSION = Version(airflow_version)
66+
AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0")
6667
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
6768

6869
if TYPE_CHECKING:
@@ -187,7 +188,15 @@ def __init__(
187188
def execute(self, context: Context) -> Any:
188189
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
189190
self.op_kwargs = self.determine_kwargs(context)
190-
self._asset_events = context_get_outlet_events(context)
191+
192+
if AIRFLOW_V_3_0_PLUS:
193+
from airflow.utils.context import context_get_outlet_events
194+
195+
self._asset_events = context_get_outlet_events(context)
196+
elif AIRFLOW_V_2_10_PLUS:
197+
from airflow.utils.context import context_get_outlet_events
198+
199+
self._dataset_events = context_get_outlet_events(context)
191200

192201
return_value = self.execute_callable()
193202
if self.show_return_value_in_logs:
@@ -206,7 +215,15 @@ def execute_callable(self) -> Any:
206215
207216
:return: the return value of the call.
208217
"""
209-
runner = ExecutionCallableRunner(self.python_callable, self._asset_events, logger=self.log)
218+
try:
219+
from airflow.utils.operator_helpers import ExecutionCallableRunner
220+
221+
asset_events = self._asset_events if AIRFLOW_V_3_0_PLUS else self._dataset_events
222+
223+
runner = ExecutionCallableRunner(self.python_callable, asset_events, logger=self.log)
224+
except ImportError:
225+
# Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available
226+
return self.python_callable(*self.op_args, **self.op_kwargs)
210227
return runner.run(*self.op_args, **self.op_kwargs)
211228

212229

@@ -551,18 +568,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
551568
env_vars.update(self.env_vars)
552569

553570
try:
554-
execute_in_subprocess(
555-
cmd=[
556-
os.fspath(python_path),
557-
os.fspath(script_path),
558-
os.fspath(input_path),
559-
os.fspath(output_path),
560-
os.fspath(string_args_path),
561-
os.fspath(termination_log_path),
562-
os.fspath(airflow_context_path),
563-
],
564-
env=env_vars,
565-
)
571+
cmd: list[str] = [
572+
os.fspath(python_path),
573+
os.fspath(script_path),
574+
os.fspath(input_path),
575+
os.fspath(output_path),
576+
os.fspath(string_args_path),
577+
os.fspath(termination_log_path),
578+
os.fspath(airflow_context_path),
579+
]
580+
if AIRFLOW_V_2_10_PLUS:
581+
execute_in_subprocess(
582+
cmd=cmd,
583+
env=env_vars,
584+
)
585+
else:
586+
execute_in_subprocess_with_kwargs(
587+
cmd=cmd,
588+
env=env_vars,
589+
)
566590
except subprocess.CalledProcessError as e:
567591
if e.returncode in self.skip_on_exit_code:
568592
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
@@ -699,8 +723,14 @@ def __init__(
699723
)
700724

701725
if use_airflow_context and (not expect_airflow and not system_site_packages):
702-
error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False."
703-
raise AirflowException(error_msg)
726+
raise AirflowException(
727+
"The `use_airflow_context` parameter is set to True, but "
728+
"expect_airflow and system_site_packages are set to False."
729+
)
730+
if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
731+
raise AirflowException(
732+
"The `use_airflow_context=True` is only supported in Airflow 3.0.0 and later."
733+
)
704734
if not requirements:
705735
self.requirements: list[str] = []
706736
elif isinstance(requirements, str):

providers/src/airflow/providers/standard/sensors/date_time.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,27 @@
1818
from __future__ import annotations
1919

2020
import datetime
21+
from dataclasses import dataclass
2122
from typing import TYPE_CHECKING, Any, NoReturn, Sequence
2223

24+
from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
2325
from airflow.sensors.base import BaseSensorOperator
24-
from airflow.triggers.base import StartTriggerArgs
26+
27+
try:
28+
from airflow.triggers.base import StartTriggerArgs
29+
except ImportError:
30+
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
31+
@dataclass
32+
class StartTriggerArgs: # type: ignore[no-redef]
33+
"""Arguments required for start task execution from triggerer."""
34+
35+
trigger_cls: str
36+
next_method: str
37+
trigger_kwargs: dict[str, Any] | None = None
38+
next_kwargs: dict[str, Any] | None = None
39+
timeout: datetime.timedelta | None = None
40+
41+
2542
from airflow.triggers.temporal import DateTimeTrigger
2643
from airflow.utils import timezone
2744

@@ -125,7 +142,9 @@ def execute(self, context: Context) -> NoReturn:
125142
trigger=DateTimeTrigger(
126143
moment=timezone.parse(self.target_time),
127144
end_from_trigger=self.end_from_trigger,
128-
),
145+
)
146+
if AIRFLOW_V_3_0_PLUS
147+
else DateTimeTrigger(moment=timezone.parse(self.target_time)),
129148
)
130149

131150
def execute_complete(self, context: Context, event: Any = None) -> None:

providers/src/airflow/providers/standard/sensors/time.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,27 @@
1818
from __future__ import annotations
1919

2020
import datetime
21+
from dataclasses import dataclass
2122
from typing import TYPE_CHECKING, Any, NoReturn
2223

24+
from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
2325
from airflow.sensors.base import BaseSensorOperator
24-
from airflow.triggers.base import StartTriggerArgs
26+
27+
try:
28+
from airflow.triggers.base import StartTriggerArgs
29+
except ImportError:
30+
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
31+
@dataclass
32+
class StartTriggerArgs: # type: ignore[no-redef]
33+
"""Arguments required for start task execution from triggerer."""
34+
35+
trigger_cls: str
36+
next_method: str
37+
trigger_kwargs: dict[str, Any] | None = None
38+
next_kwargs: dict[str, Any] | None = None
39+
timeout: datetime.timedelta | None = None
40+
41+
2542
from airflow.triggers.temporal import DateTimeTrigger
2643
from airflow.utils import timezone
2744

@@ -102,7 +119,9 @@ def __init__(
102119

103120
def execute(self, context: Context) -> NoReturn:
104121
self.defer(
105-
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
122+
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger)
123+
if AIRFLOW_V_3_0_PLUS
124+
else DateTimeTrigger(moment=self.target_datetime),
106125
method_name="execute_complete",
107126
)
108127

providers/src/airflow/providers/standard/sensors/time_delta.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from airflow.configuration import conf
2525
from airflow.exceptions import AirflowSkipException
26+
from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS
2627
from airflow.sensors.base import BaseSensorOperator
2728
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
2829
from airflow.utils import timezone
@@ -81,7 +82,10 @@ def execute(self, context: Context) -> bool | NoReturn:
8182
# If the target datetime is in the past, return immediately
8283
return True
8384
try:
84-
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
85+
if AIRFLOW_V_3_0_PLUS:
86+
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
87+
else:
88+
trigger = DateTimeTrigger(moment=target_dttm)
8589
except (TypeError, ValueError) as e:
8690
if self.soft_fail:
8791
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
@@ -121,7 +125,9 @@ def __init__(
121125
def execute(self, context: Context) -> None:
122126
if self.deferrable:
123127
self.defer(
124-
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True),
128+
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True)
129+
if AIRFLOW_V_3_0_PLUS
130+
else TimeDeltaTrigger(self.time_to_wait),
125131
method_name="execute_complete",
126132
)
127133
else:

providers/tests/common/sql/operators/test_sql.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353

5454
pytestmark = [
5555
pytest.mark.db_test,
56-
pytest.mark.skipif(reason="Tests for Airflow 2.8.0+ only"),
5756
pytest.mark.skip_if_database_isolation_mode,
5857
]
5958

providers/tests/openlineage/plugins/test_utils.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,6 @@
5757
if AIRFLOW_V_3_0_PLUS:
5858
from airflow.utils.types import DagRunTriggeredByType
5959

60-
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
61-
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
62-
if not AIRFLOW_V_2_10_PLUS:
63-
BASH_OPERATOR_PATH = "airflow.operators.bash"
64-
PYTHON_OPERATOR_PATH = "airflow.operators.python"
65-
6660

6761
class SafeStrDict(dict):
6862
def __str__(self):
@@ -276,7 +270,7 @@ def test_get_fully_qualified_class_name():
276270
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
277271

278272
result = get_fully_qualified_class_name(BashOperator(task_id="test", bash_command="exit 0;"))
279-
assert result == f"{BASH_OPERATOR_PATH}.BashOperator"
273+
assert result == "airflow.providers.standard.operators.bash.BashOperator"
280274

281275
result = get_fully_qualified_class_name(OpenLineageAdapter())
282276
assert result == "airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter"
@@ -292,8 +286,8 @@ def test_is_operator_disabled(mock_disabled_operators):
292286
assert is_operator_disabled(op) is False
293287

294288
mock_disabled_operators.return_value = {
295-
f"{BASH_OPERATOR_PATH}.BashOperator",
296-
f"{PYTHON_OPERATOR_PATH}.PythonOperator",
289+
"airflow.providers.standard.operators.bash.BashOperator",
290+
"airflow.providers.standard.operators.python.PythonOperator",
297291
}
298292
assert is_operator_disabled(op) is True
299293

providers/tests/openlineage/utils/test_utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,11 @@
4343
from airflow.utils.task_group import TaskGroup
4444
from airflow.utils.types import DagRunType
4545

46-
from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator, PythonOperator
46+
from tests_common.test_utils.compat import BashOperator, PythonOperator
4747
from tests_common.test_utils.mock_operators import MockOperator
4848

4949
BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
5050
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
51-
if not AIRFLOW_V_2_10_PLUS:
52-
BASH_OPERATOR_PATH = "airflow.operators.bash"
53-
PYTHON_OPERATOR_PATH = "airflow.operators.python"
5451

5552

5653
class CustomOperatorForTest(BashOperator):

0 commit comments

Comments
 (0)