@@ -1082,6 +1082,7 @@ def create_task_instance(dag_maker, create_dummy_dag):
10821082
10831083 Uses ``create_dummy_dag`` to create the dag structure.
10841084 """
1085+ from airflow .operators .empty import EmptyOperator
10851086
10861087 def maker (
10871088 execution_date = None ,
@@ -1091,14 +1092,46 @@ def maker(
10911092 run_type = None ,
10921093 data_interval = None ,
10931094 external_executor_id = None ,
1095+ dag_id = "dag" ,
1096+ task_id = "op1" ,
1097+ task_display_name = None ,
1098+ max_active_tis_per_dag = 16 ,
1099+ max_active_tis_per_dagrun = None ,
1100+ pool = "default_pool" ,
1101+ executor_config = None ,
1102+ trigger_rule = "all_done" ,
1103+ on_success_callback = None ,
1104+ on_execute_callback = None ,
1105+ on_failure_callback = None ,
1106+ on_retry_callback = None ,
1107+ email = None ,
10941108 map_index = - 1 ,
10951109 ** kwargs ,
10961110 ) -> TaskInstance :
10971111 if execution_date is None :
10981112 from airflow .utils import timezone
10991113
11001114 execution_date = timezone .utcnow ()
1101- _ , task = create_dummy_dag (with_dagrun_type = None , ** kwargs )
1115+ with dag_maker (dag_id , ** kwargs ):
1116+ op_kwargs = {}
1117+ from tests .test_utils .compat import AIRFLOW_V_2_9_PLUS
1118+
1119+ if AIRFLOW_V_2_9_PLUS :
1120+ op_kwargs ["task_display_name" ] = task_display_name
1121+ task = EmptyOperator (
1122+ task_id = task_id ,
1123+ max_active_tis_per_dag = max_active_tis_per_dag ,
1124+ max_active_tis_per_dagrun = max_active_tis_per_dagrun ,
1125+ executor_config = executor_config or {},
1126+ on_success_callback = on_success_callback ,
1127+ on_execute_callback = on_execute_callback ,
1128+ on_failure_callback = on_failure_callback ,
1129+ on_retry_callback = on_retry_callback ,
1130+ email = email ,
1131+ pool = pool ,
1132+ trigger_rule = trigger_rule ,
1133+ ** op_kwargs ,
1134+ )
11021135
11031136 dagrun_kwargs = {"execution_date" : execution_date , "state" : dagrun_state }
11041137 if run_id is not None :
0 commit comments