Skip to content

Commit 116d130

Browse files
authored
Simplify the handle stuck in queued interface (#43647)
1 parent ffd1a8e commit 116d130

6 files changed

Lines changed: 92 additions & 83 deletions

File tree

airflow/executors/base_executor.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import sys
2323
from collections import defaultdict, deque
2424
from dataclasses import dataclass, field
25-
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple
25+
from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Sequence, Tuple
2626

2727
import pendulum
2828

@@ -540,7 +540,9 @@ def terminate(self):
540540
"""Get called when the daemon receives a SIGTERM."""
541541
raise NotImplementedError
542542

543-
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover
543+
def cleanup_stuck_queued_tasks(
544+
self, tis: list[TaskInstance]
545+
) -> Iterable[TaskInstance]: # pragma: no cover
544546
"""
545547
Handle remnants of tasks that were failed because they were stuck in queued.
546548

airflow/jobs/scheduler_job_runner.py

Lines changed: 71 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@
9898
DR = DagRun
9999
DM = DagModel
100100

101-
RESCHEDULE_STUCK_IN_QUEUED_EVENT = "rescheduling stuck in queued"
101+
STUCK_IN_QUEUED_EVENT = "stuck in queued"
102+
""":meta private:"""
102103

103104

104105
class ConcurrencyMap:
@@ -1790,7 +1791,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
17901791
17911792
As a compromise between always failing a stuck task and always rescheduling a stuck task (which could
17921793
lead to tasks being stuck in queued forever without informing the user), we have creating the config
1793-
`[core] num_stuck_reschedules`. With this new configuration, an airflow admin can decide how
1794+
``[scheduler] num_stuck_in_queued_retries``. With this new configuration, an airflow admin can decide how
17941795
sensitive they would like their airflow to be WRT failing stuck tasks.
17951796
"""
17961797
self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method")
@@ -1803,65 +1804,73 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
18031804
)
18041805
).all()
18051806

1806-
num_allowed_retries = conf.getint("core", "num_stuck_reschedules")
1807+
num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries")
18071808
for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
1808-
try:
1809-
cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
1810-
for ti in stuck_tis:
1811-
if repr(ti) in cleaned_up_task_instances:
1812-
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
1813-
if num_times_stuck < num_allowed_retries:
1814-
self.log.warning(
1815-
"Task %s was stuck in queued and will be requeued, once it has hit %s attempts"
1816-
" the task will be marked as failed. After that, if the task instance has "
1817-
"available retries, it will be retried.", ti.key, num_allowed_retries
1818-
)
1819-
session.add(
1820-
Log(
1821-
event=RESCHEDULE_STUCK_IN_QUEUED_EVENT,
1822-
task_instance=ti.key,
1823-
extra=(
1824-
f"Task was stuck in queued and will be requeued, once it has hit {num_allowed_retries} attempts"
1825-
"Task will be marked as failed. After that, if the task instance has "
1826-
"available retries, it will be retried."
1827-
),
1828-
)
1829-
)
1809+
if not hasattr(executor, "cleanup_stuck_queued_tasks"):
1810+
continue
18301811

1831-
executor.change_state(ti.key, State.SCHEDULED)
1832-
session.execute(
1833-
update(TI)
1834-
.where(TI.filter_for_tis([ti]))
1835-
.values(
1836-
# TODO[ha]: should we use func.now()? How does that work with DB timezone
1837-
# on mysql when it's not UTC?
1838-
state=TaskInstanceState.SCHEDULED,
1839-
queued_dttm=None,
1840-
# queued_by_job_id=None,
1841-
)
1842-
.execution_options(synchronize_session=False)
1843-
)
1844-
else:
1845-
self.log.warning(
1846-
"Marking task instance %s stuck in queued as failed. "
1847-
"If the task instance has available retries, it will be retried.",
1848-
ti,
1849-
)
1850-
session.add(
1851-
Log(
1852-
event="failing stuck in queued",
1853-
task_instance=ti.key,
1854-
extra=(
1855-
"Task will be marked as failed. If the task instance has "
1856-
"available retries, it will be retried."
1857-
),
1858-
)
1859-
)
1860-
executor.fail(ti.key)
1812+
for ti in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
1813+
if not isinstance(ti, TaskInstance):
1814+
# this is for backcompat. the pre-2.10.4 version of the interface
1815+
# expected a string return val.
1816+
self.log.warning(
1817+
"Marking task instance %s stuck in queued as failed. "
1818+
"If the task instance has available retries, it will be retried.",
1819+
ti,
1820+
)
1821+
continue
18611822

1823+
session.add(
1824+
Log(
1825+
event=STUCK_IN_QUEUED_EVENT,
1826+
task_instance=ti.key,
1827+
extra=(
1828+
"Task was in queued state for longer "
1829+
f"than {self._task_queued_timeout} seconds."
1830+
),
1831+
)
1832+
)
1833+
self.log.warning("Task stuck in queued and may be requeued task_id=%s", ti.key)
1834+
1835+
num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
1836+
if num_times_stuck < num_allowed_retries:
1837+
session.add(
1838+
Log(
1839+
event=STUCK_IN_QUEUED_EVENT,
1840+
task_instance=ti.key,
1841+
extra=(
1842+
f"Task was stuck in queued and will be requeued, once it has hit {num_allowed_retries} attempts"
1843+
"Task will be marked as failed. After that, if the task instance has "
1844+
"available retries, it will be retried."
1845+
),
1846+
)
1847+
)
18621848

1863-
except NotImplementedError:
1864-
self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.")
1849+
executor.change_state(ti.key, State.SCHEDULED)
1850+
session.execute(
1851+
update(TI)
1852+
.where(TI.filter_for_tis([ti]))
1853+
.values(
1854+
state=TaskInstanceState.SCHEDULED,
1855+
queued_dttm=None,
1856+
)
1857+
.execution_options(synchronize_session=False)
1858+
)
1859+
else:
1860+
self.log.warning(
1861+
"Task requeue attempts exceeded max; marking failed. task_instance=%s", ti
1862+
)
1863+
session.add(
1864+
Log(
1865+
event="stuck in queued tries exceeded",
1866+
task_instance=ti.key,
1867+
extra=(
1868+
f"Task was requeued more than {num_allowed_retries} times "
1869+
"and will be failed."
1870+
),
1871+
)
1872+
)
1873+
executor.fail(ti.key)
18651874

18661875
@provide_session
18671876
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
@@ -1871,14 +1880,16 @@ def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NE
18711880
We can then use this information to determine whether to reschedule a task or fail it.
18721881
"""
18731882
return (
1874-
session.query(Log).where(
1883+
session.query(Log)
1884+
.where(
18751885
Log.task_id == ti.task_id,
18761886
Log.dag_id == ti.dag_id,
18771887
Log.run_id == ti.run_id,
18781888
Log.map_index == ti.map_index,
18791889
Log.try_number == ti.try_number,
1880-
Log.event == RESCHEDULE_STUCK_IN_QUEUED_EVENT,
1881-
).count()
1890+
Log.event == STUCK_IN_QUEUED_EVENT,
1891+
)
1892+
.count()
18821893
)
18831894

18841895
@provide_session

providers/src/airflow/providers/celery/executors/celery_executor.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from collections import Counter
3333
from concurrent.futures import ProcessPoolExecutor
3434
from multiprocessing import cpu_count
35-
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple
35+
from typing import TYPE_CHECKING, Any, Generator, Optional, Sequence, Tuple
3636

3737
from celery import states as celery_states
3838
from packaging.version import Version
@@ -433,7 +433,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
433433

434434
return not_adopted_tis
435435

436-
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
436+
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> Generator[TaskInstance, None, None]:
437437
"""
438438
Handle remnants of tasks that were failed because they were stuck in queued.
439439
@@ -442,13 +442,11 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
442442
if it doesn't.
443443
444444
:param tis: List of Task Instances to clean up
445-
:return: List of readable task instances for a warning message
446445
"""
447-
readable_tis = []
448446
from airflow.providers.celery.executors.celery_executor_utils import app
449447

450448
for ti in tis:
451-
readable_tis.append(repr(ti))
449+
yield ti
452450
task_instance_key = ti.key
453451
if Version(airflow_version) < Version("2.10.4"):
454452
self.fail(task_instance_key)
@@ -458,7 +456,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
458456
app.control.revoke(celery_async_result.task_id)
459457
except Exception as ex:
460458
self.log.error("Error revoking task instance %s from celery: %s", task_instance_key, ex)
461-
return readable_tis
462459

463460
@staticmethod
464461
def get_cli_commands() -> list[GroupCommand]:

providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from __future__ import annotations
1919

2020
from functools import cached_property
21-
from typing import TYPE_CHECKING, Sequence
21+
from typing import TYPE_CHECKING, Generator, Sequence
2222

2323
from airflow.configuration import conf
2424
from airflow.executors.base_executor import BaseExecutor
@@ -246,13 +246,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
246246
*self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis),
247247
]
248248

249-
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
249+
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> Generator[TaskInstance, None, None]:
250250
celery_tis = [ti for ti in tis if ti.queue != self.kubernetes_queue]
251251
kubernetes_tis = [ti for ti in tis if ti.queue == self.kubernetes_queue]
252-
return [
253-
*self.celery_executor.cleanup_stuck_queued_tasks(celery_tis),
254-
*self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis),
255-
]
252+
yield from self.celery_executor.cleanup_stuck_queued_tasks(celery_tis)
253+
yield from self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis)
256254

257255
def end(self) -> None:
258256
"""End celery and kubernetes executor."""

providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from contextlib import suppress
3434
from datetime import datetime
3535
from queue import Empty, Queue
36-
from typing import TYPE_CHECKING, Any, Sequence
36+
from typing import TYPE_CHECKING, Any, Generator, Sequence
3737

3838
from kubernetes.dynamic import DynamicClient
3939
from packaging.version import Version
@@ -607,7 +607,7 @@ def _iter_tis_to_flush():
607607
tis_to_flush.extend(_iter_tis_to_flush())
608608
return tis_to_flush
609609

610-
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
610+
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> Generator[TaskInstance, None, None]:
611611
"""
612612
Handle remnants of tasks that were failed because they were stuck in queued.
613613
@@ -621,9 +621,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
621621
if TYPE_CHECKING:
622622
assert self.kube_client
623623
assert self.kube_scheduler
624-
readable_tis: list[str] = []
625-
if not tis:
626-
return readable_tis
627624
pod_combined_search_str_to_pod_map = self.get_pod_combined_search_str_to_pod_map()
628625
for ti in tis:
629626
# Build the pod selector
@@ -637,13 +634,17 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
637634
if not pod:
638635
self.log.warning("Cannot find pod for ti %s", ti)
639636
continue
637+
<<<<<<< HEAD
640638
readable_tis.append(repr(ti))
641639
if Version(airflow_version) >= Version("2.10.4"):
640+
=======
641+
if Version(airflow_version) < Version("2.10.4"):
642+
>>>>>>> d6d1caa641 (Simplify the handle stuck in queued interface)
642643
self.kube_scheduler.patch_pod_delete_stuck(
643644
pod_name=pod.metadata.name, namespace=pod.metadata.namespace
644645
)
646+
yield ti
645647
self.kube_scheduler.delete_pod(pod_name=pod.metadata.name, namespace=pod.metadata.namespace)
646-
return readable_tis
647648

648649
def adopt_launched_task(
649650
self,

providers/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20-
from typing import TYPE_CHECKING, Sequence
20+
from typing import TYPE_CHECKING, Generator, Sequence
2121

2222
from airflow.configuration import conf
2323
from airflow.executors.base_executor import BaseExecutor
@@ -230,11 +230,11 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task
230230
*self.kubernetes_executor.try_adopt_task_instances(kubernetes_tis),
231231
]
232232

233-
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
233+
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> Generator[TaskInstance, None, None]:
234234
# LocalExecutor doesn't have a cleanup_stuck_queued_tasks method, so we
235235
# will only run KubernetesExecutor's
236236
kubernetes_tis = [ti for ti in tis if ti.queue == self.KUBERNETES_QUEUE]
237-
return self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis)
237+
yield from self.kubernetes_executor.cleanup_stuck_queued_tasks(kubernetes_tis)
238238

239239
def end(self) -> None:
240240
"""End local and kubernetes executor."""

0 commit comments

Comments
 (0)