Skip to content

Commit 03a5681

Browse files
committed
AIP-81 Add Lists Jobs with filters API
1 parent 6d85a04 commit 03a5681

12 files changed

Lines changed: 897 additions & 0 deletions

File tree

airflow/api_fastapi/core_api/datamodels/job.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,10 @@ class JobResponse(BaseModel):
3636
executor_class: datetime | None
3737
hostname: str | None
3838
unixname: str | None
39+
40+
41+
class JobCollectionResponse(BaseModel):
42+
"""Job Collection Response."""
43+
44+
jobs: list[JobResponse]
45+
total_entries: int

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,6 +1746,142 @@ paths:
17461746
application/json:
17471747
schema:
17481748
$ref: '#/components/schemas/HTTPValidationError'
1749+
/public/jobs/:
1750+
get:
1751+
tags:
1752+
- Job
1753+
summary: Get Jobs
1754+
description: Get all jobs.
1755+
operationId: get_jobs
1756+
parameters:
1757+
- name: state
1758+
in: query
1759+
required: false
1760+
schema:
1761+
anyOf:
1762+
- type: string
1763+
- type: 'null'
1764+
title: State
1765+
- name: job_type
1766+
in: query
1767+
required: false
1768+
schema:
1769+
anyOf:
1770+
- type: string
1771+
- type: 'null'
1772+
title: Job Type
1773+
- name: hostname
1774+
in: query
1775+
required: false
1776+
schema:
1777+
anyOf:
1778+
- type: string
1779+
- type: 'null'
1780+
title: Hostname
1781+
- name: executor_class
1782+
in: query
1783+
required: false
1784+
schema:
1785+
anyOf:
1786+
- type: string
1787+
- type: 'null'
1788+
title: Executor Class
1789+
- name: is_alive
1790+
in: query
1791+
required: false
1792+
schema:
1793+
anyOf:
1794+
- type: boolean
1795+
- type: 'null'
1796+
title: Is Alive
1797+
- name: start_date_gte
1798+
in: query
1799+
required: false
1800+
schema:
1801+
anyOf:
1802+
- type: string
1803+
format: date-time
1804+
- type: 'null'
1805+
title: Start Date Gte
1806+
- name: start_date_lte
1807+
in: query
1808+
required: false
1809+
schema:
1810+
anyOf:
1811+
- type: string
1812+
format: date-time
1813+
- type: 'null'
1814+
title: Start Date Lte
1815+
- name: end_date_gte
1816+
in: query
1817+
required: false
1818+
schema:
1819+
anyOf:
1820+
- type: string
1821+
format: date-time
1822+
- type: 'null'
1823+
title: End Date Gte
1824+
- name: end_date_lte
1825+
in: query
1826+
required: false
1827+
schema:
1828+
anyOf:
1829+
- type: string
1830+
format: date-time
1831+
- type: 'null'
1832+
title: End Date Lte
1833+
- name: limit
1834+
in: query
1835+
required: false
1836+
schema:
1837+
type: integer
1838+
default: 100
1839+
title: Limit
1840+
- name: offset
1841+
in: query
1842+
required: false
1843+
schema:
1844+
type: integer
1845+
default: 0
1846+
title: Offset
1847+
- name: order_by
1848+
in: query
1849+
required: false
1850+
schema:
1851+
type: string
1852+
default: id
1853+
title: Order By
1854+
responses:
1855+
'200':
1856+
description: Successful Response
1857+
content:
1858+
application/json:
1859+
schema:
1860+
$ref: '#/components/schemas/JobCollectionResponse'
1861+
'400':
1862+
content:
1863+
application/json:
1864+
schema:
1865+
$ref: '#/components/schemas/HTTPExceptionResponse'
1866+
description: Bad Request
1867+
'401':
1868+
content:
1869+
application/json:
1870+
schema:
1871+
$ref: '#/components/schemas/HTTPExceptionResponse'
1872+
description: Unauthorized
1873+
'403':
1874+
content:
1875+
application/json:
1876+
schema:
1877+
$ref: '#/components/schemas/HTTPExceptionResponse'
1878+
description: Forbidden
1879+
'422':
1880+
description: Validation Error
1881+
content:
1882+
application/json:
1883+
schema:
1884+
$ref: '#/components/schemas/HTTPValidationError'
17491885
/public/monitor/health:
17501886
get:
17511887
tags:
@@ -4383,6 +4519,22 @@ components:
43834519
- stack_trace
43844520
title: ImportErrorResponse
43854521
description: Import Error Response.
4522+
JobCollectionResponse:
4523+
properties:
4524+
jobs:
4525+
items:
4526+
$ref: '#/components/schemas/JobResponse'
4527+
type: array
4528+
title: Jobs
4529+
total_entries:
4530+
type: integer
4531+
title: Total Entries
4532+
type: object
4533+
required:
4534+
- jobs
4535+
- total_entries
4536+
title: JobCollectionResponse
4537+
description: Job Collection Response.
43864538
JobResponse:
43874539
properties:
43884540
id:

airflow/api_fastapi/core_api/routes/public/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
2828
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
2929
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
30+
from airflow.api_fastapi.core_api.routes.public.job import job_router
3031
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
3132
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
3233
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
@@ -46,6 +47,7 @@
4647
public_router.include_router(dags_router)
4748
public_router.include_router(event_logs_router)
4849
public_router.include_router(import_error_router)
50+
public_router.include_router(job_router)
4951
public_router.include_router(monitor_router)
5052
public_router.include_router(dag_warning_router)
5153
public_router.include_router(plugins_router)
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from fastapi import Depends, status
20+
from sqlalchemy import select
21+
from sqlalchemy.orm import Session
22+
from typing_extensions import Annotated
23+
24+
from airflow.api_fastapi.common.db.common import (
25+
get_session,
26+
paginated_select,
27+
)
28+
from airflow.api_fastapi.common.parameters import (
29+
QueryLimit,
30+
QueryOffset,
31+
RangeFilter,
32+
SortParam,
33+
datetime_range_filter_factory,
34+
)
35+
from airflow.api_fastapi.common.router import AirflowRouter
36+
from airflow.api_fastapi.core_api.datamodels.job import (
37+
JobCollectionResponse,
38+
JobResponse,
39+
)
40+
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
41+
from airflow.jobs.job import Job
42+
from airflow.utils.state import JobState
43+
44+
job_router = AirflowRouter(tags=["Job"], prefix="/jobs")
45+
46+
47+
@job_router.get(
48+
"/",
49+
responses=create_openapi_http_exception_doc(
50+
[status.HTTP_400_BAD_REQUEST, status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]
51+
),
52+
)
53+
def get_jobs(
54+
start_date_range: Annotated[
55+
RangeFilter,
56+
Depends(datetime_range_filter_factory("start_date", Job)),
57+
],
58+
end_date_range: Annotated[
59+
RangeFilter,
60+
Depends(datetime_range_filter_factory("end_date", Job)),
61+
],
62+
limit: QueryLimit,
63+
offset: QueryOffset,
64+
order_by: Annotated[
65+
SortParam,
66+
Depends(
67+
SortParam(
68+
[
69+
"id",
70+
"dag_id",
71+
"state",
72+
"job_type",
73+
"start_date",
74+
"end_date",
75+
"latest_heartbeat",
76+
"executor_class",
77+
"hostname",
78+
"unixname",
79+
],
80+
Job,
81+
).dynamic_depends(default="id")
82+
),
83+
],
84+
session: Annotated[Session, Depends(get_session)],
85+
state: str | None = None,
86+
job_type: str | None = None,
87+
hostname: str | None = None,
88+
executor_class: str | None = None,
89+
is_alive: bool | None = None,
90+
) -> JobCollectionResponse:
91+
"""Get all jobs."""
92+
base_select = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
93+
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`
94+
if state:
95+
base_select = base_select.where(Job.state == state)
96+
if job_type:
97+
base_select = base_select.where(Job.job_type == job_type)
98+
if hostname:
99+
base_select = base_select.where(Job.hostname == hostname)
100+
if executor_class:
101+
base_select = base_select.where(Job.executor_class == executor_class)
102+
103+
jobs_select, total_entries = paginated_select(
104+
base_select,
105+
[
106+
start_date_range,
107+
end_date_range,
108+
],
109+
order_by,
110+
limit,
111+
offset,
112+
session,
113+
)
114+
jobs = session.scalars(jobs_select).all()
115+
116+
if is_alive is not None:
117+
jobs = [job for job in jobs if job.is_alive()]
118+
119+
return JobCollectionResponse(
120+
jobs=[
121+
JobResponse.model_validate(
122+
job,
123+
from_attributes=True,
124+
)
125+
for job in jobs
126+
],
127+
total_entries=total_entries,
128+
)

airflow/ui/openapi-gen/queries/common.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
DashboardService,
1515
EventLogService,
1616
ImportErrorService,
17+
JobService,
1718
MonitorService,
1819
PluginService,
1920
PoolService,
@@ -467,6 +468,62 @@ export const UseImportErrorServiceGetImportErrorsKeyFn = (
467468
useImportErrorServiceGetImportErrorsKey,
468469
...(queryKey ?? [{ limit, offset, orderBy }]),
469470
];
471+
export type JobServiceGetJobsDefaultResponse = Awaited<
472+
ReturnType<typeof JobService.getJobs>
473+
>;
474+
export type JobServiceGetJobsQueryResult<
475+
TData = JobServiceGetJobsDefaultResponse,
476+
TError = unknown,
477+
> = UseQueryResult<TData, TError>;
478+
export const useJobServiceGetJobsKey = "JobServiceGetJobs";
479+
export const UseJobServiceGetJobsKeyFn = (
480+
{
481+
endDateGte,
482+
endDateLte,
483+
executorClass,
484+
hostname,
485+
isAlive,
486+
jobType,
487+
limit,
488+
offset,
489+
orderBy,
490+
startDateGte,
491+
startDateLte,
492+
state,
493+
}: {
494+
endDateGte?: string;
495+
endDateLte?: string;
496+
executorClass?: string;
497+
hostname?: string;
498+
isAlive?: boolean;
499+
jobType?: string;
500+
limit?: number;
501+
offset?: number;
502+
orderBy?: string;
503+
startDateGte?: string;
504+
startDateLte?: string;
505+
state?: string;
506+
} = {},
507+
queryKey?: Array<unknown>,
508+
) => [
509+
useJobServiceGetJobsKey,
510+
...(queryKey ?? [
511+
{
512+
endDateGte,
513+
endDateLte,
514+
executorClass,
515+
hostname,
516+
isAlive,
517+
jobType,
518+
limit,
519+
offset,
520+
orderBy,
521+
startDateGte,
522+
startDateLte,
523+
state,
524+
},
525+
]),
526+
];
470527
export type MonitorServiceGetHealthDefaultResponse = Awaited<
471528
ReturnType<typeof MonitorService.getHealth>
472529
>;

0 commit comments

Comments
 (0)