Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ def set_task_instance_state(
# Clear downstream tasks that are in failed/upstream_failed state to resume them.
# Flush the session so that the tasks marked success are reflected in the db.
session.flush()
subdag = self.partial_subset(
subset = self.partial_subset(
task_ids={task_id},
include_downstream=True,
include_upstream=False,
Expand All @@ -1273,9 +1273,9 @@ def set_task_instance_state(
}
if not future and not past: # Simple case 1: we're only dealing with exactly one run.
clear_kwargs["run_id"] = run_id
subdag.clear(**clear_kwargs)
subset.clear(**clear_kwargs)
elif future and past: # Simple case 2: we're clearing ALL runs.
subdag.clear(**clear_kwargs)
subset.clear(**clear_kwargs)
else: # Complex cases: we may have more than one run, based on a date range.
# Make 'future' and 'past' make some sense when multiple runs exist
# for the same logical date. We order runs by their id and only
Expand All @@ -1287,7 +1287,7 @@ def set_task_instance_state(
else:
clear_kwargs["end_date"] = logical_date
exclude_run_id_stmt = exclude_run_id_stmt.where(DagRun.id < dr_id)
subdag.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs)
subset.clear(exclude_run_ids=frozenset(session.scalars(exclude_run_id_stmt)), **clear_kwargs)
return altered

@provide_session
Expand Down Expand Up @@ -1363,13 +1363,13 @@ def get_logical_date() -> datetime:
# Clear downstream tasks that are in failed/upstream_failed state to resume them.
# Flush the session so that the tasks marked success are reflected in the db.
session.flush()
task_subset = self.partial_subset(
subset = self.partial_subset(
task_ids=task_ids,
include_downstream=True,
include_upstream=False,
)

task_subset.clear(
subset.clear(
start_date=start_date,
end_date=end_date,
only_failed=True,
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ def bag_dag(self, dag: DAG):
"""
Add the DAG into the bag.

:raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
:raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
:raises: AirflowDagCycleException if a cycle is detected.
:raises: AirflowDagDuplicatedIdException if this dag already exists in the bag.
"""
check_cycle(dag) # throws if a task cycle is found

Expand Down
12 changes: 1 addition & 11 deletions airflow-core/src/airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,7 @@ class ResourceDetails(TypedDict):


def resource_name(root_dag_id: str, resource: str) -> str:
"""
Return the resource name for a DAG id.

Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
"""
"""Return the resource name for a DAG id."""
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
return root_dag_id
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
Expand All @@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
"""
Return the resource name for a DAG id.

Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.

Note: This function is kept for backwards compatibility.
"""
if root_dag_id == RESOURCE_DAG:
Expand Down
14 changes: 7 additions & 7 deletions airflow-core/tests/unit/utils/test_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def test_sub_dag_task_group():
group234 >> group6
group234 >> task7

subdag = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False)
subset = dag.partial_subset(task_ids="task5", include_upstream=True, include_downstream=False)

expected_node_id = {
"id": None,
Expand All @@ -467,9 +467,9 @@ def test_sub_dag_task_group():
],
}

assert extract_node_id(task_group_to_dict(subdag.task_group)) == expected_node_id
assert extract_node_id(task_group_to_dict(subset.task_group)) == expected_node_id

edges = dag_edges(subdag)
edges = dag_edges(subset)
assert sorted((e["source_id"], e["target_id"]) for e in edges) == [
("group234.group34.downstream_join_id", "task5"),
("group234.group34.task3", "group234.group34.downstream_join_id"),
Expand All @@ -479,19 +479,19 @@ def test_sub_dag_task_group():
("task1", "group234.upstream_join_id"),
]

subdag_task_groups = subdag.task_group.get_task_group_dict()
assert subdag_task_groups.keys() == {None, "group234", "group234.group34"}
groups = subset.task_group.get_task_group_dict()
assert groups.keys() == {None, "group234", "group234.group34"}

included_group_ids = {"group234", "group234.group34"}
included_task_ids = {"group234.group34.task3", "group234.group34.task4", "task1", "task5"}

for task_group in subdag_task_groups.values():
for task_group in groups.values():
assert task_group.upstream_group_ids.issubset(included_group_ids)
assert task_group.downstream_group_ids.issubset(included_group_ids)
assert task_group.upstream_task_ids.issubset(included_task_ids)
assert task_group.downstream_task_ids.issubset(included_task_ids)

for task in subdag.task_group:
for task in subset.task_group:
assert task.upstream_task_ids.issubset(included_task_ids)
assert task.downstream_task_ids.issubset(included_task_ids)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,7 @@ class ResourceDetails(TypedDict):


def resource_name(root_dag_id: str, resource: str) -> str:
"""
Return the resource name for a DAG id.

Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
"""
"""Return the resource name for a DAG id."""
if root_dag_id in RESOURCE_DETAILS_MAP.keys():
return root_dag_id
if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
Expand All @@ -113,10 +107,6 @@ def resource_name_for_dag(root_dag_id: str) -> str:
"""
Return the resource name for a DAG id.

Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.

Note: This function is kept for backwards compatibility.
"""
if root_dag_id == RESOURCE_DAG:
Expand Down
14 changes: 7 additions & 7 deletions task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ def _deepcopy_task(t) -> Operator:
}

def filter_task_group(group, parent_group):
"""Exclude tasks not included in the subdag from the given TaskGroup."""
"""Exclude tasks not included in the partial dag from the given TaskGroup."""
# We want to deepcopy _most but not all_ attributes of the task group, so we create a shallow copy
# and then manually deep copy the instances. (memo argument to deepcopy only works for instances
# of classes, not "native" properties of an instance)
Expand Down Expand Up @@ -867,12 +867,12 @@ def filter_task_group(group, parent_group):

# Removing upstream/downstream references to tasks and TaskGroups that did not make
# the cut.
subdag_task_groups = dag.task_group.get_task_group_dict()
for group in subdag_task_groups.values():
group.upstream_group_ids.intersection_update(subdag_task_groups)
group.downstream_group_ids.intersection_update(subdag_task_groups)
group.upstream_task_ids.intersection_update(dag.task_dict)
group.downstream_task_ids.intersection_update(dag.task_dict)
groups = dag.task_group.get_task_group_dict()
for g in groups.values():
g.upstream_group_ids.intersection_update(groups)
g.downstream_group_ids.intersection_update(groups)
g.upstream_task_ids.intersection_update(dag.task_dict)
g.downstream_task_ids.intersection_update(dag.task_dict)

for t in dag.tasks:
# Removing upstream/downstream references to tasks that did not
Expand Down