Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@
DropCode,
_exception_categories,
)
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
from azure.monitor.opentelemetry.exporter._connection_string_parser import (
ConnectionStringParser,
)
from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage
from azure.monitor.opentelemetry.exporter._utils import (
_get_auth_policy,
_get_sha256_hash,
)
from azure.monitor.opentelemetry.exporter.export._rate_limiter import (
_TokenBucketRateLimiter,
_DEFAULT_MAX_ENVELOPES_PER_SECOND,
)
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
get_statsbeat_initial_success,
get_statsbeat_shutdown,
Expand All @@ -70,7 +76,6 @@
get_customer_stats_manager,
)


logger = logging.getLogger(__name__)

_AZURE_TEMPDIR_PREFIX = "Microsoft-AzureMonitor-"
Expand All @@ -86,6 +91,7 @@ class ExportResult(Enum):

# pylint: disable=broad-except
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-statements
# pylint: disable=C0301
class BaseExporter:
"""Azure Monitor base exporter for OpenTelemetry."""
Expand All @@ -98,6 +104,7 @@ def __init__(self, **kwargs: Any) -> None:
:keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None.
:keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`.
:keyword str storage_directory: Storage path in which to store retry files. Defaults to `<tempfile.gettempdir()>/opentelemetry-python-<your-instrumentation-key>`.
:keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting.
:rtype: None
"""
parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string"))
Expand Down Expand Up @@ -139,6 +146,17 @@ def __init__(self, **kwargs: Any) -> None:
"storage_retention_period", 48 * 60 * 60
) # Retention period in seconds (default 48 hrs)
self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds
max_eps = kwargs.get("max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND)
if max_eps is not None and max_eps < 0:
raise ValueError("max_envelopes_per_second must be non-negative (0 disables rate limiting)")
# Each exporter instance gets its own rate limiter. This is intentional:
# different telemetry types (traces, logs, metrics) have different
# ingestion characteristics and burst profiles, so per-exporter caps
# provide more predictable behaviour than a shared process-wide bucket.
if max_eps and max_eps > 0:
self._rate_limiter: Optional[_TokenBucketRateLimiter] = _TokenBucketRateLimiter(max_eps)
else:
self._rate_limiter = None
Comment thread
hectorhdzg marked this conversation as resolved.
self._distro_version = kwargs.get(
_AZURE_MONITOR_DISTRO_VERSION_ARG, ""
) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version.
Expand Down Expand Up @@ -168,7 +186,10 @@ def __init__(self, **kwargs: Any) -> None:
policies.append(config.http_logging_policy or HttpLoggingPolicy(**kwargs))

self.client: AzureMonitorClient = AzureMonitorClient(
host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs
host=self._endpoint,
connection_timeout=self._timeout,
policies=policies,
**kwargs,
)
# TODO: Uncomment configuration changes once testing is completed
# if self._configuration_manager:
Expand All @@ -195,7 +216,9 @@ def __init__(self, **kwargs: Any) -> None:
if self._should_collect_stats():
try:
# Import here to avoid circular dependencies
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
collect_statsbeat_metrics,
)

collect_statsbeat_metrics(self)
except Exception as e: # pylint: disable=broad-except
Expand All @@ -205,7 +228,9 @@ def __init__(self, **kwargs: Any) -> None:

# customer sdkstats initialization
if self._should_collect_customer_sdkstats():
from azure.monitor.opentelemetry.exporter.statsbeat.customer import collect_customer_sdkstats
from azure.monitor.opentelemetry.exporter.statsbeat.customer import (
collect_customer_sdkstats,
)

# Collect customer sdkstats metrics
collect_customer_sdkstats(self)
Expand Down Expand Up @@ -261,18 +286,60 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
# pylint: disable=too-many-branches
# pylint: disable=too-many-nested-blocks
# pylint: disable=too-many-statements
def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
def _transmit(self, envelopes: List[TelemetryItem], _skip_rate_limit: bool = False) -> ExportResult:
"""
Transmit the data envelopes to the ingestion service.

Returns an ExportResult, this function should never
throw an exception.
:param envelopes: The list of telemetry items to transmit.
:type envelopes: list of ~azure.monitor.opentelemetry.exporter._generated.exporter.models.TelemetryItem
:param _skip_rate_limit: Internal flag to skip rate limiting on recursive calls (e.g. redirects).
:type _skip_rate_limit: bool
:return: The result of the export.
:rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult
"""
if len(envelopes) > 0:
# Client-side rate limiting: cap send rate to protect shared ingestion infrastructure.
# Stats exporters bypass rate limiting to ensure observability data is not lost.
# Skip rate limiting on recursive calls (e.g. 307/308 redirects) to avoid
# double-consuming tokens for the same batch.
if (
not _skip_rate_limit
and self._rate_limiter
and not self._is_stats_exporter()
and not self._is_customer_sdkstats_exporter()
):
granted = self._rate_limiter.try_consume(len(envelopes))
if granted == 0:
logger.warning(
"Rate limiter rejected entire batch of %d envelopes. Routing to local storage for retry.",
len(envelopes),
)
return ExportResult.FAILED_RETRYABLE
if granted < len(envelopes):
# Send what we can, route the rest to local storage.
# We mutate the list in-place so that the caller's reference
# (used later in _handle_transmit_from_storage) only sees
# the admitted envelopes, preventing double-persist of the
# overflow on a subsequent retryable failure.
overflow = envelopes[granted:]
del envelopes[granted:]
logger.info(
"Rate limiter admitted %d of %d envelopes; %d envelopes deferred to local storage.",
granted,
granted + len(overflow),
len(overflow),
)
if self.storage:
self.storage.put([x.as_dict() for x in overflow])
Comment thread
hectorhdzg marked this conversation as resolved.
else:
logger.warning(
"Rate limiter deferred %d envelopes but offline "
"storage is disabled; these envelopes are dropped.",
len(overflow),
)

result = ExportResult.SUCCESS
# Track whether or not exporter has successfully reached ingestion
# Currently only used for statsbeat exporter to detect shutdown cases
Expand Down Expand Up @@ -308,7 +375,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
logger.info(
"Data dropped due to ingestion sampling: %s %s.",
error.message,
envelopes[error.index] if error.index is not None else "",
(envelopes[error.index] if error.index is not None else ""),
)
elif _is_retryable_code(error.status_code):
resend_envelopes.append(envelopes[error.index]) # type: ignore
Expand All @@ -330,7 +397,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
"Data drop %s: %s %s.",
error.status_code,
error.message,
envelopes[error.index] if error.index is not None else "",
(envelopes[error.index] if error.index is not None else ""),
)
if self.storage and resend_envelopes:
envelopes_to_store = [x.as_dict() for x in resend_envelopes]
Expand Down Expand Up @@ -394,7 +461,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
# Change the host to the new redirected host
self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212
# Attempt to export again
result = self._transmit(envelopes)
result = self._transmit(envelopes, _skip_rate_limit=True)
else:
if not self._is_stats_exporter():
if self._should_collect_customer_sdkstats():
Expand All @@ -412,7 +479,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
# Track dropped items in customer sdkstats, non-retryable scenario
if self._should_collect_customer_sdkstats():
track_dropped_items(
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
envelopes,
DropCode.CLIENT_EXCEPTION,
_exception_categories.CLIENT_EXCEPTION.value,
)
logger.error(
"Error sending telemetry because of circular redirects. "
Expand Down Expand Up @@ -474,7 +543,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
# Track dropped items in customer sdkstats for general exceptions
if self._should_collect_customer_sdkstats():
track_dropped_items(
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
envelopes,
DropCode.CLIENT_EXCEPTION,
_exception_categories.CLIENT_EXCEPTION.value,
)

if self._should_collect_stats():
Expand Down Expand Up @@ -608,7 +679,9 @@ def _is_sampling_rejection(message: Optional[str]) -> bool:


# mypy: disable-error-code="union-attr"
def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCredential]:
def _get_authentication_credential(
**kwargs: Any,
) -> Optional[ManagedIdentityCredential]:
if "credential" in kwargs:
return kwargs.get("credential")
auth_string = os.getenv(_APPLICATIONINSIGHTS_AUTHENTICATION_STRING, "")
Expand All @@ -625,7 +698,9 @@ def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCre
return credential
except ValueError as exc:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s", auth_string, exc
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s",
auth_string,
exc,
)
except Exception as e:
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
Expand Down Expand Up @@ -688,6 +763,8 @@ def _safe_psutil_call(func, default=""):
)
subdirectory = _get_sha256_hash(hash_input)
storage_directory = os.path.join(
shared_root, _AZURE_TEMPDIR_PREFIX + subdirectory, _TEMPDIR_PREFIX + instrumentation_key
shared_root,
_AZURE_TEMPDIR_PREFIX + subdirectory,
_TEMPDIR_PREFIX + instrumentation_key,
)
return storage_directory
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import logging
import threading
import time

logger = logging.getLogger(__name__)

# Default maximum envelopes per second across all telemetry types.
# This is a client-side safety cap to prevent self-inflicted overload
# of shared ingestion infrastructure during telemetry bursts.
_DEFAULT_MAX_ENVELOPES_PER_SECOND = 10000

# Minimum allowed value to prevent misconfiguration
_MIN_MAX_ENVELOPES_PER_SECOND = 1


class _TokenBucketRateLimiter:
"""Thread-safe token bucket rate limiter for outbound telemetry.

The bucket refills at ``max_per_second`` tokens per second and holds
at most ``max_per_second`` tokens (i.e. one second of burst capacity).

:param float max_per_second: Maximum tokens (envelopes) allowed per second.
"""

def __init__(self, max_per_second: float) -> None:
if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND:
raise ValueError(f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}")
self._max_per_second = float(max_per_second)
self._tokens = self._max_per_second # start full
self._last_refill = time.monotonic()
self._lock = threading.Lock()

def try_consume(self, count: int) -> int:
"""Try to consume *count* tokens from the bucket.

Returns the number of tokens actually consumed (i.e. how many
envelopes may be sent). The caller should handle the remainder
(e.g. store for retry or drop).

:param int count: Number of tokens requested.
:return: Number of tokens granted (<= *count*).
:rtype: int
"""
if count <= 0:
return 0

with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._last_refill = now

# Refill tokens based on elapsed time, capped at bucket capacity
self._tokens = min(
self._max_per_second,
self._tokens + elapsed * self._max_per_second,
)

granted = min(count, int(self._tokens))
self._tokens -= granted

return granted
Loading