Skip to content

Commit 61e5f40

Browse files
guan404mingjason810496
authored andcommitted
Add producing/consuming task dependencies to AssetGraph (apache#58059)
* Add data lineage graph to AssetGraph * Update airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com> * Refactor to use deque * Apply review comments * Add task icon * Extract get_scheduling_dependencies --------- Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com>
1 parent 8468c2d commit 61e5f40

21 files changed

Lines changed: 459 additions & 82 deletions

File tree

airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,16 @@ paths:
388388
- type: string
389389
- type: 'null'
390390
title: Node Id
391+
- name: dependency_type
392+
in: query
393+
required: false
394+
schema:
395+
enum:
396+
- scheduling
397+
- data
398+
type: string
399+
default: scheduling
400+
title: Dependency Type
391401
responses:
392402
'200':
393403
description: Successful Response

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dependencies.py

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
from __future__ import annotations
1919

20+
from typing import Literal
21+
2022
from fastapi import Depends, status
2123
from fastapi.exceptions import HTTPException
2224

@@ -26,8 +28,11 @@
2628
from airflow.api_fastapi.core_api.datamodels.ui.common import BaseGraphResponse
2729
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
2830
from airflow.api_fastapi.core_api.security import requires_access_dag
29-
from airflow.api_fastapi.core_api.services.ui.dependencies import extract_single_connected_component
30-
from airflow.models.serialized_dag import SerializedDagModel
31+
from airflow.api_fastapi.core_api.services.ui.dependencies import (
32+
extract_single_connected_component,
33+
get_data_dependencies,
34+
get_scheduling_dependencies,
35+
)
3136

3237
dependencies_router = AirflowRouter(tags=["Dependencies"])
3338

@@ -41,44 +46,27 @@
4146
),
4247
dependencies=[Depends(requires_access_dag("GET", DagAccessEntity.DEPENDENCIES))],
4348
)
44-
def get_dependencies(session: SessionDep, node_id: str | None = None) -> BaseGraphResponse:
49+
def get_dependencies(
50+
session: SessionDep,
51+
node_id: str | None = None,
52+
dependency_type: Literal["scheduling", "data"] = "scheduling",
53+
) -> BaseGraphResponse:
4554
"""Dependencies graph."""
46-
nodes_dict: dict[str, dict] = {}
47-
edge_tuples: set[tuple[str, str]] = set()
48-
49-
for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
50-
dag_node_id = f"dag:{dag}"
51-
if dag_node_id not in nodes_dict:
52-
for dep in dependencies:
53-
# Add nodes
54-
nodes_dict[dag_node_id] = {"id": dag_node_id, "label": dag, "type": "dag"}
55-
if dep.node_id not in nodes_dict:
56-
nodes_dict[dep.node_id] = {
57-
"id": dep.node_id,
58-
"label": dep.label,
59-
"type": dep.dependency_type,
60-
}
55+
if dependency_type == "data":
56+
if node_id is None or not node_id.startswith("asset:"):
57+
raise HTTPException(
58+
status.HTTP_400_BAD_REQUEST, "Data dependencies require an asset node_id (e.g., 'asset:123')"
59+
)
6160

62-
# Add edges
63-
# not start dep
64-
if dep.source != dep.dependency_type:
65-
source = dep.source if ":" in dep.source else f"dag:{dep.source}"
66-
target = dep.node_id
67-
edge_tuples.add((source, target))
68-
69-
# not end dep
70-
if dep.target != dep.dependency_type:
71-
source = dep.node_id
72-
target = dep.target if ":" in dep.target else f"dag:{dep.target}"
73-
edge_tuples.add((source, target))
61+
try:
62+
asset_id = int(node_id.replace("asset:", ""))
63+
except ValueError:
64+
raise HTTPException(status.HTTP_400_BAD_REQUEST, f"Invalid asset node_id: {node_id}")
7465

75-
nodes = list(nodes_dict.values())
76-
edges = [{"source_id": source, "target_id": target} for source, target in sorted(edge_tuples)]
66+
data = get_data_dependencies(asset_id, session)
67+
return BaseGraphResponse(**data)
7768

78-
data = {
79-
"nodes": nodes,
80-
"edges": edges,
81-
}
69+
data = get_scheduling_dependencies()
8270

8371
if node_id is not None:
8472
try:

airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717

1818
from __future__ import annotations
1919

20-
from collections import defaultdict
20+
from collections import defaultdict, deque
21+
from typing import TYPE_CHECKING
22+
23+
from airflow.models.asset import AssetModel
24+
25+
if TYPE_CHECKING:
26+
from sqlalchemy.orm import Session
2127

2228

2329
def _dfs_connected_components(
@@ -76,3 +82,147 @@ def extract_single_connected_component(
7682
]
7783

7884
return {"nodes": nodes, "edges": edges}
85+
86+
87+
def get_scheduling_dependencies() -> dict[str, list[dict]]:
88+
"""Get scheduling dependencies between DAGs."""
89+
from airflow.models.serialized_dag import SerializedDagModel
90+
91+
nodes_dict: dict[str, dict] = {}
92+
edge_tuples: set[tuple[str, str]] = set()
93+
94+
for dag, dependencies in sorted(SerializedDagModel.get_dag_dependencies().items()):
95+
dag_node_id = f"dag:{dag}"
96+
if dag_node_id not in nodes_dict:
97+
for dep in dependencies:
98+
# Add nodes
99+
nodes_dict[dag_node_id] = {"id": dag_node_id, "label": dag, "type": "dag"}
100+
if dep.node_id not in nodes_dict:
101+
nodes_dict[dep.node_id] = {
102+
"id": dep.node_id,
103+
"label": dep.label,
104+
"type": dep.dependency_type,
105+
}
106+
107+
# Add edges
108+
# not start dep
109+
if dep.source != dep.dependency_type:
110+
source = dep.source if ":" in dep.source else f"dag:{dep.source}"
111+
target = dep.node_id
112+
edge_tuples.add((source, target))
113+
114+
# not end dep
115+
if dep.target != dep.dependency_type:
116+
source = dep.node_id
117+
target = dep.target if ":" in dep.target else f"dag:{dep.target}"
118+
edge_tuples.add((source, target))
119+
120+
return {
121+
"nodes": list(nodes_dict.values()),
122+
"edges": [{"source_id": source, "target_id": target} for source, target in sorted(edge_tuples)],
123+
}
124+
125+
126+
def get_data_dependencies(asset_id: int, session: Session) -> dict[str, list[dict]]:
127+
"""Get full task dependencies for an asset."""
128+
from sqlalchemy import select
129+
from sqlalchemy.orm import selectinload
130+
131+
from airflow.models.asset import TaskInletAssetReference, TaskOutletAssetReference
132+
133+
SEPARATOR = "__SEPARATOR__"
134+
135+
nodes_dict: dict[str, dict] = {}
136+
edge_set: set[tuple[str, str]] = set()
137+
138+
# BFS to trace full dependencies
139+
assets_to_process: deque[int] = deque([asset_id])
140+
processed_assets: set[int] = set()
141+
processed_tasks: set[tuple[str, str]] = set() # (dag_id, task_id)
142+
143+
while assets_to_process:
144+
current_asset_id = assets_to_process.popleft()
145+
if current_asset_id in processed_assets:
146+
continue
147+
processed_assets.add(current_asset_id)
148+
149+
# Eagerload producing_tasks and consuming_tasks to avoid lazy queries
150+
asset = session.scalar(
151+
select(AssetModel)
152+
.where(AssetModel.id == current_asset_id)
153+
.options(
154+
selectinload(AssetModel.producing_tasks),
155+
selectinload(AssetModel.consuming_tasks),
156+
)
157+
)
158+
if not asset:
159+
continue
160+
161+
asset_node_id = f"asset:{current_asset_id}"
162+
163+
# Add asset node
164+
if asset_node_id not in nodes_dict:
165+
nodes_dict[asset_node_id] = {"id": asset_node_id, "label": asset.name, "type": "asset"}
166+
167+
# Process producing tasks (tasks that output this asset)
168+
for ref in asset.producing_tasks:
169+
task_key = (ref.dag_id, ref.task_id)
170+
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
171+
172+
# Add task node with dag_id.task_id label for disambiguation
173+
if task_node_id not in nodes_dict:
174+
nodes_dict[task_node_id] = {
175+
"id": task_node_id,
176+
"label": f"{ref.dag_id}.{ref.task_id}",
177+
"type": "task",
178+
}
179+
180+
# Add edge: task → asset
181+
edge_set.add((task_node_id, asset_node_id))
182+
183+
# Find other assets this task consumes (inlets) to trace upstream
184+
if task_key not in processed_tasks:
185+
processed_tasks.add(task_key)
186+
inlet_refs = session.scalars(
187+
select(TaskInletAssetReference).where(
188+
TaskInletAssetReference.dag_id == ref.dag_id,
189+
TaskInletAssetReference.task_id == ref.task_id,
190+
)
191+
).all()
192+
for inlet_ref in inlet_refs:
193+
if inlet_ref.asset_id not in processed_assets:
194+
assets_to_process.append(inlet_ref.asset_id)
195+
196+
# Process consuming tasks (tasks that input this asset)
197+
for ref in asset.consuming_tasks:
198+
task_key = (ref.dag_id, ref.task_id)
199+
task_node_id = f"task:{ref.dag_id}{SEPARATOR}{ref.task_id}"
200+
201+
# Add task node with dag_id.task_id label for disambiguation
202+
if task_node_id not in nodes_dict:
203+
nodes_dict[task_node_id] = {
204+
"id": task_node_id,
205+
"label": f"{ref.dag_id}.{ref.task_id}",
206+
"type": "task",
207+
}
208+
209+
# Add edge: asset → task
210+
edge_set.add((asset_node_id, task_node_id))
211+
212+
# Find other assets this task produces (outlets) to trace downstream
213+
if task_key not in processed_tasks:
214+
processed_tasks.add(task_key)
215+
outlet_refs = session.scalars(
216+
select(TaskOutletAssetReference).where(
217+
TaskOutletAssetReference.dag_id == ref.dag_id,
218+
TaskOutletAssetReference.task_id == ref.task_id,
219+
)
220+
).all()
221+
for outlet_ref in outlet_refs:
222+
if outlet_ref.asset_id not in processed_assets:
223+
assets_to_process.append(outlet_ref.asset_id)
224+
225+
return {
226+
"nodes": list(nodes_dict.values()),
227+
"edges": [{"source_id": source, "target_id": target} for source, target in edge_set],
228+
}

airflow-core/src/airflow/ui/openapi-gen/queries/common.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -791,9 +791,10 @@ export const UseAuthLinksServiceGetCurrentUserInfoKeyFn = (queryKey?: Array<unkn
791791
export type DependenciesServiceGetDependenciesDefaultResponse = Awaited<ReturnType<typeof DependenciesService.getDependencies>>;
792792
export type DependenciesServiceGetDependenciesQueryResult<TData = DependenciesServiceGetDependenciesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
793793
export const useDependenciesServiceGetDependenciesKey = "DependenciesServiceGetDependencies";
794-
export const UseDependenciesServiceGetDependenciesKeyFn = ({ nodeId }: {
794+
export const UseDependenciesServiceGetDependenciesKeyFn = ({ dependencyType, nodeId }: {
795+
dependencyType?: "scheduling" | "data";
795796
nodeId?: string;
796-
} = {}, queryKey?: Array<unknown>) => [useDependenciesServiceGetDependenciesKey, ...(queryKey ?? [{ nodeId }])];
797+
} = {}, queryKey?: Array<unknown>) => [useDependenciesServiceGetDependenciesKey, ...(queryKey ?? [{ dependencyType, nodeId }])];
797798
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<ReturnType<typeof DashboardService.historicalMetrics>>;
798799
export type DashboardServiceHistoricalMetricsQueryResult<TData = DashboardServiceHistoricalMetricsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
799800
export const useDashboardServiceHistoricalMetricsKey = "DashboardServiceHistoricalMetrics";

airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,12 +1501,14 @@ export const ensureUseAuthLinksServiceGetCurrentUserInfoData = (queryClient: Que
15011501
* Dependencies graph.
15021502
* @param data The data for the request.
15031503
* @param data.nodeId
1504+
* @param data.dependencyType
15041505
* @returns BaseGraphResponse Successful Response
15051506
* @throws ApiError
15061507
*/
1507-
export const ensureUseDependenciesServiceGetDependenciesData = (queryClient: QueryClient, { nodeId }: {
1508+
export const ensureUseDependenciesServiceGetDependenciesData = (queryClient: QueryClient, { dependencyType, nodeId }: {
1509+
dependencyType?: "scheduling" | "data";
15081510
nodeId?: string;
1509-
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }), queryFn: () => DependenciesService.getDependencies({ nodeId }) });
1511+
} = {}) => queryClient.ensureQueryData({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) });
15101512
/**
15111513
* Historical Metrics
15121514
* Return cluster activity historical metrics.

airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,12 +1501,14 @@ export const prefetchUseAuthLinksServiceGetCurrentUserInfo = (queryClient: Query
15011501
* Dependencies graph.
15021502
* @param data The data for the request.
15031503
* @param data.nodeId
1504+
* @param data.dependencyType
15041505
* @returns BaseGraphResponse Successful Response
15051506
* @throws ApiError
15061507
*/
1507-
export const prefetchUseDependenciesServiceGetDependencies = (queryClient: QueryClient, { nodeId }: {
1508+
export const prefetchUseDependenciesServiceGetDependencies = (queryClient: QueryClient, { dependencyType, nodeId }: {
1509+
dependencyType?: "scheduling" | "data";
15081510
nodeId?: string;
1509-
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }), queryFn: () => DependenciesService.getDependencies({ nodeId }) });
1511+
} = {}) => queryClient.prefetchQuery({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) });
15101512
/**
15111513
* Historical Metrics
15121514
* Return cluster activity historical metrics.

airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,12 +1501,14 @@ export const useAuthLinksServiceGetCurrentUserInfo = <TData = Common.AuthLinksSe
15011501
* Dependencies graph.
15021502
* @param data The data for the request.
15031503
* @param data.nodeId
1504+
* @param data.dependencyType
15041505
* @returns BaseGraphResponse Successful Response
15051506
* @throws ApiError
15061507
*/
1507-
export const useDependenciesServiceGetDependencies = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ nodeId }: {
1508+
export const useDependenciesServiceGetDependencies = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dependencyType, nodeId }: {
1509+
dependencyType?: "scheduling" | "data";
15081510
nodeId?: string;
1509-
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ nodeId }) as TData, ...options });
1511+
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) as TData, ...options });
15101512
/**
15111513
* Historical Metrics
15121514
* Return cluster activity historical metrics.

airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,12 +1501,14 @@ export const useAuthLinksServiceGetCurrentUserInfoSuspense = <TData = Common.Aut
15011501
* Dependencies graph.
15021502
* @param data The data for the request.
15031503
* @param data.nodeId
1504+
* @param data.dependencyType
15041505
* @returns BaseGraphResponse Successful Response
15051506
* @throws ApiError
15061507
*/
1507-
export const useDependenciesServiceGetDependenciesSuspense = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ nodeId }: {
1508+
export const useDependenciesServiceGetDependenciesSuspense = <TData = Common.DependenciesServiceGetDependenciesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dependencyType, nodeId }: {
1509+
dependencyType?: "scheduling" | "data";
15081510
nodeId?: string;
1509-
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ nodeId }) as TData, ...options });
1511+
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDependenciesServiceGetDependenciesKeyFn({ dependencyType, nodeId }, queryKey), queryFn: () => DependenciesService.getDependencies({ dependencyType, nodeId }) as TData, ...options });
15101512
/**
15111513
* Historical Metrics
15121514
* Return cluster activity historical metrics.

airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3852,6 +3852,7 @@ export class DependenciesService {
38523852
* Dependencies graph.
38533853
* @param data The data for the request.
38543854
* @param data.nodeId
3855+
* @param data.dependencyType
38553856
* @returns BaseGraphResponse Successful Response
38563857
* @throws ApiError
38573858
*/
@@ -3860,7 +3861,8 @@ export class DependenciesService {
38603861
method: 'GET',
38613862
url: '/ui/dependencies',
38623863
query: {
3863-
node_id: data.nodeId
3864+
node_id: data.nodeId,
3865+
dependency_type: data.dependencyType
38643866
},
38653867
errors: {
38663868
404: 'Not Found',

airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3425,6 +3425,7 @@ export type GetAuthMenusResponse = MenuItemCollectionResponse;
34253425
export type GetCurrentUserInfoResponse = AuthenticatedMeResponse;
34263426

34273427
export type GetDependenciesData = {
3428+
dependencyType?: 'scheduling' | 'data';
34283429
nodeId?: string | null;
34293430
};
34303431

0 commit comments

Comments
 (0)