22"""
33Events Agent - Fetches local events for a destination.
44Supports fault injection for testing observability.
5+
6+ Instrumented with opensearch-genai-observability-sdk-py:
7+ - register() replaces ~20 lines of manual TracerProvider/exporter setup
8+ - observe() + enrich() replace manual span creation + set_attribute() calls
59"""
610
711import json
1519import httpx
1620from fastapi import FastAPI
1721from opentelemetry import trace , metrics
18- from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import OTLPSpanExporter
1922from opentelemetry .exporter .otlp .proto .grpc .metric_exporter import OTLPMetricExporter
20- from opentelemetry .sdk .trace import TracerProvider
21- from opentelemetry .sdk .trace .export import BatchSpanProcessor
2223from opentelemetry .sdk .metrics import MeterProvider
2324from opentelemetry .sdk .metrics .export import PeriodicExportingMetricReader
2425from opentelemetry .sdk .resources import Resource
2728from opentelemetry .propagate import inject
2829from pydantic import BaseModel , Field
2930
31+ from opensearch_genai_observability_sdk_py import Op , enrich , observe , register
32+
3033
3134# MCP Server configuration
3235MCP_SERVER_URL = os .getenv ("MCP_SERVER_URL" , "http://mcp-server:8003" )
@@ -141,29 +144,25 @@ class ErrorResponse(BaseModel):
141144 agent_id : str
142145
143146
144- def setup_telemetry (service_name : str , otlp_endpoint : str ):
145- resource = Resource .create ({
146- "service.name" : service_name ,
147- "service.version" : "1.0.0" ,
148- "gen_ai.agent.id" : AGENT_ID ,
149- "gen_ai.agent.name" : AGENT_NAME ,
150- })
151- tracer_provider = TracerProvider (resource = resource )
152- tracer_provider .add_span_processor (
153- BatchSpanProcessor (OTLPSpanExporter (endpoint = otlp_endpoint , insecure = True ))
154- )
155- trace .set_tracer_provider (tracer_provider )
156- metric_reader = PeriodicExportingMetricReader (
157- OTLPMetricExporter (endpoint = otlp_endpoint , insecure = True ),
158- export_interval_millis = 10000 ,
159- )
160- meter_provider = MeterProvider (resource = resource , metric_readers = [metric_reader ])
161- metrics .set_meter_provider (meter_provider )
162- return trace .get_tracer (service_name ), metrics .get_meter (service_name )
163-
164-
147+ # --- Telemetry setup ---
165148otlp_endpoint = os .getenv ("OTEL_EXPORTER_OTLP_ENDPOINT" , "http://localhost:4317" )
166- tracer , meter = setup_telemetry ("events-agent" , otlp_endpoint )
149+
150+ # One line replaces ~20 lines of TracerProvider + exporter setup
151+ register (
152+ endpoint = f"grpc://{ otlp_endpoint .replace ('http://' , '' ).replace ('https://' , '' )} " ,
153+ service_name = "events-agent" ,
154+ service_version = "1.0.0" ,
155+ )
156+
157+ # Metrics (SDK handles tracing only)
158+ # TODO: unify Resource with register() when SDK supports metrics
159+ resource = Resource .create ({"service.name" : "events-agent" })
160+ metric_reader = PeriodicExportingMetricReader (
161+ OTLPMetricExporter (endpoint = otlp_endpoint , insecure = True ),
162+ export_interval_millis = 10000 ,
163+ )
164+ meter_provider = MeterProvider (resource = resource , metric_readers = [metric_reader ])
165+ metrics .set_meter_provider (meter_provider )
167166
168167inner_app = FastAPI (title = "Events Agent" , version = "1.0.0" )
169168
@@ -182,41 +181,40 @@ async def health():
182181@inner_app .post ("/events" )
183182async def get_events (request : EventsRequest ):
184183 model = random .choice (MODELS )
185-
184+ provider = SYSTEMS [model ]
185+
186186 # Promote gen_ai attributes to the root HTTP span so the UI can read them
187+ enrich (
188+ model = model ,
189+ provider = provider ,
190+ agent_id = AGENT_ID ,
191+ input_messages = [{"role" : "user" , "parts" : [{"type" : "text" , "content" : f"Find events in { request .destination } " }]}],
192+ )
187193 root_span = trace .get_current_span ()
188- root_span .set_attribute ("gen_ai.system" , SYSTEMS [model ])
189194 root_span .set_attribute ("gen_ai.agent.name" , AGENT_NAME )
190- root_span .set_attribute ("gen_ai.request.model" , model )
191195 root_span .set_attribute ("gen_ai.operation.name" , "invoke_agent" )
192- root_span .set_attribute ("gen_ai.input.messages" , json .dumps (
193- [{"role" : "user" , "parts" : [{"type" : "text" , "content" : f"Find events in { request .destination } " }]}]
194- ))
195-
196- with tracer .start_as_current_span (
197- "invoke_agent" ,
198- kind = SpanKind .INTERNAL ,
199- attributes = {
200- "gen_ai.operation.name" : "invoke_agent" ,
201- "gen_ai.agent.id" : AGENT_ID ,
202- "gen_ai.agent.name" : AGENT_NAME ,
203- "gen_ai.system" : SYSTEMS [model ],
204- "gen_ai.request.model" : model ,
205- "gen_ai.tool.definitions" : json .dumps (TOOL_DEFINITIONS ),
206- },
207- ) as span :
196+
197+ with observe (AGENT_NAME , op = Op .INVOKE_AGENT ) as span :
198+ enrich (
199+ model = model ,
200+ provider = provider ,
201+ agent_id = AGENT_ID ,
202+ tool_definitions = TOOL_DEFINITIONS ,
203+ )
204+
208205 destination = request .destination .lower ()
209206 date = request .date or datetime .now ().strftime ("%Y-%m-%d" )
210207 fault = request .fault
211208
212209 # Synthetic "thinking" LLM call
213- with tracer .start_as_current_span ("chat" , kind = SpanKind .INTERNAL ) as chat_span :
214- chat_span .set_attribute ("gen_ai.operation.name" , "chat" )
215- chat_span .set_attribute ("gen_ai.system" , SYSTEMS [model ])
216- chat_span .set_attribute ("gen_ai.request.model" , model )
217- chat_span .set_attribute ("gen_ai.usage.input_tokens" , random .randint (100 , 500 ))
218- chat_span .set_attribute ("gen_ai.usage.output_tokens" , random .randint (50 , 200 ))
219- chat_span .set_attribute ("gen_ai.response.finish_reasons" , ["tool_calls" ])
210+ with observe ("events-reasoning" , op = Op .CHAT ):
211+ enrich (
212+ model = model ,
213+ provider = provider ,
214+ input_tokens = random .randint (100 , 500 ),
215+ output_tokens = random .randint (50 , 200 ),
216+ finish_reason = "tool_calls" ,
217+ )
220218 time .sleep (random .uniform (0.05 , 0.15 ))
221219
222220 # Check for fault injection
@@ -267,20 +265,16 @@ async def get_events(request: EventsRequest):
267265 # Tool execution via MCP server
268266 session_id = uuid4 ().hex
269267 request_id = uuid4 ().hex [:8 ]
270- with tracer .start_as_current_span (
271- "tools/call fetch_events_api" ,
272- kind = SpanKind .CLIENT ,
273- attributes = {
274- "mcp.method.name" : "tools/call" ,
275- "mcp.session.id" : session_id ,
276- "mcp.protocol.version" : MCP_PROTOCOL_VERSION ,
277- "jsonrpc.request.id" : request_id ,
278- "gen_ai.operation.name" : "execute_tool" ,
279- "gen_ai.tool.name" : "fetch_events_api" ,
280- "network.transport" : "tcp" ,
281- "network.protocol.name" : "http" ,
282- },
283- ):
268+
269+ # MCP tool call — uses observe() for the span, with MCP-specific attributes set manually
270+ with observe ("fetch_events_api" , op = Op .EXECUTE_TOOL , kind = SpanKind .CLIENT ) as tool_span :
271+ tool_span .set_attribute ("mcp.method.name" , "tools/call" )
272+ tool_span .set_attribute ("mcp.session.id" , session_id )
273+ tool_span .set_attribute ("mcp.protocol.version" , MCP_PROTOCOL_VERSION )
274+ tool_span .set_attribute ("jsonrpc.request.id" , request_id )
275+ tool_span .set_attribute ("network.transport" , "tcp" )
276+ tool_span .set_attribute ("network.protocol.name" , "http" )
277+
284278 headers = {"mcp-session-id" : session_id }
285279 inject (headers )
286280 payload = {
@@ -293,12 +287,15 @@ async def get_events(request: EventsRequest):
293287 events = [Event (name = e ["name" ], type = e ["type" ], venue = e .get ("venue" , "TBD" ), date = e .get ("date" , date )) for e in events_list ]
294288
295289 span .set_attribute ("events.count" , len (events ))
296-
297- root_span .set_attribute ("gen_ai.output.messages" , json .dumps (
298- [{"role" : "assistant" , "parts" : [{"type" : "text" , "content" : json .dumps ([e .model_dump () for e in events ])}]}]
299- ))
300-
301- return EventsResponse (destination = request .destination , events = events , agent_id = AGENT_ID )
290+
291+ # Set output on the parent HTTP request span. This enrich() is intentionally
292+ # outside the observe() block — exiting observe() restores the parent span
293+ # context, so enrich() here targets the HTTP request span, not the agent span.
294+ enrich (
295+ output_messages = [{"role" : "assistant" , "parts" : [{"type" : "text" , "content" : json .dumps ([e .model_dump () for e in events ])}]}],
296+ )
297+
298+ return EventsResponse (destination = request .destination , events = events , agent_id = AGENT_ID )
302299
303300
304301app = OpenTelemetryMiddleware (inner_app )
0 commit comments