Skip to content

Commit 6cb9728

Browse files
Dwij1704dot-agi
andauthored
feat: add @track_endpoint decorator for HTTP tracing (#1148)
* feat: add @track_endpoint decorator for HTTP tracing * feat: add log_session_replay_url configuration option * docs: add documentation for @track_endpoint decorator with usage examples --------- Co-authored-by: Pratyush Shukla <ps4534@nyu.edu>
1 parent 67406e5 commit 6cb9728

File tree

11 files changed

+349
-5
lines changed

11 files changed

+349
-5
lines changed

agentops/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from typing import List, Optional, Union, Dict, Any
2727
from agentops.client import Client
2828
from agentops.sdk.core import TraceContext, tracer
29-
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail
29+
from agentops.sdk.decorators import trace, session, agent, task, workflow, operation, tool, guardrail, track_endpoint
3030
from agentops.enums import TraceState, SUCCESS, ERROR, UNSET
3131
from opentelemetry.trace.status import StatusCode
3232

@@ -90,6 +90,7 @@ def init(
9090
env_data_opt_out: Optional[bool] = None,
9191
log_level: Optional[Union[str, int]] = None,
9292
fail_safe: Optional[bool] = None,
93+
log_session_replay_url: Optional[bool] = None,
9394
exporter_endpoint: Optional[str] = None,
9495
**kwargs,
9596
):
@@ -117,6 +118,7 @@ def init(
117118
env_data_opt_out (bool): Whether to opt out of collecting environment data.
118119
log_level (str, int): The log level to use for the client. Defaults to 'CRITICAL'.
119120
fail_safe (bool): Whether to suppress errors and continue execution when possible.
121+
log_session_replay_url (bool): Whether to log session replay URLs to the console. Defaults to True.
120122
exporter_endpoint (str, optional): Endpoint for the exporter. If none is provided, key will
121123
be read from the AGENTOPS_EXPORTER_ENDPOINT environment variable.
122124
**kwargs: Additional configuration parameters to be passed to the client.
@@ -159,6 +161,7 @@ def init(
159161
"env_data_opt_out": env_data_opt_out,
160162
"log_level": log_level,
161163
"fail_safe": fail_safe,
164+
"log_session_replay_url": log_session_replay_url,
162165
"exporter_endpoint": exporter_endpoint,
163166
**kwargs,
164167
}
@@ -472,6 +475,7 @@ def extract_key_from_attr(attr_value: str) -> str:
472475
"operation",
473476
"tool",
474477
"guardrail",
478+
"track_endpoint",
475479
# Enums
476480
"TraceState",
477481
"SUCCESS",

agentops/config.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class ConfigDict(TypedDict):
3131
log_level: Optional[Union[str, int]]
3232
fail_safe: Optional[bool]
3333
prefetch_jwt_token: Optional[bool]
34+
log_session_replay_url: Optional[bool]
3435

3536

3637
@dataclass
@@ -115,6 +116,11 @@ class Config:
115116
metadata={"description": "Whether to prefetch JWT token during initialization"},
116117
)
117118

119+
log_session_replay_url: bool = field(
120+
default_factory=lambda: get_env_bool("AGENTOPS_LOG_SESSION_REPLAY_URL", True),
121+
metadata={"description": "Whether to log session replay URLs to the console"},
122+
)
123+
118124
exporter_endpoint: Optional[str] = field(
119125
default_factory=lambda: os.getenv("AGENTOPS_EXPORTER_ENDPOINT", "https://otlp.agentops.ai/v1/traces"),
120126
metadata={
@@ -148,6 +154,7 @@ def configure(
148154
log_level: Optional[Union[str, int]] = None,
149155
fail_safe: Optional[bool] = None,
150156
prefetch_jwt_token: Optional[bool] = None,
157+
log_session_replay_url: Optional[bool] = None,
151158
exporter: Optional[SpanExporter] = None,
152159
processor: Optional[SpanProcessor] = None,
153160
exporter_endpoint: Optional[str] = None,
@@ -213,6 +220,9 @@ def configure(
213220
if prefetch_jwt_token is not None:
214221
self.prefetch_jwt_token = prefetch_jwt_token
215222

223+
if log_session_replay_url is not None:
224+
self.log_session_replay_url = log_session_replay_url
225+
216226
if exporter is not None:
217227
self.exporter = exporter
218228

@@ -243,6 +253,7 @@ def dict(self):
243253
"log_level": self.log_level,
244254
"fail_safe": self.fail_safe,
245255
"prefetch_jwt_token": self.prefetch_jwt_token,
256+
"log_session_replay_url": self.log_session_replay_url,
246257
"exporter": self.exporter,
247258
"processor": self.processor,
248259
"exporter_endpoint": self.exporter_endpoint,

agentops/helpers/dashboard.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ def log_trace_url(span: Union[Span, ReadableSpan], title: Optional[str] = None)
3939
4040
Args:
4141
span: The span to log the URL for.
42+
title: Optional title for the trace.
4243
"""
44+
from agentops import get_client
45+
46+
try:
47+
client = get_client()
48+
if not client.config.log_session_replay_url:
49+
return
50+
except Exception:
51+
return
52+
4353
session_url = get_trace_url(span)
4454
logger.info(colored(f"\x1b[34mSession Replay for {title} trace: {session_url}\x1b[0m", "blue"))

agentops/sdk/decorators/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
tool = create_entity_decorator(SpanKind.TOOL)
1717
operation = task
1818
guardrail = create_entity_decorator(SpanKind.GUARDRAIL)
19+
track_endpoint = create_entity_decorator(SpanKind.HTTP)
1920

2021

2122
# For backward compatibility: @session decorator calls @trace decorator
@@ -46,4 +47,5 @@ def session(*args, **kwargs): # noqa: F811
4647
"operation",
4748
"tool",
4849
"guardrail",
50+
"track_endpoint",
4951
]

agentops/sdk/decorators/factory.py

Lines changed: 185 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
_process_sync_generator,
1818
_record_entity_input,
1919
_record_entity_output,
20+
_extract_request_data,
21+
_extract_response_data,
2022
)
2123

2224

2325
def create_entity_decorator(entity_kind: str) -> Callable[..., Any]:
2426
"""
2527
Factory that creates decorators for instrumenting functions and classes.
26-
Handles different entity kinds (e.g., SESSION, TASK) and function types (sync, async, generator).
28+
Handles different entity kinds (e.g., SESSION, TASK, HTTP) and function types (sync, async, generator).
2729
"""
2830

2931
def decorator(
@@ -34,9 +36,20 @@ def decorator(
3436
tags: Optional[Union[list, dict]] = None,
3537
cost=None,
3638
spec=None,
39+
capture_request: bool = True,
40+
capture_response: bool = True,
3741
) -> Callable[..., Any]:
3842
if wrapped is None:
39-
return functools.partial(decorator, name=name, version=version, tags=tags, cost=cost, spec=spec)
43+
return functools.partial(
44+
decorator,
45+
name=name,
46+
version=version,
47+
tags=tags,
48+
cost=cost,
49+
spec=spec,
50+
capture_request=capture_request,
51+
capture_response=capture_response,
52+
)
4053

4154
if inspect.isclass(wrapped):
4255
# Class decoration wraps __init__ and aenter/aexit for context management.
@@ -96,7 +109,176 @@ def wrapper(
96109
is_generator = inspect.isgeneratorfunction(wrapped_func)
97110
is_async_generator = inspect.isasyncgenfunction(wrapped_func)
98111

99-
if entity_kind == SpanKind.SESSION:
112+
# Special handling for HTTP entity kind
113+
if entity_kind == SpanKind.HTTP:
114+
if is_generator or is_async_generator:
115+
logger.warning(
116+
f"@track_endpoint on generator '{operation_name}' is not supported. Use @trace instead."
117+
)
118+
return wrapped_func(*args, **kwargs)
119+
120+
if is_async:
121+
122+
async def _wrapped_http_async() -> Any:
123+
trace_context: Optional[TraceContext] = None
124+
try:
125+
# Create main session span
126+
trace_context = tracer.start_trace(trace_name=operation_name, tags=tags)
127+
if not trace_context:
128+
logger.error(
129+
f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace."
130+
)
131+
return await wrapped_func(*args, **kwargs)
132+
133+
# Create HTTP request span
134+
if capture_request:
135+
with _create_as_current_span(
136+
f"{operation_name}.request",
137+
SpanKind.HTTP,
138+
version=version,
139+
attributes={SpanAttributes.HTTP_METHOD: "REQUEST"}
140+
if SpanAttributes.HTTP_METHOD
141+
else None,
142+
) as request_span:
143+
try:
144+
request_data = _extract_request_data()
145+
if request_data:
146+
# Set HTTP attributes
147+
if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"):
148+
request_span.set_attribute(
149+
SpanAttributes.HTTP_METHOD, request_data["method"]
150+
)
151+
if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"):
152+
request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"])
153+
154+
# Record the full request data
155+
_record_entity_input(request_span, (request_data,), {})
156+
except Exception as e:
157+
logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}")
158+
159+
# Execute the main function
160+
result = await wrapped_func(*args, **kwargs)
161+
162+
# Create HTTP response span
163+
if capture_response:
164+
with _create_as_current_span(
165+
f"{operation_name}.response",
166+
SpanKind.HTTP,
167+
version=version,
168+
attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"}
169+
if SpanAttributes.HTTP_METHOD
170+
else None,
171+
) as response_span:
172+
try:
173+
response_data = _extract_response_data(result)
174+
if response_data:
175+
# Set HTTP attributes
176+
if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get(
177+
"status_code"
178+
):
179+
response_span.set_attribute(
180+
SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"]
181+
)
182+
183+
# Record the full response data
184+
_record_entity_output(response_span, response_data)
185+
except Exception as e:
186+
logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}")
187+
188+
tracer.end_trace(trace_context, "Success")
189+
return result
190+
except Exception:
191+
if trace_context:
192+
tracer.end_trace(trace_context, "Indeterminate")
193+
raise
194+
finally:
195+
if trace_context and trace_context.span.is_recording():
196+
logger.warning(
197+
f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'."
198+
)
199+
tracer.end_trace(trace_context, "Unknown")
200+
201+
return _wrapped_http_async()
202+
else: # Sync function for HTTP
203+
trace_context: Optional[TraceContext] = None
204+
try:
205+
# Create main session span
206+
trace_context = tracer.start_trace(trace_name=operation_name, tags=tags)
207+
if not trace_context:
208+
logger.error(
209+
f"Failed to start trace for @track_endpoint '{operation_name}'. Executing without trace."
210+
)
211+
return wrapped_func(*args, **kwargs)
212+
213+
# Create HTTP request span
214+
if capture_request:
215+
with _create_as_current_span(
216+
f"{operation_name}.request",
217+
SpanKind.HTTP,
218+
version=version,
219+
attributes={SpanAttributes.HTTP_METHOD: "REQUEST"}
220+
if SpanAttributes.HTTP_METHOD
221+
else None,
222+
) as request_span:
223+
try:
224+
request_data = _extract_request_data()
225+
if request_data:
226+
# Set HTTP attributes
227+
if hasattr(SpanAttributes, "HTTP_METHOD") and request_data.get("method"):
228+
request_span.set_attribute(
229+
SpanAttributes.HTTP_METHOD, request_data["method"]
230+
)
231+
if hasattr(SpanAttributes, "HTTP_URL") and request_data.get("url"):
232+
request_span.set_attribute(SpanAttributes.HTTP_URL, request_data["url"])
233+
234+
# Record the full request data
235+
_record_entity_input(request_span, (request_data,), {})
236+
except Exception as e:
237+
logger.warning(f"Failed to record HTTP request for '{operation_name}': {e}")
238+
239+
# Execute the main function
240+
result = wrapped_func(*args, **kwargs)
241+
242+
# Create HTTP response span
243+
if capture_response:
244+
with _create_as_current_span(
245+
f"{operation_name}.response",
246+
SpanKind.HTTP,
247+
version=version,
248+
attributes={SpanAttributes.HTTP_METHOD: "RESPONSE"}
249+
if SpanAttributes.HTTP_METHOD
250+
else None,
251+
) as response_span:
252+
try:
253+
response_data = _extract_response_data(result)
254+
if response_data:
255+
# Set HTTP attributes
256+
if hasattr(SpanAttributes, "HTTP_STATUS_CODE") and response_data.get(
257+
"status_code"
258+
):
259+
response_span.set_attribute(
260+
SpanAttributes.HTTP_STATUS_CODE, response_data["status_code"]
261+
)
262+
263+
# Record the full response data
264+
_record_entity_output(response_span, response_data)
265+
except Exception as e:
266+
logger.warning(f"Failed to record HTTP response for '{operation_name}': {e}")
267+
268+
tracer.end_trace(trace_context, "Success")
269+
return result
270+
except Exception:
271+
if trace_context:
272+
tracer.end_trace(trace_context, "Indeterminate")
273+
raise
274+
finally:
275+
if trace_context and trace_context.span.is_recording():
276+
logger.warning(
277+
f"Trace for @track_endpoint '{operation_name}' not explicitly ended. Ending as 'Unknown'."
278+
)
279+
tracer.end_trace(trace_context, "Unknown")
280+
281+
elif entity_kind == SpanKind.SESSION:
100282
if is_generator or is_async_generator:
101283
logger.warning(
102284
f"@agentops.trace on generator '{operation_name}' creates a single span, not a full trace."

agentops/sdk/decorators/utility.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,57 @@ def _record_entity_output(span: trace.Span, result: Any, entity_kind: str = "ent
160160
logger.debug("Operation output exceeds size limit, not recording")
161161
except Exception as err:
162162
logger.warning(f"Failed to serialize operation output: {err}")
163+
164+
165+
# Helper functions for HTTP request/response data extraction
166+
167+
168+
def _extract_request_data():
169+
"""Extract HTTP request data from the current web framework context."""
170+
request_data = {}
171+
172+
try:
173+
# Try to import Flask and get current request
174+
from flask import request
175+
176+
request_data = {
177+
"method": request.method,
178+
"url": request.url,
179+
"headers": dict(request.headers),
180+
"args": dict(request.args),
181+
"form": dict(request.form) if request.form else None,
182+
"json": request.get_json(silent=True),
183+
"data": request.get_data(as_text=True) if request.content_length else None,
184+
}
185+
except ImportError:
186+
logger.debug("Flask not available for request data extraction")
187+
except Exception as e:
188+
logger.warning(f"Failed to extract request data: {e}")
189+
190+
return request_data
191+
192+
193+
def _extract_response_data(response):
194+
"""Extract HTTP response data from response object."""
195+
response_data = {}
196+
197+
try:
198+
# Handle Flask response objects
199+
from flask import Response
200+
201+
if isinstance(response, Response):
202+
response_data = {
203+
"status_code": response.status_code,
204+
"headers": dict(response.headers),
205+
"data": response.get_data(as_text=True) if response.content_length else None,
206+
}
207+
else:
208+
# Handle cases where response is just data (will be converted to Response by Flask)
209+
response_data = {
210+
"status_code": 200, # Default status for successful responses
211+
"data": str(response) if response is not None else None,
212+
}
213+
except Exception as e:
214+
logger.warning(f"Failed to extract response data: {e}")
215+
216+
return response_data

0 commit comments

Comments
 (0)