-
Notifications
You must be signed in to change notification settings - Fork 585
Implement concurrent.futures instrumentation for OpenTelemetry contex… #1018
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
bd804fa
Implement concurrent.futures instrumentation for OpenTelemetry contex…
fenilfaldu cfb3619
ruff checks :)
fenilfaldu ea8401f
ruff check again :(
fenilfaldu d17f14e
damn ruff +_+
fenilfaldu 4c2faa2
heck ruff again
fenilfaldu 6d9db2e
Merge branch 'main' into add-concurrency-instrumentaion
Dwij1704 e280ec9
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu 0f09692
constants update
fenilfaldu 397760c
Merge branch 'main' into add-concurrency-instrumentaion
Dwij1704 9e0a7b5
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu cd53e9d
refactor the instrumentation with utlity instrumentation
fenilfaldu 1ffa1ea
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu 0f9749f
added types to the function/methods defs
fenilfaldu 1e84738
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu 6cc58d1
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu a6a302b
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu 2921987
Merge branch 'main' into add-concurrency-instrumentaion
fenilfaldu 722f150
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu a460ad5
Merge branch 'main' into add-concurrency-instrumentaion
dot-agi f43df8a
Merge branch 'main' of https://github.com/AgentOps-AI/agentops into a…
fenilfaldu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| """ | ||
| Instrumentation for concurrent.futures module. | ||
|
|
||
| This module provides automatic instrumentation for ThreadPoolExecutor to ensure | ||
| proper OpenTelemetry context propagation across thread boundaries. | ||
| """ | ||
|
|
||
| from .instrumentation import ConcurrentFuturesInstrumentor | ||
|
|
||
| __all__ = ["ConcurrentFuturesInstrumentor"] |
157 changes: 157 additions & 0 deletions
157
agentops/instrumentation/concurrent_futures/instrumentation.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| """ | ||
| OpenTelemetry Instrumentation for concurrent.futures module. | ||
|
|
||
| This instrumentation automatically patches ThreadPoolExecutor to ensure proper | ||
| context propagation across thread boundaries, preventing "NEW TRACE DETECTED" issues. | ||
| """ | ||
|
|
||
| import contextvars | ||
| import functools | ||
| from typing import Collection | ||
| from concurrent.futures import ThreadPoolExecutor | ||
|
|
||
| from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||
|
|
||
| from agentops.logging import logger | ||
|
|
||
| # Store original methods to restore during uninstrumentation | ||
| _original_init = None | ||
| _original_submit = None | ||
|
|
||
|
|
||
| def _context_propagating_init(original_init): | ||
| """Wrap ThreadPoolExecutor.__init__ to set up context-aware initializer.""" | ||
|
|
||
| @functools.wraps(original_init) | ||
| def wrapped_init(self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()): | ||
| # Capture the current context when the executor is created | ||
| main_context = contextvars.copy_context() | ||
|
|
||
| def context_aware_initializer(): | ||
| """Initializer that sets up the captured context in each worker thread.""" | ||
| logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") | ||
|
|
||
| # Set the main context variables in this thread | ||
| for var, value in main_context.items(): | ||
| try: | ||
| var.set(value) | ||
| except Exception as e: | ||
| logger.debug(f"[ConcurrentFuturesInstrumentor] Could not set context var {var}: {e}") | ||
|
|
||
| # Run user's initializer if provided | ||
| if initializer and callable(initializer): | ||
| try: | ||
| if initargs: | ||
| initializer(*initargs) | ||
| else: | ||
| initializer() | ||
| except Exception as e: | ||
| logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}") | ||
| raise | ||
|
|
||
| logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete") | ||
|
|
||
| # Create executor with context-aware initializer | ||
| prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread" | ||
|
|
||
| # Call original init with our context-aware initializer | ||
| original_init( | ||
| self, | ||
| max_workers=max_workers, | ||
| thread_name_prefix=prefix, | ||
| initializer=context_aware_initializer, | ||
| initargs=(), # We handle initargs in our wrapper | ||
| ) | ||
|
|
||
| logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") | ||
|
|
||
| return wrapped_init | ||
|
|
||
|
|
||
| def _context_propagating_submit(original_submit): | ||
| """Wrap ThreadPoolExecutor.submit to ensure context propagation.""" | ||
|
|
||
| @functools.wraps(original_submit) | ||
| def wrapped_submit(self, func, *args, **kwargs): | ||
| # Log the submission | ||
| func_name = getattr(func, "__name__", str(func)) | ||
| logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") | ||
|
|
||
| # The context propagation is handled by the initializer, so we can submit normally | ||
| # But we can add additional logging or monitoring here if needed | ||
| return original_submit(self, func, *args, **kwargs) | ||
|
|
||
| return wrapped_submit | ||
|
|
||
|
|
||
| class ConcurrentFuturesInstrumentor(BaseInstrumentor): | ||
| """ | ||
| Instrumentor for concurrent.futures module. | ||
|
|
||
| This instrumentor patches ThreadPoolExecutor to automatically propagate | ||
| OpenTelemetry context to worker threads, ensuring all LLM calls and other | ||
| instrumented operations maintain proper trace context. | ||
| """ | ||
|
|
||
| def instrumentation_dependencies(self) -> Collection[str]: | ||
| """Return a list of instrumentation dependencies.""" | ||
| return [] | ||
|
|
||
| def _instrument(self, **kwargs): | ||
| """Instrument the concurrent.futures module.""" | ||
| global _original_init, _original_submit | ||
|
|
||
| logger.debug("[ConcurrentFuturesInstrumentor] Starting instrumentation") | ||
|
|
||
| # Store original methods | ||
| _original_init = ThreadPoolExecutor.__init__ | ||
| _original_submit = ThreadPoolExecutor.submit | ||
|
|
||
| # Patch ThreadPoolExecutor methods | ||
| ThreadPoolExecutor.__init__ = _context_propagating_init(_original_init) | ||
| ThreadPoolExecutor.submit = _context_propagating_submit(_original_submit) | ||
|
|
||
| logger.info("[ConcurrentFuturesInstrumentor] Successfully instrumented concurrent.futures.ThreadPoolExecutor") | ||
|
|
||
| def _uninstrument(self, **kwargs): | ||
| """Uninstrument the concurrent.futures module.""" | ||
| global _original_init, _original_submit | ||
|
|
||
| logger.debug("[ConcurrentFuturesInstrumentor] Starting uninstrumentation") | ||
|
|
||
| # Restore original methods | ||
| if _original_init: | ||
| ThreadPoolExecutor.__init__ = _original_init | ||
| _original_init = None | ||
|
|
||
| if _original_submit: | ||
| ThreadPoolExecutor.submit = _original_submit | ||
| _original_submit = None | ||
|
|
||
| logger.info("[ConcurrentFuturesInstrumentor] Successfully uninstrumented concurrent.futures.ThreadPoolExecutor") | ||
|
|
||
| @staticmethod | ||
| def instrument_module_directly(): | ||
| """ | ||
| Directly instrument the module without using the standard instrumentor interface. | ||
|
|
||
| This can be called manually if automatic instrumentation is not desired. | ||
| """ | ||
| instrumentor = ConcurrentFuturesInstrumentor() | ||
| if not instrumentor.is_instrumented_by_opentelemetry: | ||
| instrumentor.instrument() | ||
| return True | ||
| return False | ||
|
|
||
| @staticmethod | ||
| def uninstrument_module_directly(): | ||
| """ | ||
| Directly uninstrument the module. | ||
|
|
||
| This can be called manually to remove instrumentation. | ||
| """ | ||
| instrumentor = ConcurrentFuturesInstrumentor() | ||
| if instrumentor.is_instrumented_by_opentelemetry: | ||
| instrumentor.uninstrument() | ||
| return True | ||
| return False | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.