@@ -21,13 +21,15 @@ Deadline Alerts
2121
2222.. warning ::
2323 Deadline Alerts are new in Airflow 3.1 and should be considered experimental. The feature may be
24- subject to changes in 3.2 without warning based on user feedback.
24+ subject to changes in future versions without warning based on user feedback.
2525
2626|experimental |
2727
2828Deadline Alerts allow you to set time thresholds for your Dag runs and automatically respond when those
29- thresholds are exceeded. You can set up Deadline Alerts by choosing a built-in reference point, setting
30- an interval, and defining a response using either Airflow's Notifiers or a custom callback function.
29+ thresholds are exceeded. You configure Deadline Alerts by choosing a reference point, setting an interval,
30+ and defining a callback to execute if the deadline is missed. A reference may be one of the built-in
31+ DeadlineReference options such as when the dagrun is queued or any custom method that returns a timestamp.
32+ The callback can either be one of Airflow's Notifiers or a custom callback function.
3133
3234Migrating from SLA
3335------------------
@@ -57,8 +59,7 @@ Below is an example Dag implementation. If the Dag has not finished 15 minutes a
5759.. code-block :: python
5860
5961 from datetime import datetime, timedelta
60- from airflow import DAG
61- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
62+ from airflow.sdk import AsyncCallback, DAG , DeadlineAlert, DeadlineReference
6263 from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
6364 from airflow.providers.standard.operators.empty import EmptyOperator
6465
@@ -196,18 +197,19 @@ Using Callbacks
196197---------------
197198
198199When a deadline is exceeded, the callback's callable is executed with the specified kwargs. You can use an
199- existing :doc: `Notifier </howto/notifications >` or create a custom callable. A callback must be an
200- :class: `~airflow.sdk.AsyncCallback `, with support coming soon for :class: `~airflow.sdk.SyncCallback `.
200+ existing :doc: `Notifier </howto/notifications >` or create a custom callable. A callback must be either an
201+ :class: `~airflow.sdk.AsyncCallback `, or a :class: `~airflow.sdk.SyncCallback ` (SyncCallback support added in 3.2) .
201202
202203Using Built-in Notifiers
203204^^^^^^^^^^^^^^^^^^^^^^^^
204205
205- Here's an example using the Slack Notifier if the Dag run has not finished within 30 minutes of it being queued:
206+ Here's an example using the Slack Notifier with an **asynchronous callback ** if the Dag run has not finished
207+ within 30 minutes of it being queued. The callback runs in the Triggerer:
206208
207209.. code-block :: python
208210
209211 with DAG(
210- dag_id = " slack_deadline_alert " ,
212+ dag_id = " slack_deadline_alert_async " ,
211213 deadline = DeadlineAlert(
212214 reference = DeadlineReference.DAGRUN_QUEUED_AT ,
213215 interval = timedelta(minutes = 30 ),
@@ -221,13 +223,33 @@ Here's an example using the Slack Notifier if the Dag run has not finished withi
221223 ):
222224 EmptyOperator(task_id = " example_task" )
223225
226+ Here's the same example using a **synchronous callback **. The callback runs in the executor:
227+
228+ .. code-block :: python
229+
230+ with DAG(
231+ dag_id = " slack_deadline_alert_sync" ,
232+ deadline = DeadlineAlert(
233+ reference = DeadlineReference.DAGRUN_QUEUED_AT ,
234+ interval = timedelta(minutes = 30 ),
235+ callback = SyncCallback(
236+ SlackWebhookNotifier,
237+ kwargs = {
238+ " text" : " 🚨 Dag {{ dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }} . DagRun: {{ dag_run }} "
239+ },
240+ ),
241+ ),
242+ ):
243+ EmptyOperator(task_id = " example_task" )
244+
224245
225246 Creating Custom Callbacks
226247^^^^^^^^^^^^^^^^^^^^^^^^^
227248
228249You can create custom callables for more complex handling. If ``kwargs `` are specified in the ``Callback ``,
229250they are passed to the callback function. **Asynchronous callbacks ** must be defined somewhere in the
230- Triggerer's system path.
251+ Triggerer's system path. **Synchronous callbacks ** must be importable on the worker where they will be executed.
252+
231253
232254.. note ::
233255 Regarding Async Custom Deadline callbacks:
@@ -237,6 +259,11 @@ Triggerer's system path.
237259 Nested callables are not currently supported.
238260 * The Triggerer will need to be restarted when a callback is added or changed in order to reload the file.
239261
262+ .. note ::
263+ Regarding Synchronous callbacks:
264+
265+ * Sync callbacks are sent to the executor and treated just like a Dag task with top priority.
266+
240267.. note ::
241268 **Airflow ``context``: ** When a deadline is missed, Airflow automatically provides a ``context ``
242269 kwarg into the callback containing information about the Dag run and the deadline. To receive it,
@@ -245,9 +272,60 @@ Triggerer's system path.
245272 the callable accepts. The ``context `` keyword is reserved and cannot be used in the ``kwargs ``
246273 parameter of a ``Callback ``; attempting to do so will raise a ``ValueError `` at DAG parse time.
247274
275+
276+ A **custom synchronous callback ** might look like this:
277+
278+ 1. Place this method in your plugins folder (e.g. ``$AIRFLOW_HOME/plugins/deadline_callbacks.py ``):
279+
280+ .. code-block :: python
281+
282+ def custom_sync_callback (** kwargs ):
283+ """ Handle deadline violation with custom logic."""
284+ context = kwargs.get(" context" , {})
285+ print (f " Deadline exceeded for Dag { context.get(' dag_run' , {}).get(' dag_id' )} ! " )
286+ print (f " Context: { context} " )
287+ print (f " Alert type: { kwargs.get(' alert_type' )} " )
288+ # Additional custom handling here
289+
290+ 2. Place this in a Dag file:
291+
292+ .. code-block :: python
293+
294+ from datetime import timedelta
295+
296+ from deadline_callbacks import custom_sync_callback
297+
298+ from airflow.providers.standard.operators.empty import EmptyOperator
299+ from airflow.sdk import DAG , DeadlineAlert, DeadlineReference, SyncCallback
300+
301+ with DAG(
302+ dag_id = " custom_sync_deadline_alert" ,
303+ deadline = DeadlineAlert(
304+ reference = DeadlineReference.DAGRUN_QUEUED_AT ,
305+ interval = timedelta(minutes = 15 ),
306+ callback = SyncCallback(
307+ custom_sync_callback,
308+ kwargs = {" alert_type" : " time_exceeded" },
309+ ),
310+ ),
311+ ):
312+ EmptyOperator(task_id = " example_task" )
313+
314+ .. tip ::
315+ ``SyncCallback `` accepts an optional ``executor `` parameter to target a specific executor.
316+ If not specified, the default executor is used.
317+
318+ .. code-block :: python
319+
320+ SyncCallback(
321+ my_callback,
322+ kwargs = {" msg" : " deadline missed" },
323+ executor = " celery_executor" ,
324+ )
325+
248326 A **custom asynchronous callback ** might look like this:
249327
250- 1. Place this method in `` /files/ plugins/deadline_callbacks.py ``:
328+ 1. Place this method in your plugins folder (e.g. `` $AIRFLOW_HOME/ plugins/deadline_callbacks.py ``) :
251329
252330.. code-block :: python
253331
@@ -268,9 +346,8 @@ A **custom asynchronous callback** might look like this:
268346
269347 from deadline_callbacks import custom_async_callback
270348
271- from airflow import DAG
272349 from airflow.providers.standard.operators.empty import EmptyOperator
273- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
350+ from airflow.sdk import AsyncCallback, DAG , DeadlineAlert, DeadlineReference
274351
275352 with DAG(
276353 dag_id = " custom_deadline_alert" ,
@@ -302,7 +379,7 @@ A deadline's trigger time is calculated by adding the ``interval`` to the dateti
302379the ``reference ``. For ``FIXED_DATETIME `` references, negative intervals can be particularly
303380useful to trigger the callback *before * the reference time.
304381
305- For example :
382+ In the following examples, `` notify_team `` is either a SyncCallback or AsyncCallback defined elsewhere :
306383
307384.. code-block :: python
308385
@@ -386,8 +463,7 @@ Once registered [see notes below], use your custom references in Dag definitions
386463.. code-block :: python
387464
388465 from datetime import timedelta
389- from airflow import DAG
390- from airflow.sdk import AsyncCallback, DeadlineAlert, DeadlineReference
466+ from airflow.sdk import AsyncCallback, DAG , DeadlineAlert, DeadlineReference
391467
392468 with DAG(
393469 dag_id = " custom_reference_example" ,
@@ -400,6 +476,48 @@ Once registered [see notes below], use your custom references in Dag definitions
400476 # Your tasks here
401477 ...
402478
479+ Multiple Deadline Alerts
480+ ^^^^^^^^^^^^^^^^^^^^^^^^
481+
482+ A Dag can have multiple Deadline Alerts. Pass a list to the ``deadline `` parameter instead of a single
483+ ``DeadlineAlert ``. Each alert in the list is evaluated independently, and each may use any combination
484+ of reference points and callback types (sync or async).
485+
486+ .. code-block :: python
487+
488+ from datetime import timedelta
489+ from airflow.sdk import AsyncCallback, DAG , DeadlineAlert, DeadlineReference, SyncCallback
490+ from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
491+ from airflow.providers.standard.operators.empty import EmptyOperator
492+
493+ with DAG(
494+ dag_id = " multiple_deadline_alerts" ,
495+ deadline = [
496+ # First alert: warn via Slack (async) if not done 30 min after queuing
497+ DeadlineAlert(
498+ reference = DeadlineReference.DAGRUN_QUEUED_AT ,
499+ interval = timedelta(minutes = 30 ),
500+ callback = AsyncCallback(
501+ SlackWebhookNotifier,
502+ kwargs = {" text" : " ⚠️ Dag {{ dag_run.dag_id }} is approaching its deadline." },
503+ ),
504+ ),
505+ # Second alert: escalate via custom sync callback if not done 60 min after queuing
506+ DeadlineAlert(
507+ reference = DeadlineReference.DAGRUN_QUEUED_AT ,
508+ interval = timedelta(minutes = 60 ),
509+ callback = SyncCallback(
510+ " my_plugins.escalation.escalate_to_oncall" ,
511+ kwargs = {" severity" : " high" },
512+ ),
513+ ),
514+ ],
515+ ):
516+ EmptyOperator(task_id = " example_task" )
517+
518+ This pattern is useful for creating tiered alerting strategies — for example, a warning notification
519+ followed by a more urgent escalation if the Dag is still running.
520+
403521**Important Notes: **
404522
405523* **Timezone Awareness **: Always return timezone-aware datetime objects.
0 commit comments