1717
1818from __future__ import annotations
1919
20- import re
21- from datetime import datetime , timedelta
22- from pathlib import Path
23- from typing import TYPE_CHECKING , Any
24-
25- from flask import Blueprint , redirect , request , url_for
26- from flask_appbuilder import BaseView , expose
27- from markupsafe import Markup
28- from sqlalchemy import select
29-
3020from airflow .configuration import conf
3121from airflow .exceptions import AirflowConfigException
32- from airflow .models .taskinstance import TaskInstanceState
3322from airflow .plugins_manager import AirflowPlugin
3423from airflow .providers .edge3 .version_compat import AIRFLOW_V_3_0_PLUS
35- from airflow .utils .state import State
3624
3725if AIRFLOW_V_3_0_PLUS :
38- from airflow .api_fastapi .auth .managers .models .resource_details import AccessView
39- from airflow .providers .fab .www .auth import has_access_view
40-
41- else :
42- from airflow .auth .managers .models .resource_details import AccessView # type: ignore[no-redef]
43- from airflow .www .auth import has_access_view # type: ignore[no-redef]
44- from airflow .utils .session import NEW_SESSION , provide_session
45- from airflow .utils .yaml import safe_load
46-
47- if TYPE_CHECKING :
48- from sqlalchemy .orm import Session
49-
50-
51- def _get_airflow_2_api_endpoint () -> Blueprint :
52- from airflow .www .constants import SWAGGER_BUNDLE , SWAGGER_ENABLED
53- from airflow .www .extensions .init_views import _CustomErrorRequestBodyValidator , _LazyResolver
54-
55- folder = Path (__file__ ).parents [1 ].resolve () # this is airflow/providers/edge3/
56- with folder .joinpath ("openapi" , "edge_worker_api_v1.yaml" ).open () as f :
57- specification = safe_load (f )
58- from connexion import FlaskApi
59-
60- bp = FlaskApi (
61- specification = specification ,
62- resolver = _LazyResolver (),
63- base_path = "/edge_worker/v1" ,
64- strict_validation = True ,
65- options = {"swagger_ui" : SWAGGER_ENABLED , "swagger_path" : SWAGGER_BUNDLE .__fspath__ ()},
66- validate_responses = True ,
67- validator_map = {"body" : _CustomErrorRequestBodyValidator },
68- ).blueprint
69- # Need to exempt CSRF to make API usable
70- from airflow .www .app import csrf
71-
72- csrf .exempt (bp )
73- return bp
74-
75-
76- def _get_api_endpoint () -> dict [str , Any ]:
7726 from airflow .providers .edge3 .worker_api .app import create_edge_worker_api_app
7827
79- return {
80- "app" : create_edge_worker_api_app (),
81- "url_prefix" : "/edge_worker/v1" ,
82- "name" : "Airflow Edge Worker API" ,
83- }
84-
85-
86- def _state_token (state ):
87- """Return a formatted string with HTML for a given State."""
88- color = State .color (state )
89- fg_color = State .color_fg (state )
90- return Markup (
91- """
92- <span class="label" style="color:{fg_color}; background-color:{color};"
93- title="Current State: {state}">{state}</span>
94- """
95- ).format (color = color , state = state , fg_color = fg_color )
96-
97-
98- def modify_maintenance_comment_on_update (maintenance_comment : str | None , username : str ) -> str :
99- if maintenance_comment :
100- if re .search (
101- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*" , maintenance_comment
102- ):
103- return re .sub (
104- r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:" ,
105- f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:" ,
106- maintenance_comment ,
107- )
108- if re .search (r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*" , maintenance_comment ):
109- return re .sub (
110- r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:" ,
111- f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:" ,
112- maintenance_comment ,
113- )
114- return f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment: { maintenance_comment } "
115- return f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:"
116-
117-
118- # registers airflow/providers/edge3/plugins/templates as a Jinja template folder
119- template_bp = Blueprint (
120- "template_blueprint" ,
121- __name__ ,
122- template_folder = "templates" ,
123- )
124-
125-
126- class EdgeWorkerJobs (BaseView ):
127- """Simple view to show Edge Worker jobs."""
128-
129- default_view = "jobs"
130-
131- @expose ("/jobs" )
132- @has_access_view (AccessView .JOBS )
133- @provide_session
134- def jobs (self , session : Session = NEW_SESSION ):
135- from airflow .providers .edge3 .models .edge_job import EdgeJobModel
136-
137- jobs = session .scalars (select (EdgeJobModel ).order_by (EdgeJobModel .queued_dttm )).all ()
138- html_states = {
139- str (state ): _state_token (str (state )) for state in TaskInstanceState .__members__ .values ()
28+ def _get_api_endpoint () -> dict [str , Any ]:
29+ return {
30+ "app" : create_edge_worker_api_app (),
31+ "url_prefix" : "/edge_worker/v1" ,
32+ "name" : "Airflow Edge Worker API" ,
14033 }
141- return self .render_template ("edge_worker_jobs.html" , jobs = jobs , html_states = html_states )
142-
143-
144- class EdgeWorkerHosts (BaseView ):
145- """Simple view to show Edge Worker status."""
146-
147- default_view = "status"
148-
149- @expose ("/status" )
150- @has_access_view (AccessView .JOBS )
151- @provide_session
152- def status (self , session : Session = NEW_SESSION ):
153- from airflow .providers .edge3 .models .edge_worker import EdgeWorkerModel
154-
155- hosts = session .scalars (select (EdgeWorkerModel ).order_by (EdgeWorkerModel .worker_name )).all ()
156- five_min_ago = datetime .now () - timedelta (minutes = 5 )
157- return self .render_template ("edge_worker_hosts.html" , hosts = hosts , five_min_ago = five_min_ago )
158-
159- @expose ("/status/maintenance/<string:worker_name>/on" , methods = ["POST" ])
160- @has_access_view (AccessView .JOBS )
161- def worker_to_maintenance (self , worker_name : str ):
162- from flask_login import current_user
163-
164- from airflow .providers .edge3 .models .edge_worker import request_maintenance
165-
166- maintenance_comment = request .form .get ("maintenance_comment" )
167- maintenance_comment = f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { current_user .username } put node into maintenance mode\n Comment: { maintenance_comment } "
168- request_maintenance (worker_name , maintenance_comment )
169- return redirect (url_for ("EdgeWorkerHosts.status" ))
17034
171- @expose ("/status/maintenance/<string:worker_name>/off" , methods = ["POST" ])
172- @has_access_view (AccessView .JOBS )
173- def remove_worker_from_maintenance (self , worker_name : str ):
174- from airflow .providers .edge3 .models .edge_worker import exit_maintenance
175-
176- exit_maintenance (worker_name )
177- return redirect (url_for ("EdgeWorkerHosts.status" ))
178-
179- @expose ("/status/maintenance/<string:worker_name>/remove" , methods = ["POST" ])
180- @has_access_view (AccessView .JOBS )
181- def remove_worker (self , worker_name : str ):
182- from airflow .providers .edge3 .models .edge_worker import remove_worker
183-
184- remove_worker (worker_name )
185- return redirect (url_for ("EdgeWorkerHosts.status" ))
186-
187- @expose ("/status/maintenance/<string:worker_name>/change_comment" , methods = ["POST" ])
188- @has_access_view (AccessView .JOBS )
189- def change_maintenance_comment (self , worker_name : str ):
190- from flask_login import current_user
191-
192- from airflow .providers .edge3 .models .edge_worker import change_maintenance_comment
193-
194- maintenance_comment = request .form .get ("maintenance_comment" )
195- maintenance_comment = modify_maintenance_comment_on_update (maintenance_comment , current_user .username )
196- change_maintenance_comment (worker_name , maintenance_comment )
197- return redirect (url_for ("EdgeWorkerHosts.status" ))
35+ else :
36+ # This is for back-compatability with Airflow 2.x and we only make this
37+ # to prevents dependencies and breaking imports in Airflow 3.x
38+ import re
39+ from datetime import datetime , timedelta
40+ from pathlib import Path
41+ from typing import TYPE_CHECKING , Any
42+
43+ from flask import Blueprint , redirect , request , url_for
44+ from flask_appbuilder import BaseView , expose
45+ from markupsafe import Markup
46+ from sqlalchemy import select
47+
48+ from airflow .auth .managers .models .resource_details import AccessView
49+ from airflow .models .taskinstance import TaskInstanceState
50+ from airflow .utils .session import NEW_SESSION , provide_session
51+ from airflow .utils .state import State
52+ from airflow .utils .yaml import safe_load
53+ from airflow .www .auth import has_access_view
54+
55+ if TYPE_CHECKING :
56+ from sqlalchemy .orm import Session
57+
58+ def _get_airflow_2_api_endpoint () -> Blueprint :
59+ from airflow .www .app import csrf
60+ from airflow .www .constants import SWAGGER_BUNDLE , SWAGGER_ENABLED
61+ from airflow .www .extensions .init_views import _CustomErrorRequestBodyValidator , _LazyResolver
62+
63+ folder = Path (__file__ ).parents [1 ].resolve () # this is airflow/providers/edge3/
64+ with folder .joinpath ("openapi" , "edge_worker_api_v1.yaml" ).open () as f :
65+ specification = safe_load (f )
66+ from connexion import FlaskApi
67+
68+ bp = FlaskApi (
69+ specification = specification ,
70+ resolver = _LazyResolver (),
71+ base_path = "/edge_worker/v1" ,
72+ strict_validation = True ,
73+ options = {"swagger_ui" : SWAGGER_ENABLED , "swagger_path" : SWAGGER_BUNDLE .__fspath__ ()},
74+ validate_responses = True ,
75+ validator_map = {"body" : _CustomErrorRequestBodyValidator },
76+ ).blueprint
77+ # Need to exempt CSRF to make API usable
78+ csrf .exempt (bp )
79+ return bp
80+
81+ def _state_token (state ):
82+ """Return a formatted string with HTML for a given State."""
83+ color = State .color (state )
84+ fg_color = State .color_fg (state )
85+ return Markup (
86+ """
87+ <span class="label" style="color:{fg_color}; background-color:{color};"
88+ title="Current State: {state}">{state}</span>
89+ """
90+ ).format (color = color , state = state , fg_color = fg_color )
91+
92+ def modify_maintenance_comment_on_update (maintenance_comment : str | None , username : str ) -> str :
93+ if maintenance_comment :
94+ if re .search (
95+ r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:.*" , maintenance_comment
96+ ):
97+ return re .sub (
98+ r"^\[[-\d:\s]+\] - .+ put node into maintenance mode\r?\nComment:" ,
99+ f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:" ,
100+ maintenance_comment ,
101+ )
102+ if re .search (r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:.*" , maintenance_comment ):
103+ return re .sub (
104+ r"^\[[-\d:\s]+\] - .+ updated maintenance mode\r?\nComment:" ,
105+ f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:" ,
106+ maintenance_comment ,
107+ )
108+ return f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment: { maintenance_comment } "
109+ return (
110+ f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { username } updated maintenance mode\n Comment:"
111+ )
112+
113+ # registers airflow/providers/edge3/plugins/templates as a Jinja template folder
114+ template_bp = Blueprint (
115+ "template_blueprint" ,
116+ __name__ ,
117+ template_folder = "templates" ,
118+ )
119+
120+ class EdgeWorkerJobs (BaseView ):
121+ """Simple view to show Edge Worker jobs."""
122+
123+ default_view = "jobs"
124+
125+ @expose ("/jobs" )
126+ @has_access_view (AccessView .JOBS )
127+ @provide_session
128+ def jobs (self , session : Session = NEW_SESSION ):
129+ from airflow .providers .edge3 .models .edge_job import EdgeJobModel
130+
131+ jobs = session .scalars (select (EdgeJobModel ).order_by (EdgeJobModel .queued_dttm )).all ()
132+ html_states = {
133+ str (state ): _state_token (str (state )) for state in TaskInstanceState .__members__ .values ()
134+ }
135+ return self .render_template ("edge_worker_jobs.html" , jobs = jobs , html_states = html_states )
136+
137+ class EdgeWorkerHosts (BaseView ):
138+ """Simple view to show Edge Worker status."""
139+
140+ default_view = "status"
141+
142+ @expose ("/status" )
143+ @has_access_view (AccessView .JOBS )
144+ @provide_session
145+ def status (self , session : Session = NEW_SESSION ):
146+ from airflow .providers .edge3 .models .edge_worker import EdgeWorkerModel
147+
148+ hosts = session .scalars (select (EdgeWorkerModel ).order_by (EdgeWorkerModel .worker_name )).all ()
149+ five_min_ago = datetime .now () - timedelta (minutes = 5 )
150+ return self .render_template ("edge_worker_hosts.html" , hosts = hosts , five_min_ago = five_min_ago )
151+
152+ @expose ("/status/maintenance/<string:worker_name>/on" , methods = ["POST" ])
153+ @has_access_view (AccessView .JOBS )
154+ def worker_to_maintenance (self , worker_name : str ):
155+ from flask_login import current_user
156+
157+ from airflow .providers .edge3 .models .edge_worker import request_maintenance
158+
159+ maintenance_comment = request .form .get ("maintenance_comment" )
160+ maintenance_comment = f"[{ datetime .now ().strftime ('%Y-%m-%d %H:%M' )} ] - { current_user .username } put node into maintenance mode\n Comment: { maintenance_comment } "
161+ request_maintenance (worker_name , maintenance_comment )
162+ return redirect (url_for ("EdgeWorkerHosts.status" ))
163+
164+ @expose ("/status/maintenance/<string:worker_name>/off" , methods = ["POST" ])
165+ @has_access_view (AccessView .JOBS )
166+ def remove_worker_from_maintenance (self , worker_name : str ):
167+ from airflow .providers .edge3 .models .edge_worker import exit_maintenance
168+
169+ exit_maintenance (worker_name )
170+ return redirect (url_for ("EdgeWorkerHosts.status" ))
171+
172+ @expose ("/status/maintenance/<string:worker_name>/remove" , methods = ["POST" ])
173+ @has_access_view (AccessView .JOBS )
174+ def remove_worker (self , worker_name : str ):
175+ from airflow .providers .edge3 .models .edge_worker import remove_worker
176+
177+ remove_worker (worker_name )
178+ return redirect (url_for ("EdgeWorkerHosts.status" ))
179+
180+ @expose ("/status/maintenance/<string:worker_name>/change_comment" , methods = ["POST" ])
181+ @has_access_view (AccessView .JOBS )
182+ def change_maintenance_comment (self , worker_name : str ):
183+ from flask_login import current_user
184+
185+ from airflow .providers .edge3 .models .edge_worker import change_maintenance_comment
186+
187+ maintenance_comment = request .form .get ("maintenance_comment" )
188+ maintenance_comment = modify_maintenance_comment_on_update (
189+ maintenance_comment , current_user .username
190+ )
191+ change_maintenance_comment (worker_name , maintenance_comment )
192+ return redirect (url_for ("EdgeWorkerHosts.status" ))
198193
199194
200195# Check if EdgeExecutor is actually loaded
@@ -209,21 +204,19 @@ class EdgeExecutorPlugin(AirflowPlugin):
209204
210205 name = "edge_executor"
211206 if EDGE_EXECUTOR_ACTIVE :
212- appbuilder_views = [
213- {
214- "name" : "Edge Worker Jobs" ,
215- "category" : "Admin" ,
216- "view" : EdgeWorkerJobs (),
217- },
218- {
219- "name" : "Edge Worker Hosts" ,
220- "category" : "Admin" ,
221- "view" : EdgeWorkerHosts (),
222- },
223- ]
224-
225207 if AIRFLOW_V_3_0_PLUS :
226208 fastapi_apps = [_get_api_endpoint ()]
227- flask_blueprints = [template_bp ]
228209 else :
210+ appbuilder_views = [
211+ {
212+ "name" : "Edge Worker Jobs" ,
213+ "category" : "Admin" ,
214+ "view" : EdgeWorkerJobs (),
215+ },
216+ {
217+ "name" : "Edge Worker Hosts" ,
218+ "category" : "Admin" ,
219+ "view" : EdgeWorkerHosts (),
220+ },
221+ ]
229222 flask_blueprints = [_get_airflow_2_api_endpoint (), template_bp ]
0 commit comments