Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"opentelemetry-api ~= 1.38.0.dev0",
"opentelemetry-api ~= 1.41.0.dev0",
"opentelemetry-instrumentation ~= 0.59b0.dev0",
"opentelemetry-semantic-conventions ~= 0.59b0.dev0",
"splunk-otel-util-genai>=0.1.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ def _instrument(self, **kwargs):
meter_provider = kwargs.get("meter_provider")
logger_provider = kwargs.get("logger_provider")

if tracer_provider is None:
from opentelemetry import trace
tracer_provider = trace.get_tracer_provider()

if meter_provider is None:
from opentelemetry import metrics
meter_provider = metrics.get_meter_provider()

if logger_provider is None:
from opentelemetry import _logs
logger_provider = _logs.get_logger_provider()

# Get the telemetry handler from util-genai
self._telemetry_handler = get_telemetry_handler(
tracer_provider=tracer_provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,23 @@ async def traced_handle_request(wrapped, instance, args, kwargs) -> Any:
if tracestate:
carrier["tracestate"] = tracestate

baggage = getattr(request_meta, "baggage", None)
if baggage:
carrier["baggage"] = baggage

# Also try model_extra for pydantic v2 extra fields
if not carrier and hasattr(request_meta, "model_extra"):
extra = request_meta.model_extra
if extra:
for key in ["traceparent", "tracestate"]:
for key in ["traceparent", "tracestate", "baggage"]:
if key in extra:
carrier[key] = extra[key]

if carrier:
ctx = propagate.extract(carrier)
token = context.attach(ctx)
_LOGGER.debug(
f"Attached trace context in _handle_request: "
f"Attached trace context and baggage in _handle_request: "
f"carrier={carrier}"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,8 @@ def main(manual_instrumentation: bool = False) -> None:
else:
print("🔑 Using standard OPENAI_API_KEY authentication")

if manual_instrumentation:
_configure_manual_instrumentation()
#if manual_instrumentation:
_configure_manual_instrumentation()

session_id = str(uuid4())
user_request = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ python-dotenv>=1.0.0
deepeval
litellm

opentelemetry-exporter-otlp-proto-grpc>=1.38.0

# Splunk OpenTelemetry GenAI packages
splunk-otel-util-genai==0.1.9
splunk-otel-util-genai-evals==0.1.7
splunk-otel-genai-evals-deepeval==0.1.12
splunk-otel-genai-emitters-splunk==0.1.6
splunk-otel-instrumentation-langchain==0.1.7
#splunk-otel-util-genai==0.1.9
#splunk-otel-util-genai-evals==0.1.7
#splunk-otel-genai-evals-deepeval==0.1.12
#splunk-otel-genai-emitters-splunk==0.1.6
#splunk-otel-instrumentation-langchain==0.1.7
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,8 @@ def main():
sys.exit(1)

# Configure manual instrumentation if requested
if args.manual_instrumentation:
_configure_manual_instrumentation(config)
#if args.manual_instrumentation:
_configure_manual_instrumentation(config)

# Set up OpenTelemetry environment
os.environ.setdefault("OTEL_SERVICE_NAME", config.otel_service_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,72 +40,114 @@
from agents import investigation_agent # noqa: E402
from config import Config # noqa: E402

mcp = FastMCP("investigation-agent")


@mcp.tool()
async def investigate_incident(
service_id: str,
investigation_checklist: str,
scenario_id: str = None,
) -> dict:
"""Investigate an incident by querying metrics, logs, and traces.

This tool exposes the Investigation Agent as an MCP tool that can be called
by other agents or external systems.

Args:
service_id: The service identifier to investigate
investigation_checklist: JSON string with investigation steps
scenario_id: Optional scenario ID for seeded data

Returns:
Dict containing investigation results with hypotheses and evidence
"""
# Create a minimal state for the agent
config = Config.from_env()
if scenario_id:
config.scenario_id = scenario_id

state = {
"service_id": service_id,
"scenario_id": scenario_id,
"session_id": f"mcp-{asyncio.get_event_loop().time()}",
"triage_result": {
"investigation_checklist": json.loads(investigation_checklist)
if isinstance(investigation_checklist, str)
else investigation_checklist,
},
"current_agent": "investigation",
"hypotheses": [],
"confidence_score": 0.0,
"eval_metrics": {},
}

try:
# Run investigation agent
updated_state = investigation_agent(state, config)

# Extract results
investigation_result = updated_state.get("investigation_result", {})
hypotheses = updated_state.get("hypotheses", [])
confidence_score = updated_state.get("confidence_score", 0.0)

return {
"status": "success",
"service_id": service_id,
"hypotheses": hypotheses,
"investigation_result": investigation_result,
"confidence_score": confidence_score,
"evidence_count": sum(len(h.get("evidence", [])) for h in hypotheses),
}
except Exception as e:
return {
"status": "error",
"error": str(e),

def _configure_manual_instrumentation():
"""Configure manual OpenTelemetry instrumentation."""

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry import _events, _logs, metrics, trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk._events import EventLoggerProvider
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(OTLPSpanExporter())
)

metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter())
metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader]))

_logs.set_logger_provider(LoggerProvider())
_logs.get_logger_provider().add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter())
)
_events.set_event_logger_provider(EventLoggerProvider())

from opentelemetry.instrumentation.langchain import LangchainInstrumentor
instrumentor = LangchainInstrumentor()
instrumentor.instrument()

from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor
instrumentor2 = FastMCPInstrumentor()
instrumentor2.instrument()

if __name__ == "__main__":
_configure_manual_instrumentation()
mcp = FastMCP("investigation-agent")


@mcp.tool()
async def investigate_incident(
service_id: str,
investigation_checklist: str,
scenario_id: str = None,
) -> dict:
"""Investigate an incident by querying metrics, logs, and traces.

This tool exposes the Investigation Agent as an MCP tool that can be called
by other agents or external systems.

Args:
service_id: The service identifier to investigate
investigation_checklist: JSON string with investigation steps
scenario_id: Optional scenario ID for seeded data

Returns:
Dict containing investigation results with hypotheses and evidence
"""
# Create a minimal state for the agent
config = Config.from_env()
if scenario_id:
config.scenario_id = scenario_id

state = {
"service_id": service_id,
"scenario_id": scenario_id,
"session_id": f"mcp-{asyncio.get_event_loop().time()}",
"triage_result": {
"investigation_checklist": json.loads(investigation_checklist)
if isinstance(investigation_checklist, str)
else investigation_checklist,
},
"current_agent": "investigation",
"hypotheses": [],
"confidence_score": 0.0,
"eval_metrics": {},
}

try:
# Run investigation agent
updated_state = investigation_agent(state, config)

# Extract results
investigation_result = updated_state.get("investigation_result", {})
hypotheses = updated_state.get("hypotheses", [])
confidence_score = updated_state.get("confidence_score", 0.0)

return {
"status": "success",
"service_id": service_id,
"hypotheses": hypotheses,
"investigation_result": investigation_result,
"confidence_score": confidence_score,
"evidence_count": sum(len(h.get("evidence", [])) for h in hypotheses),
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"service_id": service_id,
}

if __name__ == "__main__":
mcp.run(transport="stdio", show_banner=False)
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ def notifier(message: str, channel: str = "incidents") -> str:
}
return json.dumps(notification, indent=2)

def _configure_manual_instrumentation():
"""Configure manual OpenTelemetry instrumentation."""

from opentelemetry.instrumentation.fastmcp import FastMCPInstrumentor
instrumentor2 = FastMCPInstrumentor()
instrumentor2.instrument()

@tool
def investigation_agent_mcp(
Expand All @@ -383,7 +389,7 @@ def investigation_agent_mcp(
Returns:
JSON string with investigation results
"""

_configure_manual_instrumentation()
mcp_script_path = os.path.join(
os.path.dirname(__file__), "mcp_tools", "investigation_agent_mcp.py"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"opentelemetry-api ~= 1.38.0.dev0",
"opentelemetry-api ~= 1.41.0.dev0",
"opentelemetry-instrumentation ~= 0.59b0.dev0",
"opentelemetry-semantic-conventions ~= 0.59b0.dev0",
"splunk-otel-util-genai>=0.1.4",
Expand Down
Loading
Loading