Skip to content

Commit a362101

Browse files
Correctly treat requeues on reschedule sensors as resetting after each reschedule (#51410)
1 parent 448a184 commit a362101

2 files changed

Lines changed: 120 additions & 6 deletions

File tree

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from itertools import groupby
3333
from typing import TYPE_CHECKING, Any
3434

35-
from sqlalchemy import and_, delete, exists, func, or_, select, text, tuple_, update
35+
from sqlalchemy import and_, delete, desc, exists, func, or_, select, text, tuple_, update
3636
from sqlalchemy.exc import OperationalError
3737
from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, selectinload
3838
from sqlalchemy.sql import expression
@@ -2028,19 +2028,34 @@ def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NE
20282028
20292029
We can then use this information to determine whether to reschedule a task or fail it.
20302030
"""
2031-
return (
2032-
session.query(Log)
2031+
last_running_time = session.scalar(
2032+
select(Log.dttm)
20332033
.where(
2034-
Log.task_id == ti.task_id,
20352034
Log.dag_id == ti.dag_id,
2035+
Log.task_id == ti.task_id,
20362036
Log.run_id == ti.run_id,
20372037
Log.map_index == ti.map_index,
20382038
Log.try_number == ti.try_number,
2039-
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
2039+
Log.event == "running",
20402040
)
2041-
.count()
2041+
.order_by(desc(Log.dttm))
2042+
.limit(1)
2043+
)
2044+
2045+
query = session.query(Log).where(
2046+
Log.task_id == ti.task_id,
2047+
Log.dag_id == ti.dag_id,
2048+
Log.run_id == ti.run_id,
2049+
Log.map_index == ti.map_index,
2050+
Log.try_number == ti.try_number,
2051+
Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
20422052
)
20432053

2054+
if last_running_time:
2055+
query = query.where(Log.dttm > last_running_time)
2056+
2057+
return query.count()
2058+
20442059
previous_ti_running_metrics: dict[tuple[str, str, str], int] = {}
20452060

20462061
@provide_session

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2090,6 +2090,105 @@ def _queue_tasks(tis):
20902090
states = [x.state for x in dr.get_task_instances(session=session)]
20912091
assert states == ["failed", "failed"]
20922092

2093+
@conf_vars({("scheduler", "num_stuck_in_queued_retries"): "2"})
2094+
def test_handle_stuck_queued_tasks_reschedule_sensors(self, dag_maker, session, mock_executors):
2095+
"""Reschedule sensors go in and out of running repeatedly using the same try_number
2096+
Make sure that they get three attempts per reschedule, not 3 attempts per try_number"""
2097+
with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
2098+
EmptyOperator(task_id="op1")
2099+
EmptyOperator(task_id="op2", executor="default_exec")
2100+
2101+
def _queue_tasks(tis):
2102+
for ti in tis:
2103+
ti.state = "queued"
2104+
ti.queued_dttm = timezone.utcnow()
2105+
session.commit()
2106+
2107+
def _add_running_event(tis):
2108+
for ti in tis:
2109+
updated_entry = Log(
2110+
dttm=timezone.utcnow(),
2111+
dag_id=ti.dag_id,
2112+
task_id=ti.task_id,
2113+
map_index=ti.map_index,
2114+
event="running",
2115+
run_id=ti.run_id,
2116+
try_number=ti.try_number,
2117+
)
2118+
session.add(updated_entry)
2119+
2120+
run_id = str(uuid4())
2121+
dr = dag_maker.create_dagrun(run_id=run_id)
2122+
2123+
tis = dr.get_task_instances(session=session)
2124+
_queue_tasks(tis=tis)
2125+
scheduler_job = Job()
2126+
scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0)
2127+
# job_runner._reschedule_stuck_task = MagicMock()
2128+
scheduler._task_queued_timeout = -300 # always in violation of timeout
2129+
2130+
with _loader_mock(mock_executors):
2131+
scheduler._handle_tasks_stuck_in_queued()
2132+
# If the task gets stuck in queued once, we reset it to scheduled
2133+
tis = dr.get_task_instances(session=session)
2134+
assert [x.state for x in tis] == ["scheduled", "scheduled"]
2135+
assert [x.queued_dttm for x in tis] == [None, None]
2136+
2137+
_queue_tasks(tis=tis)
2138+
log_events = [
2139+
x.event for x in session.scalars(select(Log).where(Log.run_id == run_id).order_by(Log.id)).all()
2140+
]
2141+
assert log_events == [
2142+
"stuck in queued reschedule",
2143+
"stuck in queued reschedule",
2144+
]
2145+
2146+
with _loader_mock(mock_executors):
2147+
scheduler._handle_tasks_stuck_in_queued()
2148+
2149+
log_events = [
2150+
x.event for x in session.scalars(select(Log).where(Log.run_id == run_id).order_by(Log.id)).all()
2151+
]
2152+
assert log_events == [
2153+
"stuck in queued reschedule",
2154+
"stuck in queued reschedule",
2155+
"stuck in queued reschedule",
2156+
"stuck in queued reschedule",
2157+
]
2158+
mock_executors[0].fail.assert_not_called()
2159+
tis = dr.get_task_instances(session=session)
2160+
assert [x.state for x in tis] == ["scheduled", "scheduled"]
2161+
2162+
_add_running_event(tis) # This should "reset" the count of stuck queued
2163+
2164+
for _ in range(3): # Should be able to be stuck 3 more times before failing
2165+
_queue_tasks(tis=tis)
2166+
with _loader_mock(mock_executors):
2167+
scheduler._handle_tasks_stuck_in_queued()
2168+
tis = dr.get_task_instances(session=session)
2169+
2170+
log_events = [
2171+
x.event for x in session.scalars(select(Log).where(Log.run_id == run_id).order_by(Log.id)).all()
2172+
]
2173+
assert log_events == [
2174+
"stuck in queued reschedule",
2175+
"stuck in queued reschedule",
2176+
"stuck in queued reschedule",
2177+
"stuck in queued reschedule",
2178+
"running",
2179+
"running",
2180+
"stuck in queued reschedule",
2181+
"stuck in queued reschedule",
2182+
"stuck in queued reschedule",
2183+
"stuck in queued reschedule",
2184+
"stuck in queued tries exceeded",
2185+
"stuck in queued tries exceeded",
2186+
]
2187+
2188+
mock_executors[0].fail.assert_not_called() # just demoing that we don't fail with executor method
2189+
states = [x.state for x in dr.get_task_instances(session=session)]
2190+
assert states == ["failed", "failed"]
2191+
20932192
def test_revoke_task_not_imp_tolerated(self, dag_maker, session, caplog):
20942193
"""Test that if executor no implement revoke_task then we don't blow up."""
20952194
with dag_maker("test_fail_stuck_queued_tasks"):

0 commit comments

Comments
 (0)