Skip to content

Commit 8411dd1

Browse files
jason810496Cloud Composer Team
authored andcommitted
AIP-84 Refactor SortParm (#44345)
* Refactor get_connections * Allow Column type for `to_replace` parameter * Refactor get_dags * Refactor get_import_errors * Refactor SortParam, get_dag_runs * Fix default ordering when directly using SortParam - related: apache/airflow#44393 * Fix get_list_dag_runs_batch GitOrigin-RevId: 5719d195c4a4e9e3c4908d548586950a0f137d2c
1 parent 815cf3a commit 8411dd1

5 files changed

Lines changed: 22 additions & 22 deletions

File tree

airflow/api_fastapi/common/parameters.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,11 @@
4040

4141
from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
4242
from airflow.jobs.job import Job
43-
from airflow.models import Base, Connection
43+
from airflow.models import Base
4444
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
4545
from airflow.models.dag import DagModel, DagTag
4646
from airflow.models.dagrun import DagRun
4747
from airflow.models.dagwarning import DagWarning, DagWarningType
48-
from airflow.models.errors import ParseImportError
4948
from airflow.models.taskinstance import TaskInstance
5049
from airflow.typing_compat import Self
5150
from airflow.utils import timezone
@@ -218,16 +217,8 @@ def depends(self, dag_display_name_pattern: str | None = None) -> _DagDisplayNam
218217
class SortParam(BaseParam[str]):
219218
"""Order result by the attribute."""
220219

221-
attr_mapping = {
222-
"last_run_state": DagRun.state,
223-
"last_run_start_date": DagRun.start_date,
224-
"connection_id": Connection.conn_id,
225-
"import_error_id": ParseImportError.id,
226-
"dag_run_id": DagRun.run_id,
227-
}
228-
229220
def __init__(
230-
self, allowed_attrs: list[str], model: Base, to_replace: dict[str, str] | None = None
221+
self, allowed_attrs: list[str], model: Base, to_replace: dict[str, str | Column] | None = None
231222
) -> None:
232223
super().__init__()
233224
self.allowed_attrs = allowed_attrs
@@ -242,19 +233,22 @@ def to_orm(self, select: Select) -> Select:
242233
self.value = self.get_primary_key_string()
243234

244235
lstriped_orderby = self.value.lstrip("-")
236+
column: Column | None = None
245237
if self.to_replace:
246-
lstriped_orderby = self.to_replace.get(lstriped_orderby, lstriped_orderby)
238+
replacement = self.to_replace.get(lstriped_orderby, lstriped_orderby)
239+
if isinstance(replacement, str):
240+
lstriped_orderby = replacement
241+
else:
242+
column = replacement
247243

248-
if self.allowed_attrs and lstriped_orderby not in self.allowed_attrs:
244+
if (self.allowed_attrs and lstriped_orderby not in self.allowed_attrs) and column is None:
249245
raise HTTPException(
250246
400,
251247
f"Ordering with '{lstriped_orderby}' is disallowed or "
252248
f"the attribute does not exist on the model",
253249
)
254-
255-
column: Column = self.attr_mapping.get(lstriped_orderby, None) or getattr(
256-
self.model, lstriped_orderby
257-
)
250+
if column is None:
251+
column = getattr(self.model, lstriped_orderby)
258252

259253
# MySQL does not support `nullslast`, and True/False ordering depends on the
260254
# database implementation.

airflow/api_fastapi/core_api/routes/public/connections.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ def get_connections(
9292
SortParam,
9393
Depends(
9494
SortParam(
95-
["connection_id", "conn_type", "description", "host", "port", "id"], Connection
95+
["conn_id", "conn_type", "description", "host", "port", "id"],
96+
Connection,
97+
{"connection_id": "conn_id"},
9698
).dynamic_depends()
9799
),
98100
],

airflow/api_fastapi/core_api/routes/public/dag_run.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,15 +266,16 @@ def get_dag_runs(
266266
"id",
267267
"state",
268268
"dag_id",
269+
"run_id",
269270
"logical_date",
270-
"dag_run_id",
271271
"start_date",
272272
"end_date",
273273
"updated_at",
274274
"external_trigger",
275275
"conf",
276276
],
277277
DagRun,
278+
{"dag_run_id": "run_id"},
278279
).dynamic_depends(default="id")
279280
),
280281
],
@@ -401,14 +402,15 @@ def get_list_dag_runs_batch(
401402
"state",
402403
"dag_id",
403404
"logical_date",
404-
"dag_run_id",
405+
"run_id",
405406
"start_date",
406407
"end_date",
407408
"updated_at",
408409
"external_trigger",
409410
"conf",
410411
],
411412
DagRun,
413+
{"dag_run_id": "run_id"},
412414
).set_value(body.order_by)
413415

414416
base_query = select(DagRun)

airflow/api_fastapi/core_api/routes/public/dags.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
5555
from airflow.exceptions import AirflowException, DagNotFound
5656
from airflow.models import DAG, DagModel, DagTag
57+
from airflow.models.dagrun import DagRun
5758

5859
dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")
5960

@@ -73,8 +74,9 @@ def get_dags(
7374
SortParam,
7475
Depends(
7576
SortParam(
76-
["dag_id", "dag_display_name", "next_dagrun", "last_run_state", "last_run_start_date"],
77+
["dag_id", "dag_display_name", "next_dagrun", "state", "start_date"],
7778
DagModel,
79+
{"last_run_state": DagRun.state, "last_run_start_date": DagRun.start_date},
7880
).dynamic_depends()
7981
),
8082
],

airflow/api_fastapi/core_api/routes/public/import_error.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ def get_import_errors(
7373
SortParam(
7474
[
7575
"id",
76-
"import_error_id",
7776
"timestamp",
7877
"filename",
7978
"stacktrace",
8079
],
8180
ParseImportError,
81+
{"import_error_id": "id"},
8282
).dynamic_depends()
8383
),
8484
],

0 commit comments

Comments
 (0)