@@ -109,7 +109,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
109109 of LocalTaskJob can be assigned with
110110 proper values without intervention
111111 """
112- with dag_maker ("test_localtaskjob_essential_attr" ):
112+ with dag_maker ("test_localtaskjob_essential_attr" , serialized = True ):
113113 op1 = EmptyOperator (task_id = "op1" )
114114
115115 dr = dag_maker .create_dagrun ()
@@ -127,6 +127,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
127127 check_result_2 = [getattr (job1 , attr ) is not None for attr in essential_attr ]
128128 assert all (check_result_2 )
129129
130+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
130131 def test_localtaskjob_heartbeat (self , dag_maker ):
131132 session = settings .Session ()
132133 with dag_maker ("test_localtaskjob_heartbeat" ):
@@ -173,6 +174,7 @@ def test_localtaskjob_heartbeat(self, dag_maker):
173174 assert not job1 .task_runner .run_as_user
174175 job_runner .heartbeat_callback ()
175176
177+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
176178 @mock .patch ("subprocess.check_call" )
177179 @mock .patch ("airflow.jobs.local_task_job_runner.psutil" )
178180 def test_localtaskjob_heartbeat_with_run_as_user (self , psutil_mock , _ , dag_maker ):
@@ -227,6 +229,7 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker
227229 assert ti .pid != job1 .task_runner .process .pid
228230 job_runner .heartbeat_callback ()
229231
232+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
230233 @conf_vars ({("core" , "default_impersonation" ): "testuser" })
231234 @mock .patch ("subprocess.check_call" )
232235 @mock .patch ("airflow.jobs.local_task_job_runner.psutil" )
@@ -282,6 +285,7 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _,
282285 assert ti .pid != job1 .task_runner .process .pid
283286 job_runner .heartbeat_callback ()
284287
288+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
285289 def test_heartbeat_failed_fast (self ):
286290 """
287291 Test that task heartbeat will sleep when it fails fast
@@ -323,6 +327,7 @@ def test_heartbeat_failed_fast(self):
323327 delta = (time2 - time1 ).total_seconds ()
324328 assert abs (delta - job .heartrate ) < 0.8
325329
330+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
326331 @conf_vars ({("core" , "task_success_overtime" ): "1" })
327332 def test_mark_success_no_kill (self , caplog , get_test_dag , session ):
328333 """
@@ -354,6 +359,7 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session):
354359 "State of this instance has been externally set to success. Terminating instance." in caplog .text
355360 )
356361
362+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
357363 def test_localtaskjob_double_trigger (self ):
358364 dag = self .dagbag .dags .get ("test_localtaskjob_double_trigger" )
359365 task = dag .get_task ("test_localtaskjob_double_trigger_task" )
@@ -392,6 +398,7 @@ def test_localtaskjob_double_trigger(self):
392398
393399 session .close ()
394400
401+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
395402 @patch .object (StandardTaskRunner , "return_code" )
396403 @mock .patch ("airflow.jobs.scheduler_job_runner.Stats.incr" , autospec = True )
397404 def test_local_task_return_code_metric (self , mock_stats_incr , mock_return_code , create_dummy_dag ):
@@ -424,6 +431,7 @@ def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code,
424431 ]
425432 )
426433
434+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
427435 @patch .object (StandardTaskRunner , "return_code" )
428436 def test_localtaskjob_maintain_heart_rate (self , mock_return_code , caplog , create_dummy_dag ):
429437 dag , task = create_dummy_dag ("test_localtaskjob_double_trigger" )
@@ -456,6 +464,7 @@ def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create
456464 assert time_end - time_start < job1 .heartrate
457465 assert "Task exited with return code 0" in caplog .text
458466
467+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
459468 def test_mark_failure_on_failure_callback (self , caplog , get_test_dag ):
460469 """
461470 Test that ensures that mark_failure in the UI fails
@@ -488,6 +497,7 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
488497 "State of this instance has been externally set to failed. Terminating instance."
489498 ) in caplog .text
490499
500+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
491501 def test_dagrun_timeout_logged_in_task_logs (self , caplog , get_test_dag ):
492502 """
493503 Test that ensures that if a running task is externally skipped (due to a dagrun timeout)
@@ -520,6 +530,7 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
520530 assert ti .state == State .SKIPPED
521531 assert "DagRun timed out after " in caplog .text
522532
533+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
523534 def test_failure_callback_called_by_airflow_run_raw_process (self , monkeypatch , tmp_path , get_test_dag ):
524535 """
525536 Ensure failure callback of a task is run by the airflow run --raw process
@@ -555,6 +566,7 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t
555566 assert m , "pid expected in output."
556567 assert os .getpid () != int (m .group (1 ))
557568
569+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
558570 @conf_vars ({("core" , "task_success_overtime" ): "5" })
559571 def test_mark_success_on_success_callback (self , caplog , get_test_dag ):
560572 """
@@ -586,6 +598,7 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag):
586598 "State of this instance has been externally set to success. Terminating instance." in caplog .text
587599 )
588600
601+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
589602 def test_success_listeners_executed (self , caplog , get_test_dag ):
590603 """
591604 Test that ensures that when listeners are executed, the task is not killed before they finish
@@ -623,6 +636,7 @@ def test_success_listeners_executed(self, caplog, get_test_dag):
623636 )
624637 lm .clear ()
625638
639+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
626640 @conf_vars ({("core" , "task_success_overtime" ): "3" })
627641 def test_success_slow_listeners_executed_kill (self , caplog , get_test_dag ):
628642 """
@@ -659,6 +673,7 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
659673 )
660674 lm .clear ()
661675
676+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
662677 @conf_vars ({("core" , "task_success_overtime" ): "3" })
663678 def test_success_slow_task_not_killed_by_overtime_but_regular_timeout (self , caplog , get_test_dag ):
664679 """
@@ -698,6 +713,7 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl
698713 )
699714 lm .clear ()
700715
716+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
701717 @pytest .mark .parametrize ("signal_type" , [signal .SIGTERM , signal .SIGKILL ])
702718 def test_process_os_signal_calls_on_failure_callback (
703719 self , monkeypatch , tmp_path , get_test_dag , signal_type
@@ -792,6 +808,7 @@ def send_signal(ti, signal_sent, sig):
792808 lines = f .readlines ()
793809 assert len (lines ) == 0
794810
811+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
795812 @pytest .mark .parametrize (
796813 "conf, init_state, first_run_state, second_run_state, task_ids_to_run, error_message" ,
797814 [
@@ -876,6 +893,7 @@ def test_fast_follow(
876893 if scheduler_job_runner .processor_agent :
877894 scheduler_job_runner .processor_agent .end ()
878895
896+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
879897 @conf_vars ({("scheduler" , "schedule_after_task_execution" ): "True" })
880898 def test_mini_scheduler_works_with_wait_for_upstream (self , caplog , get_test_dag ):
881899 dag = get_test_dag ("test_dagrun_fast_follow" )
@@ -944,7 +962,7 @@ def task_function(ti):
944962
945963 os .kill (psutil .Process (os .getpid ()).ppid (), signal .SIGSEGV )
946964
947- with dag_maker (dag_id = "test_segmentation_fault" ):
965+ with dag_maker (dag_id = "test_segmentation_fault" , serialized = True ):
948966 task = PythonOperator (
949967 task_id = "test_sigsegv" ,
950968 python_callable = task_function ,
@@ -975,7 +993,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
975993 mock_get_task_runner .return_value .return_code .side_effects = [[0 ], codes ]
976994
977995 unique_prefix = str (uuid .uuid4 ())
978- with dag_maker (dag_id = f"{ unique_prefix } _test_number_of_queries" ):
996+ with dag_maker (dag_id = f"{ unique_prefix } _test_number_of_queries" , serialized = True ):
979997 task = EmptyOperator (task_id = "test_state_succeeded1" )
980998
981999 dr = dag_maker .create_dagrun (run_id = unique_prefix , state = State .NONE )
@@ -992,6 +1010,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
9921010class TestSigtermOnRunner :
9931011 """Test receive SIGTERM on Task Runner."""
9941012
1013+ @pytest .mark .skip_if_database_isolation_mode # Does not work in db isolation mode
9951014 @pytest .mark .parametrize (
9961015 "daemon" , [pytest .param (True , id = "daemon" ), pytest .param (False , id = "non-daemon" )]
9971016 )
0 commit comments