diff --git a/src/configuration.py b/src/configuration.py index b8c1a02c3..9a253ac7a 100644 --- a/src/configuration.py +++ b/src/configuration.py @@ -23,6 +23,7 @@ DatabaseConfiguration, ConversationHistoryConfiguration, QuotaHandlersConfiguration, + SplunkConfiguration, ) from cache.cache import Cache @@ -39,7 +40,7 @@ class LogicError(Exception): """Error in application logic.""" -class AppConfig: +class AppConfig: # pylint: disable=too-many-public-methods """Singleton class to load and store the configuration.""" _instance = None @@ -348,5 +349,19 @@ def azure_entra_id(self) -> Optional[AzureEntraIdConfiguration]: raise LogicError("logic error: configuration is not loaded") return self._configuration.azure_entra_id + @property + def splunk(self) -> Optional[SplunkConfiguration]: + """Return Splunk configuration, or None if not provided.""" + if self._configuration is None: + raise LogicError("logic error: configuration is not loaded") + return self._configuration.splunk + + @property + def deployment_environment(self) -> str: + """Return deployment environment name.""" + if self._configuration is None: + raise LogicError("logic error: configuration is not loaded") + return self._configuration.deployment_environment + configuration: AppConfig = AppConfig() diff --git a/src/models/config.py b/src/models/config.py index 9356464cb..0e7158148 100644 --- a/src/models/config.py +++ b/src/models/config.py @@ -612,6 +612,87 @@ def check_llama_stack_model(self) -> Self: return self +class SplunkConfiguration(ConfigurationBase): + """Splunk HEC (HTTP Event Collector) configuration. + + Splunk HEC allows sending events directly to Splunk over HTTP/HTTPS. + This configuration is used to send telemetry events for inference + requests to the corporate Splunk deployment. + + Useful resources: + + - [Splunk HEC Docs](https://docs.splunk.com/Documentation/SplunkCloud) + - [About HEC](https://docs.splunk.com/Documentation/Splunk/latest/Data) + """ + + enabled: bool = Field( + False, + title="Enabled", + description="Enable or disable Splunk HEC integration.", + ) + + url: Optional[str] = Field( + None, + title="HEC URL", + description="Splunk HEC endpoint URL.", + ) + + token_path: Optional[FilePath] = Field( + None, + title="Token path", + description="Path to file containing the Splunk HEC authentication token.", + ) + + index: Optional[str] = Field( + None, + title="Index", + description="Target Splunk index for events.", + ) + + source: str = Field( + "lightspeed-stack", + title="Source", + description="Event source identifier.", + ) + + timeout: PositiveInt = Field( + 5, + title="Timeout", + description="HTTP timeout in seconds for HEC requests.", + ) + + verify_ssl: bool = Field( + True, + title="Verify SSL", + description="Whether to verify SSL certificates for HEC endpoint.", + ) + + @model_validator(mode="after") + def check_splunk_configuration(self) -> Self: + """Validate that required fields are set when Splunk is enabled. + + Returns: + Self: The validated configuration instance. + + Raises: + ValueError: If enabled is True but required fields are missing. + """ + if self.enabled: + missing_fields = [] + if not self.url: + missing_fields.append("url") + if not self.token_path: + missing_fields.append("token_path") + if not self.index: + missing_fields.append("index") + if missing_fields: + raise ValueError( + f"Splunk is enabled but required fields are missing: " + f"{', '.join(missing_fields)}" + ) + return self + + class UserDataCollection(ConfigurationBase): """User data collection configuration.""" @@ -1659,6 +1740,19 @@ class Configuration(ConfigurationBase): ) azure_entra_id: Optional[AzureEntraIdConfiguration] = None + splunk: Optional[SplunkConfiguration] = Field( + default=None, + title="Splunk configuration", + description="Splunk HEC configuration for sending telemetry events.", + ) + + deployment_environment: str = Field( + "development", + title="Deployment environment", + description="Deployment environment name (e.g., 'development', 'staging', 'production'). " + "Used in telemetry events.", + ) + @model_validator(mode="after") def validate_mcp_auth_headers(self) -> Self: """ diff --git a/src/observability/__init__.py b/src/observability/__init__.py new file mode 100644 index 000000000..df8601281 --- /dev/null +++ b/src/observability/__init__.py @@ -0,0 +1,14 @@ +"""Observability module for telemetry and event collection. + +This module provides functionality for sending telemetry events to external +systems like Splunk HEC for monitoring and analytics. + +The splunk module provides a format-agnostic send_splunk_event() function. +Event formats are in the formats subpackage - see formats.rlsapi for the +default implementation, or create your own format module. +""" + +from observability.formats import InferenceEventData, build_inference_event +from observability.splunk import send_splunk_event + +__all__ = ["send_splunk_event", "InferenceEventData", "build_inference_event"] diff --git a/src/observability/formats/__init__.py b/src/observability/formats/__init__.py new file mode 100644 index 000000000..d406d40fb --- /dev/null +++ b/src/observability/formats/__init__.py @@ -0,0 +1,9 @@ +"""Event format builders for Splunk telemetry. + +Each submodule provides format-specific event builders. The rlsapi module +provides the default format matching Red Hat's rlsapi v1 specification. +""" + +from observability.formats.rlsapi import InferenceEventData, build_inference_event + +__all__ = ["InferenceEventData", "build_inference_event"] diff --git a/src/observability/formats/rlsapi.py b/src/observability/formats/rlsapi.py new file mode 100644 index 000000000..1bc110805 --- /dev/null +++ b/src/observability/formats/rlsapi.py @@ -0,0 +1,57 @@ +"""Event builders for rlsapi v1 Splunk format. + +This module provides event builders specific to the rlsapi v1 telemetry format. +To implement a custom format, create a new module in this package with your own +event builder function that returns a dict, then pass the result to send_splunk_event(). +""" + +from dataclasses import dataclass +from typing import Any + +from configuration import configuration + + +@dataclass +class InferenceEventData: # pylint: disable=too-many-instance-attributes + """Data required to build an inference telemetry event.""" + + question: str + response: str + inference_time: float + model: str + org_id: str + system_id: str + request_id: str + cla_version: str + system_os: str + system_version: str + system_arch: str + + +def build_inference_event(data: InferenceEventData) -> dict[str, Any]: + """Build an inference telemetry event payload matching rlsapi format. + + Args: + data: The inference event data. + + Returns: + A dictionary matching the rlsapi Splunk event format. + """ + return { + "question": data.question, + "refined_questions": [], + "context": "", + "response": data.response, + "inference_time": data.inference_time, + "model": data.model, + "deployment": configuration.deployment_environment, + "org_id": data.org_id, + "system_id": data.system_id, + # Token counting not yet implemented in lightspeed-stack; rlsapi uses 0 as default + "total_llm_tokens": 0, + "request_id": data.request_id, + "cla_version": data.cla_version, + "system_os": data.system_os, + "system_version": data.system_version, + "system_arch": data.system_arch, + } diff --git a/src/observability/splunk.py b/src/observability/splunk.py new file mode 100644 index 000000000..ca663127f --- /dev/null +++ b/src/observability/splunk.py @@ -0,0 +1,90 @@ +"""Async Splunk HEC client for sending telemetry events.""" + +import logging +import platform +import time +from typing import Any + +import aiohttp + +from configuration import configuration +from version import __version__ + +logger = logging.getLogger(__name__) + + +def _get_hostname() -> str: + """Get the hostname for Splunk event metadata.""" + return platform.node() or "unknown" + + +def _read_token_from_file(token_path: str) -> str | None: + """Read HEC token from file path.""" + try: + with open(token_path, encoding="utf-8") as f: + return f.read().strip() + except OSError as e: + logger.warning("Failed to read Splunk HEC token from %s: %s", token_path, e) + return None + + +async def send_splunk_event(event: dict[str, Any], sourcetype: str) -> None: + """Send an event to Splunk HEC. + + This function sends events asynchronously and handles failures gracefully + by logging warnings instead of raising exceptions. This ensures that + Splunk connectivity issues don't affect the main application flow. + + Args: + event: The event payload to send. + sourcetype: The Splunk sourcetype (e.g., "infer_with_llm", "infer_error"). + """ + splunk_config = configuration.splunk + if splunk_config is None or not splunk_config.enabled: + logger.debug("Splunk integration disabled, skipping event") + return + + if not splunk_config.url or not splunk_config.token_path or not splunk_config.index: + logger.warning("Splunk configuration incomplete, skipping event") + return + + # Read token on each request to support rotation without restart + token = _read_token_from_file(str(splunk_config.token_path)) + if not token: + return + + payload = { + "time": int(time.time()), + "host": _get_hostname(), + "source": f"{splunk_config.source} (v{__version__})", + "sourcetype": sourcetype, + "index": splunk_config.index, + "event": event, + } + + headers = { + "Authorization": f"Splunk {token}", + "Content-Type": "application/json", + } + + timeout = aiohttp.ClientTimeout(total=splunk_config.timeout) + connector = aiohttp.TCPConnector(ssl=splunk_config.verify_ssl) + + try: + async with aiohttp.ClientSession( + timeout=timeout, connector=connector + ) as session: + async with session.post( + splunk_config.url, json=payload, headers=headers + ) as response: + if response.status >= 400: + body = await response.text() + logger.warning( + "Splunk HEC request failed with status %d: %s", + response.status, + body[:200], + ) + except aiohttp.ClientError as e: + logger.warning("Splunk HEC request failed: %s", e) + except TimeoutError: + logger.warning("Splunk HEC request timed out after %ds", splunk_config.timeout) diff --git a/tests/unit/models/config/test_dump_configuration.py b/tests/unit/models/config/test_dump_configuration.py index 86edcd488..c7e1e2ec5 100644 --- a/tests/unit/models/config/test_dump_configuration.py +++ b/tests/unit/models/config/test_dump_configuration.py @@ -204,6 +204,8 @@ def test_dump_configuration(tmp_path: Path) -> None: "postgres": None, }, "azure_entra_id": None, + "splunk": None, + "deployment_environment": "development", } @@ -539,6 +541,8 @@ def test_dump_configuration_with_quota_limiters(tmp_path: Path) -> None: "postgres": None, }, "azure_entra_id": None, + "splunk": None, + "deployment_environment": "development", } @@ -756,6 +760,8 @@ def test_dump_configuration_with_quota_limiters_different_values( "postgres": None, }, "azure_entra_id": None, + "splunk": None, + "deployment_environment": "development", } @@ -947,6 +953,8 @@ def test_dump_configuration_byok(tmp_path: Path) -> None: "postgres": None, }, "azure_entra_id": None, + "splunk": None, + "deployment_environment": "development", } @@ -1124,4 +1132,6 @@ def test_dump_configuration_pg_namespace(tmp_path: Path) -> None: "postgres": None, }, "azure_entra_id": None, + "splunk": None, + "deployment_environment": "development", } diff --git a/tests/unit/models/config/test_splunk_configuration.py b/tests/unit/models/config/test_splunk_configuration.py new file mode 100644 index 000000000..636821d27 --- /dev/null +++ b/tests/unit/models/config/test_splunk_configuration.py @@ -0,0 +1,107 @@ +"""Unit tests for SplunkConfiguration model.""" + +from pathlib import Path + +import pytest + +from models.config import SplunkConfiguration + + +@pytest.fixture(name="token_file") +def token_file_fixture(tmp_path: Path) -> Path: + """Create a temporary token file for testing.""" + token_file = tmp_path / "token" + token_file.write_text("test-token") + return token_file + + +def test_default_values() -> None: + """Test default SplunkConfiguration has expected values.""" + cfg = SplunkConfiguration() + assert cfg.enabled is False + assert cfg.url is None + assert cfg.token_path is None + assert cfg.index is None + assert cfg.source == "lightspeed-stack" + assert cfg.timeout == 5 + assert cfg.verify_ssl is True + + +def test_disabled_skips_validation() -> None: + """Test that disabled Splunk config doesn't require other fields.""" + cfg = SplunkConfiguration(enabled=False) + assert cfg.enabled is False + assert cfg.url is None + + +@pytest.mark.parametrize( + ("url", "has_token", "index", "expected_missing"), + [ + (None, False, None, r"url.*token_path.*index"), + ("https://splunk:8088", False, None, r"token_path.*index"), + ("https://splunk:8088", False, "idx", r"token_path"), + ("https://splunk:8088", True, None, r"index"), + (None, True, "idx", r"url"), + ], + ids=[ + "all_missing", + "url_present_only", + "url_and_index_present", + "url_and_token_present", + "token_and_index_present", + ], +) +def test_enabled_missing_required_fields( + token_file: Path, + url: str | None, + has_token: bool, + index: str | None, + expected_missing: str, +) -> None: + """Test that enabled Splunk config validates required fields.""" + with pytest.raises(ValueError, match=expected_missing): + SplunkConfiguration( + enabled=True, + url=url, + token_path=token_file if has_token else None, + index=index, + ) + + +def test_valid_enabled_configuration(token_file: Path) -> None: + """Test valid enabled Splunk configuration passes validation.""" + cfg = SplunkConfiguration( + enabled=True, + url="https://splunk.example.com:8088", + token_path=token_file, + index="rhel_lightspeed", + source="my-service", + timeout=10, + verify_ssl=False, + ) + + assert cfg.enabled is True + assert cfg.url == "https://splunk.example.com:8088" + assert cfg.token_path == token_file + assert cfg.index == "rhel_lightspeed" + assert cfg.source == "my-service" + assert cfg.timeout == 10 + assert cfg.verify_ssl is False + + +def test_custom_source() -> None: + """Test custom source value is preserved.""" + cfg = SplunkConfiguration(enabled=False, source="custom-source") + assert cfg.source == "custom-source" + + +def test_custom_timeout() -> None: + """Test custom timeout value is preserved.""" + cfg = SplunkConfiguration(enabled=False, timeout=30) + assert cfg.timeout == 30 + + +def test_verify_ssl_disabled() -> None: + """Test verify_ssl can be disabled.""" + cfg = SplunkConfiguration(enabled=False, verify_ssl=False) + assert cfg.verify_ssl is False diff --git a/tests/unit/observability/__init__.py b/tests/unit/observability/__init__.py new file mode 100644 index 000000000..84b4bed69 --- /dev/null +++ b/tests/unit/observability/__init__.py @@ -0,0 +1 @@ +"""Unit tests for observability module.""" diff --git a/tests/unit/observability/formats/__init__.py b/tests/unit/observability/formats/__init__.py new file mode 100644 index 000000000..32a27c3ef --- /dev/null +++ b/tests/unit/observability/formats/__init__.py @@ -0,0 +1 @@ +"""Unit tests for observability event format builders.""" diff --git a/tests/unit/observability/formats/test_rlsapi.py b/tests/unit/observability/formats/test_rlsapi.py new file mode 100644 index 000000000..e8c7fa82f --- /dev/null +++ b/tests/unit/observability/formats/test_rlsapi.py @@ -0,0 +1,74 @@ +"""Unit tests for rlsapi v1 event builders.""" + +from unittest.mock import patch + +import pytest + +from observability.formats.rlsapi import InferenceEventData, build_inference_event + + +@pytest.fixture(name="sample_event_data") +def sample_event_data_fixture() -> InferenceEventData: + """Create sample inference event data for testing.""" + return InferenceEventData( + question="How do I configure SSH?", + response="To configure SSH, edit /etc/ssh/sshd_config...", + inference_time=2.34, + model="granite-3-8b-instruct", + org_id="12345678", + system_id="abc-def-123", + request_id="req_xyz789", + cla_version="CLA/0.4.0", + system_os="RHEL", + system_version="9.3", + system_arch="x86_64", + ) + + +def test_builds_event_with_all_fields(sample_event_data: InferenceEventData) -> None: + """Test event contains all required fields and placeholders.""" + with patch("observability.formats.rlsapi.configuration") as mock_config: + mock_config.deployment_environment = "production" + + event = build_inference_event(sample_event_data) + + assert event["question"] == "How do I configure SSH?" + assert event["response"] == "To configure SSH, edit /etc/ssh/sshd_config..." + assert event["inference_time"] == 2.34 + assert event["model"] == "granite-3-8b-instruct" + assert event["org_id"] == "12345678" + assert event["system_id"] == "abc-def-123" + assert event["request_id"] == "req_xyz789" + assert event["cla_version"] == "CLA/0.4.0" + assert event["system_os"] == "RHEL" + assert event["system_version"] == "9.3" + assert event["system_arch"] == "x86_64" + assert event["deployment"] == "production" + assert not event["refined_questions"] + assert event["context"] == "" + assert event["total_llm_tokens"] == 0 + + +def test_handles_auth_disabled_values() -> None: + """Test event handles auth_disabled placeholder values.""" + data = InferenceEventData( + question="test", + response="test", + inference_time=1.0, + model="test-model", + org_id="auth_disabled", + system_id="auth_disabled", + request_id="req_123", + cla_version="test/1.0", + system_os="", + system_version="", + system_arch="", + ) + + with patch("observability.formats.rlsapi.configuration") as mock_config: + mock_config.deployment_environment = "test" + + event = build_inference_event(data) + + assert event["org_id"] == "auth_disabled" + assert event["system_id"] == "auth_disabled" diff --git a/tests/unit/observability/test_splunk.py b/tests/unit/observability/test_splunk.py new file mode 100644 index 000000000..f290f80e9 --- /dev/null +++ b/tests/unit/observability/test_splunk.py @@ -0,0 +1,155 @@ +"""Unit tests for Splunk HEC client.""" + +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import aiohttp +import pytest + +from observability.splunk import send_splunk_event, _read_token_from_file + + +@pytest.fixture(name="mock_splunk_config") +def mock_splunk_config_fixture(tmp_path: Path) -> MagicMock: + """Create a mock SplunkConfiguration.""" + token_file = tmp_path / "token" + token_file.write_text("test-hec-token") + + config = MagicMock() + config.enabled = True + config.url = "https://splunk.example.com:8088/services/collector" + config.token_path = token_file + config.index = "test_index" + config.source = "test-source" + config.timeout = 5 + config.verify_ssl = True + return config + + +@pytest.fixture(name="mock_session") +def mock_session_fixture() -> AsyncMock: + """Create a mock aiohttp session with successful response.""" + mock_response = AsyncMock() + mock_response.status = 200 + session = AsyncMock(spec=aiohttp.ClientSession) + session.post.return_value.__aenter__.return_value = mock_response + return session + + +@pytest.mark.parametrize( + ("token_content", "expected"), + [ + (" my-secret-token \n", "my-secret-token"), + ("token-no-whitespace", "token-no-whitespace"), + ], + ids=["strips_whitespace", "no_whitespace"], +) +def test_read_token_from_file( + tmp_path: Path, token_content: str, expected: str +) -> None: + """Test reading and stripping token from file.""" + token_file = tmp_path / "token" + token_file.write_text(token_content) + assert _read_token_from_file(str(token_file)) == expected + + +def test_read_token_returns_none_for_missing_file(tmp_path: Path) -> None: + """Test returns None when file doesn't exist.""" + assert _read_token_from_file(str(tmp_path / "nonexistent")) is None + + +def _make_config( + enabled: bool = True, + url: str | None = "https://splunk:8088", + token_path: Path | None = None, + index: str | None = "idx", +) -> MagicMock: + """Helper to create mock config with specific fields.""" + config = MagicMock() + config.enabled = enabled + config.url = url + config.token_path = token_path + config.index = index + return config + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("splunk_config",), + [ + (None,), + (_make_config(enabled=False),), + (_make_config(url=None, index=None),), + ], + ids=["config_none", "disabled", "incomplete"], +) +async def test_skips_event_when_not_configured(splunk_config: Any) -> None: + """Test event is skipped when Splunk is not properly configured.""" + with patch("observability.splunk.configuration") as mock_config: + mock_config.splunk = splunk_config + # Should not raise, just skip silently + await send_splunk_event({"test": "event"}, "test_sourcetype") + + +@pytest.mark.asyncio +async def test_sends_event_successfully( + mock_splunk_config: MagicMock, mock_session: AsyncMock +) -> None: + """Test event is sent successfully to Splunk HEC.""" + with ( + patch("observability.splunk.configuration") as mock_config, + patch("observability.splunk.aiohttp.ClientSession") as mock_client, + ): + mock_config.splunk = mock_splunk_config + mock_client.return_value.__aenter__.return_value = mock_session + + await send_splunk_event({"question": "test"}, "infer_with_llm") + + mock_session.post.assert_called_once() + call_args = mock_session.post.call_args + assert call_args[0][0] == mock_splunk_config.url + assert "Authorization" in call_args[1]["headers"] + assert call_args[1]["json"]["sourcetype"] == "infer_with_llm" + assert call_args[1]["json"]["event"] == {"question": "test"} + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("error_setup",), + [ + ( + lambda s: setattr( + s.post.return_value.__aenter__.return_value, "status", 503 + ), + ), + ( + lambda s: setattr( + s.return_value.__aenter__, "side_effect", aiohttp.ClientError() + ), + ), + ], + ids=["http_error", "client_error"], +) +async def test_logs_warning_on_error( + mock_splunk_config: MagicMock, error_setup: Any +) -> None: + """Test warning is logged on HTTP or client errors.""" + mock_session = AsyncMock(spec=aiohttp.ClientSession) + mock_response = AsyncMock() + mock_response.status = 503 + mock_response.text.return_value = "error" + mock_session.post.return_value.__aenter__.return_value = mock_response + + with ( + patch("observability.splunk.configuration") as mock_config, + patch("observability.splunk.aiohttp.ClientSession") as mock_client, + patch("observability.splunk.logger") as mock_logger, + ): + mock_config.splunk = mock_splunk_config + error_setup(mock_client) + mock_client.return_value.__aenter__.return_value = mock_session + + await send_splunk_event({"test": "event"}, "test_sourcetype") + + mock_logger.warning.assert_called()