@@ -1120,6 +1120,8 @@ def run_ti(self, task_id, dag_run=None, dag_run_kwargs=None, **kwargs):
11201120
11211121 Returns the created TaskInstance.
11221122 """
1123+ from tests_common .test_utils .version_compat import AIRFLOW_V_3_1_PLUS
1124+
11231125 if dag_run is None :
11241126 if dag_run_kwargs is None :
11251127 dag_run_kwargs = {}
@@ -1131,7 +1133,21 @@ def run_ti(self, task_id, dag_run=None, dag_run_kwargs=None, **kwargs):
11311133 f"Task instance with task_id '{ task_id } ' not found in dag run. "
11321134 f"Available task_ids: { available_task_ids } "
11331135 )
1134- ti .refresh_from_task (self .dag .get_task (ti .task_id ))
1136+ task = self .dag .get_task (ti .task_id )
1137+
1138+ if not AIRFLOW_V_3_1_PLUS :
1139+ # Airflow <3.1 has a bug for DecoratedOperator has an unused signature for
1140+ # `DecoratedOperator._handle_output` for xcom_push
1141+ # This worked for `models.BaseOperator` since it had xcom_push method but for
1142+ # `airflow.sdk.BaseOperator`, this does not exist, so this returns an AttributeError
1143+ # Since this is an unused attribute anyway, we just monkey patch it with a lambda.
1144+ # Error otherwise:
1145+ # /usr/local/lib/python3.11/site-packages/airflow/sdk/bases/decorator.py:253: in execute
1146+ # return self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)
1147+ # ^^^^^^^^^^^^^^
1148+ # E AttributeError: '_PythonDecoratedOperator' object has no attribute 'xcom_push'
1149+ task .xcom_push = lambda * args , ** kwargs : None
1150+ ti .refresh_from_task (task )
11351151 ti .run (** kwargs )
11361152 return ti
11371153
0 commit comments