Skip to content

Commit a1fbdb3

Browse files
authored
AIP-84 Migrate Trigger Dag Run endpoint to FastAPI (#43875)
* init * wip * remove logical_date * fix trigger dag_run * tests WIP * working tests * remove logical_date from post body * remove logical_date from tests * fix * include return type * fix conf * feedback * fix tests * Update tests/api_fastapi/core_api/routes/public/test_dag_run.py * feedback
1 parent 9ee501d commit a1fbdb3

10 files changed

Lines changed: 704 additions & 11 deletions

File tree

airflow/api_connexion/endpoints/dag_run_endpoint.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
305305
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, total_entries=total_entries))
306306

307307

308+
@mark_fastapi_migration_done
308309
@security.requires_access_dag("POST", DagAccessEntity.RUN)
309310
@action_logging
310311
@provide_session

airflow/api_fastapi/core_api/datamodels/dag_run.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
from datetime import datetime
2121
from enum import Enum
2222

23-
from pydantic import AwareDatetime, Field, NonNegativeInt
23+
from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field, model_validator
2424

2525
from airflow.api_fastapi.core_api.base import BaseModel
26+
from airflow.models import DagRun
27+
from airflow.utils import timezone
2628
from airflow.utils.state import DagRunState
2729
from airflow.utils.types import DagRunTriggeredByType, DagRunType
2830

@@ -75,6 +77,37 @@ class DAGRunCollectionResponse(BaseModel):
7577
total_entries: int
7678

7779

80+
class TriggerDAGRunPostBody(BaseModel):
81+
"""Trigger DAG Run Serializer for POST body."""
82+
83+
dag_run_id: str | None = None
84+
data_interval_start: AwareDatetime | None = None
85+
data_interval_end: AwareDatetime | None = None
86+
87+
conf: dict = Field(default_factory=dict)
88+
note: str | None = None
89+
90+
@model_validator(mode="after")
91+
def check_data_intervals(cls, values):
92+
if (values.data_interval_start is None) != (values.data_interval_end is None):
93+
raise ValueError(
94+
"Either both data_interval_start and data_interval_end must be provided or both must be None"
95+
)
96+
return values
97+
98+
@model_validator(mode="after")
99+
def validate_dag_run_id(self):
100+
if not self.dag_run_id:
101+
self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date)
102+
return self
103+
104+
# Mypy issue https://github.com/python/mypy/issues/1362
105+
@computed_field # type: ignore[misc]
106+
@property
107+
def logical_date(self) -> datetime:
108+
return timezone.utcnow()
109+
110+
78111
class DAGRunsBatchBody(BaseModel):
79112
"""List DAG Runs body for batch endpoint."""
80113

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,6 +1828,67 @@ paths:
18281828
application/json:
18291829
schema:
18301830
$ref: '#/components/schemas/HTTPValidationError'
1831+
post:
1832+
tags:
1833+
- DagRun
1834+
summary: Trigger Dag Run
1835+
description: Trigger a DAG.
1836+
operationId: trigger_dag_run
1837+
parameters:
1838+
- name: dag_id
1839+
in: path
1840+
required: true
1841+
schema:
1842+
title: Dag Id
1843+
requestBody:
1844+
required: true
1845+
content:
1846+
application/json:
1847+
schema:
1848+
$ref: '#/components/schemas/TriggerDAGRunPostBody'
1849+
responses:
1850+
'200':
1851+
description: Successful Response
1852+
content:
1853+
application/json:
1854+
schema:
1855+
$ref: '#/components/schemas/DAGRunResponse'
1856+
'401':
1857+
content:
1858+
application/json:
1859+
schema:
1860+
$ref: '#/components/schemas/HTTPExceptionResponse'
1861+
description: Unauthorized
1862+
'403':
1863+
content:
1864+
application/json:
1865+
schema:
1866+
$ref: '#/components/schemas/HTTPExceptionResponse'
1867+
description: Forbidden
1868+
'400':
1869+
content:
1870+
application/json:
1871+
schema:
1872+
$ref: '#/components/schemas/HTTPExceptionResponse'
1873+
description: Bad Request
1874+
'404':
1875+
content:
1876+
application/json:
1877+
schema:
1878+
$ref: '#/components/schemas/HTTPExceptionResponse'
1879+
description: Not Found
1880+
'409':
1881+
content:
1882+
application/json:
1883+
schema:
1884+
$ref: '#/components/schemas/HTTPExceptionResponse'
1885+
description: Conflict
1886+
'422':
1887+
description: Validation Error
1888+
content:
1889+
application/json:
1890+
schema:
1891+
$ref: '#/components/schemas/HTTPValidationError'
18311892
/public/dags/{dag_id}/dagRuns/list:
18321893
post:
18331894
tags:
@@ -8672,6 +8733,36 @@ components:
86728733
- microseconds
86738734
title: TimeDelta
86748735
description: TimeDelta can be used to interact with datetime.timedelta objects.
8736+
TriggerDAGRunPostBody:
8737+
properties:
8738+
dag_run_id:
8739+
anyOf:
8740+
- type: string
8741+
- type: 'null'
8742+
title: Dag Run Id
8743+
data_interval_start:
8744+
anyOf:
8745+
- type: string
8746+
format: date-time
8747+
- type: 'null'
8748+
title: Data Interval Start
8749+
data_interval_end:
8750+
anyOf:
8751+
- type: string
8752+
format: date-time
8753+
- type: 'null'
8754+
title: Data Interval End
8755+
conf:
8756+
type: object
8757+
title: Conf
8758+
note:
8759+
anyOf:
8760+
- type: string
8761+
- type: 'null'
8762+
title: Note
8763+
type: object
8764+
title: TriggerDAGRunPostBody
8765+
description: Trigger DAG Run Serializer for POST body.
86758766
TriggerResponse:
86768767
properties:
86778768
id:

airflow/api_fastapi/core_api/routes/public/dag_run.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from typing import Annotated, Literal, cast
2121

22+
import pendulum
2223
from fastapi import Depends, HTTPException, Query, Request, status
2324
from sqlalchemy import select
2425
from sqlalchemy.orm import Session
@@ -50,13 +51,19 @@
5051
DAGRunPatchStates,
5152
DAGRunResponse,
5253
DAGRunsBatchBody,
54+
TriggerDAGRunPostBody,
5355
)
5456
from airflow.api_fastapi.core_api.datamodels.task_instances import (
5557
TaskInstanceCollectionResponse,
5658
TaskInstanceResponse,
5759
)
5860
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
59-
from airflow.models import DAG, DagRun
61+
from airflow.exceptions import ParamValidationError
62+
from airflow.models import DAG, DagModel, DagRun
63+
from airflow.models.dag_version import DagVersion
64+
from airflow.timetables.base import DataInterval
65+
from airflow.utils.state import DagRunState
66+
from airflow.utils.types import DagRunTriggeredByType, DagRunType
6067

6168
dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")
6269

@@ -303,6 +310,67 @@ def get_dag_runs(
303310
)
304311

305312

313+
@dag_run_router.post(
314+
"",
315+
responses=create_openapi_http_exception_doc(
316+
[
317+
status.HTTP_400_BAD_REQUEST,
318+
status.HTTP_404_NOT_FOUND,
319+
status.HTTP_409_CONFLICT,
320+
]
321+
),
322+
)
323+
def trigger_dag_run(
324+
dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)]
325+
) -> DAGRunResponse:
326+
"""Trigger a DAG."""
327+
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1))
328+
if not dm:
329+
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found")
330+
331+
if dm.has_import_errors:
332+
raise HTTPException(
333+
status.HTTP_400_BAD_REQUEST,
334+
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
335+
)
336+
337+
run_id = body.dag_run_id
338+
logical_date = pendulum.instance(body.logical_date)
339+
340+
try:
341+
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
342+
343+
if body.data_interval_start and body.data_interval_end:
344+
data_interval = DataInterval(
345+
start=pendulum.instance(body.data_interval_start),
346+
end=pendulum.instance(body.data_interval_end),
347+
)
348+
else:
349+
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
350+
dag_version = DagVersion.get_latest_version(dag.dag_id)
351+
dag_run = dag.create_dagrun(
352+
run_type=DagRunType.MANUAL,
353+
run_id=run_id,
354+
logical_date=logical_date,
355+
data_interval=data_interval,
356+
state=DagRunState.QUEUED,
357+
conf=body.conf,
358+
external_trigger=True,
359+
dag_version=dag_version,
360+
session=session,
361+
triggered_by=DagRunTriggeredByType.REST_API,
362+
)
363+
dag_run_note = body.note
364+
if dag_run_note:
365+
current_user_id = None # refer to https://github.com/apache/airflow/issues/43534
366+
dag_run.note = (dag_run_note, current_user_id)
367+
return dag_run
368+
except ValueError as e:
369+
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
370+
except ParamValidationError as e:
371+
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
372+
373+
306374
@dag_run_router.post("/list", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]))
307375
def get_list_dag_runs_batch(
308376
dag_id: Literal["~"], body: DAGRunsBatchBody, session: Annotated[Session, Depends(get_session)]

airflow/ui/openapi-gen/queries/common.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,6 +1573,9 @@ export type ConnectionServiceTestConnectionMutationResult = Awaited<
15731573
export type DagRunServiceClearDagRunMutationResult = Awaited<
15741574
ReturnType<typeof DagRunService.clearDagRun>
15751575
>;
1576+
export type DagRunServiceTriggerDagRunMutationResult = Awaited<
1577+
ReturnType<typeof DagRunService.triggerDagRun>
1578+
>;
15761579
export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited<
15771580
ReturnType<typeof DagRunService.getListDagRunsBatch>
15781581
>;

airflow/ui/openapi-gen/queries/queries.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
PoolPostBody,
4949
PoolPostBulkBody,
5050
TaskInstancesBatchBody,
51+
TriggerDAGRunPostBody,
5152
VariableBody,
5253
} from "../requests/types.gen";
5354
import * as Common from "./common";
@@ -2726,6 +2727,49 @@ export const useDagRunServiceClearDagRun = <
27262727
}) as unknown as Promise<TData>,
27272728
...options,
27282729
});
2730+
/**
2731+
* Trigger Dag Run
2732+
* Trigger a DAG.
2733+
* @param data The data for the request.
2734+
* @param data.dagId
2735+
* @param data.requestBody
2736+
* @returns DAGRunResponse Successful Response
2737+
* @throws ApiError
2738+
*/
2739+
export const useDagRunServiceTriggerDagRun = <
2740+
TData = Common.DagRunServiceTriggerDagRunMutationResult,
2741+
TError = unknown,
2742+
TContext = unknown,
2743+
>(
2744+
options?: Omit<
2745+
UseMutationOptions<
2746+
TData,
2747+
TError,
2748+
{
2749+
dagId: unknown;
2750+
requestBody: TriggerDAGRunPostBody;
2751+
},
2752+
TContext
2753+
>,
2754+
"mutationFn"
2755+
>,
2756+
) =>
2757+
useMutation<
2758+
TData,
2759+
TError,
2760+
{
2761+
dagId: unknown;
2762+
requestBody: TriggerDAGRunPostBody;
2763+
},
2764+
TContext
2765+
>({
2766+
mutationFn: ({ dagId, requestBody }) =>
2767+
DagRunService.triggerDagRun({
2768+
dagId,
2769+
requestBody,
2770+
}) as unknown as Promise<TData>,
2771+
...options,
2772+
});
27292773
/**
27302774
* Get List Dag Runs Batch
27312775
* Get a list of DAG Runs.

airflow/ui/openapi-gen/requests/schemas.gen.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4852,6 +4852,64 @@ export const $TimeDelta = {
48524852
"TimeDelta can be used to interact with datetime.timedelta objects.",
48534853
} as const;
48544854

4855+
export const $TriggerDAGRunPostBody = {
4856+
properties: {
4857+
dag_run_id: {
4858+
anyOf: [
4859+
{
4860+
type: "string",
4861+
},
4862+
{
4863+
type: "null",
4864+
},
4865+
],
4866+
title: "Dag Run Id",
4867+
},
4868+
data_interval_start: {
4869+
anyOf: [
4870+
{
4871+
type: "string",
4872+
format: "date-time",
4873+
},
4874+
{
4875+
type: "null",
4876+
},
4877+
],
4878+
title: "Data Interval Start",
4879+
},
4880+
data_interval_end: {
4881+
anyOf: [
4882+
{
4883+
type: "string",
4884+
format: "date-time",
4885+
},
4886+
{
4887+
type: "null",
4888+
},
4889+
],
4890+
title: "Data Interval End",
4891+
},
4892+
conf: {
4893+
type: "object",
4894+
title: "Conf",
4895+
},
4896+
note: {
4897+
anyOf: [
4898+
{
4899+
type: "string",
4900+
},
4901+
{
4902+
type: "null",
4903+
},
4904+
],
4905+
title: "Note",
4906+
},
4907+
},
4908+
type: "object",
4909+
title: "TriggerDAGRunPostBody",
4910+
description: "Trigger DAG Run Serializer for POST body.",
4911+
} as const;
4912+
48554913
export const $TriggerResponse = {
48564914
properties: {
48574915
id: {

0 commit comments

Comments
 (0)