Skip to content

Commit 0e824e3

Browse files
committed
RSPEED-2326: feat(observability): add async Splunk HEC client
Implement send_splunk_event() async function for sending events to Splunk: - Uses aiohttp for async HTTP requests - Reads HEC token from file path - Graceful degradation when config missing/disabled - Logs warnings on send failures (no exceptions raised) Add splunk and deployment_environment properties to AppConfig. Signed-off-by: Major Hayden <major@redhat.com>
1 parent 797620c commit 0e824e3

4 files changed

Lines changed: 262 additions & 1 deletion

File tree

src/configuration.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
DatabaseConfiguration,
2424
ConversationHistoryConfiguration,
2525
QuotaHandlersConfiguration,
26+
SplunkConfiguration,
2627
)
2728

2829
from cache.cache import Cache
@@ -39,7 +40,7 @@ class LogicError(Exception):
3940
"""Error in application logic."""
4041

4142

42-
class AppConfig:
43+
class AppConfig: # pylint: disable=too-many-public-methods
4344
"""Singleton class to load and store the configuration."""
4445

4546
_instance = None
@@ -348,5 +349,19 @@ def azure_entra_id(self) -> Optional[AzureEntraIdConfiguration]:
348349
raise LogicError("logic error: configuration is not loaded")
349350
return self._configuration.azure_entra_id
350351

352+
@property
353+
def splunk(self) -> Optional[SplunkConfiguration]:
354+
"""Return Splunk configuration, or None if not provided."""
355+
if self._configuration is None:
356+
raise LogicError("logic error: configuration is not loaded")
357+
return self._configuration.splunk
358+
359+
@property
360+
def deployment_environment(self) -> str:
361+
"""Return deployment environment name."""
362+
if self._configuration is None:
363+
raise LogicError("logic error: configuration is not loaded")
364+
return self._configuration.deployment_environment
365+
351366

352367
configuration: AppConfig = AppConfig()

src/observability/splunk.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
"""Async Splunk HEC client for sending telemetry events."""
2+
3+
import logging
4+
import platform
5+
import time
6+
from typing import Any
7+
8+
import aiohttp
9+
10+
from configuration import configuration
11+
from version import __version__
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _get_hostname() -> str:
17+
"""Get the hostname for Splunk event metadata."""
18+
return platform.node() or "unknown"
19+
20+
21+
def _read_token_from_file(token_path: str) -> str | None:
22+
"""Read HEC token from file path."""
23+
try:
24+
with open(token_path, encoding="utf-8") as f:
25+
return f.read().strip()
26+
except OSError as e:
27+
logger.warning("Failed to read Splunk HEC token from %s: %s", token_path, e)
28+
return None
29+
30+
31+
async def send_splunk_event(event: dict[str, Any], sourcetype: str) -> None:
32+
"""Send an event to Splunk HEC.
33+
34+
This function sends events asynchronously and handles failures gracefully
35+
by logging warnings instead of raising exceptions. This ensures that
36+
Splunk connectivity issues don't affect the main application flow.
37+
38+
Args:
39+
event: The event payload to send.
40+
sourcetype: The Splunk sourcetype (e.g., "infer_with_llm", "infer_error").
41+
"""
42+
splunk_config = configuration.splunk
43+
if splunk_config is None or not splunk_config.enabled:
44+
logger.debug("Splunk integration disabled, skipping event")
45+
return
46+
47+
if not splunk_config.url or not splunk_config.token_path or not splunk_config.index:
48+
logger.warning("Splunk configuration incomplete, skipping event")
49+
return
50+
51+
# Read token on each request to support rotation without restart
52+
token = _read_token_from_file(str(splunk_config.token_path))
53+
if not token:
54+
return
55+
56+
payload = {
57+
"time": int(time.time()),
58+
"host": _get_hostname(),
59+
"source": f"{splunk_config.source} (v{__version__})",
60+
"sourcetype": sourcetype,
61+
"index": splunk_config.index,
62+
"event": event,
63+
}
64+
65+
headers = {
66+
"Authorization": f"Splunk {token}",
67+
"Content-Type": "application/json",
68+
}
69+
70+
timeout = aiohttp.ClientTimeout(total=splunk_config.timeout)
71+
connector = aiohttp.TCPConnector(ssl=splunk_config.verify_ssl)
72+
73+
try:
74+
async with aiohttp.ClientSession(
75+
timeout=timeout, connector=connector
76+
) as session:
77+
async with session.post(
78+
splunk_config.url, json=payload, headers=headers
79+
) as response:
80+
if response.status >= 400:
81+
body = await response.text()
82+
logger.warning(
83+
"Splunk HEC request failed with status %d: %s",
84+
response.status,
85+
body[:200],
86+
)
87+
except aiohttp.ClientError as e:
88+
logger.warning("Splunk HEC request failed: %s", e)
89+
except TimeoutError:
90+
logger.warning("Splunk HEC request timed out after %ds", splunk_config.timeout)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Unit tests for observability module."""
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
"""Unit tests for Splunk HEC client."""
2+
3+
from pathlib import Path
4+
from typing import Any
5+
from unittest.mock import AsyncMock, MagicMock, patch
6+
7+
import aiohttp
8+
import pytest
9+
10+
from observability.splunk import send_splunk_event, _read_token_from_file
11+
12+
13+
@pytest.fixture(name="mock_splunk_config")
14+
def mock_splunk_config_fixture(tmp_path: Path) -> MagicMock:
15+
"""Create a mock SplunkConfiguration."""
16+
token_file = tmp_path / "token"
17+
token_file.write_text("test-hec-token")
18+
19+
config = MagicMock()
20+
config.enabled = True
21+
config.url = "https://splunk.example.com:8088/services/collector"
22+
config.token_path = token_file
23+
config.index = "test_index"
24+
config.source = "test-source"
25+
config.timeout = 5
26+
config.verify_ssl = True
27+
return config
28+
29+
30+
@pytest.fixture(name="mock_session")
31+
def mock_session_fixture() -> AsyncMock:
32+
"""Create a mock aiohttp session with successful response."""
33+
mock_response = AsyncMock()
34+
mock_response.status = 200
35+
session = AsyncMock(spec=aiohttp.ClientSession)
36+
session.post.return_value.__aenter__.return_value = mock_response
37+
return session
38+
39+
40+
@pytest.mark.parametrize(
41+
("token_content", "expected"),
42+
[
43+
(" my-secret-token \n", "my-secret-token"),
44+
("token-no-whitespace", "token-no-whitespace"),
45+
],
46+
ids=["strips_whitespace", "no_whitespace"],
47+
)
48+
def test_read_token_from_file(
49+
tmp_path: Path, token_content: str, expected: str
50+
) -> None:
51+
"""Test reading and stripping token from file."""
52+
token_file = tmp_path / "token"
53+
token_file.write_text(token_content)
54+
assert _read_token_from_file(str(token_file)) == expected
55+
56+
57+
def test_read_token_returns_none_for_missing_file(tmp_path: Path) -> None:
58+
"""Test returns None when file doesn't exist."""
59+
assert _read_token_from_file(str(tmp_path / "nonexistent")) is None
60+
61+
62+
def _make_config(
63+
enabled: bool = True,
64+
url: str | None = "https://splunk:8088",
65+
token_path: Path | None = None,
66+
index: str | None = "idx",
67+
) -> MagicMock:
68+
"""Helper to create mock config with specific fields."""
69+
config = MagicMock()
70+
config.enabled = enabled
71+
config.url = url
72+
config.token_path = token_path
73+
config.index = index
74+
return config
75+
76+
77+
@pytest.mark.asyncio
78+
@pytest.mark.parametrize(
79+
("splunk_config",),
80+
[
81+
(None,),
82+
(_make_config(enabled=False),),
83+
(_make_config(url=None, index=None),),
84+
],
85+
ids=["config_none", "disabled", "incomplete"],
86+
)
87+
async def test_skips_event_when_not_configured(splunk_config: Any) -> None:
88+
"""Test event is skipped when Splunk is not properly configured."""
89+
with patch("observability.splunk.configuration") as mock_config:
90+
mock_config.splunk = splunk_config
91+
# Should not raise, just skip silently
92+
await send_splunk_event({"test": "event"}, "test_sourcetype")
93+
94+
95+
@pytest.mark.asyncio
96+
async def test_sends_event_successfully(
97+
mock_splunk_config: MagicMock, mock_session: AsyncMock
98+
) -> None:
99+
"""Test event is sent successfully to Splunk HEC."""
100+
with (
101+
patch("observability.splunk.configuration") as mock_config,
102+
patch("observability.splunk.aiohttp.ClientSession") as mock_client,
103+
):
104+
mock_config.splunk = mock_splunk_config
105+
mock_client.return_value.__aenter__.return_value = mock_session
106+
107+
await send_splunk_event({"question": "test"}, "infer_with_llm")
108+
109+
mock_session.post.assert_called_once()
110+
call_args = mock_session.post.call_args
111+
assert call_args[0][0] == mock_splunk_config.url
112+
assert "Authorization" in call_args[1]["headers"]
113+
assert call_args[1]["json"]["sourcetype"] == "infer_with_llm"
114+
assert call_args[1]["json"]["event"] == {"question": "test"}
115+
116+
117+
@pytest.mark.asyncio
118+
@pytest.mark.parametrize(
119+
("error_setup",),
120+
[
121+
(
122+
lambda s: setattr(
123+
s.post.return_value.__aenter__.return_value, "status", 503
124+
),
125+
),
126+
(
127+
lambda s: setattr(
128+
s.return_value.__aenter__, "side_effect", aiohttp.ClientError()
129+
),
130+
),
131+
],
132+
ids=["http_error", "client_error"],
133+
)
134+
async def test_logs_warning_on_error(
135+
mock_splunk_config: MagicMock, error_setup: Any
136+
) -> None:
137+
"""Test warning is logged on HTTP or client errors."""
138+
mock_session = AsyncMock(spec=aiohttp.ClientSession)
139+
mock_response = AsyncMock()
140+
mock_response.status = 503
141+
mock_response.text.return_value = "error"
142+
mock_session.post.return_value.__aenter__.return_value = mock_response
143+
144+
with (
145+
patch("observability.splunk.configuration") as mock_config,
146+
patch("observability.splunk.aiohttp.ClientSession") as mock_client,
147+
patch("observability.splunk.logger") as mock_logger,
148+
):
149+
mock_config.splunk = mock_splunk_config
150+
error_setup(mock_client)
151+
mock_client.return_value.__aenter__.return_value = mock_session
152+
153+
await send_splunk_event({"test": "event"}, "test_sourcetype")
154+
155+
mock_logger.warning.assert_called()

0 commit comments

Comments
 (0)