Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
609aebb
Start converting _execute_task_callbacks to internal API. get_templat…
vincbeck Jan 12, 2023
ac48800
Move get_serialized_dag and get_task_instance methods to model classes
vincbeck Jan 16, 2023
b57ddca
Convert handle_failure() method to internal API
vincbeck Jan 18, 2023
917abfb
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 18, 2023
6dfebce
Remove comment
vincbeck Jan 18, 2023
1867db4
Migrate _execute_dag_callbacks to internal API
vincbeck Jan 20, 2023
6b110ea
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 20, 2023
c21fb81
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jan 25, 2023
d92fa83
Use cls.logger()
vincbeck Jan 25, 2023
cc52f47
Remove todo
vincbeck Feb 2, 2023
ee27659
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 2, 2023
68bfc72
Add back if callbacks
vincbeck Feb 3, 2023
839dc6c
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 6, 2023
30d6ef9
Remove comments
vincbeck Feb 6, 2023
9fb937d
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 9, 2023
81ee61b
Refactore _fetch_callback
vincbeck Feb 9, 2023
3c300a7
Fix TaskInstance.get_task_instance
vincbeck Feb 9, 2023
dc6031e
Fix unit tests
vincbeck Feb 10, 2023
12c908a
Fix unit tests
vincbeck Feb 10, 2023
ccf3a86
Add methods to rpc_endpoint
vincbeck Feb 10, 2023
791ddbe
Add flag select_columns to get_task_instance
vincbeck Feb 10, 2023
120a030
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 10, 2023
4f1ef33
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 10, 2023
e72652e
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Feb 13, 2023
2fca2b3
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Mar 9, 2023
48ef1cf
Use Pydantic-powered ORM models
vincbeck Mar 9, 2023
ee63263
Fix DagRunPydantic
vincbeck Mar 9, 2023
3f5dc7f
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
f64ea0c
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
b61f9e6
Fix imports
vincbeck May 8, 2023
7bed9bc
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 8, 2023
c8a8d20
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 12, 2023
9277e53
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck May 19, 2023
fea8db0
Convert methods in models to private functions
vincbeck May 19, 2023
1fff88d
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jun 29, 2023
6b80508
Convert methods get_previous_scheduled_dagrun and get_previous_dagrun…
vincbeck Jun 29, 2023
c9ad169
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 4, 2023
dfc01a8
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 4, 2023
6d35f2f
Fix static checks
vincbeck Jul 4, 2023
920aae6
Convert DagRun.get_task_instance to internal API
vincbeck Jul 4, 2023
9492538
Fix circular dependencies
vincbeck Jul 5, 2023
10638cc
Fix spellcheck
vincbeck Jul 5, 2023
0179ba5
Disable type aliasing in pydantc models
vincbeck Jul 5, 2023
995cab1
Fix unit test
vincbeck Jul 5, 2023
03694b4
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 6, 2023
33640a9
Add custom_operator_name attribute to taskinstance Pydantic model
vincbeck Jul 6, 2023
52dd4b0
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 11, 2023
0f042e9
Fix static checks
vincbeck Jul 11, 2023
f463837
Fix unit tests
vincbeck Jul 11, 2023
8cf1fb1
Update reason in context
vincbeck Jul 12, 2023
16c334b
Add annotations to `fetch_callback` method
vincbeck Jul 12, 2023
7192075
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 12, 2023
d29dcd2
Resolve conflicts
vincbeck Jul 12, 2023
de30beb
Improve style of `handle_callback` method
vincbeck Jul 12, 2023
ed97834
Add default encoder
vincbeck Jul 12, 2023
658be83
Address feedbacks from @uranusjr
vincbeck Jul 13, 2023
62b9ba8
Add reason to context
vincbeck Jul 13, 2023
640c819
Fix unit test
vincbeck Jul 13, 2023
87b4fac
Fix params order
vincbeck Jul 14, 2023
67f2ee6
Convert private functions to kwarg only
vincbeck Jul 14, 2023
9914a36
Fix unit test
vincbeck Jul 14, 2023
6881a51
Fix taskinstance unit tests
vincbeck Jul 14, 2023
073ce8b
Fix pydantic unit tests
vincbeck Jul 14, 2023
d32a21e
Apply suggestion by uranujs@
vincbeck Jul 14, 2023
927344a
Address feedbacks from uranusjr@
vincbeck Jul 14, 2023
b42390b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 17, 2023
ea1d9cd
Apply D205 style rule
vincbeck Jul 17, 2023
d87346f
Fix get_serialized_dag method
vincbeck Jul 17, 2023
d3480a7
Skipping some tests if AIP-44 if disabled
vincbeck Jul 17, 2023
638f149
Remove _get_task_instance function
vincbeck Jul 18, 2023
3249ce4
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 20, 2023
9bb04fe
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 21, 2023
67fd268
Remove get_dagrun function
vincbeck Jul 21, 2023
c6cc066
Remove get_task_instances method
vincbeck Jul 21, 2023
e782d50
Remove get_task_instance method
vincbeck Jul 21, 2023
9a5e1ab
Fix static checks
vincbeck Jul 21, 2023
bdd2370
Revert "Remove get_task_instance method"
vincbeck Jul 21, 2023
4c1d4fa
Revert "Remove get_task_instances method"
vincbeck Jul 24, 2023
7f73bc1
Revert "Remove get_dagrun function"
vincbeck Jul 24, 2023
0ae55e1
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 24, 2023
72d106e
Fix static checks
vincbeck Jul 24, 2023
10a511f
Fix tests
vincbeck Jul 24, 2023
c4d385d
Leave annotations
vincbeck Jul 25, 2023
c80105b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Jul 25, 2023
ae79b6b
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 18, 2023
46d2e25
Fix static checks
vincbeck Aug 18, 2023
8a71857
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 29, 2023
4796186
Add type annotation to _stop_remaining_tasks()
vincbeck Aug 29, 2023
9ac5d30
Add type annotation to handle_callback()
vincbeck Aug 29, 2023
8ca799b
Pass `dagrun_id` to fetch_callback()
vincbeck Aug 29, 2023
e795915
Pass `dag_run_id` to fetch_task_instance()
vincbeck Aug 29, 2023
a14f941
Pass `dag_run_id` to get_previous_scheduled_dagrun()
vincbeck Aug 29, 2023
b2d5bc9
Remove `_set_duration` and introduce `set_end_date`
vincbeck Aug 29, 2023
975eba8
Fix `set_end_date`
vincbeck Aug 30, 2023
adc10fd
Add TaskInstancePydantic.update_forward_refs()
vincbeck Aug 30, 2023
7921dc5
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Aug 31, 2023
f76bc56
Replace `save_to_db()` to `finish_task()`
vincbeck Aug 31, 2023
4178b7c
Fix static checks
vincbeck Aug 31, 2023
2a3edf1
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 1, 2023
8d35a1e
Fix static checks
vincbeck Sep 1, 2023
2de2375
Cleanup
vincbeck Sep 1, 2023
85743a8
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 5, 2023
5882674
Fix tests
vincbeck Sep 5, 2023
10183ac
Fix tests
vincbeck Sep 5, 2023
c6fb38b
Make pydantic models ignore TCH001 rule
vincbeck Sep 5, 2023
5682a56
Use List instead of list in Pydantic models
vincbeck Sep 5, 2023
cbd4757
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 6, 2023
df2ff72
Fix tests
vincbeck Sep 6, 2023
512872d
Add `refresh_from_db` after setting the duration
vincbeck Sep 6, 2023
a44ac1a
Merge branch 'main' into vincbeck/execute_callbacks
vincbeck Sep 7, 2023
dee5655
Fix import
vincbeck Sep 7, 2023
8c60f21
Revert "Add `refresh_from_db` after setting the duration"
vincbeck Sep 13, 2023
3923f2c
Revert "Cleanup"
vincbeck Sep 13, 2023
989672e
Revert "Replace `save_to_db()` to `finish_task()`"
vincbeck Sep 13, 2023
d51371e
Revert "Fix `set_end_date`"
vincbeck Sep 13, 2023
2b9014b
Revert "Remove `_set_duration` and introduce `set_end_date`"
vincbeck Sep 13, 2023
c6573c7
Add `dataset` property to `DatasetEventPydantic`
vincbeck Sep 13, 2023
0653a33
Remove assert
vincbeck Sep 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ def _initialize_map() -> dict[str, Callable]:
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessor
from airflow.models import Trigger, Variable, XCom
from airflow.models.dag import DagModel
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
from airflow.secrets.metastore import MetastoreBackend

functions: list[Callable] = [
Expand All @@ -60,6 +63,16 @@ def _initialize_map() -> dict[str, Callable]:
Variable.set,
Variable.update,
Variable.delete,
DAG.fetch_callback,
DAG.fetch_dagrun,
DagRun.fetch_task_instances,
DagRun.get_previous_dagrun,
DagRun.get_previous_scheduled_dagrun,
DagRun.fetch_task_instance,
SerializedDagModel.get_serialized_dag,
TaskInstance.get_task_instance,
TaskInstance.fetch_handle_failure_context,
TaskInstance.save_to_db,
Trigger.from_object,
Trigger.bulk_fetch,
Trigger.clean_unused,
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ def wrapper(*args, **kwargs):
if "cls" in arguments_dict: # used by @classmethod
del arguments_dict["cls"]

args_json = json.dumps(BaseSerialization.serialize(arguments_dict, use_pydantic_models=True))
args_json = json.dumps(
BaseSerialization.serialize(arguments_dict, use_pydantic_models=True),
default=BaseSerialization.serialize,
)
method_name = f"{func.__module__}.{func.__qualname__}"
result = make_jsonrpc_request(method_name, args_json)
if result is None or result == b"":
Expand Down
3 changes: 2 additions & 1 deletion airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from sqlalchemy.orm.session import Session

from airflow.models.operator import Operator
from airflow.serialization.pydantic.dag_run import DagRunPydantic

log = logging.getLogger(__name__)

Expand All @@ -94,7 +95,7 @@ def _get_dag_run(
create_if_necessary: CreateIfNecessary,
exec_date_or_run_id: str | None = None,
session: Session,
) -> tuple[DagRun, bool]:
) -> tuple[DagRun | DagRunPydantic, bool]:
"""Try to retrieve a DAG run from a string representing either a run ID or logical date.

This checks DAG runs like this:
Expand Down
44 changes: 22 additions & 22 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from typing import TYPE_CHECKING, Iterable, Iterator

from setproctitle import setproctitle
from sqlalchemy import delete, exc, func, or_, select
from sqlalchemy import delete, func, or_, select

from airflow import settings
from airflow.api_internal.internal_api_call import internal_api_call
Expand All @@ -44,7 +44,8 @@
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.taskinstance import TaskInstance as TI
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
Expand Down Expand Up @@ -736,25 +737,28 @@ def execute_callbacks_without_dag(
@provide_session
def _execute_dag_callbacks(self, dagbag: DagBag, request: DagCallbackRequest, session: Session):
dag = dagbag.dags[request.dag_id]
dag_run = dag.get_dagrun(run_id=request.run_id, session=session)
dag.handle_callback(
dagrun=dag_run, success=not request.is_failure_callback, reason=request.msg, session=session
)
callbacks, context = DAG.fetch_callback(
dag=dag,
dag_run_id=request.run_id,
success=not request.is_failure_callback,
reason=request.msg,
session=session,
) or (None, None)

if callbacks and context:
DAG.execute_callback(callbacks, context, dag.dag_id)

def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRequest, session: Session):
if not request.is_failure_callback:
return

simple_ti = request.simple_task_instance
ti: TI | None = (
session.query(TI)
.filter_by(
dag_id=simple_ti.dag_id,
run_id=simple_ti.run_id,
task_id=simple_ti.task_id,
map_index=simple_ti.map_index,
)
.one_or_none()
ti = TaskInstance.get_task_instance(
dag_id=simple_ti.dag_id,
run_id=simple_ti.run_id,
task_id=simple_ti.task_id,
map_index=simple_ti.map_index,
session=session,
)
if not ti:
return
Expand All @@ -770,14 +774,10 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
# `handle_failure` so that the state of the TI gets progressed.
#
# Since handle_failure _really_ wants a task, we do our best effort to give it one
from airflow.models.serialized_dag import SerializedDagModel
task = SerializedDagModel.get_serialized_dag(
dag_id=simple_ti.dag_id, task_id=simple_ti.task_id, session=session
)

try:
model = session.get(SerializedDagModel, simple_ti.dag_id)
if model:
task = model.dag.get_task(simple_ti.task_id)
except (exc.NoResultFound, TaskNotFound):
pass
if task:
ti.refresh_from_task(task)

Expand Down
2 changes: 1 addition & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ def __init__(
max_active_tis_per_dag = task_concurrency
self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
self.do_xcom_push = do_xcom_push
self.do_xcom_push: bool = do_xcom_push
Comment thread
vincbeck marked this conversation as resolved.

self.doc_md = doc_md
self.doc_json = doc_json
Expand Down
96 changes: 77 additions & 19 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
TaskNotFound,
)
from airflow.jobs.job import run_job
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback
from airflow.models.base import Base, StringID
from airflow.models.baseoperator import BaseOperator
from airflow.models.dagcode import DagCode
Expand Down Expand Up @@ -139,6 +139,7 @@
from airflow.models.operator import Operator
from airflow.models.slamiss import SlaMiss
from airflow.serialization.pydantic.dag import DagModelPydantic
from airflow.serialization.pydantic.dag_run import DagRunPydantic
from airflow.typing_compat import Literal
from airflow.utils.task_group import TaskGroup

Expand Down Expand Up @@ -888,7 +889,7 @@ def get_next_data_interval(self, dag_model: DagModel) -> DataInterval | None:
# infer from the logical date.
return self.infer_automated_data_interval(dag_model.next_dagrun)

def get_run_data_interval(self, run: DagRun) -> DataInterval:
def get_run_data_interval(self, run: DagRun | DagRunPydantic) -> DataInterval:
"""Get the data interval of this run.

For compatibility, this method infers the data interval from the DAG's
Expand Down Expand Up @@ -1383,8 +1384,43 @@ def normalized_schedule_interval(self) -> ScheduleInterval:
_schedule_interval = self.schedule_interval
return _schedule_interval

@staticmethod
@internal_api_call
@provide_session
def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION):
def fetch_callback(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vincbeck @potiuk @mhenc @uranusjr just curious what your thoughts are on backward compatibility on this one. technically it (handle_callback) was part of a public class and not marked :meta private: in the docstring so.... technically it was probably public and therefore subject to backcompat.

that said, considering it as part of the public API also seems absurd. do you think we should put in our "public API" some "cover your ass" type of language that sort of expresses that .... methods which are clearly not for public use, even if not marked internal, are internal. maybe we could add some language that explains what that means. like methods not related to the dag authoring interface etc -- not sure.

but in any case, we should mark fetch_callback as private by either prefixing with underscore or adding :meta private:.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks backward compatible to me, Github makes it confusing but handle_callback is not removed, look at line 1423

Copy link
Copy Markdown
Member

@potiuk potiuk Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it's not "disappearing" now so not an issue.

But I also would like to have a bit of philosophical rant here.

I thnk we are approaching backwards compatibility here as "0 / 1". And I keep on repeating it's totally wrong. Hyrum's right is totallly right here: https://www.hyrumslaw.com/ - in sufficiantly complex system ANY change is braaking. You would have to stop changing things to stop things from breaking. There is no way around it. And we cannot describe super precise rules about it upfront. We cannot really say:

  • any time we change this and that, we are technically breaking things

That would make us slow and very shortly we would lose any flexibliity. I think we will never achieve 100% correctness and sets of rules that will clearly say for each change "breaking/not breaking" in automated way. We can approach it and get closer to it by adding more and more rules and description - but we will at most asymptotically get closer to the certainty - never achieving it and at some point in time, adding more and more rules will make it more not less confusing and contradicting. So we should strive for 'good enough" and "pretty correct" set of rules - but also accept the fact that there will be exceptions and room for interpretation and even for arbitrary decisions that others (including some of our users) might not agree with.

As I see it (and what I think SemVer also explains) Backwards Compatiility ad Semver is NOT about following certain rules "adding a parameter is breaking, renamig any method no marked as private is breaking". IMHO this is about three things:

  1. what is the INTENTION we had when we created the code - were we INTENDING to make it relied on? Was described and explained that users were supposed to rely on it ? Or was their reliance on certain methods and fields accidental and the fact that method was there was just "assumed" they can rely on it?

  2. How likely it is tha many of our users made such assumptions if it was not clearly documented, and explained - or even if they could take the impression it was, how likely it is we are breaking something sersious.

  3. How difficult it is to recover for our users. If the system is failing immediately and what the user needs to do is flipping the flag to bring back the old behaviour - is it breaking or not? If the system is not failing but the change in behaviour is not persistent nor dangerous and the user might bring it back with a flip of a flag - is it breaking or not?

And yes - it means we will sometimes have to make arbitrary decisions based on gut feelings not data nor precise rules followed. And yes - it means that sometimes there will be individual angry users who will tells us "but you promised backwards compatibiliity - bring it back NOW", and there will be cases where we disageree between ourselves - maintainers - what is backwards compatible and what is. not and we will have to vote on it eventually. And yes - sometimes it will mean we will take a wrong decisiion and break too many workflows of too many users and we will have to quickly release a bugfix that will revert it.

All this. And more. And we will remains humans making sometimes flawed and imperfect decisions based on our insticts and intentions and gut feelings not data and strict rules - rather than robots following precise rules and prescribed algorithms. I think this is why we - as maintainers are still needed in the project - to make such decisions.

Sorry If I've gotten a bit too philosophical, but I do think we are quite too often trying to make things crystal clear and be free of making the decisions so that we don't have to well, make decisions.

It's needed in many cases - that's why I am also adding a lot of rules on how we approach things - for example provider's maintenance lifecycale. But I treat it more as communication tool and write down our intentions and where possible leave enough room for interpretation and decision making.

Where we can - yes we should make clear rule. But when we can't we should state our intentions, communicate general principles, and simply try - as best as we can - to fulfill those stated intentions (but we should attempt to communicate those intentions so that our users are aware of them).

dag: DAG,
dag_run_id: str,
success: bool = True,
reason: str | None = None,
*,
session: Session = NEW_SESSION,
) -> tuple[list[TaskStateChangeCallback], Context] | None:
"""
Fetch the appropriate callbacks depending on the value of success.

This method gets the context of a single TaskInstance part of this DagRun and returns it along
the list of callbacks.

:param dag: DAG object
:param dag_run_id: The DAG run ID
:param success: Flag to specify if failure or success callback should be called
:param reason: Completion reason
:param session: Database session
"""
callbacks = dag.on_success_callback if success else dag.on_failure_callback
if callbacks:
dagrun = DAG.fetch_dagrun(dag_id=dag.dag_id, run_id=dag_run_id, session=session)
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
tis = dagrun.get_task_instances(session=session)
ti = tis[-1] # get first TaskInstance of DagRun
Comment thread
potiuk marked this conversation as resolved.
ti.task = dag.get_task(ti.task_id)
context = ti.get_template_context(session=session)
context["reason"] = reason
return callbacks, context
return None

@provide_session
def handle_callback(self, dagrun: DagRun, success=True, reason=None, session=NEW_SESSION):
"""
Triggers on_failure_callback or on_success_callback as appropriate.

Expand All @@ -1400,21 +1436,29 @@ def handle_callback(self, dagrun, success=True, reason=None, session=NEW_SESSION
:param reason: Completion reason
:param session: Database session
"""
callbacks = self.on_success_callback if success else self.on_failure_callback
if callbacks:
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
tis = dagrun.get_task_instances(session=session)
ti = tis[-1] # get first TaskInstance of DagRun
ti.task = self.get_task(ti.task_id)
context = ti.get_template_context(session=session)
context.update({"reason": reason})
callbacks, context = DAG.fetch_callback(
dag=self, dag_run_id=dagrun.run_id, success=success, reason=reason, session=session
) or (None, None)

DAG.execute_callback(callbacks, context, self.dag_id)

@classmethod
def execute_callback(cls, callbacks: list[Callable] | None, context: Context | None, dag_id: str):
"""
Triggers the callbacks with the given context.

:param callbacks: List of callbacks to call
:param context: Context to pass to all callbacks
:param dag_id: The dag_id of the DAG to find.
"""
if callbacks and context:
for callback in callbacks:
self.log.info("Executing dag callback function: %s", callback)
cls.logger().info("Executing dag callback function: %s", callback)
try:
callback(context)
except Exception:
self.log.exception("failed to invoke dag state update callback")
Stats.incr("dag.callback_exceptions", tags={"dag_id": dagrun.dag_id})
cls.logger().exception("failed to invoke dag state update callback")
Stats.incr("dag.callback_exceptions", tags={"dag_id": dag_id})

def get_active_runs(self):
"""
Expand Down Expand Up @@ -1452,16 +1496,19 @@ def get_num_active_runs(self, external_trigger=None, only_running=True, session=

return session.scalar(query)

@staticmethod
@internal_api_call
@provide_session
def get_dagrun(
self,
def fetch_dagrun(
dag_id: str,
execution_date: datetime | None = None,
run_id: str | None = None,
session: Session = NEW_SESSION,
):
) -> DagRun | DagRunPydantic:
"""
Return the dag run for a given execution date or run_id if it exists, otherwise none.

:param dag_id: The dag_id of the DAG to find.
:param execution_date: The execution date of the DagRun to find.
:param run_id: The run_id of the DagRun to find.
:param session:
Expand All @@ -1471,11 +1518,22 @@ def get_dagrun(
raise TypeError("You must provide either the execution_date or the run_id")
query = select(DagRun)
if execution_date:
query = query.where(DagRun.dag_id == self.dag_id, DagRun.execution_date == execution_date)
query = query.where(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
if run_id:
query = query.where(DagRun.dag_id == self.dag_id, DagRun.run_id == run_id)
query = query.where(DagRun.dag_id == dag_id, DagRun.run_id == run_id)
return session.scalar(query)

@provide_session
def get_dagrun(
self,
execution_date: datetime | None = None,
run_id: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun | DagRunPydantic:
return DAG.fetch_dagrun(
dag_id=self.dag_id, execution_date=execution_date, run_id=run_id, session=session
)
Comment thread
vincbeck marked this conversation as resolved.

@provide_session
def get_dagruns_between(self, start_date, end_date, session=NEW_SESSION):
"""
Expand Down
Loading