Skip to content

Commit 1f8b642

Browse files
committed
Allow for retry when tasks are stuck in queued
1 parent 765bbd6 commit 1f8b642

5 files changed

Lines changed: 20 additions & 7 deletions

File tree

airflow/executors/base_executor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,15 @@ def queued(self, key: TaskInstanceKey, info=None) -> None:
478478
"""
479479
self.change_state(key, TaskInstanceState.QUEUED, info)
480480

481+
def retry_stuck_task(self, key: TaskInstanceKey, info=None) -> None:
482+
"""
483+
Set queued state for the event.
484+
485+
:param info: Executor information for the task instance
486+
:param key: Unique key for the task instance
487+
"""
488+
self.change_state(key, TaskInstanceState.STUCK_RESCHEDULE, info)
489+
481490
def running_state(self, key: TaskInstanceKey, info=None) -> None:
482491
"""
483492
Set running state for the event.

airflow/jobs/scheduler_job_runner.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,7 +1075,7 @@ def _run_scheduler_loop(self) -> None:
10751075

10761076
timers.call_regular_interval(
10771077
conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
1078-
self._fail_tasks_stuck_in_queued,
1078+
self._handle_tasks_stuck_in_queued,
10791079
)
10801080

10811081
timers.call_regular_interval(
@@ -1782,7 +1782,7 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques
17821782
self.log.debug("callback is empty")
17831783

17841784
@provide_session
1785-
def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
1785+
def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
17861786
"""
17871787
Mark tasks stuck in queued for longer than `task_queued_timeout` as failed.
17881788
@@ -1825,6 +1825,8 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
18251825
num_times_stuck = self._get_num_times_stuck(ti, session)
18261826
if num_times_stuck < conf.getint("core", "num_stuck_retries", fallback=3):
18271827
self._reset_task_instance(ti, session)
1828+
else:
1829+
executor.fail(ti.key)
18281830

18291831

18301832
except NotImplementedError:
@@ -1840,6 +1842,7 @@ def _get_num_times_stuck(self, ti: TaskInstance, session: Session = NEW_SESSION)
18401842
.where(Log.event == "stuck in queued")
18411843
))
18421844

1845+
@provide_session
18431846
def _reset_task_instance(self, ti: TaskInstance, session: Session = NEW_SESSION):
18441847
ti.external_executor_id = None
18451848
ti.state = State.SCHEDULED

airflow/utils/state.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class TaskInstanceState(str, Enum):
5353
SUCCESS = "success" # Task completed
5454
RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
5555
FAILED = "failed" # Task errored out
56+
STUCK_RESCHEDULE = "stuck_reschedule" # Task got stuck in queued but is up for resch
5657
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
5758
UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
5859
UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed

providers/src/airflow/providers/celery/executors/celery_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
450450
for ti in tis:
451451
readable_tis.append(repr(ti))
452452
task_instance_key = ti.key
453-
self.fail(task_instance_key, None)
453+
self.retry_stuck_task(task_instance_key, None)
454454
celery_async_result = self.tasks.pop(task_instance_key, None)
455455
if celery_async_result:
456456
try:

tests/jobs/test_scheduler_job.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2202,7 +2202,7 @@ def test_adopt_or_reset_orphaned_tasks_multiple_executors(self, dag_maker, mock_
22022202
# Second executor called for ti3
22032203
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
22042204

2205-
def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
2205+
def test_handle_stuck_queued_tasks(self, dag_maker, session, mock_executors):
22062206
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
22072207
op1 = EmptyOperator(task_id="op1")
22082208
op2 = EmptyOperator(task_id="op2", executor="default_exec")
@@ -2228,7 +2228,7 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
22282228
(None,): mock_executors[0],
22292229
("secondary_exec",): mock_executors[1],
22302230
}[x]
2231-
job_runner._fail_tasks_stuck_in_queued()
2231+
job_runner._handle_tasks_stuck_in_queued()
22322232

22332233
# Default executor is called for ti1 (no explicit executor override uses default) and ti2 (where we
22342234
# explicitly marked that for execution by the default executor)
@@ -2238,7 +2238,7 @@ def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
22382238
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, ti1])
22392239
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
22402240

2241-
def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog):
2241+
def test_handle_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog):
22422242
with dag_maker("test_fail_stuck_queued_tasks"):
22432243
op1 = EmptyOperator(task_id="op1")
22442244

@@ -2253,7 +2253,7 @@ def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session
22532253
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
22542254
job_runner._task_queued_timeout = 300
22552255
with caplog.at_level(logging.DEBUG):
2256-
job_runner._fail_tasks_stuck_in_queued()
2256+
job_runner._handle_tasks_stuck_in_queued()
22572257
assert "Executor doesn't support cleanup of stuck queued tasks. Skipping." in caplog.text
22582258

22592259
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")

0 commit comments

Comments
 (0)