Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20aac73bfb0e08226c687304b997ed93d9cf796d3b761d14e45d732ef4f6c01e
db00d57fce32830b69f2c1481b231e65e67e197b4a96a5fa1c870cd555eac3bd
1,073 changes: 529 additions & 544 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``959e216a3abb`` (head) | ``0e9519b56710`` | ``3.0.0`` | Rename ``is_active`` to ``is_stale`` column in ``dag`` |
| ``29ce7909c52b`` (head) | ``959e216a3abb`` | ``3.0.0`` | Change TI table to have unique UUID id/pk per attempt. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``959e216a3abb`` | ``0e9519b56710`` | ``3.0.0`` | Rename ``is_active`` to ``is_stale`` column in ``dag`` |
| | | | table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``0e9519b56710`` | ``ec62e120484d`` | ``3.0.0`` | Rename run_type from 'dataset_triggered' to |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def ti_run(
session.query(
func.count(TaskReschedule.id) # or any other primary key column
)
.filter(TaskReschedule.ti_id == ti_id_str, TaskReschedule.try_number == ti.try_number)
.filter(TaskReschedule.ti_id == ti_id_str)
.scalar()
or 0
)
Expand Down Expand Up @@ -309,13 +309,9 @@ def ti_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
query = query.values(state=updated_state)
elif isinstance(ti_patch_payload, TIRetryStatePayload):
from airflow.models.taskinstance import uuid7
from airflow.models.taskinstancehistory import TaskInstanceHistory

ti = session.get(TI, ti_id_str)
TaskInstanceHistory.record_ti(ti, session=session)
ti.try_id = uuid7()
updated_state = ti_patch_payload.state
ti.prepare_db_for_next_try(session)
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
query = query.values(state=updated_state)
elif isinstance(ti_patch_payload, TISuccessStatePayload):
Expand Down Expand Up @@ -393,7 +389,6 @@ def ti_update_state(
session.add(
TaskReschedule(
task_instance.id,
task_instance.try_number,
actual_start_date,
ti_patch_payload.end_date,
ti_patch_payload.reschedule_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

from __future__ import annotations

from typing import Annotated
from uuid import UUID

from fastapi import Query, status
from fastapi import status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep
Expand All @@ -37,18 +36,12 @@


@router.get("/{task_instance_id}/start_date")
def get_start_date(
task_instance_id: UUID, session: SessionDep, try_number: Annotated[int, Query()] = 1
) -> UtcDateTime | None:
def get_start_date(task_instance_id: UUID, session: SessionDep) -> UtcDateTime | None:
"""Get the first reschedule date if found, None if no records exist."""
start_date = session.scalar(
select(TaskReschedule)
.where(
TaskReschedule.ti_id == str(task_instance_id),
TaskReschedule.try_number >= try_number,
)
select(TaskReschedule.start_date)
.where(TaskReschedule.ti_id == str(task_instance_id))
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
.limit(1)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change TI table to have unique UUID id/pk per attempt.

Revision ID: 29ce7909c52b
Revises: 959e216a3abb
Create Date: 2025-04-09 10:09:53.130924

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import UUIDType

# revision identifiers, used by Alembic.
revision = "29ce7909c52b"
down_revision = "959e216a3abb"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def _get_uuid_type(dialect_name: str) -> sa.types.TypeEngine:
if dialect_name == "sqlite":
return sa.String(36)
else:
return UUIDType(binary=False)


def upgrade():
"""Apply Change TI table to have unique UUID id/pk per attempt."""
conn = op.get_bind()
dialect_name = conn.dialect.name
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_try_id_uq", type_="unique")
batch_op.drop_column("try_id")

with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.create_index("idx_tih_dag_run", ["dag_id", "run_id"], unique=False)
batch_op.drop_column("task_instance_id")
batch_op.alter_column(
"try_id",
new_column_name="task_instance_id",
existing_type=_get_uuid_type(dialect_name),
existing_nullable=False,
)

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
batch_op.create_foreign_key(
"task_instance_note_ti_fkey",
"task_instance",
["ti_id"],
["id"],
onupdate="CASCADE",
ondelete="CASCADE",
)

# We decided to not migrate/correct the data for task_reschedule as we decided the time to do it wasn't
# worth it, as 90%+ of the data in the table is not needed -- this table only has use when a Sensor task,
# with reschedule mode, is in the `running` or `up_for_reschedule` states.
#
# Going forward, Airflow will delete rows from this table when the TI is recorded in the history table
# (i.e. when it's cleared or retired) so the only case in which this data will be accessed and give an
# incorrect figure is when all of these hold true
#
# - a Sensor in reschedule mode
# - in running or up_for_reschedule states
# - On a try_number > 1
# - with a timeout set
#
# If all of these are true, then the total runtime will be mistakenly calculated as the start of the first
# try to now, rather than the start of the current try. But this is such an unlikely set of circumstances
# that it's not worth the time cost of migrating it.
with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
batch_op.drop_column("try_number")


def downgrade():
"""Unapply Change TI table to have unique UUID id/pk per attempt."""
conn = op.get_bind()
dialect_name = conn.dialect.name
with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
batch_op.add_column(
sa.Column("try_number", sa.INTEGER(), autoincrement=False, nullable=False, default=1)
)

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
batch_op.drop_constraint("task_instance_note_ti_fkey", type_="foreignkey")
batch_op.create_foreign_key(
"task_instance_note_ti_fkey", "task_instance", ["ti_id"], ["id"], ondelete="CASCADE"
)

with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.alter_column(
"task_instance_id",
new_column_name="try_id",
existing_type=_get_uuid_type(dialect_name),
existing_nullable=False,
)
batch_op.drop_index("idx_tih_dag_run")
# This has to be in a separate batch, else on sqlite it throws `sqlalchemy.exc.CircularDependencyError`
# (and on non sqlite batching isn't "a thing", it issue alter tables fine)
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.add_column(
sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=False)
)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("try_id", _get_uuid_type(dialect_name), nullable=False))
batch_op.create_unique_constraint("task_instance_try_id_uq", ["try_id"])
29 changes: 10 additions & 19 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
not_,
or_,
text,
tuple_,
update,
)
from sqlalchemy.dialects import postgresql
Expand Down Expand Up @@ -1831,7 +1830,7 @@ def schedule_tis(
"""
# Get list of TI IDs that do not need to executed, these are
# tasks using EmptyOperator and without on_execute_callback / on_success_callback
dummy_ti_ids = []
empty_ti_ids = []
schedulable_ti_ids = []
for ti in schedulable_tis:
if TYPE_CHECKING:
Expand All @@ -1842,7 +1841,7 @@ def schedule_tis(
and not ti.task.on_success_callback
and not ti.task.outlets
):
dummy_ti_ids.append((ti.task_id, ti.map_index))
empty_ti_ids.append(ti.id)
# check "start_trigger_args" to see whether the operator supports start execution from triggerer
# if so, we'll then check "start_from_trigger" to see whether this feature is turned on and defer
# this task.
Expand All @@ -1857,24 +1856,20 @@ def schedule_tis(
ti.try_number += 1
ti.defer_task(exception=None, session=session)
else:
schedulable_ti_ids.append((ti.task_id, ti.map_index))
schedulable_ti_ids.append(ti.id)
else:
schedulable_ti_ids.append((ti.task_id, ti.map_index))
schedulable_ti_ids.append(ti.id)

count = 0

if schedulable_ti_ids:
schedulable_ti_ids_chunks = chunks(
schedulable_ti_ids, max_tis_per_query or len(schedulable_ti_ids)
)
for schedulable_ti_ids_chunk in schedulable_ti_ids_chunks:
for id_chunk in schedulable_ti_ids_chunks:
count += session.execute(
update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_(TI.task_id, TI.map_index).in_(schedulable_ti_ids_chunk),
)
.where(TI.id.in_(id_chunk))
.values(
state=TaskInstanceState.SCHEDULED,
scheduled_dttm=timezone.utcnow(),
Expand All @@ -1890,16 +1885,12 @@ def schedule_tis(
).rowcount

# Tasks using EmptyOperator should not be executed, mark them as success
if dummy_ti_ids:
dummy_ti_ids_chunks = chunks(dummy_ti_ids, max_tis_per_query or len(dummy_ti_ids))
for dummy_ti_ids_chunk in dummy_ti_ids_chunks:
if empty_ti_ids:
dummy_ti_ids_chunks = chunks(empty_ti_ids, max_tis_per_query or len(empty_ti_ids))
for id_chunk in dummy_ti_ids_chunks:
count += session.execute(
update(TI)
.where(
TI.dag_id == self.dag_id,
TI.run_id == self.run_id,
tuple_(TI.task_id, TI.map_index).in_(dummy_ti_ids_chunk),
)
.where(TI.id.in_(id_chunk))
.values(
state=TaskInstanceState.SUCCESS,
start_date=timezone.utcnow(),
Expand Down
Loading