diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 3291bc250b085..23f6e1abbe5e1 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -52,7 +52,7 @@ from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS from airflow.typing_compat import Literal -from airflow.utils import cli as cli_utils +from airflow.utils import cli as cli_utils, timezone from airflow.utils.cli import ( get_dag, get_dag_by_file_location, @@ -61,7 +61,6 @@ should_ignore_depends_on_past, suppress_logs_and_warning, ) -from airflow.utils.dates import timezone from airflow.utils.log.file_task_handler import _set_task_deferred_context_var from airflow.utils.log.logging_mixin import StreamLogWriter from airflow.utils.log.secrets_masker import RedactedIO diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 0ff60a57df56f..a45ad81d30fc3 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -17,13 +17,6 @@ # under the License. from __future__ import annotations -from datetime import datetime, timedelta -from typing import TYPE_CHECKING - -from croniter import croniter - -from airflow.utils import timezone - cron_presets: dict[str, str] = { "@hourly": "0 * * * *", "@daily": "0 0 * * *", @@ -33,99 +26,6 @@ "@yearly": "0 0 1 1 *", } -if TYPE_CHECKING: - from dateutil.relativedelta import relativedelta # for doctest - - -def round_time( - dt: datetime, - delta: str | timedelta | relativedelta, - start_date: datetime = timezone.make_aware(datetime.min), -): - """ - Return ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``. - - .. code-block:: pycon - - >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) - datetime.datetime(2015, 1, 1, 0, 0) - >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) - datetime.datetime(2015, 1, 1, 0, 0) - >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 16, 0, 0) - >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 15, 0, 0) - >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 14, 0, 0) - >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 14, 0, 0) - """ - if isinstance(delta, str): - # It's cron based, so it's easy - time_zone = start_date.tzinfo - start_date = timezone.make_naive(start_date, time_zone) - cron = croniter(delta, start_date) - prev = cron.get_prev(datetime) - if prev == start_date: - return timezone.make_aware(start_date, time_zone) - else: - return timezone.make_aware(prev, time_zone) - - # Ignore the microseconds of dt - dt -= timedelta(microseconds=dt.microsecond) - - # We are looking for a datetime in the form start_date + i * delta - # which is as close as possible to dt. Since delta could be a relative - # delta we don't know its exact length in seconds so we cannot rely on - # division to find i. Instead we employ a binary search algorithm, first - # finding an upper and lower limit and then dissecting the interval until - # we have found the closest match. - - # We first search an upper limit for i for which start_date + upper * delta - # exceeds dt. - upper = 1 - while start_date + upper * delta < dt: - # To speed up finding an upper limit we grow this exponentially by a - # factor of 2 - upper *= 2 - - # Since upper is the first value for which start_date + upper * delta - # exceeds dt, upper // 2 is below dt and therefore forms a lower limited - # for the i we are looking for - lower = upper // 2 - - # We now continue to intersect the interval between - # start_date + lower * delta and start_date + upper * delta - # until we find the closest value - while True: - # Invariant: start + lower * delta < dt <= start + upper * delta - # If start_date + (lower + 1)*delta exceeds dt, then either lower or - # lower+1 has to be the solution we are searching for - if start_date + (lower + 1) * delta >= dt: - # Check if start_date + (lower + 1)*delta or - # start_date + lower*delta is closer to dt and return the solution - if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta): - return start_date + (lower + 1) * delta - else: - return start_date + lower * delta - - # We intersect the interval and either replace the lower or upper - # limit with the candidate - candidate = lower + (upper - lower) // 2 - if start_date + candidate * delta >= dt: - upper = candidate - else: - lower = candidate - - # in the special case when start_date > dt the search for upper will - # immediately stop for upper == 1 which results in lower = upper // 2 = 0 - # and this function returns start_date. - - -def parse_execution_date(execution_date_str): - """Parse execution date string to datetime object.""" - return timezone.parse(execution_date_str) - def datetime_to_nano(datetime) -> int: """Convert datetime to nanoseconds.""" diff --git a/newsfragments/43533.significant.rst b/newsfragments/43533.significant.rst new file mode 100644 index 0000000000000..7b84c2bf87ff9 --- /dev/null +++ b/newsfragments/43533.significant.rst @@ -0,0 +1,8 @@ +Unused and redundant functions from ``airflow.utils.dates`` module have been removed. + +Following functions are removed: + +- ``parse_execution_date`` +- ``round_time`` +- ``scale_time_units`` +- ``infer_time_unit`` diff --git a/providers/tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py b/providers/tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py index e40db31eb2385..16e5d44f89e2f 100644 --- a/providers/tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py +++ b/providers/tests/fab/auth_manager/api_endpoints/test_xcom_endpoint.py @@ -26,7 +26,7 @@ from airflow.models.xcom import BaseXCom, XCom from airflow.operators.empty import EmptyOperator from airflow.security import permissions -from airflow.utils.dates import parse_execution_date +from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.types import DagRunType @@ -118,7 +118,7 @@ def test_should_respond_200_with_tilde_and_granular_dag_access(self): dag_id_1 = "test-dag-id-1" task_id_1 = "test-task-id-1" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) dag_run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id_1, dag_run_id_1, execution_date_parsed, task_id_1) diff --git a/providers/tests/system/amazon/aws/example_mongo_to_s3.py b/providers/tests/system/amazon/aws/example_mongo_to_s3.py index 6fa5f612b0681..80e02510e3bad 100644 --- a/providers/tests/system/amazon/aws/example_mongo_to_s3.py +++ b/providers/tests/system/amazon/aws/example_mongo_to_s3.py @@ -20,7 +20,7 @@ from airflow.models.dag import DAG from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.amazon.aws.transfers.mongo_to_s3 import MongoToS3Operator -from airflow.utils.dates import datetime +from airflow.utils.timezone import datetime from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.amazon.aws.utils import SystemTestContextBuilder diff --git a/providers/tests/system/common/sql/example_sql_column_table_check.py b/providers/tests/system/common/sql/example_sql_column_table_check.py index 4033c12faf43b..49c5d94ce9230 100644 --- a/providers/tests/system/common/sql/example_sql_column_table_check.py +++ b/providers/tests/system/common/sql/example_sql_column_table_check.py @@ -19,7 +19,7 @@ from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator, SQLTableCheckOperator -from airflow.utils.dates import datetime +from airflow.utils import timezone AIRFLOW_DB_METADATA_TABLE = "ab_role" connection_args = { @@ -36,7 +36,7 @@ "example_sql_column_table_check", description="Example DAG for SQLColumnCheckOperator and SQLTableCheckOperator.", default_args=connection_args, - start_date=datetime(2021, 1, 1), + start_date=timezone.datetime(2021, 1, 1), schedule=None, catchup=False, ) as dag: diff --git a/providers/tests/system/common/sql/example_sql_execute_query.py b/providers/tests/system/common/sql/example_sql_execute_query.py index 535798046305b..e1083080671ab 100644 --- a/providers/tests/system/common/sql/example_sql_execute_query.py +++ b/providers/tests/system/common/sql/example_sql_execute_query.py @@ -19,7 +19,7 @@ from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -from airflow.utils.dates import datetime +from airflow.utils.timezone import datetime AIRFLOW_DB_METADATA_TABLE = "ab_role" connection_args = { diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 6f12f77e08d4b..a2c267428ac66 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -26,7 +26,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend from airflow.operators.empty import EmptyOperator -from airflow.utils.dates import parse_execution_date +from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.timezone import utcnow from airflow.utils.types import DagRunType @@ -111,7 +111,7 @@ def test_should_respond_200_stringify(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( @@ -137,7 +137,7 @@ def test_should_respond_200_native(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( @@ -164,7 +164,7 @@ def test_should_respond_200_native_for_pickled(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359} self._create_xcom_entry( @@ -193,7 +193,7 @@ def test_should_raise_404_for_non_existent_xcom(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key) response = self.client.get( @@ -208,7 +208,7 @@ def test_should_raises_401_unauthenticated(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key) response = self.client.get( @@ -222,7 +222,7 @@ def test_should_raise_403_forbidden(self): task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key) @@ -318,7 +318,7 @@ def test_should_respond_200(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id, run_id, execution_date_parsed, task_id) @@ -360,7 +360,7 @@ def test_should_respond_200_with_tilde_and_access_to_all_dags(self): dag_id_1 = "test-dag-id-1" task_id_1 = "test-task-id-1" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id_1, run_id_1, execution_date_parsed, task_id_1) @@ -423,7 +423,7 @@ def test_should_respond_200_with_map_index(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id, dag_run_id, execution_date_parsed, task_id, mapped_ti=True) @@ -467,7 +467,7 @@ def test_should_respond_200_with_xcom_key(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id, dag_run_id, execution_date_parsed, task_id, mapped_ti=True) @@ -509,7 +509,7 @@ def test_should_raises_401_unauthenticated(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" - execution_date_parsed = parse_execution_date(execution_date) + execution_date_parsed = timezone.parse(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) self._create_xcom_entries(dag_id, run_id, execution_date_parsed, task_id) @@ -594,7 +594,7 @@ def setup_method(self): self.dag_id = "test-dag-id" self.task_id = "test-task-id" self.execution_date = "2005-04-02T00:00:00+00:00" - self.execution_date_parsed = parse_execution_date(self.execution_date) + self.execution_date_parsed = timezone.parse(self.execution_date) self.run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.execution_date_parsed) @pytest.mark.parametrize( diff --git a/tests/api_connexion/schemas/test_xcom_schema.py b/tests/api_connexion/schemas/test_xcom_schema.py index 8714b3430fac8..5863dd48c6186 100644 --- a/tests/api_connexion/schemas/test_xcom_schema.py +++ b/tests/api_connexion/schemas/test_xcom_schema.py @@ -28,7 +28,7 @@ xcom_schema_string, ) from airflow.models import DagRun, XCom -from airflow.utils.dates import parse_execution_date +from airflow.utils import timezone from airflow.utils.session import create_session from tests_common.test_utils.config import conf_vars @@ -90,7 +90,7 @@ def maker(dag_id, task_id, execution_date, key, map_index=-1, value=None): class TestXComCollectionItemSchema: default_time = "2016-04-02T21:00:00+00:00" - default_time_parsed = parse_execution_date(default_time) + default_time_parsed = timezone.parse(default_time) def test_serialize(self, create_xcom, session): create_xcom( @@ -133,8 +133,8 @@ def test_deserialize(self): class TestXComCollectionSchema: default_time_1 = "2016-04-02T21:00:00+00:00" default_time_2 = "2016-04-02T21:01:00+00:00" - time_1 = parse_execution_date(default_time_1) - time_2 = parse_execution_date(default_time_2) + time_1 = timezone.parse(default_time_1) + time_2 = timezone.parse(default_time_2) def test_serialize(self, create_xcom, session): create_xcom( @@ -188,7 +188,7 @@ def test_serialize(self, create_xcom, session): class TestXComSchema: default_time = "2016-04-02T21:00:00+00:00" - default_time_parsed = parse_execution_date(default_time) + default_time_parsed = timezone.parse(default_time) @conf_vars({("core", "enable_xcom_pickling"): "True"}) def test_serialize(self, create_xcom, session): diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index 63949fe9c1600..d91f6738822a1 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -41,7 +41,7 @@ from airflow.models.dagbag import DagBag from airflow.models.serialized_dag import SerializedDagModel from airflow.serialization.serialized_objects import SerializedDAG -from airflow.utils.dates import timezone as tz +from airflow.utils import timezone as tz from airflow.utils.session import create_session from airflow.www.security_appless import ApplessAirflowSecurityManager diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py deleted file mode 100644 index 702bd730c1eb6..0000000000000 --- a/tests/utils/test_dates.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.utils import dates, timezone - - -class TestDates: - def test_parse_execution_date(self): - execution_date_str_wo_ms = "2017-11-02 00:00:00" - execution_date_str_w_ms = "2017-11-05 16:18:30.989729" - bad_execution_date_str = "2017-11-06TXX:00:00Z" - - assert timezone.datetime(2017, 11, 2, 0, 0, 0) == dates.parse_execution_date(execution_date_str_wo_ms) - assert timezone.datetime(2017, 11, 5, 16, 18, 30, 989729) == dates.parse_execution_date( - execution_date_str_w_ms - ) - with pytest.raises(ValueError): - dates.parse_execution_date(bad_execution_date_str)