diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py index 449e101f7f565..87215f5531a71 100644 --- a/task-sdk/src/airflow/sdk/definitions/dag.py +++ b/task-sdk/src/airflow/sdk/definitions/dag.py @@ -54,7 +54,7 @@ from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.sdk.definitions._internal.node import validate_key -from airflow.sdk.definitions._internal.types import NOTSET +from airflow.sdk.definitions._internal.types import NOTSET, ArgNotSet from airflow.sdk.definitions.asset import AssetAll, BaseAsset from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.param import DagParam, ParamsDict @@ -1014,7 +1014,7 @@ def _validate_owner_links(self, _, owner_links): def test( self, run_after: datetime | None = None, - logical_date: datetime | None = None, + logical_date: datetime | None | ArgNotSet = NOTSET, run_conf: dict[str, Any] | None = None, conn_file_path: str | None = None, variable_file_path: str | None = None, @@ -1082,6 +1082,10 @@ def add_logger_if_needed(ti: TaskInstance): with exit_stack: self.validate() + + # Allow users to explicitly pass None. If it isn't set, we default to current time. + logical_date = logical_date if not isinstance(logical_date, ArgNotSet) else timezone.utcnow() + log.debug("Clearing existing task instances for logical date %s", logical_date) # TODO: Replace with calling client.dag_run.clear in Execution API at some point SchedulerDAG.clear_dags(