Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1064,24 +1064,6 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
# TODO: Remove 'timer_unit_consistency' in Airflow 3.0
timer_unit_consistency:
description: |
Controls the consistency of timer units across all metrics loggers
(e.g., Statsd, Datadog, OpenTelemetry)
for timing and duration-based metrics. When enabled, all timers will publish
metrics in milliseconds for consistency and alignment with Airflow's default
metrics behavior in version 3.0+.

.. warning::

It will be the default behavior from Airflow 3.0. If disabled, timers may publish
in seconds for backwards compatibility, though it is recommended to enable this
setting to ensure metric uniformity and forward-compat with Airflow 3.
version_added: 2.11.0
type: string
example: ~
default: "False"
statsd_on:
description: |
Enables sending metrics to StatsD.
Expand Down
15 changes: 1 addition & 14 deletions airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

import datetime
import logging
import warnings
from typing import TYPE_CHECKING

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
PatternAllowListValidator,
Expand All @@ -42,14 +40,6 @@

log = logging.getLogger(__name__)

timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)


class SafeDogStatsdLogger:
"""DogStatsd Logger."""
Expand Down Expand Up @@ -144,10 +134,7 @@ def timing(
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
dt = dt.total_seconds() * 1000.0
return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
return None

Expand Down
14 changes: 1 addition & 13 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -73,14 +72,6 @@
# Delimiter is placed between the universal metric prefix and the unique metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."

timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)


def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
"""Assembles the prefix, delimiter, and name and returns it as a string."""
Expand Down Expand Up @@ -284,10 +275,7 @@ def timing(
"""OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
dt = dt.total_seconds() * 1000.0
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags)

def timer(
Expand Down
16 changes: 1 addition & 15 deletions airflow/metrics/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,12 @@

import datetime
import time
import warnings
from typing import Union

from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.typing_compat import Protocol

DeltaType = Union[int, float, datetime.timedelta]

timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)


class TimerProtocol(Protocol):
"""Type protocol for StatsLogger.timer."""
Expand Down Expand Up @@ -127,9 +116,6 @@ def start(self) -> Timer:
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
if timer_unit_consistency:
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
else:
self.duration = time.perf_counter() - self._start_time
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
if send and self.real_timer:
self.real_timer.stop()
20 changes: 2 additions & 18 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import operator
import os
import signal
import warnings
from collections import defaultdict
from contextlib import nullcontext
from datetime import timedelta
Expand Down Expand Up @@ -85,7 +84,6 @@
AirflowSkipException,
AirflowTaskTerminated,
AirflowTaskTimeout,
RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
UnmappableXComLengthPushed,
Expand Down Expand Up @@ -176,14 +174,6 @@

PAST_DEPENDS_MET = "past_depends_met"

timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)


class TaskReturnCode(Enum):
"""
Expand Down Expand Up @@ -2827,10 +2817,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if timer_unit_consistency:
timing = timezone.utcnow() - self.queued_dttm
else:
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
timing = timezone.utcnow() - self.queued_dttm
elif new_state == TaskInstanceState.QUEUED:
metric_name = "scheduled_duration"
if self.start_date is None:
Expand All @@ -2843,10 +2830,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if timer_unit_consistency:
timing = timezone.utcnow() - self.start_date
else:
timing = (timezone.utcnow() - self.start_date).total_seconds()
timing = timezone.utcnow() - self.start_date
else:
raise NotImplementedError("no metric emission setup for state %s", new_state)

Expand Down
11 changes: 0 additions & 11 deletions newsfragments/39908.significant.rst

This file was deleted.

8 changes: 8 additions & 0 deletions newsfragments/43975.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Timer and timing metrics are now standardized to milliseconds

In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section is removed as it is now the default behaviour.
This is done to standardize all timer and timing metrics to milliseconds across all metric loggers.

Airflow 2.11 introduced the ``timer_unit_consistency`` setting in the ``metrics`` section of the configuration file. The
default value was ``False`` which meant that the timer and timing metrics were logged in seconds. This was done to maintain
backwards compatibility with the previous versions of Airflow.
56 changes: 10 additions & 46 deletions tests/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from opentelemetry.metrics import MeterProvider

from airflow.exceptions import InvalidStatsNameException
from airflow.metrics import otel_logger, protocols
from airflow.metrics.otel_logger import (
OTEL_NAME_MAX_LENGTH,
UP_DOWN_COUNTERS,
Expand Down Expand Up @@ -235,21 +234,15 @@ def test_gauge_value_is_correct(self, name):

assert self.map[full_name(name)].value == 1

@pytest.mark.parametrize(
"timer_unit_consistency",
[True, False],
)
def test_timing_new_metric(self, timer_unit_consistency, name):
def test_timing_new_metric(self, name):
import datetime

otel_logger.timer_unit_consistency = timer_unit_consistency

self.stats.timing(name, dt=datetime.timedelta(seconds=123))

self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)
expected_value = 123000.0 if timer_unit_consistency else 123
expected_value = 123000.0
assert self.map[full_name(name)].value == expected_value

def test_timing_new_metric_with_tags(self, name):
Expand All @@ -276,81 +269,52 @@ def test_timing_existing_metric(self, name):
# time.perf_count() is called once to get the starting timestamp and again
# to get the end timestamp. timer() should return the difference as a float.

@pytest.mark.parametrize(
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency
def test_timer_with_name_returns_float_and_stores_value(self, mock_time, name):
with self.stats.timer(name) as timer:
pass

assert isinstance(timer.duration, float)
expected_duration = 3140.0 if timer_unit_consistency else 3.14
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
name=full_name(name), callbacks=ANY
)

@pytest.mark.parametrize(
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_no_name_returns_float_but_does_not_store_value(
self, mock_time, timer_unit_consistency, name
):
protocols.timer_unit_consistency = timer_unit_consistency
def test_timer_no_name_returns_float_but_does_not_store_value(self, mock_time, name):
with self.stats.timer() as timer:
pass

assert isinstance(timer.duration, float)
expected_duration = 3140.0 if timer_unit_consistency else 3.14
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()

@pytest.mark.parametrize(
"timer_unit_consistency",
[
True,
False,
],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_false(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency

def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
timer = self.stats.timer(name)
timer.start()
# Perform some task
timer.stop(send=False)

assert isinstance(timer.duration, float)
expected_value = 3140.0 if timer_unit_consistency else 3.14
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_not_called()

@pytest.mark.parametrize(
"timer_unit_consistency",
[
True,
False,
],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_true(self, mock_time, timer_unit_consistency, name):
protocols.timer_unit_consistency = timer_unit_consistency
def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
timer = self.stats.timer(name)
timer.start()
# Perform some task
timer.stop(send=True)

assert isinstance(timer.duration, float)
expected_value = 3140.0 if timer_unit_consistency else 3.14
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_observable_gauge.assert_called_once_with(
Expand Down
25 changes: 4 additions & 21 deletions tests/core/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import airflow
from airflow.exceptions import AirflowConfigException, InvalidStatsNameException
from airflow.metrics import datadog_logger, protocols
from airflow.metrics.datadog_logger import SafeDogStatsdLogger
from airflow.metrics.statsd_logger import SafeStatsdLogger
from airflow.metrics.validators import (
Expand Down Expand Up @@ -221,20 +220,12 @@ def test_does_send_stats_using_dogstatsd_when_statsd_and_dogstatsd_both_on(self)
metric="empty_key", sample_rate=1, tags=[], value=1
)

@pytest.mark.parametrize(
"timer_unit_consistency",
[True, False],
)
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 100.0])
def test_timer(self, time_mock, timer_unit_consistency):
protocols.timer_unit_consistency = timer_unit_consistency

def test_timer(self, time_mock):
with self.dogstatsd.timer("empty_timer") as timer:
pass
self.dogstatsd_client.timed.assert_called_once_with("empty_timer", tags=[])
expected_duration = 100.0
if timer_unit_consistency:
expected_duration = 1000.0 * 100.0
expected_duration = 1000.0 * 100.0
assert expected_duration == timer.duration
assert time_mock.call_count == 2

Expand All @@ -243,22 +234,14 @@ def test_empty_timer(self):
pass
self.dogstatsd_client.timed.assert_not_called()

@pytest.mark.parametrize(
"timer_unit_consistency",
[True, False],
)
def test_timing(self, timer_unit_consistency):
def test_timing(self):
import datetime

datadog_logger.timer_unit_consistency = timer_unit_consistency

self.dogstatsd.timing("empty_timer", 123)
self.dogstatsd_client.timing.assert_called_once_with(metric="empty_timer", value=123, tags=[])

self.dogstatsd.timing("empty_timer", datetime.timedelta(seconds=123))
self.dogstatsd_client.timing.assert_called_with(
metric="empty_timer", value=123000.0 if timer_unit_consistency else 123.0, tags=[]
)
self.dogstatsd_client.timing.assert_called_with(metric="empty_timer", value=123000.0, tags=[])

def test_gauge(self):
self.dogstatsd.gauge("empty", 123)
Expand Down
5 changes: 0 additions & 5 deletions tests_common/_internals/forbidden_warnings.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ def pytest_itemcollected(self, item: pytest.Item):
# Add marker at the beginning of the markers list. In this case, it does not conflict with
# filterwarnings markers, which are set explicitly in the test suite.
item.add_marker(pytest.mark.filterwarnings(f"error::{fw}"), append=False)
item.add_marker(
pytest.mark.filterwarnings(
"ignore:Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.:DeprecationWarning"
)
)

@pytest.hookimpl(hookwrapper=True, trylast=True)
def pytest_sessionfinish(self, session: pytest.Session, exitstatus: int):
Expand Down