Skip to content
Merged
7 changes: 6 additions & 1 deletion model-engine/model_engine_server/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
ABSFilesystemGateway,
ABSLLMArtifactGateway,
CeleryTaskQueueGateway,
DatadogMonitoringMetricsGateway,
FakeMonitoringMetricsGateway,
LiveAsyncModelEndpointInferenceGateway,
LiveBatchJobOrchestrationGateway,
Expand Down Expand Up @@ -159,7 +160,11 @@ class ExternalInterfaces:


def get_default_monitoring_metrics_gateway() -> MonitoringMetricsGateway:
monitoring_metrics_gateway = FakeMonitoringMetricsGateway()
# dd_trace_enabled is a good enough proxy for determining if we should use Datadog
if hmi_config.dd_trace_enabled:
monitoring_metrics_gateway: MonitoringMetricsGateway = DatadogMonitoringMetricsGateway()
else:
monitoring_metrics_gateway = FakeMonitoringMetricsGateway()
return monitoring_metrics_gateway


Expand Down
4 changes: 4 additions & 0 deletions model-engine/model_engine_server/api/llms_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,11 @@ async def create_completion_stream_task(

async def event_generator():
try:
time_to_first_token = None
with timer() as use_case_timer:
async for message in response:
if time_to_first_token is None and message.output is not None:
time_to_first_token = use_case_timer.lap()
yield {"data": message.json()}
background_tasks.add_task(
external_interfaces.monitoring_metrics_gateway.emit_token_count_metrics,
Expand All @@ -402,6 +405,7 @@ async def event_generator():
if message.output
else None,
total_duration=use_case_timer.duration,
time_to_first_token=time_to_first_token,
),
metric_metadata,
)
Expand Down
16 changes: 16 additions & 0 deletions model-engine/model_engine_server/common/dtos/llms.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ class TokenUsage(BaseModel):
total_duration: Optional[float] = None
"""Includes time spent waiting for the model to be ready."""

time_to_first_token: Optional[float] = None # Only for streaming requests

@property
def num_total_tokens(self) -> int:
return (self.num_prompt_tokens or 0) + (self.num_completion_tokens or 0)
Expand All @@ -333,6 +335,20 @@ def total_tokens_per_second(self) -> float:
else 0.0
)

@property
def inter_token_latency(self) -> Optional[float]: # Only for streaming requests
# Note: we calculate a single inter-token latency for the entire request.
# Calculating latency between each token seems a bit heavyweight, although we can do this if we wanted
if (
self.time_to_first_token is None
or self.num_completion_tokens is None
or self.total_duration is None
):
return None
if self.num_completion_tokens < 2:
return None
return (self.total_duration - self.time_to_first_token) / (self.num_completion_tokens - 1)


class CreateFineTuneRequest(BaseModel):
model: str
Expand Down
15 changes: 14 additions & 1 deletion model-engine/model_engine_server/core/utils/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class timer: # pylint: disable=invalid-name
>>> f()
"""

__slots__ = ("logger", "name", "_duration", "start")
__slots__ = ("logger", "name", "_duration", "start", "start_lap")

def __init__(self, logger: Optional[Logger] = None, name: str = "") -> None:
self.logger = logger
Expand All @@ -42,6 +42,7 @@ def __init__(self, logger: Optional[Logger] = None, name: str = "") -> None:
# for start, -1 is the uninitialized value
# it is set at the context-block entering method: __enter__
self.start: float = -1.0
self.start_lap: float = -1.0

def __enter__(self) -> "timer":
"""Records start time: context-block entering function."""
Expand All @@ -62,6 +63,18 @@ def __exit__(self, *args) -> None:
)
self._maybe_log_end_time()

def lap(self) -> float:
# Records a "lap time". Specifically if start is called at t_0, and lap is
# called at t_1 and t_2, then the returned values are t_1 - t_0 and t_2 - t_1.
# This does introduce extra overhead, however.
current_time = time.monotonic()
if self.start_lap == -1:
duration = current_time - self.start
else:
duration = current_time - self.start_lap
self.start_lap = current_time
return duration

def _maybe_log_end_time(self) -> None:
if self.logger is not None:
caller_namespace = "<unknown_caller_namespace>"
Expand Down
2 changes: 2 additions & 0 deletions model-engine/model_engine_server/infra/gateways/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .batch_job_orchestration_gateway import BatchJobOrchestrationGateway
from .batch_job_progress_gateway import BatchJobProgressGateway
from .celery_task_queue_gateway import CeleryTaskQueueGateway
from .datadog_monitoring_metrics_gateway import DatadogMonitoringMetricsGateway
from .fake_model_primitive_gateway import FakeModelPrimitiveGateway
from .fake_monitoring_metrics_gateway import FakeMonitoringMetricsGateway
from .live_async_model_endpoint_inference_gateway import LiveAsyncModelEndpointInferenceGateway
Expand All @@ -31,6 +32,7 @@
"BatchJobOrchestrationGateway",
"BatchJobProgressGateway",
"CeleryTaskQueueGateway",
"DatadogMonitoringMetricsGateway",
"FakeModelPrimitiveGateway",
"FakeMonitoringMetricsGateway",
"LiveAsyncModelEndpointInferenceGateway",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import List, Optional

from datadog import statsd
from model_engine_server.common.dtos.llms import TokenUsage
from model_engine_server.core.config import infra_config
from model_engine_server.domain.gateways.monitoring_metrics_gateway import (
MetricMetadata,
MonitoringMetricsGateway,
)


def get_model_tags(model_name: Optional[str]) -> List[str]:
"""
Returns a tag for the model name and whether it is a finetuned model
"""
tags = []
if model_name:
parts = model_name.split(".")
tags.extend([f"model_name:{parts[0]}"])
return tags


class DatadogMonitoringMetricsGateway(MonitoringMetricsGateway):
def __init__(self, prefix: str = "model_engine"):
self.prefix = prefix
self.tags = [f"env:{infra_config().env}"]

def emit_attempted_build_metric(self):
statsd.increment("scale_launch.service_builder.attempt", tags=self.tags)

def emit_successful_build_metric(self):
statsd.increment("scale_launch.service_builder.success", tags=self.tags)

def emit_build_time_metric(self, duration_seconds: float):
statsd.distribution(
"scale_launch.service_builder.endpoint_build_time", duration_seconds, tags=self.tags
)

def emit_image_build_cache_hit_metric(self, image_type: str):
statsd.increment(
f"scale_launch.service_builder.{image_type}_image_cache_hit", tags=self.tags
)

def emit_image_build_cache_miss_metric(self, image_type: str):
statsd.increment(
f"scale_launch.service_builder.{image_type}_image_cache_miss", tags=self.tags
)

def emit_docker_failed_build_metric(self):
statsd.increment("scale_launch.service_builder.docker_failed", tags=self.tags)

def emit_database_cache_hit_metric(self):
statsd.increment("scale_launch.database_cache.hit", tags=self.tags)

def emit_database_cache_miss_metric(self):
statsd.increment("scale_launch.database_cache.miss", tags=self.tags)

def _format_call_tags(self, metadata: MetricMetadata) -> List[str]:
tags = self.tags
tags.extend(get_model_tags(metadata.model_name))
return tags

def emit_route_call_metric(self, route: str, metadata: MetricMetadata):
statsd.increment(f"{self.prefix}.{route}.call", tags=self._format_call_tags(metadata))

def emit_token_count_metrics(self, token_usage: TokenUsage, metadata: MetricMetadata):
tags = self._format_call_tags(metadata)

token_count_metric = f"{self.prefix}.token_count"
statsd.increment(
f"{token_count_metric}.prompt", (token_usage.num_prompt_tokens or 0), tags=tags
)
statsd.increment(
f"{token_count_metric}.completion", (token_usage.num_completion_tokens or 0), tags=tags
)
statsd.increment(f"{token_count_metric}.total", token_usage.num_total_tokens, tags=tags)

total_tokens_per_second = f"{self.prefix}.total_tokens_per_second"
statsd.histogram(total_tokens_per_second, token_usage.total_tokens_per_second, tags=tags)

time_to_first_token = f"{self.prefix}.time_to_first_token"
if token_usage.time_to_first_token is not None:
statsd.distribution(time_to_first_token, token_usage.time_to_first_token, tags=tags)

inter_token_latency = f"{self.prefix}.inter_token_latency"
if token_usage.inter_token_latency is not None:
statsd.distribution(inter_token_latency, token_usage.inter_token_latency, tags=tags)
15 changes: 15 additions & 0 deletions model-engine/tests/unit/core/utils/test_timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import time

from model_engine_server.core.utils.timer import timer


def test_timer():
with timer() as t:
time.sleep(0.1)
lap_time = t.lap()
time.sleep(0.01)
new_lap_time = t.lap()

assert new_lap_time >= 0.009
assert lap_time >= 0.09
assert t.duration >= 0.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from unittest.mock import Mock

import pytest
from datadog import statsd
from model_engine_server.common.dtos.llms import TokenUsage
from model_engine_server.core.auth.authentication_repository import User
from model_engine_server.domain.gateways.monitoring_metrics_gateway import MetricMetadata
from model_engine_server.infra.gateways import DatadogMonitoringMetricsGateway


@pytest.fixture(autouse=True)
def mock_statsd():
# https://github.com/DataDog/datadogpy/issues/183 for how dd mocks statsd
statsd.socket = Mock()
# also mock the methods we use or may use, there might be more
statsd.gauge = Mock()
statsd.increment = Mock()
statsd.decrement = Mock()
statsd.histogram = Mock()
statsd.distribution = Mock()


@pytest.fixture
def sync_token_count():
return TokenUsage(
num_prompt_tokens=100,
num_completion_tokens=200,
total_duration=30,
)


@pytest.fixture
def streaming_token_count():
return TokenUsage(
num_prompt_tokens=100,
num_completion_tokens=200,
total_duration=30,
time_to_first_token=5,
)


@pytest.fixture
def datadog_monitoring_metrics_gateway():
gateway = DatadogMonitoringMetricsGateway(prefix="model_engine_unit_test")
return gateway


def test_datadog_monitoring_metrics_gateway_build_metrics(datadog_monitoring_metrics_gateway):
datadog_monitoring_metrics_gateway.emit_attempted_build_metric()
statsd.increment.assert_called_once()
statsd.increment.reset_mock()
datadog_monitoring_metrics_gateway.emit_successful_build_metric()
statsd.increment.assert_called_once()
statsd.increment.reset_mock()
datadog_monitoring_metrics_gateway.emit_build_time_metric(300)
statsd.distribution.assert_called_once()
statsd.distribution.reset_mock()
datadog_monitoring_metrics_gateway.emit_image_build_cache_hit_metric("test_image")
statsd.increment.assert_called_once()
statsd.increment.reset_mock()
datadog_monitoring_metrics_gateway.emit_image_build_cache_miss_metric("test_image_2")
statsd.increment.assert_called_once()
statsd.increment.reset_mock()
datadog_monitoring_metrics_gateway.emit_docker_failed_build_metric()
statsd.increment.assert_called_once()
statsd.increment.reset_mock()


def test_datadog_monitoring_metrics_gateway_db_metrics(datadog_monitoring_metrics_gateway):
datadog_monitoring_metrics_gateway.emit_database_cache_hit_metric()
statsd.increment.assert_called_once()
statsd.increment.reset_mock()
datadog_monitoring_metrics_gateway.emit_database_cache_miss_metric()
statsd.increment.assert_called_once()
statsd.increment.reset_mock()


def test_datadog_monitoring_metrics_gateway_route_call_metrics(datadog_monitoring_metrics_gateway):
metadata = MetricMetadata(
user=User(user_id="test_user", team_id="test_team", email="test_email"),
model_name="test_model",
)
datadog_monitoring_metrics_gateway.emit_route_call_metric("test_route", metadata)
statsd.increment.assert_called_once()
statsd.increment.reset_mock()


def test_datadog_monitoring_metrics_gateway_token_count_metrics(
datadog_monitoring_metrics_gateway, sync_token_count, streaming_token_count
):
metadata = MetricMetadata(
user=User(user_id="test_user", team_id="test_team", email="test_email"),
model_name="test_model",
)
datadog_monitoring_metrics_gateway.emit_token_count_metrics(sync_token_count, metadata)
statsd.increment.assert_called()
statsd.increment.reset_mock()
statsd.histogram.assert_called()
statsd.histogram.reset_mock()
datadog_monitoring_metrics_gateway.emit_token_count_metrics(streaming_token_count, metadata)
statsd.increment.assert_called()
statsd.increment.reset_mock()
statsd.histogram.assert_called()
statsd.histogram.reset_mock()
statsd.distribution.assert_called()
statsd.distribution.reset_mock()