Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DatabaseConfiguration,
ConversationHistoryConfiguration,
QuotaHandlersConfiguration,
SplunkConfiguration,
)

from cache.cache import Cache
Expand All @@ -39,7 +40,7 @@ class LogicError(Exception):
"""Error in application logic."""


class AppConfig:
class AppConfig: # pylint: disable=too-many-public-methods
"""Singleton class to load and store the configuration."""

_instance = None
Expand Down Expand Up @@ -348,5 +349,19 @@ def azure_entra_id(self) -> Optional[AzureEntraIdConfiguration]:
raise LogicError("logic error: configuration is not loaded")
return self._configuration.azure_entra_id

@property
def splunk(self) -> Optional[SplunkConfiguration]:
"""Return Splunk configuration, or None if not provided."""
if self._configuration is None:
raise LogicError("logic error: configuration is not loaded")
return self._configuration.splunk

@property
def deployment_environment(self) -> str:
"""Return deployment environment name."""
if self._configuration is None:
raise LogicError("logic error: configuration is not loaded")
return self._configuration.deployment_environment


configuration: AppConfig = AppConfig()
94 changes: 94 additions & 0 deletions src/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,87 @@ def check_llama_stack_model(self) -> Self:
return self


class SplunkConfiguration(ConfigurationBase):
"""Splunk HEC (HTTP Event Collector) configuration.

Splunk HEC allows sending events directly to Splunk over HTTP/HTTPS.
This configuration is used to send telemetry events for inference
requests to the corporate Splunk deployment.

Useful resources:

- [Splunk HEC Docs](https://docs.splunk.com/Documentation/SplunkCloud)
- [About HEC](https://docs.splunk.com/Documentation/Splunk/latest/Data)
"""

enabled: bool = Field(
False,
title="Enabled",
description="Enable or disable Splunk HEC integration.",
)

url: Optional[str] = Field(
None,
title="HEC URL",
description="Splunk HEC endpoint URL.",
)

token_path: Optional[FilePath] = Field(
None,
title="Token path",
description="Path to file containing the Splunk HEC authentication token.",
)

index: Optional[str] = Field(
None,
title="Index",
description="Target Splunk index for events.",
)

source: str = Field(
"lightspeed-stack",
title="Source",
description="Event source identifier.",
)

timeout: PositiveInt = Field(
5,
title="Timeout",
description="HTTP timeout in seconds for HEC requests.",
)

verify_ssl: bool = Field(
True,
title="Verify SSL",
description="Whether to verify SSL certificates for HEC endpoint.",
)

@model_validator(mode="after")
def check_splunk_configuration(self) -> Self:
"""Validate that required fields are set when Splunk is enabled.

Returns:
Self: The validated configuration instance.

Raises:
ValueError: If enabled is True but required fields are missing.
"""
if self.enabled:
missing_fields = []
if not self.url:
missing_fields.append("url")
if not self.token_path:
missing_fields.append("token_path")
if not self.index:
missing_fields.append("index")
if missing_fields:
raise ValueError(
f"Splunk is enabled but required fields are missing: "
f"{', '.join(missing_fields)}"
)
return self


class UserDataCollection(ConfigurationBase):
"""User data collection configuration."""

Expand Down Expand Up @@ -1659,6 +1740,19 @@ class Configuration(ConfigurationBase):
)
azure_entra_id: Optional[AzureEntraIdConfiguration] = None

splunk: Optional[SplunkConfiguration] = Field(
default=None,
title="Splunk configuration",
description="Splunk HEC configuration for sending telemetry events.",
)

deployment_environment: str = Field(
"development",
title="Deployment environment",
description="Deployment environment name (e.g., 'development', 'staging', 'production'). "
"Used in telemetry events.",
)

@model_validator(mode="after")
def validate_mcp_auth_headers(self) -> Self:
"""
Expand Down
14 changes: 14 additions & 0 deletions src/observability/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Observability module for telemetry and event collection.

This module provides functionality for sending telemetry events to external
systems like Splunk HEC for monitoring and analytics.

The splunk module provides a format-agnostic send_splunk_event() function.
Event formats are in the formats subpackage - see formats.rlsapi for the
default implementation, or create your own format module.
"""

from observability.formats import InferenceEventData, build_inference_event
from observability.splunk import send_splunk_event

__all__ = ["send_splunk_event", "InferenceEventData", "build_inference_event"]
9 changes: 9 additions & 0 deletions src/observability/formats/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Event format builders for Splunk telemetry.

Each submodule provides format-specific event builders. The rlsapi module
provides the default format matching Red Hat's rlsapi v1 specification.
"""

from observability.formats.rlsapi import InferenceEventData, build_inference_event

__all__ = ["InferenceEventData", "build_inference_event"]
57 changes: 57 additions & 0 deletions src/observability/formats/rlsapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Event builders for rlsapi v1 Splunk format.

This module provides event builders specific to the rlsapi v1 telemetry format.
To implement a custom format, create a new module in this package with your own
event builder function that returns a dict, then pass the result to send_splunk_event().
"""

from dataclasses import dataclass
from typing import Any

from configuration import configuration


@dataclass
class InferenceEventData: # pylint: disable=too-many-instance-attributes
"""Data required to build an inference telemetry event."""

question: str
response: str
inference_time: float
model: str
org_id: str
system_id: str
request_id: str
cla_version: str
system_os: str
system_version: str
system_arch: str


def build_inference_event(data: InferenceEventData) -> dict[str, Any]:
"""Build an inference telemetry event payload matching rlsapi format.

Args:
data: The inference event data.

Returns:
A dictionary matching the rlsapi Splunk event format.
"""
return {
"question": data.question,
"refined_questions": [],
"context": "",
"response": data.response,
"inference_time": data.inference_time,
"model": data.model,
"deployment": configuration.deployment_environment,
"org_id": data.org_id,
"system_id": data.system_id,
# Token counting not yet implemented in lightspeed-stack; rlsapi uses 0 as default
"total_llm_tokens": 0,
"request_id": data.request_id,
"cla_version": data.cla_version,
"system_os": data.system_os,
"system_version": data.system_version,
"system_arch": data.system_arch,
}
90 changes: 90 additions & 0 deletions src/observability/splunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Async Splunk HEC client for sending telemetry events."""

import logging
import platform
import time
from typing import Any

import aiohttp

from configuration import configuration
from version import __version__

logger = logging.getLogger(__name__)


def _get_hostname() -> str:
"""Get the hostname for Splunk event metadata."""
return platform.node() or "unknown"


def _read_token_from_file(token_path: str) -> str | None:
"""Read HEC token from file path."""
try:
with open(token_path, encoding="utf-8") as f:
return f.read().strip()
except OSError as e:
logger.warning("Failed to read Splunk HEC token from %s: %s", token_path, e)
return None


async def send_splunk_event(event: dict[str, Any], sourcetype: str) -> None:
"""Send an event to Splunk HEC.

This function sends events asynchronously and handles failures gracefully
by logging warnings instead of raising exceptions. This ensures that
Splunk connectivity issues don't affect the main application flow.

Args:
event: The event payload to send.
sourcetype: The Splunk sourcetype (e.g., "infer_with_llm", "infer_error").
"""
splunk_config = configuration.splunk
if splunk_config is None or not splunk_config.enabled:
logger.debug("Splunk integration disabled, skipping event")
return

if not splunk_config.url or not splunk_config.token_path or not splunk_config.index:
logger.warning("Splunk configuration incomplete, skipping event")
return

# Read token on each request to support rotation without restart
token = _read_token_from_file(str(splunk_config.token_path))
if not token:
return

payload = {
"time": int(time.time()),
"host": _get_hostname(),
"source": f"{splunk_config.source} (v{__version__})",
"sourcetype": sourcetype,
"index": splunk_config.index,
"event": event,
}

headers = {
"Authorization": f"Splunk {token}",
"Content-Type": "application/json",
}

timeout = aiohttp.ClientTimeout(total=splunk_config.timeout)
connector = aiohttp.TCPConnector(ssl=splunk_config.verify_ssl)

try:
async with aiohttp.ClientSession(
timeout=timeout, connector=connector
) as session:
async with session.post(
splunk_config.url, json=payload, headers=headers
) as response:
if response.status >= 400:
body = await response.text()
logger.warning(
"Splunk HEC request failed with status %d: %s",
response.status,
body[:200],
)
except aiohttp.ClientError as e:
logger.warning("Splunk HEC request failed: %s", e)
except TimeoutError:
logger.warning("Splunk HEC request timed out after %ds", splunk_config.timeout)
10 changes: 10 additions & 0 deletions tests/unit/models/config/test_dump_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ def test_dump_configuration(tmp_path: Path) -> None:
"postgres": None,
},
"azure_entra_id": None,
"splunk": None,
"deployment_environment": "development",
}


Expand Down Expand Up @@ -539,6 +541,8 @@ def test_dump_configuration_with_quota_limiters(tmp_path: Path) -> None:
"postgres": None,
},
"azure_entra_id": None,
"splunk": None,
"deployment_environment": "development",
}


Expand Down Expand Up @@ -756,6 +760,8 @@ def test_dump_configuration_with_quota_limiters_different_values(
"postgres": None,
},
"azure_entra_id": None,
"splunk": None,
"deployment_environment": "development",
}


Expand Down Expand Up @@ -947,6 +953,8 @@ def test_dump_configuration_byok(tmp_path: Path) -> None:
"postgres": None,
},
"azure_entra_id": None,
"splunk": None,
"deployment_environment": "development",
}


Expand Down Expand Up @@ -1124,4 +1132,6 @@ def test_dump_configuration_pg_namespace(tmp_path: Path) -> None:
"postgres": None,
},
"azure_entra_id": None,
"splunk": None,
"deployment_environment": "development",
}
Loading
Loading