Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dynamic_task_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.decorators import task
from airflow.models.dag import DAG

with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

@task
def add_one(x: int):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def execute(self, context):

with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
schedule=None,
start_date=datetime(2022, 3, 4),
catchup=False,
):
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

with DAG(
dag_id="example_setup_teardown",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown_taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

with DAG(
dag_id="example_setup_teardown_taskflow",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_short_circuit_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.utils.trigger_rule import TriggerRule


@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
@dag(schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
def example_short_circuit_decorator():
# [START howto_operator_short_circuit]
@task.short_circuit()
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

with DAG(
dag_id="example_short_circuit_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 2 additions & 0 deletions airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

import pendulum
Expand Down Expand Up @@ -63,6 +64,7 @@ def create_test_pipeline(suffix, trigger_rule):

with DAG(
dag_id="example_skip_dag",
schedule=datetime.timedelta(days=1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# [START howto_task_group]
with DAG(
dag_id="example_task_group",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def task_group_function(value: int) -> None:
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group_decorator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
8 changes: 7 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,12 @@ def __init__(
self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule))
self.schedule_interval = self.timetable.summary
elif isinstance(schedule, ArgNotSet):
warnings.warn(
"Creating a DAG with an implicit schedule is deprecated, and will stop working "
"in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.timetable = create_timetable(schedule, self.timezone)
self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL
else:
Expand Down Expand Up @@ -3647,7 +3653,7 @@ def get_serialized_fields(cls):
"auto_register",
"fail_stop",
}
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test"))) - exclusion_list
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test", schedule=None))) - exclusion_list
return cls.__serialized_fields

def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeInfoType:
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@ def serialize_dag(cls, dag: DAG) -> dict:
@classmethod
def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
"""Deserializes a DAG from a JSON object."""
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"])
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"], schedule=None)

for k, v in encoded_dag.items():
if k == "_downstream_task_ids":
Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@


def create_context(task) -> Context:
dag = DAG(dag_id="dag")
dag = DAG(dag_id="dag", schedule=None)
execution_date = timezone.datetime(
2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam")
)
Expand Down
37 changes: 19 additions & 18 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ def configured_app(minimal_app_for_api):

with DAG(
DAG_ID,
schedule=None,
start_date=datetime(2020, 6, 15),
doc_md="details",
params={"foo": 1},
tags=["example"],
) as dag:
EmptyOperator(task_id=TASK_ID)

with DAG(DAG2_ID, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
EmptyOperator(task_id=TASK_ID)

with DAG(DAG3_ID) as dag3: # DAG start_date set to None
with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None
EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12))

dag_bag = DagBag(os.devnull, include_examples=False)
Expand Down Expand Up @@ -988,10 +989,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand All @@ -1016,10 +1017,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1938,10 +1939,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1971,10 +1972,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def teardown_method(self) -> None:
clear_db_xcom()

def _create_dag(self):
with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag:
with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag:
CustomOperator(task_id="TEST_SINGLE_LINK", bash_command="TEST_LINK_VALUE")
CustomOperator(
task_id="TEST_MULTIPLE_LINK", bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu

# Recreate DAG without tasks
dagbag = self.app.dag_bag
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time))
del dagbag.dags[self.DAG_ID]
dagbag.bag_dag(dag=dag, root_dag=dag)

Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class TestTaskEndpoint:

@pytest.fixture(scope="class")
def setup_dag(self, configured_app):
with DAG(self.dag_id, start_date=self.task1_start_date, doc_md="details") as dag:
with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date, doc_md="details") as dag:
task1 = EmptyOperator(task_id=self.task_id, params={"foo": "bar"})
task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)

with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
with DAG(self.mapped_dag_id, schedule=None, start_date=self.task1_start_date) as mapped_dag:
EmptyOperator(task_id=self.task_id3)
# Use the private _expand() method to avoid the empty kwargs check.
# We don't care about how the operator runs here, only its presence.
Expand Down
3 changes: 2 additions & 1 deletion tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timedelta

import pendulum
import pytest
Expand Down Expand Up @@ -158,6 +158,7 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer):
def test_serialize_test_dag_detail_schema(url_safe_serializer):
dag = DAG(
dag_id="test_dag",
schedule=timedelta(days=1),
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
Expand Down
6 changes: 5 additions & 1 deletion tests/api_experimental/common/test_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ def setup_dag_models(self, for_sub_dag=False):

task = EmptyOperator(
task_id="dummy",
dag=DAG(dag_id=self.key, default_args={"start_date": timezone.datetime(2022, 1, 1)}),
dag=DAG(
dag_id=self.key,
schedule=None,
default_args={"start_date": timezone.datetime(2022, 1, 1)},
),
owner="airflow",
)

Expand Down
16 changes: 12 additions & 4 deletions tests/api_experimental/common/test_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_trigger_dag_dag_not_found(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
dag_id = "dag_run_exist"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run_mock.find_duplicate.return_value = DagRun()
Expand Down Expand Up @@ -90,7 +90,11 @@ def test_trigger_dag_include_nested_subdags(self, dag_bag_mock, dag_run_mock, da
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_too_early_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand All @@ -100,7 +104,11 @@ def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_valid_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_bag_mock.dags_hash = {}
Expand All @@ -120,7 +128,7 @@ def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_conf(self, dag_bag_mock, conf, expected_conf):
dag_id = "trigger_dag_with_conf"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand Down
5 changes: 4 additions & 1 deletion tests/callbacks/test_callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def test_from_json(self, input, request_class):
if input is None:
ti = TaskInstance(
task=BashOperator(
task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now()
task_id="test",
bash_command="true",
start_date=datetime.now(),
dag=DAG(dag_id="id", schedule=None),
),
run_id="fake_run",
state=State.RUNNING,
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def dag_maker(request):
if serialized_marker:
(want_serialized,) = serialized_marker.args or (True,)

from airflow.utils.helpers import NOTSET
from airflow.utils.log.logging_mixin import LoggingMixin

class DagFactory(LoggingMixin):
Expand Down Expand Up @@ -923,6 +924,7 @@ def create_dagrun_after(self, dagrun, **kwargs):
def __call__(
self,
dag_id="test_dag",
schedule=NOTSET,
serialized=want_serialized,
fileloc=None,
processor_subdir=None,
Expand Down Expand Up @@ -951,6 +953,12 @@ def __call__(
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
self.start_date = DEFAULT_DATE
self.kwargs["start_date"] = self.start_date
# Set schedule argument to explicitly set value, or a default if no
# other scheduling arguments are set.
if schedule is not NOTSET:
self.kwargs["schedule"] = schedule
elif "timetable" not in self.kwargs and "schedule_interval" not in self.kwargs:
self.kwargs["schedule"] = timedelta(days=1)
self.dag = DAG(dag_id, **self.kwargs)
self.dag.fileloc = fileloc or request.module.__file__
self.want_serialized = serialized
Expand Down
13 changes: 12 additions & 1 deletion tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_callbacks, clear_db_dags, clear_db_runs, clear_db_serialized_dags
from tests.test_utils.db import (
clear_db_callbacks,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
clear_db_serialized_dags,
)

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -148,7 +154,12 @@ def run_processor_manager_one_loop(self, manager, parent_pipe):
return results
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")

@pytest.fixture
def clear_parse_import_errors(self):
clear_db_import_errors()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.usefixtures("clear_parse_import_errors")
@conf_vars({("core", "load_examples"): "False"})
def test_remove_file_clears_import_error(self, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"
Expand Down
4 changes: 3 additions & 1 deletion tests/dags/test_cli_triggered_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def success(ti=None, *args, **kwargs):
# DAG tests that tasks ignore all dependencies

dag1 = DAG(
dag_id="test_run_ignores_all_dependencies", default_args=dict(depends_on_past=True, **default_args)
dag_id="test_run_ignores_all_dependencies",
schedule=None,
default_args={"depends_on_past": True, **default_args},
)
dag1_task1 = PythonOperator(task_id="test_run_dependency_task", python_callable=fail, dag=dag1)
dag1_task2 = PythonOperator(task_id="test_run_dependent_task", python_callable=success, dag=dag1)
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_dagrun_fast_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


dag_id = "test_dagrun_fast_follow"
dag = DAG(dag_id=dag_id, default_args=args)
dag = DAG(dag_id=dag_id, schedule=None, default_args=args)

# A -> B -> C
task_a = PythonOperator(task_id="A", dag=dag, python_callable=lambda: True)
Expand Down
Loading