Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
from airflow.typing_compat import Literal
from airflow.utils import cli as cli_utils
from airflow.utils import cli as cli_utils, timezone
from airflow.utils.cli import (
get_dag,
get_dag_by_file_location,
Expand All @@ -61,7 +61,6 @@
should_ignore_depends_on_past,
suppress_logs_and_warning,
)
from airflow.utils.dates import timezone
from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
from airflow.utils.log.logging_mixin import StreamLogWriter
from airflow.utils.log.secrets_masker import RedactedIO
Expand Down
100 changes: 0 additions & 100 deletions airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@
# under the License.
from __future__ import annotations

from datetime import datetime, timedelta
from typing import TYPE_CHECKING

from croniter import croniter

from airflow.utils import timezone

cron_presets: dict[str, str] = {
"@hourly": "0 * * * *",
"@daily": "0 0 * * *",
Expand All @@ -33,99 +26,6 @@
"@yearly": "0 0 1 1 *",
}

if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta # for doctest


def round_time(
dt: datetime,
delta: str | timedelta | relativedelta,
start_date: datetime = timezone.make_aware(datetime.min),
):
"""
Return ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``.

.. code-block:: pycon

>>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
datetime.datetime(2015, 1, 1, 0, 0)
>>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
datetime.datetime(2015, 1, 1, 0, 0)
>>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 16, 0, 0)
>>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 15, 0, 0)
>>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
>>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
"""
if isinstance(delta, str):
# It's cron based, so it's easy
time_zone = start_date.tzinfo
start_date = timezone.make_naive(start_date, time_zone)
cron = croniter(delta, start_date)
prev = cron.get_prev(datetime)
if prev == start_date:
return timezone.make_aware(start_date, time_zone)
else:
return timezone.make_aware(prev, time_zone)

# Ignore the microseconds of dt
dt -= timedelta(microseconds=dt.microsecond)

# We are looking for a datetime in the form start_date + i * delta
# which is as close as possible to dt. Since delta could be a relative
# delta we don't know its exact length in seconds so we cannot rely on
# division to find i. Instead we employ a binary search algorithm, first
# finding an upper and lower limit and then dissecting the interval until
# we have found the closest match.

# We first search an upper limit for i for which start_date + upper * delta
# exceeds dt.
upper = 1
while start_date + upper * delta < dt:
# To speed up finding an upper limit we grow this exponentially by a
# factor of 2
upper *= 2

# Since upper is the first value for which start_date + upper * delta
# exceeds dt, upper // 2 is below dt and therefore forms a lower limited
# for the i we are looking for
lower = upper // 2

# We now continue to intersect the interval between
# start_date + lower * delta and start_date + upper * delta
# until we find the closest value
while True:
# Invariant: start + lower * delta < dt <= start + upper * delta
# If start_date + (lower + 1)*delta exceeds dt, then either lower or
# lower+1 has to be the solution we are searching for
if start_date + (lower + 1) * delta >= dt:
# Check if start_date + (lower + 1)*delta or
# start_date + lower*delta is closer to dt and return the solution
if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta):
return start_date + (lower + 1) * delta
else:
return start_date + lower * delta

# We intersect the interval and either replace the lower or upper
# limit with the candidate
candidate = lower + (upper - lower) // 2
if start_date + candidate * delta >= dt:
upper = candidate
else:
lower = candidate

# in the special case when start_date > dt the search for upper will
# immediately stop for upper == 1 which results in lower = upper // 2 = 0
# and this function returns start_date.


def parse_execution_date(execution_date_str):
"""Parse execution date string to datetime object."""
return timezone.parse(execution_date_str)


def datetime_to_nano(datetime) -> int:
"""Convert datetime to nanoseconds."""
Expand Down
8 changes: 8 additions & 0 deletions newsfragments/43533.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Unused and redundant functions from ``airflow.utils.dates`` module have been removed.

Following functions are removed:

- ``parse_execution_date``
- ``round_time``
- ``scale_time_units``
- ``infer_time_unit``
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.models.xcom import BaseXCom, XCom
from airflow.operators.empty import EmptyOperator
from airflow.security import permissions
from airflow.utils.dates import parse_execution_date
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType

Expand Down Expand Up @@ -118,7 +118,7 @@ def test_should_respond_200_with_tilde_and_granular_dag_access(self):
dag_id_1 = "test-dag-id-1"
task_id_1 = "test-task-id-1"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
dag_run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entries(dag_id_1, dag_run_id_1, execution_date_parsed, task_id_1)

Expand Down
2 changes: 1 addition & 1 deletion providers/tests/system/amazon/aws/example_mongo_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.amazon.aws.transfers.mongo_to_s3 import MongoToS3Operator
from airflow.utils.dates import datetime
from airflow.utils.timezone import datetime
from airflow.utils.trigger_rule import TriggerRule

from providers.tests.system.amazon.aws.utils import SystemTestContextBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator, SQLTableCheckOperator
from airflow.utils.dates import datetime
from airflow.utils import timezone

AIRFLOW_DB_METADATA_TABLE = "ab_role"
connection_args = {
Expand All @@ -36,7 +36,7 @@
"example_sql_column_table_check",
description="Example DAG for SQLColumnCheckOperator and SQLTableCheckOperator.",
default_args=connection_args,
start_date=datetime(2021, 1, 1),
start_date=timezone.datetime(2021, 1, 1),
schedule=None,
catchup=False,
) as dag:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.dates import datetime
from airflow.utils.timezone import datetime

AIRFLOW_DB_METADATA_TABLE = "ab_role"
connection_args = {
Expand Down
26 changes: 13 additions & 13 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.models.xcom import BaseXCom, XCom, resolve_xcom_backend
from airflow.operators.empty import EmptyOperator
from airflow.utils.dates import parse_execution_date
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.timezone import utcnow
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -111,7 +111,7 @@ def test_should_respond_200_stringify(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
Expand All @@ -137,7 +137,7 @@ def test_should_respond_200_native(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
Expand All @@ -164,7 +164,7 @@ def test_should_respond_200_native_for_pickled(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
value_non_serializable_key = {("201009_NB502104_0421_AHJY23BGXG (SEQ_WF: 138898)", None): 82359}
self._create_xcom_entry(
Expand Down Expand Up @@ -193,7 +193,7 @@ def test_should_raise_404_for_non_existent_xcom(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key)
response = self.client.get(
Expand All @@ -208,7 +208,7 @@ def test_should_raises_401_unauthenticated(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key)
response = self.client.get(
Expand All @@ -222,7 +222,7 @@ def test_should_raise_403_forbidden(self):
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)

self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key)
Expand Down Expand Up @@ -318,7 +318,7 @@ def test_should_respond_200(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)

self._create_xcom_entries(dag_id, run_id, execution_date_parsed, task_id)
Expand Down Expand Up @@ -360,7 +360,7 @@ def test_should_respond_200_with_tilde_and_access_to_all_dags(self):
dag_id_1 = "test-dag-id-1"
task_id_1 = "test-task-id-1"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id_1 = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entries(dag_id_1, run_id_1, execution_date_parsed, task_id_1)

Expand Down Expand Up @@ -423,7 +423,7 @@ def test_should_respond_200_with_map_index(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entries(dag_id, dag_run_id, execution_date_parsed, task_id, mapped_ti=True)

Expand Down Expand Up @@ -467,7 +467,7 @@ def test_should_respond_200_with_xcom_key(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entries(dag_id, dag_run_id, execution_date_parsed, task_id, mapped_ti=True)

Expand Down Expand Up @@ -509,7 +509,7 @@ def test_should_raises_401_unauthenticated(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
execution_date_parsed = parse_execution_date(execution_date)
execution_date_parsed = timezone.parse(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entries(dag_id, run_id, execution_date_parsed, task_id)

Expand Down Expand Up @@ -594,7 +594,7 @@ def setup_method(self):
self.dag_id = "test-dag-id"
self.task_id = "test-task-id"
self.execution_date = "2005-04-02T00:00:00+00:00"
self.execution_date_parsed = parse_execution_date(self.execution_date)
self.execution_date_parsed = timezone.parse(self.execution_date)
self.run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.execution_date_parsed)

@pytest.mark.parametrize(
Expand Down
10 changes: 5 additions & 5 deletions tests/api_connexion/schemas/test_xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
xcom_schema_string,
)
from airflow.models import DagRun, XCom
from airflow.utils.dates import parse_execution_date
from airflow.utils import timezone
from airflow.utils.session import create_session

from tests_common.test_utils.config import conf_vars
Expand Down Expand Up @@ -90,7 +90,7 @@ def maker(dag_id, task_id, execution_date, key, map_index=-1, value=None):

class TestXComCollectionItemSchema:
default_time = "2016-04-02T21:00:00+00:00"
default_time_parsed = parse_execution_date(default_time)
default_time_parsed = timezone.parse(default_time)

def test_serialize(self, create_xcom, session):
create_xcom(
Expand Down Expand Up @@ -133,8 +133,8 @@ def test_deserialize(self):
class TestXComCollectionSchema:
default_time_1 = "2016-04-02T21:00:00+00:00"
default_time_2 = "2016-04-02T21:01:00+00:00"
time_1 = parse_execution_date(default_time_1)
time_2 = parse_execution_date(default_time_2)
time_1 = timezone.parse(default_time_1)
time_2 = timezone.parse(default_time_2)

def test_serialize(self, create_xcom, session):
create_xcom(
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_serialize(self, create_xcom, session):

class TestXComSchema:
default_time = "2016-04-02T21:00:00+00:00"
default_time_parsed = parse_execution_date(default_time)
default_time_parsed = timezone.parse(default_time)

@conf_vars({("core", "enable_xcom_pickling"): "True"})
def test_serialize(self, create_xcom, session):
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.dates import timezone as tz
from airflow.utils import timezone as tz
from airflow.utils.session import create_session
from airflow.www.security_appless import ApplessAirflowSecurityManager

Expand Down
36 changes: 0 additions & 36 deletions tests/utils/test_dates.py

This file was deleted.