Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
bd804fa
Implement concurrent.futures instrumentation for OpenTelemetry contex…
fenilfaldu May 29, 2025
cfb3619
ruff checks :)
fenilfaldu May 29, 2025
ea8401f
ruff check again :(
fenilfaldu May 29, 2025
d17f14e
damn ruff +_+
fenilfaldu May 29, 2025
4c2faa2
heck ruff again
fenilfaldu May 29, 2025
6d9db2e
Merge branch 'main' into add-concurrency-instrumentaion
Dwij1704 Jun 2, 2025
e280ec9
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu Jun 2, 2025
0f09692
constants update
fenilfaldu Jun 2, 2025
397760c
Merge branch 'main' into add-concurrency-instrumentaion
Dwij1704 Jun 6, 2025
9e0a7b5
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu Jun 6, 2025
cd53e9d
refactor the instrumentation with utlity instrumentation
fenilfaldu Jun 7, 2025
1ffa1ea
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu Jun 7, 2025
0f9749f
added types to the function/methods defs
fenilfaldu Jun 7, 2025
1e84738
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu Jun 8, 2025
6cc58d1
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu Jun 10, 2025
a6a302b
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu Jun 10, 2025
2921987
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu Jun 11, 2025
722f150
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu Jun 12, 2025
a460ad5
Merge branch 'main' into add-concurrency-instrumentaion
dot-agi Jun 12, 2025
f43df8a
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@
def _should_instrument_package(package_name: str) -> bool:
"""
Determine if a package should be instrumented based on current state.
Handles special cases for agentic libraries and providers.
Handles special cases for agentic libraries, providers, and utility instrumentors.
"""
global _has_agentic_library

# If this is an agentic library, uninstrument all providers first
if package_name in AGENTIC_LIBRARIES:
_uninstrument_providers()
Expand All @@ -79,6 +80,10 @@
if package_name in PROVIDERS and _has_agentic_library:
return False

# Utility instrumentors are always enabled regardless of agentic library state
if package_name in UTILITY_INSTRUMENTORS:
return not _is_package_instrumented(package_name)

# Skip if already instrumented
if _is_package_instrumented(package_name):
return False
Expand All @@ -93,7 +98,12 @@
return

# Get the appropriate configuration for the package
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES[package_name]
config = (
PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS.get(package_name)
)
if not config:
return

Check warning on line 105 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L105

Added line #L105 was not covered by tests

loader = InstrumentorLoader(**config)

if loader.should_activate:
Expand Down Expand Up @@ -188,6 +198,22 @@
"min_version": "0.1.0",
"package_name": "google-genai", # Actual pip package name
},
# "mem0": {
# "module_name": "agentops.instrumentation.mem0",
# "class_name": "Mem0Instrumentor",
# "min_version": "0.1.10",
# "package_name": "mem0ai", # Actual pip package name
# },
}

# Configuration for utility instrumentors
UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = {
Comment thread
fenilfaldu marked this conversation as resolved.
Outdated
"concurrent.futures": {
"module_name": "agentops.instrumentation.concurrent_futures",
"class_name": "ConcurrentFuturesInstrumentor",
"min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib)
"package_name": "python", # Special case for stdlib modules
},
}

# Configuration for supported agentic libraries
Expand All @@ -211,7 +237,7 @@
}

# Combine all target packages for monitoring
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys())
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys())

# Create a single instance of the manager
# _manager = InstrumentationManager() # Removed
Expand All @@ -238,6 +264,13 @@
def should_activate(self) -> bool:
"""Check if the package is available and meets version requirements."""
try:
# Special case for stdlib modules (like concurrent.futures)
if self.package_name == "python":
import sys

python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
return Version(python_version) >= parse(self.min_version)

# Use explicit package_name if provided, otherwise derive from module_name
if self.package_name:
provider_name = self.package_name
Expand Down
10 changes: 10 additions & 0 deletions agentops/instrumentation/concurrent_futures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""
Instrumentation for concurrent.futures module.

This module provides automatic instrumentation for ThreadPoolExecutor to ensure
proper OpenTelemetry context propagation across thread boundaries.
"""

from .instrumentation import ConcurrentFuturesInstrumentor

__all__ = ["ConcurrentFuturesInstrumentor"]
157 changes: 157 additions & 0 deletions agentops/instrumentation/concurrent_futures/instrumentation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
OpenTelemetry Instrumentation for concurrent.futures module.

This instrumentation automatically patches ThreadPoolExecutor to ensure proper
context propagation across thread boundaries, preventing "NEW TRACE DETECTED" issues.
"""

import contextvars
import functools
from typing import Collection
from concurrent.futures import ThreadPoolExecutor

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor

from agentops.logging import logger

# Store original methods to restore during uninstrumentation
_original_init = None
_original_submit = None


def _context_propagating_init(original_init):
"""Wrap ThreadPoolExecutor.__init__ to set up context-aware initializer."""

@functools.wraps(original_init)
def wrapped_init(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()):
# Capture the current context when the executor is created
main_context = contextvars.copy_context()

def context_aware_initializer():
"""Initializer that sets up the captured context in each worker thread."""
logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread")

# Set the main context variables in this thread
for var, value in main_context.items():
try:
var.set(value)
except Exception as e:
logger.debug(f"[ConcurrentFuturesInstrumentor] Could not set context var {var}: {e}")

Check warning on line 39 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L38-L39

Added lines #L38 - L39 were not covered by tests

# Run user's initializer if provided
if initializer and callable(initializer):
try:
if initargs:
initializer(*initargs)

Check warning on line 45 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L43-L45

Added lines #L43 - L45 were not covered by tests
else:
initializer()
except Exception as e:
logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}")
raise

Check warning on line 50 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L47-L50

Added lines #L47 - L50 were not covered by tests

logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete")

# Create executor with context-aware initializer
prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread"

# Call original init with our context-aware initializer
original_init(
self,
max_workers=max_workers,
thread_name_prefix=prefix,
initializer=context_aware_initializer,
initargs=(), # We handle initargs in our wrapper
)

logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation")

return wrapped_init


def _context_propagating_submit(original_submit):
"""Wrap ThreadPoolExecutor.submit to ensure context propagation."""

@functools.wraps(original_submit)
def wrapped_submit(self, func, *args, **kwargs):
# Log the submission
func_name = getattr(func, "__name__", str(func))
logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}")

# The context propagation is handled by the initializer, so we can submit normally
# But we can add additional logging or monitoring here if needed
return original_submit(self, func, *args, **kwargs)

return wrapped_submit


class ConcurrentFuturesInstrumentor(BaseInstrumentor):
"""
Instrumentor for concurrent.futures module.

This instrumentor patches ThreadPoolExecutor to automatically propagate
OpenTelemetry context to worker threads, ensuring all LLM calls and other
instrumented operations maintain proper trace context.
"""

def instrumentation_dependencies(self) -> Collection[str]:
"""Return a list of instrumentation dependencies."""
return []

def _instrument(self, **kwargs):
"""Instrument the concurrent.futures module."""
global _original_init, _original_submit

logger.debug("[ConcurrentFuturesInstrumentor] Starting instrumentation")

# Store original methods
_original_init = ThreadPoolExecutor.__init__
_original_submit = ThreadPoolExecutor.submit

# Patch ThreadPoolExecutor methods
ThreadPoolExecutor.__init__ = _context_propagating_init(_original_init)
ThreadPoolExecutor.submit = _context_propagating_submit(_original_submit)

logger.info("[ConcurrentFuturesInstrumentor] Successfully instrumented concurrent.futures.ThreadPoolExecutor")

def _uninstrument(self, **kwargs):
"""Uninstrument the concurrent.futures module."""
global _original_init, _original_submit

logger.debug("[ConcurrentFuturesInstrumentor] Starting uninstrumentation")

Check warning on line 120 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L120

Added line #L120 was not covered by tests

# Restore original methods
if _original_init:
ThreadPoolExecutor.__init__ = _original_init
_original_init = None

Check warning on line 125 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L123-L125

Added lines #L123 - L125 were not covered by tests

if _original_submit:
ThreadPoolExecutor.submit = _original_submit
_original_submit = None

Check warning on line 129 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L127-L129

Added lines #L127 - L129 were not covered by tests

logger.info("[ConcurrentFuturesInstrumentor] Successfully uninstrumented concurrent.futures.ThreadPoolExecutor")

Check warning on line 131 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L131

Added line #L131 was not covered by tests

@staticmethod
def instrument_module_directly():
"""
Directly instrument the module without using the standard instrumentor interface.

This can be called manually if automatic instrumentation is not desired.
"""
instrumentor = ConcurrentFuturesInstrumentor()
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument()
return True
return False

Check warning on line 144 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L140-L144

Added lines #L140 - L144 were not covered by tests

@staticmethod
def uninstrument_module_directly():
"""
Directly uninstrument the module.

This can be called manually to remove instrumentation.
"""
instrumentor = ConcurrentFuturesInstrumentor()
if instrumentor.is_instrumented_by_opentelemetry:
instrumentor.uninstrument()
return True
return False

Check warning on line 157 in agentops/instrumentation/concurrent_futures/instrumentation.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/concurrent_futures/instrumentation.py#L153-L157

Added lines #L153 - L157 were not covered by tests
1 change: 1 addition & 0 deletions agentops/sdk/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
tool = create_entity_decorator(SpanKind.TOOL)
operation = task


# For backward compatibility: @session decorator calls @trace decorator
@functools.wraps(trace)
def session(*args, **kwargs):
Expand Down
Loading
Loading