5454from airflow .settings import _ENABLE_AIP_44
5555from airflow .typing_compat import Literal
5656from airflow .utils import hashlib_wrapper
57- from airflow .utils .context import context_copy_partial , context_get_outlet_events , context_merge
57+ from airflow .utils .context import context_copy_partial , context_merge
5858from airflow .utils .file import get_unique_dag_module_name
59- from airflow .utils .operator_helpers import ExecutionCallableRunner , KeywordParameters
60- from airflow .utils .process_utils import execute_in_subprocess
59+ from airflow .utils .operator_helpers import KeywordParameters
60+ from airflow .utils .process_utils import execute_in_subprocess , execute_in_subprocess_with_kwargs
6161from airflow .utils .session import create_session
6262
6363log = logging .getLogger (__name__ )
6464
6565AIRFLOW_VERSION = Version (airflow_version )
66+ AIRFLOW_V_2_10_PLUS = Version (AIRFLOW_VERSION .base_version ) >= Version ("2.10.0" )
6667AIRFLOW_V_3_0_PLUS = Version (AIRFLOW_VERSION .base_version ) >= Version ("3.0.0" )
6768
6869if TYPE_CHECKING :
@@ -187,7 +188,15 @@ def __init__(
187188 def execute (self , context : Context ) -> Any :
188189 context_merge (context , self .op_kwargs , templates_dict = self .templates_dict )
189190 self .op_kwargs = self .determine_kwargs (context )
190- self ._asset_events = context_get_outlet_events (context )
191+
192+ if AIRFLOW_V_3_0_PLUS :
193+ from airflow .utils .context import context_get_outlet_events
194+
195+ self ._asset_events = context_get_outlet_events (context )
196+ elif AIRFLOW_V_2_10_PLUS :
197+ from airflow .utils .context import context_get_outlet_events
198+
199+ self ._dataset_events = context_get_outlet_events (context )
191200
192201 return_value = self .execute_callable ()
193202 if self .show_return_value_in_logs :
@@ -206,7 +215,15 @@ def execute_callable(self) -> Any:
206215
207216 :return: the return value of the call.
208217 """
209- runner = ExecutionCallableRunner (self .python_callable , self ._asset_events , logger = self .log )
218+ try :
219+ from airflow .utils .operator_helpers import ExecutionCallableRunner
220+
221+ asset_events = self ._asset_events if AIRFLOW_V_3_0_PLUS else self ._dataset_events
222+
223+ runner = ExecutionCallableRunner (self .python_callable , asset_events , logger = self .log )
224+ except ImportError :
225+ # Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available
226+ return self .python_callable (* self .op_args , ** self .op_kwargs )
210227 return runner .run (* self .op_args , ** self .op_kwargs )
211228
212229
@@ -551,18 +568,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
551568 env_vars .update (self .env_vars )
552569
553570 try :
554- execute_in_subprocess (
555- cmd = [
556- os .fspath (python_path ),
557- os .fspath (script_path ),
558- os .fspath (input_path ),
559- os .fspath (output_path ),
560- os .fspath (string_args_path ),
561- os .fspath (termination_log_path ),
562- os .fspath (airflow_context_path ),
563- ],
564- env = env_vars ,
565- )
571+ cmd : list [str ] = [
572+ os .fspath (python_path ),
573+ os .fspath (script_path ),
574+ os .fspath (input_path ),
575+ os .fspath (output_path ),
576+ os .fspath (string_args_path ),
577+ os .fspath (termination_log_path ),
578+ os .fspath (airflow_context_path ),
579+ ]
580+ if AIRFLOW_V_2_10_PLUS :
581+ execute_in_subprocess (
582+ cmd = cmd ,
583+ env = env_vars ,
584+ )
585+ else :
586+ execute_in_subprocess_with_kwargs (
587+ cmd = cmd ,
588+ env = env_vars ,
589+ )
566590 except subprocess .CalledProcessError as e :
567591 if e .returncode in self .skip_on_exit_code :
568592 raise AirflowSkipException (f"Process exited with code { e .returncode } . Skipping." )
0 commit comments