From 8c90debd4fada91eb2f63c5ffd2faa8ca62b4d10 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Sun, 7 Jun 2026 23:00:02 -0700 Subject: [PATCH 1/9] perf(agentex): drop redundant owner-edge grant on task creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _get_or_create_task created a task and then called grant_with_retry(task), but create_task already registers the task as owner via register_resource(task, parent=agent). On both backends those write the same owner grant: - SGP: register_resource delegates to grant(permission="owner"); grant_with_retry also POSTs permission="owner" — byte-identical upsert to egp-api-backend /private/v5/agentex/permissions. - Spark: register_resource confers the OWNER role (and the parent edge); grant_with_retry maps create -> OWNER (_RESOURCE_ROLE_MAP), the same role with no parent — fully subsumed by register_resource. So every new task issued two identical authorization writes. Dropping the duplicate halves new-task authz write volume and removes a retrying round-trip from the message/send critical path. Also removes the now-dead grant_with_retry method and its sole-use imports (asyncio, random, AuthenticationServiceUnavailableError, AgentexResource). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../domain/use_cases/agents_acp_use_case.py | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/agentex/src/domain/use_cases/agents_acp_use_case.py b/agentex/src/domain/use_cases/agents_acp_use_case.py index 98fe0bab..457c784c 100644 --- a/agentex/src/domain/use_cases/agents_acp_use_case.py +++ b/agentex/src/domain/use_cases/agents_acp_use_case.py @@ -1,18 +1,10 @@ -import asyncio import json -import random from collections.abc import AsyncIterator, Callable from typing import Annotated, Any from fastapi import Depends -from src.adapters.authentication.exceptions import ( - AuthenticationServiceUnavailableError, -) from src.adapters.crud_store.exceptions import ItemDoesNotExist -from src.api.schemas.authorization_types import ( - AgentexResource, -) from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus from src.domain.entities.agents_rpc import ( ACP_TYPE_TO_ALLOWED_RPC_METHODS, @@ -236,30 +228,6 @@ async def _execute_with_error_handling( await self.task_service.fail_task(task, str(e)) raise e - async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None: - """Grant authorization for a task with retry""" - try: - await self.authorization_service.grant( - resource=AgentexResource.task(task.id), - ) - except AuthenticationServiceUnavailableError as e: - if attempts < 3: - delay = 0.2 * (2**attempts) + random.uniform(0, 0.1) - logger.error( - f"Authentication service unavailable: {e}. Retrying in {delay}s..." - ) - await asyncio.sleep(delay) - return await self.grant_with_retry(task, attempts + 1) - else: - logger.error( - f"Authentication service unavailable: {e}. Max retries reached." - ) - raise e from e - except Exception as e: - logger.error(f"Error granting authorization for task {task.id}: {e}") - await self.task_service.fail_task(task, str(e)) - raise e from e - async def _get_or_create_task( self, *, @@ -318,7 +286,6 @@ async def _get_or_create_task( task_metadata=task_metadata, ) logger.info(f"[agent_id={agent.id}] Created task {task.id}") - await self.grant_with_retry(task) return task async def _resolve_acp_url( From 1454f9227de534aa31563786c71d77c1582abe2e Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Mon, 8 Jun 2026 16:28:07 -0700 Subject: [PATCH 2/9] fix(otel): Enable Custom OTel metrics to coexist with auto-instrumentation (#287) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixes custom app metrics (`auth_cache.*`, `db.client.*`) coexisting with OTel Operator Python auto-instrumentation instead of replacing its global `MeterProvider`. - When a real SDK `MeterProvider` is already installed (operator), `init_otel_metrics()` attaches custom instruments to it and does not call `set_meter_provider()` or shut it down on exit. - In standalone mode (local dev, no operator), the module still creates its own provider when an OTLP endpoint is configured, with protocol-aware export (`grpc` vs `http/protobuf`) via `OTEL_EXPORTER_OTLP_METRICS_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`. - Relates to: https://linear.app/scale-epd/issue/SGPINF-1718/execute-rocket-load-test-and-generate-report-for-10x-traffic The previous implementation always called `metrics.set_meter_provider()` with a gRPC exporter, which replaced the operator's HTTP/protobuf provider and dropped auto-instrumentation HTTP metrics (`http_server_request_duration_seconds`, etc.). `get_meter()` also returned `None` under operator injection because it only checked a module-local provider. - Detect an existing real `MeterProvider` via `_global_meter_provider()` and use it in shared mode. - Track `_meter_provider` only when this module creates the provider (for shutdown). - `get_meter()` works when either we created a provider or a global one is already installed. - `shutdown_otel_metrics()` only shuts down a provider this module created.

Greptile Summary

This PR fixes the conflict between custom app metrics (`auth_cache.*`, `db.client.*`) and the OTel Operator's Python auto-instrumentation by detecting an already-installed SDK `MeterProvider` and attaching to it rather than replacing it. In standalone mode the module still creates its own provider with protocol-aware export (`grpc` or `http/protobuf`), now with proper exception handling that prevents `PeriodicExportingMetricReader` thread leaks and ensures a `NoOpMeterProvider` is installed on shutdown to prevent a stale dead provider from being mistaken for a live operator provider on re-init. - **Shared mode**: `_global_meter_provider()` detects the operator's provider; `init_otel_metrics()` attaches to it with no `set_meter_provider()` call, and `shutdown_otel_metrics()` leaves it running. - **Standalone mode**: `set_meter_provider()` is wrapped in `try/except` that shuts down the newly-created provider on failure, and the `finally` block installs `NoOpMeterProvider()` to clear the global slot so re-init works correctly. - **Tests**: 11 new unit tests cover both modes, protocol selection, failure/retry paths, and cache-instrument attribute preservation, with private-SDK state injection wrapped in `pytest.skip` on `AttributeError`.

Confidence Score: 5/5

Safe to merge — the coexistence logic is correct, previously identified flag-ordering and thread-leak bugs are fixed, and the shutdown path now clears the global OTel slot to prevent stale-provider misdetection on re-init. The state-machine transitions across shared/standalone/disabled modes are all covered by tests, the try/finally exception handling in both init_otel_metrics and shutdown_otel_metrics is correct, and the invariant that _meter_provider is only set when this module owns the global slot is maintained throughout. No files require special attention. The one test isolation gap (cache_metrics state not restored in teardown) is latent and has no impact on the current test suite.

Important Files Changed

| Filename | Overview | |----------|----------| | agentex/src/utils/otel_metrics.py | Core fix: detects existing SDK MeterProvider via _global_meter_provider(), attaches in shared mode without calling set_meter_provider(), adds try/except around set_meter_provider() to shut down leaked PeriodicExportingMetricReader threads, and installs NoOpMeterProvider in finally-block to clear the global slot on standalone shutdown. Previously flagged ordering bugs are corrected. | | agentex/tests/unit/utils/test_otel_metrics.py | New test file with 11 unit tests covering shared mode coexistence, standalone mode lifecycle, protocol selection, retry-after-failure, and cache instrument attribute preservation. The test_custom_metrics_preserve_instrument_attributes_in_shared_mode test mutates cache_metrics module state at setup but has no corresponding teardown, leaving _instruments_initialized=True and counters bound to the test provider after the test completes. | Fix All in Cursor Fix All in Claude Code Fix All in Codex
Prompt To Fix All With AI `````markdown Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes. --- agentex/tests/unit/utils/test_otel_metrics.py:192-197 The test resets `cache_metrics` module state before the test body but leaves `_instruments_initialized=True` and the counters bound to the test-local `operator_provider` after the test completes. The autouse fixture restores the OTel global slot but does not touch `cache_metrics`. Any future test that calls `record_cache_access`/`record_cache_eviction` after this test will find `_instruments_initialized=True`, skip re-initialization, and silently emit to the stale instruments. Restoring the three fields in teardown keeps isolation complete. `````
Reviews (8): Last reviewed commit: ["Merge branch 'main' into jamesc-fix-otel..."](https://github.com/scaleapi/scale-agentex/commit/51a270e04239a734f9f98d015477a8624c444c29) | [Re-trigger Greptile](https://app.greptile.com/api/retrigger?id=36313424) --- agentex/src/utils/otel_metrics.py | 185 +++++++----- agentex/tests/unit/utils/test_otel_metrics.py | 272 ++++++++++++++++++ 2 files changed, 390 insertions(+), 67 deletions(-) create mode 100644 agentex/tests/unit/utils/test_otel_metrics.py diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index 4f4cb452..e676fbe9 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -1,12 +1,17 @@ """ OpenTelemetry metrics configuration for Agentex. -This module sets up the OTel MeterProvider with OTLP export for metrics. -Metrics are exported to an OTLP-compatible endpoint (e.g., OTel Collector, -Datadog Agent, or directly to Grafana Cloud/Mimir). +When auto-instrumentation (e.g. OTel Operator) has already installed a global +MeterProvider, custom app metrics attach to it instead of replacing it. +Otherwise this module creates its own provider with OTLP export when an endpoint +is configured. Environment Variables: - OTEL_EXPORTER_OTLP_ENDPOINT: OTLP endpoint URL (default: http://localhost:4317) + OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to + OTEL_EXPORTER_OTLP_ENDPOINT) + OTEL_EXPORTER_OTLP_METRICS_PROTOCOL: Metrics export protocol (falls back to + OTEL_EXPORTER_OTLP_PROTOCOL; default: grpc) + OTEL_EXPORTER_OTLP_ENDPOINT: General OTLP endpoint URL OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication OTEL_SERVICE_NAME: Service name for metrics (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) @@ -18,7 +23,13 @@ from typing import TYPE_CHECKING from opentelemetry import metrics -from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.metrics import NoOpMeterProvider +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter as OTLPGrpcMetricExporter, +) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter as OTLPHttpMetricExporter, +) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource @@ -27,19 +38,57 @@ if TYPE_CHECKING: from opentelemetry.metrics import Meter + from opentelemetry.sdk.metrics.export import MetricExporter logger = make_logger(__name__) # Global state -_meter_provider: MeterProvider | None = None +_meter_provider: MeterProvider | None = None # Set only when this module creates the provider _initialized: bool = False # Default configuration -DEFAULT_OTLP_ENDPOINT = "http://localhost:4317" DEFAULT_SERVICE_NAME = "agentex" DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds +def _global_meter_provider() -> MeterProvider | None: + """Return the global MeterProvider if installed, else None (proxy is ignored).""" + provider = metrics.get_meter_provider() + return provider if isinstance(provider, MeterProvider) else None + + +def _metrics_endpoint(explicit: str | None = None) -> str | None: + if explicit: + return explicit + return os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") or os.environ.get( + "OTEL_EXPORTER_OTLP_ENDPOINT" + ) + + +def _metrics_protocol() -> str: + return ( + ( + os.environ.get("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL") + or os.environ.get("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + ) + .strip() + .lower() + ) + + +def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter: + if protocol in {"http/protobuf", "http"}: + return OTLPHttpMetricExporter(endpoint=endpoint) + + if protocol != "grpc": + logger.warning("Unknown OTEL metrics protocol %r; using grpc", protocol) + + return OTLPGrpcMetricExporter( + endpoint=endpoint, + insecure=endpoint.startswith("http://"), + ) + + def init_otel_metrics( service_name: str | None = None, service_version: str | None = None, @@ -48,83 +97,80 @@ def init_otel_metrics( export_interval_ms: int | None = None, ) -> MeterProvider | None: """ - Initialize OpenTelemetry metrics with OTLP exporter. + Initialize OpenTelemetry metrics for custom app instruments. - This should be called once at application startup. Subsequent calls - will return the existing MeterProvider. + Call once at application startup. Subsequent calls return the active provider + without re-initializing. - NOTE: Only initializes if OTEL_EXPORTER_OTLP_ENDPOINT is configured. - Returns None if OTel is not configured. + If auto-instrumentation already installed a MeterProvider, custom metrics + attach to it. Otherwise, initializes only when an OTLP endpoint is configured. Args: service_name: Service name for resource attributes service_version: Service version for resource attributes environment: Deployment environment (e.g., "development", "production") - otlp_endpoint: OTLP gRPC endpoint URL + otlp_endpoint: OTLP endpoint URL export_interval_ms: Metric export interval in milliseconds Returns: - The configured MeterProvider, or None if not configured + The active MeterProvider, or None when metrics are disabled """ global _meter_provider, _initialized if _initialized: - return _meter_provider + return _meter_provider or _global_meter_provider() - # Check if OTLP endpoint is configured - otlp_endpoint = otlp_endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") - if not otlp_endpoint: - logger.info( - "OpenTelemetry metrics disabled: OTEL_EXPORTER_OTLP_ENDPOINT not configured" - ) + if existing := _global_meter_provider(): + _initialized = True + logger.info("OpenTelemetry metrics using existing MeterProvider") + return existing + + endpoint = _metrics_endpoint(otlp_endpoint) + if not endpoint: _initialized = True + logger.info("OpenTelemetry metrics disabled: no OTLP endpoint configured") return None - # Resolve configuration from environment or defaults - service_name = ( - service_name or os.environ.get("OTEL_SERVICE_NAME") or DEFAULT_SERVICE_NAME + protocol = _metrics_protocol() + resolved_service_name = service_name or os.environ.get( + "OTEL_SERVICE_NAME", DEFAULT_SERVICE_NAME ) - service_version = service_version or os.environ.get("SERVICE_VERSION", "0.1.0") - environment = environment or os.environ.get("ENVIRONMENT", "development") - export_interval_ms = export_interval_ms or int( - os.environ.get("OTEL_METRICS_EXPORT_INTERVAL_MS", DEFAULT_EXPORT_INTERVAL_MS) + resolved_export_interval_ms = ( + export_interval_ms + if export_interval_ms is not None + else int( + os.environ.get( + "OTEL_METRICS_EXPORT_INTERVAL_MS", DEFAULT_EXPORT_INTERVAL_MS + ) + ) ) - - # Create resource with service information resource = Resource.create( { - SERVICE_NAME: service_name, - SERVICE_VERSION: service_version, - "deployment.environment": environment, + SERVICE_NAME: resolved_service_name, + SERVICE_VERSION: service_version + or os.environ.get("SERVICE_VERSION", "0.1.0"), + "deployment.environment": environment + or os.environ.get("ENVIRONMENT", "development"), } ) - - # Create OTLP exporter - # The exporter will use OTEL_EXPORTER_OTLP_HEADERS env var for auth if set - exporter = OTLPMetricExporter( - endpoint=otlp_endpoint, - insecure=otlp_endpoint.startswith("http://"), # Use insecure for non-TLS - ) - - # Create periodic reader that exports at the specified interval reader = PeriodicExportingMetricReader( - exporter=exporter, - export_interval_millis=export_interval_ms, - ) - - # Create and set the meter provider - _meter_provider = MeterProvider( - resource=resource, - metric_readers=[reader], + exporter=_create_metric_exporter(endpoint, protocol), + export_interval_millis=resolved_export_interval_ms, ) - metrics.set_meter_provider(_meter_provider) + provider = MeterProvider(resource=resource, metric_readers=[reader]) + try: + metrics.set_meter_provider(provider) + except Exception: + provider.shutdown() + raise + _meter_provider = provider _initialized = True logger.info( - f"OpenTelemetry metrics initialized: endpoint={otlp_endpoint}, " - f"service={service_name}, interval={export_interval_ms}ms" + f"OpenTelemetry metrics initialized: endpoint={endpoint}, " + f"protocol={protocol}, service={resolved_service_name}, " + f"interval={resolved_export_interval_ms}ms" ) - return _meter_provider @@ -139,16 +185,10 @@ def get_meter(name: str, version: str = "0.1.0") -> Meter | None: Returns: An OpenTelemetry Meter instance, or None if OTel is not configured """ - global _initialized, _meter_provider - if not _initialized: - # Auto-initialize with defaults if not already initialized init_otel_metrics() - - # Return None if OTel is not configured - if _meter_provider is None: + if _meter_provider is None and _global_meter_provider() is None: return None - return metrics.get_meter(name, version) @@ -156,17 +196,28 @@ def shutdown_otel_metrics() -> None: """ Shutdown the meter provider, flushing any remaining metrics. - Should be called during application shutdown. + Should be called during application shutdown. Only shuts down a provider + this module created; a provider installed by auto-instrumentation is left + running. """ global _meter_provider, _initialized - if _meter_provider is not None: - _meter_provider.shutdown() - logger.info("OpenTelemetry metrics shut down") + try: + if _meter_provider is not None: + _meter_provider.shutdown() + logger.info("OpenTelemetry metrics shut down") + except Exception: + logger.exception("OpenTelemetry metrics shutdown failed") + finally: + if _meter_provider is not None: + try: + metrics.set_meter_provider(NoOpMeterProvider()) + except Exception: + logger.exception("Failed to reset global MeterProvider after shutdown") _meter_provider = None _initialized = False def is_otel_configured() -> bool: - """Check if an OTLP endpoint is configured via environment.""" - return bool(os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")) + """Check if metrics export is configured via environment.""" + return bool(_metrics_endpoint()) diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py new file mode 100644 index 00000000..22b0c974 --- /dev/null +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -0,0 +1,272 @@ +"""Unit tests for OpenTelemetry metrics initialization and coexistence.""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter as OTLPGrpcMetricExporter, +) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter as OTLPHttpMetricExporter, +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.resources import Resource +from src.utils import otel_metrics +from src.utils import cache_metrics + + +def _set_global_meter_provider(provider: object | None = None) -> None: + """Test-only access to the global MeterProvider slot. + + Skips with a clear message if OTel SDK internals move. Pass ``None`` to + install the no-op proxy (unset state). + """ + try: + if provider is None: + provider = metrics._internal._ProxyMeterProvider() + metrics._internal._METER_PROVIDER = provider + except AttributeError as exc: + pytest.skip(f"OpenTelemetry SDK internals changed: {exc}") + + +@pytest.fixture(autouse=True) +def reset_otel_metrics_state(): + """Reset module and global OTel state between tests.""" + saved_provider = metrics.get_meter_provider() + otel_metrics.shutdown_otel_metrics() + _set_global_meter_provider() + + yield + + otel_metrics.shutdown_otel_metrics() + _set_global_meter_provider(saved_provider) + + +def _set_operator_provider() -> MeterProvider: + provider = MeterProvider(resource=Resource.create({})) + _set_global_meter_provider(provider) + return provider + + +@pytest.mark.unit +def test_init_coexists_with_existing_meter_provider(): + operator_provider = _set_operator_provider() + + result = otel_metrics.init_otel_metrics() + + assert result is operator_provider + assert metrics.get_meter_provider() is operator_provider + assert otel_metrics._initialized is True + assert otel_metrics._meter_provider is None + + meter = otel_metrics.get_meter("agentex.test") + assert meter is not None + + +@pytest.mark.unit +def test_init_is_idempotent_in_shared_mode(): + operator_provider = _set_operator_provider() + + assert otel_metrics.init_otel_metrics() is operator_provider + assert otel_metrics.init_otel_metrics() is operator_provider + + +@pytest.mark.unit +def test_init_does_not_shutdown_operator_provider(): + operator_provider = _set_operator_provider() + otel_metrics.init_otel_metrics() + + otel_metrics.shutdown_otel_metrics() + + assert metrics.get_meter_provider() is operator_provider + assert otel_metrics._initialized is False + assert otel_metrics._meter_provider is None + assert otel_metrics.get_meter("agentex.test") is not None + + +@pytest.mark.unit +def test_init_after_shutdown_in_shared_mode(): + operator_provider = _set_operator_provider() + + otel_metrics.init_otel_metrics() + otel_metrics.shutdown_otel_metrics() + + assert otel_metrics.init_otel_metrics() is operator_provider + + +@pytest.mark.unit +def test_init_creates_meter_provider_when_none_configured(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + + result = otel_metrics.init_otel_metrics() + + assert isinstance(result, MeterProvider) + assert otel_metrics._meter_provider is result + assert otel_metrics._initialized is True + assert otel_metrics.get_meter("agentex.test") is not None + + +@pytest.mark.unit +def test_init_retries_after_provider_creation_failure(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + + with ( + patch.object( + metrics, "set_meter_provider", side_effect=RuntimeError("blocked") + ), + patch.object(MeterProvider, "shutdown") as mock_shutdown, + ): + with pytest.raises(RuntimeError): + otel_metrics.init_otel_metrics() + assert otel_metrics._initialized is False + mock_shutdown.assert_called_once() + + result = otel_metrics.init_otel_metrics() + + assert isinstance(result, MeterProvider) + assert otel_metrics._initialized is True + + +@pytest.mark.unit +def test_init_disabled_without_endpoint(monkeypatch): + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False) + + result = otel_metrics.init_otel_metrics() + + assert result is None + assert otel_metrics.get_meter("agentex.test") is None + + +@pytest.mark.unit +def test_shutdown_resets_state_when_disabled(monkeypatch): + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False) + + otel_metrics.init_otel_metrics() + otel_metrics.shutdown_otel_metrics() + + assert otel_metrics._initialized is False + assert otel_metrics._meter_provider is None + + +@pytest.mark.unit +@pytest.mark.parametrize( + ("protocol_env", "expected_exporter"), + [ + ("grpc", OTLPGrpcMetricExporter), + ("http/protobuf", OTLPHttpMetricExporter), + ], +) +def test_protocol_selection_from_env( + monkeypatch, protocol_env: str, expected_exporter: type +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + monkeypatch.setenv("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", protocol_env) + + with patch.object(otel_metrics, "_create_metric_exporter") as mock_create: + mock_create.return_value = expected_exporter(endpoint="http://localhost:4318") + otel_metrics.init_otel_metrics() + + mock_create.assert_called_once_with("http://localhost:4318", protocol_env) + + +@pytest.mark.unit +def test_metrics_endpoint_takes_precedence_over_general_endpoint(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://general:4317") + monkeypatch.setenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "http://metrics:4318") + + with patch.object(otel_metrics, "_create_metric_exporter") as mock_create: + mock_create.return_value = OTLPGrpcMetricExporter( + endpoint="http://metrics:4318" + ) + otel_metrics.init_otel_metrics() + + mock_create.assert_called_once_with("http://metrics:4318", "grpc") + + +@pytest.mark.unit +def test_custom_metrics_preserve_instrument_attributes_in_shared_mode(): + """Instrument names and point attributes must not change when attaching to operator provider.""" + cache_metrics._instruments_initialized = False + cache_metrics._access_counter = None + cache_metrics._eviction_counter = None + + reader = InMemoryMetricReader() + operator_provider = MeterProvider( + resource=Resource.create({"service.name": "operator-svc"}), + metric_readers=[reader], + ) + _set_global_meter_provider(operator_provider) + + otel_metrics.init_otel_metrics() + cache_metrics.record_cache_access("auth_gateway", "hit") + cache_metrics.record_cache_eviction("agent_api_key") + + data = reader.get_metrics_data() + assert data is not None + points = [ + ( + metric.name, + scope.scope.name, + dict(data_point.attributes), + ) + for resource_metrics in data.resource_metrics + for scope in resource_metrics.scope_metrics + for metric in scope.metrics + for data_point in metric.data.data_points + ] + assert ( + "auth_cache.access", + "agentex.auth_cache", + {"cache": "auth_gateway", "result": "hit"}, + ) in points + assert ( + "auth_cache.eviction", + "agentex.auth_cache", + {"cache": "agent_api_key"}, + ) in points + + +@pytest.mark.unit +def test_init_after_shutdown_in_standalone_mode(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + + first = otel_metrics.init_otel_metrics() + assert first is not None + otel_metrics.shutdown_otel_metrics() + + second = otel_metrics.init_otel_metrics() + assert second is not None + assert second is not first + assert otel_metrics.get_meter("agentex.test") is not None + + +@pytest.mark.unit +def test_shutdown_resets_state_when_provider_shutdown_raises(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + provider = otel_metrics.init_otel_metrics() + assert provider is not None + + with patch.object(provider, "shutdown", side_effect=RuntimeError("export failed")): + otel_metrics.shutdown_otel_metrics() + + assert otel_metrics._initialized is False + assert otel_metrics._meter_provider is None + assert otel_metrics.init_otel_metrics() is not None + + +@pytest.mark.unit +def test_shutdown_only_own_provider(monkeypatch): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + provider = otel_metrics.init_otel_metrics() + assert provider is not None + + otel_metrics.shutdown_otel_metrics() + + assert otel_metrics._initialized is False + assert otel_metrics._meter_provider is None From 9ef2834deb8cdccf150fd480a0d8a2d7b10d0c13 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Mon, 8 Jun 2026 18:04:13 -0700 Subject: [PATCH 3/9] revert: restore grant_with_retry on task creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reverts the grant_with_retry removal (8c90deb). With register_resource gated off per-account by fgac-tasks-dual-write (agentex-auth #358) and that flag unregistered in sgp-infra-staging, nothing writes the per-task owner edge — a regression from the pre-FGAC behavior where grant wrote it unconditionally on every task. register_resource alone is not sufficient while dual-write is off, so this restores the ungated grant so task ownership is recorded regardless of the dual-write flag state, until the FGAC dual-write rollout and the per-task-edge dependency question are resolved. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../domain/use_cases/agents_acp_use_case.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/agentex/src/domain/use_cases/agents_acp_use_case.py b/agentex/src/domain/use_cases/agents_acp_use_case.py index 457c784c..98fe0bab 100644 --- a/agentex/src/domain/use_cases/agents_acp_use_case.py +++ b/agentex/src/domain/use_cases/agents_acp_use_case.py @@ -1,10 +1,18 @@ +import asyncio import json +import random from collections.abc import AsyncIterator, Callable from typing import Annotated, Any from fastapi import Depends +from src.adapters.authentication.exceptions import ( + AuthenticationServiceUnavailableError, +) from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.api.schemas.authorization_types import ( + AgentexResource, +) from src.domain.entities.agents import ACPType, AgentEntity, AgentStatus from src.domain.entities.agents_rpc import ( ACP_TYPE_TO_ALLOWED_RPC_METHODS, @@ -228,6 +236,30 @@ async def _execute_with_error_handling( await self.task_service.fail_task(task, str(e)) raise e + async def grant_with_retry(self, task: TaskEntity, attempts: int = 0) -> None: + """Grant authorization for a task with retry""" + try: + await self.authorization_service.grant( + resource=AgentexResource.task(task.id), + ) + except AuthenticationServiceUnavailableError as e: + if attempts < 3: + delay = 0.2 * (2**attempts) + random.uniform(0, 0.1) + logger.error( + f"Authentication service unavailable: {e}. Retrying in {delay}s..." + ) + await asyncio.sleep(delay) + return await self.grant_with_retry(task, attempts + 1) + else: + logger.error( + f"Authentication service unavailable: {e}. Max retries reached." + ) + raise e from e + except Exception as e: + logger.error(f"Error granting authorization for task {task.id}: {e}") + await self.task_service.fail_task(task, str(e)) + raise e from e + async def _get_or_create_task( self, *, @@ -286,6 +318,7 @@ async def _get_or_create_task( task_metadata=task_metadata, ) logger.info(f"[agent_id={agent.id}] Created task {task.id}") + await self.grant_with_retry(task) return task async def _resolve_acp_url( From 9dccc8bb44e74352a49e09c26a199634d5928c6a Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Tue, 9 Jun 2026 09:37:30 -0700 Subject: [PATCH 4/9] Push metrics logging and auto-instrument fix --- agentex/src/utils/otel_metrics.py | 109 ++++++++++++++---- agentex/tests/unit/utils/test_otel_metrics.py | 92 ++++++++++++++- 2 files changed, 178 insertions(+), 23 deletions(-) diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index e676fbe9..a50c8bdf 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -1,10 +1,15 @@ """ OpenTelemetry metrics configuration for Agentex. -When auto-instrumentation (e.g. OTel Operator) has already installed a global -MeterProvider, custom app metrics attach to it instead of replacing it. -Otherwise this module creates its own provider with OTLP export when an endpoint -is configured. +The Python OTel SDK exposes a single global MeterProvider (set once). This module +uses two deterministic paths: + +1. **Coexistence** — a real SDK MeterProvider is already global (e.g. from OTel + Operator auto-instrumentation). Custom app metrics attach via get_meter(); + set_meter_provider() is never called. +2. **Standalone** — global is still the SDK proxy and an OTLP endpoint is + configured. This module creates the first MeterProvider. Auto-instrumentation + and custom metrics then share that same global slot. Environment Variables: OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: Metrics OTLP endpoint (falls back to @@ -23,7 +28,6 @@ from typing import TYPE_CHECKING from opentelemetry import metrics -from opentelemetry.metrics import NoOpMeterProvider from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter as OTLPGrpcMetricExporter, ) @@ -43,7 +47,7 @@ logger = make_logger(__name__) # Global state -_meter_provider: MeterProvider | None = None # Set only when this module creates the provider +_meter_provider: MeterProvider | None = None # Set only when this module creates the provider _initialized: bool = False # Default configuration @@ -57,6 +61,35 @@ def _global_meter_provider() -> MeterProvider | None: return provider if isinstance(provider, MeterProvider) else None +def _describe_global_provider() -> tuple[str, bool]: + provider = metrics.get_meter_provider() + return type(provider).__name__, isinstance(provider, MeterProvider) + + +def _log_provider_state( + message: str, + *, + app_provider: MeterProvider | None = None, + mode: str | None = None, +) -> None: + """Emit a single structured INFO log for operator/app coexistence debugging.""" + global_type, global_is_sdk = _describe_global_provider() + global_provider = _global_meter_provider() + app_owns_global = ( + app_provider is not None + and global_provider is not None + and global_provider is app_provider + ) + parts = [ + message, + f"mode={mode or 'unknown'}", + f"global_type={global_type}", + f"global_is_sdk_meter_provider={global_is_sdk}", + f"app_owns_global={app_owns_global}", + ] + logger.info(", ".join(parts)) + + def _metrics_endpoint(explicit: str | None = None) -> str | None: if explicit: return explicit @@ -76,9 +109,21 @@ def _metrics_protocol() -> str: ) +def _http_metrics_export_url(endpoint: str) -> str: + """Return an OTLP HTTP metrics URL including the /v1/metrics path. + + OTLPHttpMetricExporter only appends that path when it resolves the endpoint + from environment variables, not when an explicit endpoint argument is passed. + """ + normalized = endpoint.rstrip("/") + if normalized.endswith("/v1/metrics"): + return normalized + return f"{normalized}/v1/metrics" + + def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter: if protocol in {"http/protobuf", "http"}: - return OTLPHttpMetricExporter(endpoint=endpoint) + return OTLPHttpMetricExporter(endpoint=_http_metrics_export_url(endpoint)) if protocol != "grpc": logger.warning("Unknown OTEL metrics protocol %r; using grpc", protocol) @@ -102,8 +147,9 @@ def init_otel_metrics( Call once at application startup. Subsequent calls return the active provider without re-initializing. - If auto-instrumentation already installed a MeterProvider, custom metrics - attach to it. Otherwise, initializes only when an OTLP endpoint is configured. + If a real SDK MeterProvider is already global, custom metrics attach to it + and set_meter_provider() is never called. Otherwise, when an OTLP endpoint + is configured, this module installs the first global provider. Args: service_name: Service name for resource attributes @@ -120,15 +166,23 @@ def init_otel_metrics( if _initialized: return _meter_provider or _global_meter_provider() + _log_provider_state("OpenTelemetry metrics init starting", mode="starting") + if existing := _global_meter_provider(): _initialized = True - logger.info("OpenTelemetry metrics using existing MeterProvider") + _log_provider_state( + "OpenTelemetry metrics using existing MeterProvider", + mode="coexistence", + ) return existing endpoint = _metrics_endpoint(otlp_endpoint) if not endpoint: _initialized = True - logger.info("OpenTelemetry metrics disabled: no OTLP endpoint configured") + _log_provider_state( + "OpenTelemetry metrics disabled: no OTLP endpoint configured", + mode="disabled", + ) return None protocol = _metrics_protocol() @@ -164,14 +218,30 @@ def init_otel_metrics( except Exception: provider.shutdown() raise - _meter_provider = provider + + global_provider = _global_meter_provider() + if global_provider is provider: + _meter_provider = provider + _initialized = True + _log_provider_state( + "OpenTelemetry metrics standalone init installed global MeterProvider: " + f"endpoint={endpoint}, protocol={protocol}, service={resolved_service_name}, " + f"interval={resolved_export_interval_ms}ms", + app_provider=provider, + mode="standalone", + ) + return _meter_provider + + # set_meter_provider() was rejected; shut down the orphan to avoid background export noise. + provider.shutdown() _initialized = True - logger.info( - f"OpenTelemetry metrics initialized: endpoint={endpoint}, " - f"protocol={protocol}, service={resolved_service_name}, " - f"interval={resolved_export_interval_ms}ms" + _log_provider_state( + "OpenTelemetry metrics standalone set_meter_provider rejected; " + "using existing global MeterProvider", + app_provider=provider, + mode="standalone_rejected", ) - return _meter_provider + return global_provider def get_meter(name: str, version: str = "0.1.0") -> Meter | None: @@ -209,11 +279,6 @@ def shutdown_otel_metrics() -> None: except Exception: logger.exception("OpenTelemetry metrics shutdown failed") finally: - if _meter_provider is not None: - try: - metrics.set_meter_provider(NoOpMeterProvider()) - except Exception: - logger.exception("Failed to reset global MeterProvider after shutdown") _meter_provider = None _initialized = False diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 22b0c974..7e64a485 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -26,9 +26,12 @@ def _set_global_meter_provider(provider: object | None = None) -> None: install the no-op proxy (unset state). """ try: + from opentelemetry.util._once import Once + if provider is None: provider = metrics._internal._ProxyMeterProvider() metrics._internal._METER_PROVIDER = provider + metrics._internal._METER_PROVIDER_SET_ONCE = Once() except AttributeError as exc: pytest.skip(f"OpenTelemetry SDK internals changed: {exc}") @@ -52,6 +55,14 @@ def _set_operator_provider() -> MeterProvider: return provider +def _enable_auto_instrumentation_env(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OTEL_METRICS_EXPORTER", "otlp") + monkeypatch.setenv( + "PYTHONPATH", + "/otel-auto-instrumentation-python/opentelemetry/instrumentation/auto_instrumentation", + ) + + @pytest.mark.unit def test_init_coexists_with_existing_meter_provider(): operator_provider = _set_operator_provider() @@ -98,6 +109,21 @@ def test_init_after_shutdown_in_shared_mode(): assert otel_metrics.init_otel_metrics() is operator_provider +@pytest.mark.unit +def test_init_creates_standalone_when_operator_env_but_proxy_global(monkeypatch): + """Operator injection env must not block first-setter standalone on proxy.""" + _enable_auto_instrumentation_env(monkeypatch) + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4318") + monkeypatch.setenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") + + result = otel_metrics.init_otel_metrics() + + assert isinstance(result, MeterProvider) + assert otel_metrics._meter_provider is result + assert metrics.get_meter_provider() is result + assert otel_metrics.get_meter("agentex.test") is not None + + @pytest.mark.unit def test_init_creates_meter_provider_when_none_configured(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -110,6 +136,45 @@ def test_init_creates_meter_provider_when_none_configured(monkeypatch): assert otel_metrics.get_meter("agentex.test") is not None +@pytest.mark.unit +def test_init_coexists_without_set_meter_provider_when_operator_present( + monkeypatch, +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + operator_provider = _set_operator_provider() + + with patch.object(metrics, "set_meter_provider") as mock_set: + result = otel_metrics.init_otel_metrics() + + mock_set.assert_not_called() + assert result is operator_provider + assert metrics.get_meter_provider() is operator_provider + assert otel_metrics._meter_provider is None + assert otel_metrics.get_meter("agentex.test") is not None + + +@pytest.mark.unit +def test_standalone_shuts_down_orphan_when_set_meter_provider_rejected( + monkeypatch, +): + monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") + preexisting = MeterProvider(resource=Resource.create({})) + real_set = metrics.set_meter_provider + + def racing_set(provider: MeterProvider) -> None: + if not isinstance(metrics.get_meter_provider(), MeterProvider): + real_set(preexisting) + real_set(provider) + + with patch.object(metrics, "set_meter_provider", side_effect=racing_set): + result = otel_metrics.init_otel_metrics() + + assert result is preexisting + assert metrics.get_meter_provider() is preexisting + assert otel_metrics._meter_provider is None + assert otel_metrics.get_meter("agentex.test") is not None + + @pytest.mark.unit def test_init_retries_after_provider_creation_failure(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -232,6 +297,31 @@ def test_custom_metrics_preserve_instrument_attributes_in_shared_mode(): ) in points +@pytest.mark.unit +@pytest.mark.parametrize( + ("input_endpoint", "expected_url"), + [ + ("http://collector:4318", "http://collector:4318/v1/metrics"), + ( + "http://collector:4318/v1/metrics", + "http://collector:4318/v1/metrics", + ), + ], +) +def test_http_metrics_export_url(input_endpoint: str, expected_url: str): + assert otel_metrics._http_metrics_export_url(input_endpoint) == expected_url + + +@pytest.mark.unit +def test_create_http_metric_exporter_uses_v1_metrics_path(): + exporter = otel_metrics._create_metric_exporter( + "http://collector:4318", "http/protobuf" + ) + + assert isinstance(exporter, OTLPHttpMetricExporter) + assert exporter._endpoint == "http://collector:4318/v1/metrics" + + @pytest.mark.unit def test_init_after_shutdown_in_standalone_mode(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") @@ -242,7 +332,7 @@ def test_init_after_shutdown_in_standalone_mode(monkeypatch): second = otel_metrics.init_otel_metrics() assert second is not None - assert second is not first + assert second is first assert otel_metrics.get_meter("agentex.test") is not None From 6f5ad12b2a615923baa39351a0e323a34df98d5d Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Tue, 9 Jun 2026 14:21:57 -0700 Subject: [PATCH 5/9] Add opt-in in-process FastAPI OTel HTTP metrics instrumentation --- agentex/pyproject.toml | 1 + agentex/src/api/app.py | 7 +- agentex/src/utils/otel_metrics.py | 50 +++++++++- agentex/tests/unit/utils/test_otel_metrics.py | 91 ++++++++++++++++++- uv.lock | 67 ++++++++++++++ 5 files changed, 212 insertions(+), 4 deletions(-) diff --git a/agentex/pyproject.toml b/agentex/pyproject.toml index 40858d3e..1aced26d 100644 --- a/agentex/pyproject.toml +++ b/agentex/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "opentelemetry-api>=1.28.0", "opentelemetry-sdk>=1.28.0", "opentelemetry-exporter-otlp>=1.28.0", + "opentelemetry-instrumentation-fastapi>=0.49b0", "pyyaml>=6.0,<7", ] diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index 35d4c7f4..b2e72348 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -38,7 +38,11 @@ from src.config.environment_variables import EnvVarKeys from src.domain.exceptions import GenericException from src.utils.logging import make_logger -from src.utils.otel_metrics import init_otel_metrics, shutdown_otel_metrics +from src.utils.otel_metrics import ( + init_otel_metrics, + instrument_fastapi_http_metrics, + shutdown_otel_metrics, +) logger = make_logger(__name__) @@ -75,6 +79,7 @@ def __init__( async def lifespan(_: FastAPI): # Initialize OpenTelemetry metrics first (before dependencies register instruments) init_otel_metrics() + instrument_fastapi_http_metrics(fastapi_app) await dependencies.startup_global_dependencies() configure_statsd() diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index a50c8bdf..ac9cf0e2 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -25,7 +25,7 @@ from __future__ import annotations import os -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( @@ -286,3 +286,51 @@ def shutdown_otel_metrics() -> None: def is_otel_configured() -> bool: """Check if metrics export is configured via environment.""" return bool(_metrics_endpoint()) + + +def _http_metrics_enabled() -> bool: + """Return whether in-process FastAPI HTTP metrics should be installed.""" + flag = os.environ.get("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "false").strip().lower() + return flag not in {"0", "false", "no", "off"} + + +def instrument_fastapi_http_metrics(app: Any) -> bool: + """ + Install in-process FastAPI HTTP server metrics (http.server.request.duration). + + Call after init_otel_metrics() so custom and HTTP metrics share the active + global MeterProvider (operator or standalone). + + Safe with OTel Operator auto-instrumentation: FastAPIInstrumentor is + idempotent and no-ops when ``_is_instrumented_by_opentelemetry`` is already set. + Set OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi on the pod so the operator + does not also hook FastAPI. Beyla/eBPF metrics are independent and may still + appear under different label sets when attach succeeds. + + Returns: + True when instrumentation was applied, False when skipped or disabled. + """ + if not _http_metrics_enabled(): + logger.info("FastAPI HTTP metrics disabled via AGENTEX_OTEL_HTTP_METRICS_ENABLED") + return False + + if getattr(app, "_is_instrumented_by_opentelemetry", False): + logger.info( + "FastAPI already instrumented by OpenTelemetry; skipping in-process HTTP metrics" + ) + return False + + if not _initialized: + init_otel_metrics() + + if _global_meter_provider() is None and not is_otel_configured(): + logger.info( + "FastAPI HTTP metrics skipped: no MeterProvider and no OTLP endpoint configured" + ) + return False + + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + FastAPIInstrumentor.instrument_app(app) + logger.info("FastAPI in-process HTTP metrics instrumentation enabled") + return True diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 7e64a485..772f58e5 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -15,8 +15,7 @@ from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.resources import Resource -from src.utils import otel_metrics -from src.utils import cache_metrics +from src.utils import cache_metrics, otel_metrics def _set_global_meter_provider(provider: object | None = None) -> None: @@ -360,3 +359,91 @@ def test_shutdown_only_own_provider(monkeypatch): assert otel_metrics._initialized is False assert otel_metrics._meter_provider is None + + +@pytest.mark.unit +def test_instrument_fastapi_skips_when_disabled_by_default(monkeypatch): + from fastapi import FastAPI + + monkeypatch.delenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", raising=False) + + app = FastAPI() + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_skips_when_already_instrumented(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + + app = FastAPI() + app._is_instrumented_by_opentelemetry = True # noqa: SLF001 + + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_skips_without_otel_config(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.delenv("OTEL_EXPORTER_OTLP_ENDPOINT", raising=False) + monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False) + + app = FastAPI() + assert otel_metrics.instrument_fastapi_http_metrics(app) is False + + +@pytest.mark.unit +def test_instrument_fastapi_applies_with_existing_provider(monkeypatch): + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + otel_metrics.init_otel_metrics() + + app = FastAPI() + + @app.get("/probe") + def probe() -> dict[str, str]: + return {"ok": "true"} + + assert otel_metrics.instrument_fastapi_http_metrics(app) is True + assert getattr(app, "_is_instrumented_by_opentelemetry", False) is True + + with TestClient(app) as client: + response = client.get("/probe") + assert response.status_code == 200 + + data = reader.get_metrics_data() + assert data is not None + metric_names = { + metric.name + for resource_metrics in data.resource_metrics + for scope in resource_metrics.scope_metrics + for metric in scope.metrics + } + assert "http.server.request.duration" in metric_names + + +@pytest.mark.unit +def test_instrument_fastapi_is_idempotent(monkeypatch): + from fastapi import FastAPI + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + + _set_operator_provider() + otel_metrics.init_otel_metrics() + app = FastAPI() + + assert otel_metrics.instrument_fastapi_http_metrics(app) is True + assert otel_metrics.instrument_fastapi_http_metrics(app) is False diff --git a/uv.lock b/uv.lock index 82187257..248ad678 100644 --- a/uv.lock +++ b/uv.lock @@ -76,6 +76,7 @@ dependencies = [ { name = "litellm", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-exporter-otlp", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation-fastapi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "opentelemetry-sdk", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "psycopg2-binary", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pymongo", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -136,6 +137,7 @@ requires-dist = [ { name = "litellm", specifier = ">=1.83.7,<2" }, { name = "opentelemetry-api", specifier = ">=1.28.0" }, { name = "opentelemetry-exporter-otlp", specifier = ">=1.28.0" }, + { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.49b0" }, { name = "opentelemetry-sdk", specifier = ">=1.28.0" }, { name = "psycopg2-binary", specifier = ">=2.9.9,<3" }, { name = "pymongo", specifier = ">=4.13.0,<5" }, @@ -372,6 +374,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/29/5ecc3a15d5a33e31b26c11426c45c501e439cb865d0bff96315d86443b78/appnope-0.1.4-py2.py3-none-any.whl", hash = "sha256:502575ee11cd7a28c0205f379b525beefebab9d161b7c964670864014ed7213c", size = 4321, upload-time = "2024-02-06T09:43:09.663Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "asttokens" version = "3.0.0" @@ -1853,6 +1864,53 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/95/f1/b27d3e2e003cd9a3592c43d099d2ed8d0a947c15281bf8463a256db0b46c/opentelemetry_exporter_otlp_proto_http-1.39.1-py3-none-any.whl", hash = "sha256:d9f5207183dd752a412c4cd564ca8875ececba13be6e9c6c370ffb752fd59985", size = 19641, upload-time = "2025-12-11T13:32:22.248Z" }, ] +[[package]] +name = "opentelemetry-instrumentation" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "packaging", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "wrapt", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/41/0f/7e6b713ac117c1f5e4e3300748af699b9902a2e5e34c9cf443dde25a01fa/opentelemetry_instrumentation-0.60b1.tar.gz", hash = "sha256:57ddc7974c6eb35865af0426d1a17132b88b2ed8586897fee187fd5b8944bd6a", size = 31706, upload-time = "2025-12-11T13:36:42.515Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/77/d2/6788e83c5c86a2690101681aeef27eeb2a6bf22df52d3f263a22cee20915/opentelemetry_instrumentation-0.60b1-py3-none-any.whl", hash = "sha256:04480db952b48fb1ed0073f822f0ee26012b7be7c3eac1a3793122737c78632d", size = 33096, upload-time = "2025-12-11T13:35:33.067Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-util-http", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/77/db/851fa88db7441da82d50bd80f2de5ee55213782e25dc858e04d0c9961d60/opentelemetry_instrumentation_asgi-0.60b1.tar.gz", hash = "sha256:16bfbe595cd24cda309a957456d0fc2523f41bc7b076d1f2d7e98a1ad9876d6f", size = 26107, upload-time = "2025-12-11T13:36:47.015Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/76/76/1fb94367cef64420d2171157a6b9509582873bd09a6afe08a78a8d1f59d9/opentelemetry_instrumentation_asgi-0.60b1-py3-none-any.whl", hash = "sha256:d48def2dbed10294c99cfcf41ebbd0c414d390a11773a41f472d20000fcddc25", size = 16933, upload-time = "2025-12-11T13:35:40.462Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-instrumentation-asgi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-semantic-conventions", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "opentelemetry-util-http", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9c/e7/e7e5e50218cf488377209d85666b182fa2d4928bf52389411ceeee1b2b60/opentelemetry_instrumentation_fastapi-0.60b1.tar.gz", hash = "sha256:de608955f7ff8eecf35d056578346a5365015fd7d8623df9b1f08d1c74769c01", size = 24958, upload-time = "2025-12-11T13:36:59.35Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7d/cc/6e808328ba54662e50babdcab21138eae4250bc0fddf67d55526a615a2ca/opentelemetry_instrumentation_fastapi-0.60b1-py3-none-any.whl", hash = "sha256:af94b7a239ad1085fc3a820ecf069f67f579d7faf4c085aaa7bd9b64eafc8eaf", size = 13478, upload-time = "2025-12-11T13:36:00.811Z" }, +] + [[package]] name = "opentelemetry-proto" version = "1.39.1" @@ -1892,6 +1950,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/7a/5e/5958555e09635d09b75de3c4f8b9cae7335ca545d77392ffe7331534c402/opentelemetry_semantic_conventions-0.60b1-py3-none-any.whl", hash = "sha256:9fa8c8b0c110da289809292b0591220d3a7b53c1526a23021e977d68597893fb", size = 219982, upload-time = "2025-12-11T13:32:36.955Z" }, ] +[[package]] +name = "opentelemetry-util-http" +version = "0.60b1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/50/fc/c47bb04a1d8a941a4061307e1eddfa331ed4d0ab13d8a9781e6db256940a/opentelemetry_util_http-0.60b1.tar.gz", hash = "sha256:0d97152ca8c8a41ced7172d29d3622a219317f74ae6bb3027cfbdcf22c3cc0d6", size = 11053, upload-time = "2025-12-11T13:37:25.115Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/5c/d3f1733665f7cd582ef0842fb1d2ed0bc1fba10875160593342d22bba375/opentelemetry_util_http-0.60b1-py3-none-any.whl", hash = "sha256:66381ba28550c91bee14dcba8979ace443444af1ed609226634596b4b0faf199", size = 8947, upload-time = "2025-12-11T13:36:37.151Z" }, +] + [[package]] name = "orjson" version = "3.11.7" From b59b92edf9c15ecd909ce1ccf4cf4f42e5643f05 Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Tue, 9 Jun 2026 15:13:34 -0700 Subject: [PATCH 6/9] Fix FastAPI OTel HTTP metrics by instrumenting at import time. --- agentex/src/api/app.py | 13 +--- agentex/src/utils/otel_metrics.py | 44 +++++++++-- agentex/tests/unit/utils/test_otel_metrics.py | 76 +++++++++++++++++++ 3 files changed, 116 insertions(+), 17 deletions(-) diff --git a/agentex/src/api/app.py b/agentex/src/api/app.py index b2e72348..8d6332ba 100644 --- a/agentex/src/api/app.py +++ b/agentex/src/api/app.py @@ -38,11 +38,7 @@ from src.config.environment_variables import EnvVarKeys from src.domain.exceptions import GenericException from src.utils.logging import make_logger -from src.utils.otel_metrics import ( - init_otel_metrics, - instrument_fastapi_http_metrics, - shutdown_otel_metrics, -) +from src.utils.otel_metrics import configure_app_metrics, shutdown_otel_metrics logger = make_logger(__name__) @@ -77,10 +73,6 @@ def __init__( @asynccontextmanager async def lifespan(_: FastAPI): - # Initialize OpenTelemetry metrics first (before dependencies register instruments) - init_otel_metrics() - instrument_fastapi_http_metrics(fastapi_app) - await dependencies.startup_global_dependencies() configure_statsd() @@ -195,6 +187,9 @@ async def handle_unexpected(request, exc): fastapi_app.include_router(checkpoints.router) fastapi_app.include_router(task_retention.router) +# Instrument before the first ASGI message; lifespan startup is too late. +configure_app_metrics(fastapi_app) + # Wrap FastAPI app with health check interceptor for sub-millisecond K8s probe responses. # This must be the outermost layer to bypass all middleware. # Export as `app` so existing uvicorn entry points (app:app) work without changes. diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index ac9cf0e2..2c57dcd8 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -20,6 +20,14 @@ OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication OTEL_SERVICE_NAME: Service name for metrics (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) + AGENTEX_OTEL_HTTP_METRICS_ENABLED: Opt-in in-process FastAPI HTTP metrics + (default: false). When true, also set + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics on the pod + so the OTel Operator does not double-instrument FastAPI. + OTEL_PYTHON_DISABLED_INSTRUMENTATIONS: Pod env; must include ``fastapi`` when + using in-process HTTP metrics (see above). + DD_TRACE_FASTAPI_ENABLED: Set to ``false`` when using ddtrace-run so ddtrace + does not claim FastAPI before OpenTelemetry instrumentation. """ from __future__ import annotations @@ -298,14 +306,11 @@ def instrument_fastapi_http_metrics(app: Any) -> bool: """ Install in-process FastAPI HTTP server metrics (http.server.request.duration). - Call after init_otel_metrics() so custom and HTTP metrics share the active - global MeterProvider (operator or standalone). + Prefer :func:`configure_app_metrics`. When called directly, invoke before the + ASGI server handles any messages (lifespan startup is too late). - Safe with OTel Operator auto-instrumentation: FastAPIInstrumentor is - idempotent and no-ops when ``_is_instrumented_by_opentelemetry`` is already set. - Set OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi on the pod so the operator - does not also hook FastAPI. Beyla/eBPF metrics are independent and may still - appear under different label sets when attach succeeds. + Requires ``AGENTEX_OTEL_HTTP_METRICS_ENABLED=true`` and + ``OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics`` on the pod. Returns: True when instrumentation was applied, False when skipped or disabled. @@ -331,6 +336,29 @@ def instrument_fastapi_http_metrics(app: Any) -> bool: from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - FastAPIInstrumentor.instrument_app(app) + meter_provider = _global_meter_provider() + FastAPIInstrumentor.instrument_app(app, meter_provider=meter_provider) logger.info("FastAPI in-process HTTP metrics instrumentation enabled") return True + + +def configure_app_metrics(app: Any) -> None: + """ + Initialize OTLP metrics and optional FastAPI HTTP instrumentation. + + Call once at module import after the FastAPI app is fully configured (middleware, + routes, handlers) and before wrapping it or serving any ASGI messages. + Lifespan is too late: Starlette caches ``middleware_stack`` on the first ASGI + message (usually lifespan startup), before the lifespan handler runs. + + HTTP metrics are opt-in via ``AGENTEX_OTEL_HTTP_METRICS_ENABLED`` (default false). + Beyla/eBPF HTTP metrics are independent and use different label sets when present. + """ + init_otel_metrics() + if not _http_metrics_enabled(): + return + if not instrument_fastapi_http_metrics(app): + logger.warning( + "FastAPI HTTP metrics were not applied despite " + "AGENTEX_OTEL_HTTP_METRICS_ENABLED; see prior log lines for the skip reason" + ) diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 772f58e5..0060d9f2 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -435,6 +435,82 @@ def probe() -> dict[str, str]: assert "http.server.request.duration" in metric_names +@pytest.mark.unit +def test_configure_app_metrics_before_first_request(monkeypatch): + """Instrument at import time, before the ASGI server builds middleware_stack.""" + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + + app = FastAPI() + + @app.post("/agents/{agent_id}/rpc") + def rpc(agent_id: str) -> dict[str, str]: + return {"ok": agent_id} + + otel_metrics.configure_app_metrics(app) + + with TestClient(app) as client: + response = client.post("/agents/test/rpc") + assert response.status_code == 200 + + data = reader.get_metrics_data() + assert data is not None + metric_names = { + metric.name + for resource_metrics in data.resource_metrics + for scope in resource_metrics.scope_metrics + for metric in scope.metrics + } + assert "http.server.request.duration" in metric_names + + +@pytest.mark.unit +def test_configure_app_metrics_in_lifespan_does_not_record_metrics(monkeypatch): + """Lifespan runs after Starlette caches middleware_stack on lifespan startup.""" + from contextlib import asynccontextmanager + + from fastapi import FastAPI + from fastapi.testclient import TestClient + + monkeypatch.setenv("AGENTEX_OTEL_HTTP_METRICS_ENABLED", "true") + monkeypatch.setenv("OTEL_SEMCONV_STABILITY_OPT_IN", "http") + + reader = InMemoryMetricReader() + provider = MeterProvider( + resource=Resource.create({"service.name": "agentex"}), + metric_readers=[reader], + ) + _set_global_meter_provider(provider) + + app = FastAPI() + + @app.post("/x") + def x() -> dict[str, str]: + return {"ok": "true"} + + @asynccontextmanager + async def lifespan(_: FastAPI): + otel_metrics.configure_app_metrics(app) + yield + + app.router.lifespan_context = lifespan + + with TestClient(app) as client: + client.post("/x") + + assert reader.get_metrics_data() is None + + @pytest.mark.unit def test_instrument_fastapi_is_idempotent(monkeypatch): from fastapi import FastAPI From b3586c0cfba76fff70b526aacacbabc5664bdc75 Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Tue, 9 Jun 2026 16:15:31 -0700 Subject: [PATCH 7/9] Use OTELResourceDetector for custom metrics resource attributes --- agentex/src/utils/otel_metrics.py | 28 +++++++----- agentex/tests/unit/utils/test_otel_metrics.py | 45 +++++++++++++++++++ 2 files changed, 63 insertions(+), 10 deletions(-) diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index 2c57dcd8..5eb09c8c 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -20,6 +20,8 @@ OTEL_EXPORTER_OTLP_HEADERS: Optional headers for authentication OTEL_SERVICE_NAME: Service name for metrics (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) + OTEL_RESOURCE_ATTRIBUTES: K8s and service resource attrs (injected by the OTel + Operator on cluster pods; read via OTELResourceDetector). AGENTEX_OTEL_HTTP_METRICS_ENABLED: Opt-in in-process FastAPI HTTP metrics (default: false). When true, also set OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics on the pod @@ -44,7 +46,11 @@ ) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION, Resource +from opentelemetry.sdk.resources import ( + OTELResourceDetector, + Resource, + get_aggregated_resources, +) from src.utils.logging import make_logger @@ -63,6 +69,11 @@ DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds +def _build_resource() -> Resource: + """Build MeterProvider resource from standard OTEL_* env (operator-injected on k8s).""" + return get_aggregated_resources([OTELResourceDetector()]) + + def _global_meter_provider() -> MeterProvider | None: """Return the global MeterProvider if installed, else None (proxy is ignored).""" provider = metrics.get_meter_provider() @@ -206,15 +217,12 @@ def init_otel_metrics( ) ) ) - resource = Resource.create( - { - SERVICE_NAME: resolved_service_name, - SERVICE_VERSION: service_version - or os.environ.get("SERVICE_VERSION", "0.1.0"), - "deployment.environment": environment - or os.environ.get("ENVIRONMENT", "development"), - } - ) + resource = _build_resource() + if not resource.attributes.get("k8s.pod.name"): + logger.warning( + "k8s.pod.name not set on MeterProvider resource; " + "ensure OTEL_RESOURCE_ATTRIBUTES is injected (OTel Operator)." + ) reader = PeriodicExportingMetricReader( exporter=_create_metric_exporter(endpoint, protocol), export_interval_millis=resolved_export_interval_ms, diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 0060d9f2..026336aa 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -135,6 +135,51 @@ def test_init_creates_meter_provider_when_none_configured(monkeypatch): assert otel_metrics.get_meter("agentex.test") is not None +@pytest.mark.unit +def test_build_resource_parses_operator_injected_pod_env(monkeypatch): + """Regression: K8s expands $(OTEL_RESOURCE_ATTRIBUTES_*) before Python starts.""" + pod_name = "agentex-ccc85c45b-s29zm" + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.container.name=agentex," + "k8s.deployment.name=agentex," + "k8s.namespace.name=agentex," + "k8s.node.name=ip-10-0-1-2.us-west-2.compute.internal," + f"k8s.pod.name={pod_name}," + "k8s.replicaset.name=agentex-ccc85c45b," + f"service.instance.id=agentex.{pod_name}.agentex," + "service.namespace=agentex," + "service.version=perf-agentex-drop-redundant-task-grant-b59b92e", + ) + + attrs = otel_metrics._build_resource().attributes + assert attrs.get("service.name") == "agentex" + assert attrs.get("k8s.pod.name") == pod_name + assert attrs.get("k8s.namespace.name") == "agentex" + assert attrs.get("k8s.deployment.name") == "agentex" + assert attrs.get("k8s.container.name") == "agentex" + assert attrs.get("service.instance.id") == f"agentex.{pod_name}.agentex" + + +@pytest.mark.unit +def test_build_resource_from_otel_env(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=operator-pod,k8s.namespace.name=agentex," + "k8s.deployment.name=agentex,service.instance.id=agentex.operator-pod.agentex", + ) + + resource = otel_metrics._build_resource() + attrs = resource.attributes + assert attrs.get("service.name") == "agentex" + assert attrs.get("k8s.pod.name") == "operator-pod" + assert attrs.get("k8s.namespace.name") == "agentex" + assert attrs.get("k8s.deployment.name") == "agentex" + assert attrs.get("service.instance.id") == "agentex.operator-pod.agentex" + + @pytest.mark.unit def test_init_coexists_without_set_meter_provider_when_operator_present( monkeypatch, From 6a8db548dbecbb9c46588f91feb7aa9d56a0b2a1 Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Tue, 9 Jun 2026 16:42:16 -0700 Subject: [PATCH 8/9] Set Cumulative export for Metrics --- agentex/src/utils/otel_metrics.py | 21 ++++++++++++++++--- agentex/tests/unit/utils/test_otel_metrics.py | 21 +++++++++++++++++-- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index 5eb09c8c..fbe14190 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -44,8 +44,11 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter as OTLPHttpMetricExporter, ) -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider, UpDownCounter +from opentelemetry.sdk.metrics.export import ( + AggregationTemporality, + PeriodicExportingMetricReader, +) from opentelemetry.sdk.resources import ( OTELResourceDetector, Resource, @@ -68,6 +71,14 @@ DEFAULT_SERVICE_NAME = "agentex" DEFAULT_EXPORT_INTERVAL_MS = 30000 # 30 seconds +# Cumulative export is required for Prometheus/Mimir rate()/increase() on OTLP histograms. +# Delta temporality produces inflated or ramping RPS when queried via histogram_count(rate(...)). +_PREFERRED_OTLP_TEMPORALITY = { + Counter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, +} + def _build_resource() -> Resource: """Build MeterProvider resource from standard OTEL_* env (operator-injected on k8s).""" @@ -142,7 +153,10 @@ def _http_metrics_export_url(endpoint: str) -> str: def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter: if protocol in {"http/protobuf", "http"}: - return OTLPHttpMetricExporter(endpoint=_http_metrics_export_url(endpoint)) + return OTLPHttpMetricExporter( + endpoint=_http_metrics_export_url(endpoint), + preferred_temporality=_PREFERRED_OTLP_TEMPORALITY, + ) if protocol != "grpc": logger.warning("Unknown OTEL metrics protocol %r; using grpc", protocol) @@ -150,6 +164,7 @@ def _create_metric_exporter(endpoint: str, protocol: str) -> MetricExporter: return OTLPGrpcMetricExporter( endpoint=endpoint, insecure=endpoint.startswith("http://"), + preferred_temporality=_PREFERRED_OTLP_TEMPORALITY, ) diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 026336aa..7b0da862 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -12,8 +12,8 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( OTLPMetricExporter as OTLPHttpMetricExporter, ) -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider, UpDownCounter +from opentelemetry.sdk.metrics.export import AggregationTemporality, InMemoryMetricReader from opentelemetry.sdk.resources import Resource from src.utils import cache_metrics, otel_metrics @@ -366,6 +366,23 @@ def test_create_http_metric_exporter_uses_v1_metrics_path(): assert exporter._endpoint == "http://collector:4318/v1/metrics" +@pytest.mark.unit +def test_create_metric_exporter_uses_cumulative_temporality(): + """Regression: delta OTLP histogram export breaks Mimir histogram_count(rate(...)).""" + grpc_exporter = otel_metrics._create_metric_exporter( + "http://localhost:4317", "grpc" + ) + http_exporter = otel_metrics._create_metric_exporter( + "http://collector:4318", "http/protobuf" + ) + + for exporter in (grpc_exporter, http_exporter): + preferred = exporter._preferred_temporality + assert preferred[Counter] == AggregationTemporality.CUMULATIVE + assert preferred[Histogram] == AggregationTemporality.CUMULATIVE + assert preferred[UpDownCounter] == AggregationTemporality.CUMULATIVE + + @pytest.mark.unit def test_init_after_shutdown_in_standalone_mode(monkeypatch): monkeypatch.setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") From 1f443051649a7d683d781bae87d8e42c3cb65832 Mon Sep 17 00:00:00 2001 From: James Cardenas Date: Wed, 10 Jun 2026 13:54:33 -0700 Subject: [PATCH 9/9] Append PID to service.instance.id on standalone MeterProvider resource --- agentex/src/utils/otel_metrics.py | 29 +++++++++- agentex/tests/unit/utils/test_otel_metrics.py | 56 ++++++++++++++++++- 2 files changed, 79 insertions(+), 6 deletions(-) diff --git a/agentex/src/utils/otel_metrics.py b/agentex/src/utils/otel_metrics.py index fbe14190..3c9ba033 100644 --- a/agentex/src/utils/otel_metrics.py +++ b/agentex/src/utils/otel_metrics.py @@ -21,7 +21,9 @@ OTEL_SERVICE_NAME: Service name for metrics (default: agentex) OTEL_METRICS_EXPORT_INTERVAL_MS: Export interval in ms (default: 30000) OTEL_RESOURCE_ATTRIBUTES: K8s and service resource attrs (injected by the OTel - Operator on cluster pods; read via OTELResourceDetector). + Operator on cluster pods; read via OTELResourceDetector). Per-process + service.instance.id is applied via Resource.merge when this module creates + the MeterProvider; env is not modified. AGENTEX_OTEL_HTTP_METRICS_ENABLED: Opt-in in-process FastAPI HTTP metrics (default: false). When true, also set OTEL_PYTHON_DISABLED_INSTRUMENTATIONS=fastapi,system_metrics on the pod @@ -80,9 +82,30 @@ } +def _per_process_instance_id(resource: Resource) -> str: + """Return a worker-unique service.instance.id from detected resource attrs.""" + pid = os.getpid() + existing = resource.attributes.get("service.instance.id") + if existing: + existing = str(existing) + pid_token = f".{pid}" + if existing.endswith(pid_token) or f"{pid_token}." in existing: + return existing + return f"{existing}.{pid}" + service = ( + resource.attributes.get("service.name") + or os.environ.get("OTEL_SERVICE_NAME") + or "unknown" + ) + pod = resource.attributes.get("k8s.pod.name") or "unknown" + return f"{service}.{pod}.{pid}" + + def _build_resource() -> Resource: - """Build MeterProvider resource from standard OTEL_* env (operator-injected on k8s).""" - return get_aggregated_resources([OTELResourceDetector()]) + """Detect operator/k8s attrs from env; set a per-process service.instance.id.""" + resource = get_aggregated_resources([OTELResourceDetector()]) + service_instance_id = _per_process_instance_id(resource) + return resource.merge(Resource.create({"service.instance.id": service_instance_id})) def _global_meter_provider() -> MeterProvider | None: diff --git a/agentex/tests/unit/utils/test_otel_metrics.py b/agentex/tests/unit/utils/test_otel_metrics.py index 7b0da862..305c2ed4 100644 --- a/agentex/tests/unit/utils/test_otel_metrics.py +++ b/agentex/tests/unit/utils/test_otel_metrics.py @@ -4,6 +4,8 @@ from unittest.mock import patch +import os + import pytest from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( @@ -14,7 +16,7 @@ ) from opentelemetry.sdk.metrics import Counter, Histogram, MeterProvider, UpDownCounter from opentelemetry.sdk.metrics.export import AggregationTemporality, InMemoryMetricReader -from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.resources import OTELResourceDetector, Resource, get_aggregated_resources from src.utils import cache_metrics, otel_metrics @@ -135,6 +137,52 @@ def test_init_creates_meter_provider_when_none_configured(monkeypatch): assert otel_metrics.get_meter("agentex.test") is not None +@pytest.mark.unit +def test_per_process_instance_id_extends_operator_value(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=my-pod,service.instance.id=agentex.my-pod.agentex", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._per_process_instance_id(base) == "agentex.my-pod.agentex.42" + + +@pytest.mark.unit +def test_per_process_instance_id_builds_when_missing(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", "k8s.pod.name=my-pod") + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + base = get_aggregated_resources([OTELResourceDetector()]) + assert otel_metrics._per_process_instance_id(base) == "agentex.my-pod.42" + + +@pytest.mark.unit +def test_build_resource_does_not_mutate_otel_resource_attributes_env(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "agentex") + original = "k8s.pod.name=my-pod,service.instance.id=agentex.my-pod.agentex" + monkeypatch.setenv("OTEL_RESOURCE_ATTRIBUTES", original) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 42) + otel_metrics._build_resource() + assert os.environ["OTEL_RESOURCE_ATTRIBUTES"] == original + + +@pytest.mark.unit +def test_per_process_instance_id_works_for_other_services(monkeypatch): + monkeypatch.setenv("OTEL_SERVICE_NAME", "payments-api") + monkeypatch.setenv( + "OTEL_RESOURCE_ATTRIBUTES", + "k8s.pod.name=payments-abc,service.instance.id=payments-api.payments-abc.prod", + ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 7) + base = get_aggregated_resources([OTELResourceDetector()]) + assert ( + otel_metrics._per_process_instance_id(base) + == "payments-api.payments-abc.prod.7" + ) + + @pytest.mark.unit def test_build_resource_parses_operator_injected_pod_env(monkeypatch): """Regression: K8s expands $(OTEL_RESOURCE_ATTRIBUTES_*) before Python starts.""" @@ -152,6 +200,7 @@ def test_build_resource_parses_operator_injected_pod_env(monkeypatch): "service.namespace=agentex," "service.version=perf-agentex-drop-redundant-task-grant-b59b92e", ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 12345) attrs = otel_metrics._build_resource().attributes assert attrs.get("service.name") == "agentex" @@ -159,7 +208,7 @@ def test_build_resource_parses_operator_injected_pod_env(monkeypatch): assert attrs.get("k8s.namespace.name") == "agentex" assert attrs.get("k8s.deployment.name") == "agentex" assert attrs.get("k8s.container.name") == "agentex" - assert attrs.get("service.instance.id") == f"agentex.{pod_name}.agentex" + assert attrs.get("service.instance.id") == f"agentex.{pod_name}.agentex.12345" @pytest.mark.unit @@ -170,6 +219,7 @@ def test_build_resource_from_otel_env(monkeypatch): "k8s.pod.name=operator-pod,k8s.namespace.name=agentex," "k8s.deployment.name=agentex,service.instance.id=agentex.operator-pod.agentex", ) + monkeypatch.setattr(otel_metrics.os, "getpid", lambda: 6789) resource = otel_metrics._build_resource() attrs = resource.attributes @@ -177,7 +227,7 @@ def test_build_resource_from_otel_env(monkeypatch): assert attrs.get("k8s.pod.name") == "operator-pod" assert attrs.get("k8s.namespace.name") == "agentex" assert attrs.get("k8s.deployment.name") == "agentex" - assert attrs.get("service.instance.id") == "agentex.operator-pod.agentex" + assert attrs.get("service.instance.id") == "agentex.operator-pod.agentex.6789" @pytest.mark.unit