Skip to content

Commit f6f0a93

Browse files
Merge branch 'main' into trigger-openlineage-tests-on-asset-change
2 parents 858f50f + a60d105 commit f6f0a93

24 files changed

Lines changed: 851 additions & 79 deletions

File tree

airflow/api_connexion/endpoints/asset_endpoint.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def _generate_queued_event_where_clause(
181181
return where_clause
182182

183183

184+
@mark_fastapi_migration_done
184185
@security.requires_access_asset("GET")
185186
@security.requires_access_dag("GET")
186187
@provide_session
@@ -203,6 +204,7 @@ def get_dag_asset_queued_event(
203204
return queued_event_schema.dump(queued_event)
204205

205206

207+
@mark_fastapi_migration_done
206208
@security.requires_access_asset("DELETE")
207209
@security.requires_access_dag("GET")
208210
@provide_session
@@ -303,6 +305,7 @@ def get_asset_queued_events(
303305
)
304306

305307

308+
@mark_fastapi_migration_done
306309
@security.requires_access_asset("DELETE")
307310
@action_logging
308311
@provide_session

airflow/api_fastapi/core_api/openapi/v1-generated.yaml

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,174 @@ paths:
546546
application/json:
547547
schema:
548548
$ref: '#/components/schemas/HTTPValidationError'
549+
/public/dags/{dag_id}/assets/queuedEvent/{uri}:
550+
get:
551+
tags:
552+
- Asset
553+
summary: Get Dag Asset Queued Event
554+
description: Get a queued asset event for a DAG.
555+
operationId: get_dag_asset_queued_event
556+
parameters:
557+
- name: dag_id
558+
in: path
559+
required: true
560+
schema:
561+
type: string
562+
title: Dag Id
563+
- name: uri
564+
in: path
565+
required: true
566+
schema:
567+
type: string
568+
title: Uri
569+
- name: before
570+
in: query
571+
required: false
572+
schema:
573+
anyOf:
574+
- type: string
575+
- type: 'null'
576+
title: Before
577+
responses:
578+
'200':
579+
description: Successful Response
580+
content:
581+
application/json:
582+
schema:
583+
$ref: '#/components/schemas/QueuedEventResponse'
584+
'401':
585+
content:
586+
application/json:
587+
schema:
588+
$ref: '#/components/schemas/HTTPExceptionResponse'
589+
description: Unauthorized
590+
'403':
591+
content:
592+
application/json:
593+
schema:
594+
$ref: '#/components/schemas/HTTPExceptionResponse'
595+
description: Forbidden
596+
'404':
597+
content:
598+
application/json:
599+
schema:
600+
$ref: '#/components/schemas/HTTPExceptionResponse'
601+
description: Not Found
602+
'422':
603+
description: Validation Error
604+
content:
605+
application/json:
606+
schema:
607+
$ref: '#/components/schemas/HTTPValidationError'
608+
delete:
609+
tags:
610+
- Asset
611+
summary: Delete Dag Asset Queued Event
612+
description: Delete a queued asset event for a DAG.
613+
operationId: delete_dag_asset_queued_event
614+
parameters:
615+
- name: dag_id
616+
in: path
617+
required: true
618+
schema:
619+
type: string
620+
title: Dag Id
621+
- name: uri
622+
in: path
623+
required: true
624+
schema:
625+
type: string
626+
title: Uri
627+
- name: before
628+
in: query
629+
required: false
630+
schema:
631+
anyOf:
632+
- type: string
633+
- type: 'null'
634+
title: Before
635+
responses:
636+
'204':
637+
description: Successful Response
638+
'401':
639+
content:
640+
application/json:
641+
schema:
642+
$ref: '#/components/schemas/HTTPExceptionResponse'
643+
description: Unauthorized
644+
'403':
645+
content:
646+
application/json:
647+
schema:
648+
$ref: '#/components/schemas/HTTPExceptionResponse'
649+
description: Forbidden
650+
'400':
651+
content:
652+
application/json:
653+
schema:
654+
$ref: '#/components/schemas/HTTPExceptionResponse'
655+
description: Bad Request
656+
'404':
657+
content:
658+
application/json:
659+
schema:
660+
$ref: '#/components/schemas/HTTPExceptionResponse'
661+
description: Not Found
662+
'422':
663+
description: Validation Error
664+
content:
665+
application/json:
666+
schema:
667+
$ref: '#/components/schemas/HTTPValidationError'
668+
/public/assets/queuedEvent/{uri}:
669+
delete:
670+
tags:
671+
- Asset
672+
summary: Delete Asset Queued Events
673+
description: Delete queued asset events for an asset.
674+
operationId: delete_asset_queued_events
675+
parameters:
676+
- name: uri
677+
in: path
678+
required: true
679+
schema:
680+
type: string
681+
title: Uri
682+
- name: before
683+
in: query
684+
required: false
685+
schema:
686+
anyOf:
687+
- type: string
688+
- type: 'null'
689+
title: Before
690+
responses:
691+
'204':
692+
description: Successful Response
693+
'401':
694+
content:
695+
application/json:
696+
schema:
697+
$ref: '#/components/schemas/HTTPExceptionResponse'
698+
description: Unauthorized
699+
'403':
700+
content:
701+
application/json:
702+
schema:
703+
$ref: '#/components/schemas/HTTPExceptionResponse'
704+
description: Forbidden
705+
'404':
706+
content:
707+
application/json:
708+
schema:
709+
$ref: '#/components/schemas/HTTPExceptionResponse'
710+
description: Not Found
711+
'422':
712+
description: Validation Error
713+
content:
714+
application/json:
715+
schema:
716+
$ref: '#/components/schemas/HTTPValidationError'
549717
/public/backfills/:
550718
get:
551719
tags:

airflow/api_fastapi/core_api/routes/public/assets.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,59 @@ def get_dag_asset_queued_events(
254254
)
255255

256256

257+
@assets_router.get(
258+
"/dags/{dag_id}/assets/queuedEvent/{uri:path}",
259+
responses=create_openapi_http_exception_doc(
260+
[
261+
status.HTTP_404_NOT_FOUND,
262+
]
263+
),
264+
)
265+
def get_dag_asset_queued_event(
266+
dag_id: str,
267+
uri: str,
268+
session: Annotated[Session, Depends(get_session)],
269+
before: OptionalDateTimeQuery = None,
270+
) -> QueuedEventResponse:
271+
"""Get a queued asset event for a DAG."""
272+
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, uri=uri, before=before)
273+
query = (
274+
select(AssetDagRunQueue)
275+
.join(AssetModel, AssetDagRunQueue.asset_id == AssetModel.id)
276+
.where(*where_clause)
277+
)
278+
adrq = session.scalar(query)
279+
if not adrq:
280+
raise HTTPException(
281+
status.HTTP_404_NOT_FOUND,
282+
f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found",
283+
)
284+
285+
return QueuedEventResponse(created_at=adrq.created_at, dag_id=adrq.target_dag_id, uri=uri)
286+
287+
288+
@assets_router.delete(
289+
"/assets/queuedEvent/{uri:path}",
290+
status_code=status.HTTP_204_NO_CONTENT,
291+
responses=create_openapi_http_exception_doc(
292+
[
293+
status.HTTP_404_NOT_FOUND,
294+
]
295+
),
296+
)
297+
def delete_asset_queued_events(
298+
uri: str,
299+
session: Annotated[Session, Depends(get_session)],
300+
before: OptionalDateTimeQuery = None,
301+
):
302+
"""Delete queued asset events for an asset."""
303+
where_clause = _generate_queued_event_where_clause(uri=uri, before=before)
304+
delete_stmt = delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
305+
result = session.execute(delete_stmt)
306+
if result.rowcount == 0:
307+
raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Queue event with uri: `{uri}` was not found")
308+
309+
257310
@assets_router.delete(
258311
"/dags/{dag_id}/assets/queuedEvent",
259312
status_code=status.HTTP_204_NO_CONTENT,
@@ -276,3 +329,32 @@ def delete_dag_asset_queued_events(
276329

277330
if result.rowcount == 0:
278331
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Queue event with dag_id: `{dag_id}` was not found")
332+
333+
334+
@assets_router.delete(
335+
"/dags/{dag_id}/assets/queuedEvent/{uri:path}",
336+
status_code=status.HTTP_204_NO_CONTENT,
337+
responses=create_openapi_http_exception_doc(
338+
[
339+
status.HTTP_400_BAD_REQUEST,
340+
status.HTTP_404_NOT_FOUND,
341+
]
342+
),
343+
)
344+
def delete_dag_asset_queued_event(
345+
dag_id: str,
346+
uri: str,
347+
session: Annotated[Session, Depends(get_session)],
348+
before: OptionalDateTimeQuery = None,
349+
):
350+
"""Delete a queued asset event for a DAG."""
351+
where_clause = _generate_queued_event_where_clause(dag_id=dag_id, before=before, uri=uri)
352+
delete_statement = (
353+
delete(AssetDagRunQueue).where(*where_clause).execution_options(synchronize_session="fetch")
354+
)
355+
result = session.execute(delete_statement)
356+
if result.rowcount == 0:
357+
raise HTTPException(
358+
status.HTTP_404_NOT_FOUND,
359+
detail=f"Queued event with dag_id: `{dag_id}` and asset uri: `{uri}` was not found",
360+
)

airflow/configuration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,7 +777,8 @@ def mask_secrets(self):
777777

778778
for section, key in self.sensitive_config_values:
779779
try:
780-
value = self.get(section, key, suppress_warnings=True)
780+
with self.suppress_future_warnings():
781+
value = self.get(section, key, suppress_warnings=True)
781782
except AirflowConfigException:
782783
log.debug(
783784
"Could not retrieve value from section %s, for key %s. Skipping redaction of this conf.",

airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ def _log_memory_usage(self, signum: int, frame: FrameType | None) -> None:
250250
top_stats = snapshot.statistics("lineno")
251251
n = 10
252252
self.log.error(
253-
"scheduler memory usgae:\n Top %d\n %s",
253+
"scheduler memory usage:\n Top %d\n %s",
254254
n,
255255
"\n\t".join(map(str, top_stats[:n])),
256256
)
@@ -704,7 +704,7 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int:
704704

705705
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
706706

707-
# Sort queued TIs to there respective executor
707+
# Sort queued TIs to their respective executor
708708
executor_to_queued_tis = self._executor_to_tis(queued_tis)
709709
for executor, queued_tis_per_executor in executor_to_queued_tis.items():
710710
self.log.info(

airflow/ui/openapi-gen/queries/common.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,30 @@ export const UseAssetServiceGetDagAssetQueuedEventsKeyFn = (
151151
useAssetServiceGetDagAssetQueuedEventsKey,
152152
...(queryKey ?? [{ before, dagId }]),
153153
];
154+
export type AssetServiceGetDagAssetQueuedEventDefaultResponse = Awaited<
155+
ReturnType<typeof AssetService.getDagAssetQueuedEvent>
156+
>;
157+
export type AssetServiceGetDagAssetQueuedEventQueryResult<
158+
TData = AssetServiceGetDagAssetQueuedEventDefaultResponse,
159+
TError = unknown,
160+
> = UseQueryResult<TData, TError>;
161+
export const useAssetServiceGetDagAssetQueuedEventKey =
162+
"AssetServiceGetDagAssetQueuedEvent";
163+
export const UseAssetServiceGetDagAssetQueuedEventKeyFn = (
164+
{
165+
before,
166+
dagId,
167+
uri,
168+
}: {
169+
before?: string;
170+
dagId: string;
171+
uri: string;
172+
},
173+
queryKey?: Array<unknown>,
174+
) => [
175+
useAssetServiceGetDagAssetQueuedEventKey,
176+
...(queryKey ?? [{ before, dagId, uri }]),
177+
];
154178
export type DashboardServiceHistoricalMetricsDefaultResponse = Awaited<
155179
ReturnType<typeof DashboardService.historicalMetrics>
156180
>;
@@ -1161,6 +1185,12 @@ export type VariableServicePatchVariableMutationResult = Awaited<
11611185
export type AssetServiceDeleteDagAssetQueuedEventsMutationResult = Awaited<
11621186
ReturnType<typeof AssetService.deleteDagAssetQueuedEvents>
11631187
>;
1188+
export type AssetServiceDeleteDagAssetQueuedEventMutationResult = Awaited<
1189+
ReturnType<typeof AssetService.deleteDagAssetQueuedEvent>
1190+
>;
1191+
export type AssetServiceDeleteAssetQueuedEventsMutationResult = Awaited<
1192+
ReturnType<typeof AssetService.deleteAssetQueuedEvents>
1193+
>;
11641194
export type ConnectionServiceDeleteConnectionMutationResult = Awaited<
11651195
ReturnType<typeof ConnectionService.deleteConnection>
11661196
>;

0 commit comments

Comments
 (0)