diff --git a/model-engine/model_engine_server/api/dependencies.py b/model-engine/model_engine_server/api/dependencies.py index 713938d11..eb2ee2275 100644 --- a/model-engine/model_engine_server/api/dependencies.py +++ b/model-engine/model_engine_server/api/dependencies.py @@ -56,6 +56,7 @@ ABSFilesystemGateway, ABSLLMArtifactGateway, CeleryTaskQueueGateway, + DatadogMonitoringMetricsGateway, FakeMonitoringMetricsGateway, LiveAsyncModelEndpointInferenceGateway, LiveBatchJobOrchestrationGateway, @@ -159,7 +160,11 @@ class ExternalInterfaces: def get_default_monitoring_metrics_gateway() -> MonitoringMetricsGateway: - monitoring_metrics_gateway = FakeMonitoringMetricsGateway() + # dd_trace_enabled is a good enough proxy for determining if we should use Datadog + if hmi_config.dd_trace_enabled: + monitoring_metrics_gateway: MonitoringMetricsGateway = DatadogMonitoringMetricsGateway() + else: + monitoring_metrics_gateway = FakeMonitoringMetricsGateway() return monitoring_metrics_gateway diff --git a/model-engine/model_engine_server/api/llms_v1.py b/model-engine/model_engine_server/api/llms_v1.py index 9660f0d07..614cc6bb5 100644 --- a/model-engine/model_engine_server/api/llms_v1.py +++ b/model-engine/model_engine_server/api/llms_v1.py @@ -391,8 +391,11 @@ async def create_completion_stream_task( async def event_generator(): try: + time_to_first_token = None with timer() as use_case_timer: async for message in response: + if time_to_first_token is None and message.output is not None: + time_to_first_token = use_case_timer.lap() yield {"data": message.json()} background_tasks.add_task( external_interfaces.monitoring_metrics_gateway.emit_token_count_metrics, @@ -402,6 +405,7 @@ async def event_generator(): if message.output else None, total_duration=use_case_timer.duration, + time_to_first_token=time_to_first_token, ), metric_metadata, ) diff --git a/model-engine/model_engine_server/common/dtos/llms.py b/model-engine/model_engine_server/common/dtos/llms.py index 9fb8ed1d8..6f63e7120 100644 --- a/model-engine/model_engine_server/common/dtos/llms.py +++ b/model-engine/model_engine_server/common/dtos/llms.py @@ -321,6 +321,8 @@ class TokenUsage(BaseModel): total_duration: Optional[float] = None """Includes time spent waiting for the model to be ready.""" + time_to_first_token: Optional[float] = None # Only for streaming requests + @property def num_total_tokens(self) -> int: return (self.num_prompt_tokens or 0) + (self.num_completion_tokens or 0) @@ -333,6 +335,20 @@ def total_tokens_per_second(self) -> float: else 0.0 ) + @property + def inter_token_latency(self) -> Optional[float]: # Only for streaming requests + # Note: we calculate a single inter-token latency for the entire request. + # Calculating latency between each token seems a bit heavyweight, although we can do this if we wanted + if ( + self.time_to_first_token is None + or self.num_completion_tokens is None + or self.total_duration is None + ): + return None + if self.num_completion_tokens < 2: + return None + return (self.total_duration - self.time_to_first_token) / (self.num_completion_tokens - 1) + class CreateFineTuneRequest(BaseModel): model: str diff --git a/model-engine/model_engine_server/core/utils/timer.py b/model-engine/model_engine_server/core/utils/timer.py index 53a6f8fe7..5a2bd1beb 100644 --- a/model-engine/model_engine_server/core/utils/timer.py +++ b/model-engine/model_engine_server/core/utils/timer.py @@ -33,7 +33,7 @@ class timer: # pylint: disable=invalid-name >>> f() """ - __slots__ = ("logger", "name", "_duration", "start") + __slots__ = ("logger", "name", "_duration", "start", "start_lap") def __init__(self, logger: Optional[Logger] = None, name: str = "") -> None: self.logger = logger @@ -42,6 +42,7 @@ def __init__(self, logger: Optional[Logger] = None, name: str = "") -> None: # for start, -1 is the uninitialized value # it is set at the context-block entering method: __enter__ self.start: float = -1.0 + self.start_lap: float = -1.0 def __enter__(self) -> "timer": """Records start time: context-block entering function.""" @@ -62,6 +63,18 @@ def __exit__(self, *args) -> None: ) self._maybe_log_end_time() + def lap(self) -> float: + # Records a "lap time". Specifically if start is called at t_0, and lap is + # called at t_1 and t_2, then the returned values are t_1 - t_0 and t_2 - t_1. + # This does introduce extra overhead, however. + current_time = time.monotonic() + if self.start_lap == -1: + duration = current_time - self.start + else: + duration = current_time - self.start_lap + self.start_lap = current_time + return duration + def _maybe_log_end_time(self) -> None: if self.logger is not None: caller_namespace = "" diff --git a/model-engine/model_engine_server/infra/gateways/__init__.py b/model-engine/model_engine_server/infra/gateways/__init__.py index b36fb6419..5a0d7a906 100644 --- a/model-engine/model_engine_server/infra/gateways/__init__.py +++ b/model-engine/model_engine_server/infra/gateways/__init__.py @@ -6,6 +6,7 @@ from .batch_job_orchestration_gateway import BatchJobOrchestrationGateway from .batch_job_progress_gateway import BatchJobProgressGateway from .celery_task_queue_gateway import CeleryTaskQueueGateway +from .datadog_monitoring_metrics_gateway import DatadogMonitoringMetricsGateway from .fake_model_primitive_gateway import FakeModelPrimitiveGateway from .fake_monitoring_metrics_gateway import FakeMonitoringMetricsGateway from .live_async_model_endpoint_inference_gateway import LiveAsyncModelEndpointInferenceGateway @@ -31,6 +32,7 @@ "BatchJobOrchestrationGateway", "BatchJobProgressGateway", "CeleryTaskQueueGateway", + "DatadogMonitoringMetricsGateway", "FakeModelPrimitiveGateway", "FakeMonitoringMetricsGateway", "LiveAsyncModelEndpointInferenceGateway", diff --git a/model-engine/model_engine_server/infra/gateways/datadog_monitoring_metrics_gateway.py b/model-engine/model_engine_server/infra/gateways/datadog_monitoring_metrics_gateway.py new file mode 100644 index 000000000..8732615d1 --- /dev/null +++ b/model-engine/model_engine_server/infra/gateways/datadog_monitoring_metrics_gateway.py @@ -0,0 +1,87 @@ +from typing import List, Optional + +from datadog import statsd +from model_engine_server.common.dtos.llms import TokenUsage +from model_engine_server.core.config import infra_config +from model_engine_server.domain.gateways.monitoring_metrics_gateway import ( + MetricMetadata, + MonitoringMetricsGateway, +) + + +def get_model_tags(model_name: Optional[str]) -> List[str]: + """ + Returns a tag for the model name and whether it is a finetuned model + """ + tags = [] + if model_name: + parts = model_name.split(".") + tags.extend([f"model_name:{parts[0]}"]) + return tags + + +class DatadogMonitoringMetricsGateway(MonitoringMetricsGateway): + def __init__(self, prefix: str = "model_engine"): + self.prefix = prefix + self.tags = [f"env:{infra_config().env}"] + + def emit_attempted_build_metric(self): + statsd.increment("scale_launch.service_builder.attempt", tags=self.tags) + + def emit_successful_build_metric(self): + statsd.increment("scale_launch.service_builder.success", tags=self.tags) + + def emit_build_time_metric(self, duration_seconds: float): + statsd.distribution( + "scale_launch.service_builder.endpoint_build_time", duration_seconds, tags=self.tags + ) + + def emit_image_build_cache_hit_metric(self, image_type: str): + statsd.increment( + f"scale_launch.service_builder.{image_type}_image_cache_hit", tags=self.tags + ) + + def emit_image_build_cache_miss_metric(self, image_type: str): + statsd.increment( + f"scale_launch.service_builder.{image_type}_image_cache_miss", tags=self.tags + ) + + def emit_docker_failed_build_metric(self): + statsd.increment("scale_launch.service_builder.docker_failed", tags=self.tags) + + def emit_database_cache_hit_metric(self): + statsd.increment("scale_launch.database_cache.hit", tags=self.tags) + + def emit_database_cache_miss_metric(self): + statsd.increment("scale_launch.database_cache.miss", tags=self.tags) + + def _format_call_tags(self, metadata: MetricMetadata) -> List[str]: + tags = self.tags + tags.extend(get_model_tags(metadata.model_name)) + return tags + + def emit_route_call_metric(self, route: str, metadata: MetricMetadata): + statsd.increment(f"{self.prefix}.{route}.call", tags=self._format_call_tags(metadata)) + + def emit_token_count_metrics(self, token_usage: TokenUsage, metadata: MetricMetadata): + tags = self._format_call_tags(metadata) + + token_count_metric = f"{self.prefix}.token_count" + statsd.increment( + f"{token_count_metric}.prompt", (token_usage.num_prompt_tokens or 0), tags=tags + ) + statsd.increment( + f"{token_count_metric}.completion", (token_usage.num_completion_tokens or 0), tags=tags + ) + statsd.increment(f"{token_count_metric}.total", token_usage.num_total_tokens, tags=tags) + + total_tokens_per_second = f"{self.prefix}.total_tokens_per_second" + statsd.histogram(total_tokens_per_second, token_usage.total_tokens_per_second, tags=tags) + + time_to_first_token = f"{self.prefix}.time_to_first_token" + if token_usage.time_to_first_token is not None: + statsd.distribution(time_to_first_token, token_usage.time_to_first_token, tags=tags) + + inter_token_latency = f"{self.prefix}.inter_token_latency" + if token_usage.inter_token_latency is not None: + statsd.distribution(inter_token_latency, token_usage.inter_token_latency, tags=tags) diff --git a/model-engine/tests/unit/core/utils/test_timer.py b/model-engine/tests/unit/core/utils/test_timer.py new file mode 100644 index 000000000..f5d3b2d1f --- /dev/null +++ b/model-engine/tests/unit/core/utils/test_timer.py @@ -0,0 +1,15 @@ +import time + +from model_engine_server.core.utils.timer import timer + + +def test_timer(): + with timer() as t: + time.sleep(0.1) + lap_time = t.lap() + time.sleep(0.01) + new_lap_time = t.lap() + + assert new_lap_time >= 0.009 + assert lap_time >= 0.09 + assert t.duration >= 0.1 diff --git a/model-engine/tests/unit/infra/gateways/test_datadog_monitoring_metrics_gateway.py b/model-engine/tests/unit/infra/gateways/test_datadog_monitoring_metrics_gateway.py new file mode 100644 index 000000000..e3e295a62 --- /dev/null +++ b/model-engine/tests/unit/infra/gateways/test_datadog_monitoring_metrics_gateway.py @@ -0,0 +1,106 @@ +from unittest.mock import Mock + +import pytest +from datadog import statsd +from model_engine_server.common.dtos.llms import TokenUsage +from model_engine_server.core.auth.authentication_repository import User +from model_engine_server.domain.gateways.monitoring_metrics_gateway import MetricMetadata +from model_engine_server.infra.gateways import DatadogMonitoringMetricsGateway + + +@pytest.fixture(autouse=True) +def mock_statsd(): + # https://github.com/DataDog/datadogpy/issues/183 for how dd mocks statsd + statsd.socket = Mock() + # also mock the methods we use or may use, there might be more + statsd.gauge = Mock() + statsd.increment = Mock() + statsd.decrement = Mock() + statsd.histogram = Mock() + statsd.distribution = Mock() + + +@pytest.fixture +def sync_token_count(): + return TokenUsage( + num_prompt_tokens=100, + num_completion_tokens=200, + total_duration=30, + ) + + +@pytest.fixture +def streaming_token_count(): + return TokenUsage( + num_prompt_tokens=100, + num_completion_tokens=200, + total_duration=30, + time_to_first_token=5, + ) + + +@pytest.fixture +def datadog_monitoring_metrics_gateway(): + gateway = DatadogMonitoringMetricsGateway(prefix="model_engine_unit_test") + return gateway + + +def test_datadog_monitoring_metrics_gateway_build_metrics(datadog_monitoring_metrics_gateway): + datadog_monitoring_metrics_gateway.emit_attempted_build_metric() + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + datadog_monitoring_metrics_gateway.emit_successful_build_metric() + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + datadog_monitoring_metrics_gateway.emit_build_time_metric(300) + statsd.distribution.assert_called_once() + statsd.distribution.reset_mock() + datadog_monitoring_metrics_gateway.emit_image_build_cache_hit_metric("test_image") + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + datadog_monitoring_metrics_gateway.emit_image_build_cache_miss_metric("test_image_2") + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + datadog_monitoring_metrics_gateway.emit_docker_failed_build_metric() + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + + +def test_datadog_monitoring_metrics_gateway_db_metrics(datadog_monitoring_metrics_gateway): + datadog_monitoring_metrics_gateway.emit_database_cache_hit_metric() + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + datadog_monitoring_metrics_gateway.emit_database_cache_miss_metric() + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + + +def test_datadog_monitoring_metrics_gateway_route_call_metrics(datadog_monitoring_metrics_gateway): + metadata = MetricMetadata( + user=User(user_id="test_user", team_id="test_team", email="test_email"), + model_name="test_model", + ) + datadog_monitoring_metrics_gateway.emit_route_call_metric("test_route", metadata) + statsd.increment.assert_called_once() + statsd.increment.reset_mock() + + +def test_datadog_monitoring_metrics_gateway_token_count_metrics( + datadog_monitoring_metrics_gateway, sync_token_count, streaming_token_count +): + metadata = MetricMetadata( + user=User(user_id="test_user", team_id="test_team", email="test_email"), + model_name="test_model", + ) + datadog_monitoring_metrics_gateway.emit_token_count_metrics(sync_token_count, metadata) + statsd.increment.assert_called() + statsd.increment.reset_mock() + statsd.histogram.assert_called() + statsd.histogram.reset_mock() + datadog_monitoring_metrics_gateway.emit_token_count_metrics(streaming_token_count, metadata) + statsd.increment.assert_called() + statsd.increment.reset_mock() + statsd.histogram.assert_called() + statsd.histogram.reset_mock() + statsd.distribution.assert_called() + statsd.distribution.reset_mock()