Skip to content

Commit 25e9220

Browse files
[Monitor OpenTelemetry Exporter] Add client-side token bucket rate limiter for telemetry export (#46999)
* Add client-side token bucket rate limiter for telemetry export Addresses resilience finding: the Azure Monitor OpenTelemetry exporter had no client-side send-rate cap, allowing telemetry bursts to overload shared ingestion infrastructure. Changes: - Add _TokenBucketRateLimiter in export/_rate_limiter.py with configurable max_envelopes_per_second (default 10,000/sec, 1s burst capacity) - Integrate rate limiting into BaseExporter._transmit() so all exporter types (traces, logs, metrics) are protected - Excess envelopes are routed to local storage for retry, not dropped - Stats/internal exporters bypass rate limiting to preserve observability - Rate limiting can be disabled via max_envelopes_per_second=0 - Add 19 unit and integration tests in tests/test_rate_limiter.py * Address PR review: fix overflow double-persist, validate negative rate limit values - Mutate envelopes list in-place (del envelopes[granted:]) so callers' _handle_transmit_from_storage sees only admitted envelopes, preventing duplicate storage of overflow on retryable failures - Log a warning when overflow is deferred but storage is disabled - Reject negative max_envelopes_per_second with ValueError instead of silently disabling rate limiting (only 0 disables, per documentation) - Add clarifying comment that per-exporter rate limiting is intentional - Add tests: negative value rejection, in-place mutation, no-storage overflow * Fix pylint warnings: implicit-str-concat and too-many-statements * Fix redirect double rate-limit: skip rate limiting on recursive _transmit calls Address JacksonWeber review: when _transmit() hits a 307/308 redirect and calls itself recursively, the rate-limiting logic at the top would consume tokens a second time for the same batch. Add _skip_rate_limit parameter that is set to True on recursive calls to prevent this. * Fix black formatting issues in rate limiter and base exporter * Retrigger CI checks * Retrigger CI checks (attempt 2) --------- Co-authored-by: Jackson Weber <47067795+JacksonWeber@users.noreply.github.com>
1 parent 9825fdc commit 25e9220

3 files changed

Lines changed: 440 additions & 14 deletions

File tree

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 91 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,18 @@
4444
DropCode,
4545
_exception_categories,
4646
)
47-
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
47+
from azure.monitor.opentelemetry.exporter._connection_string_parser import (
48+
ConnectionStringParser,
49+
)
4850
from azure.monitor.opentelemetry.exporter._storage import LocalFileStorage
4951
from azure.monitor.opentelemetry.exporter._utils import (
5052
_get_auth_policy,
5153
_get_sha256_hash,
5254
)
55+
from azure.monitor.opentelemetry.exporter.export._rate_limiter import (
56+
_TokenBucketRateLimiter,
57+
_DEFAULT_MAX_ENVELOPES_PER_SECOND,
58+
)
5359
from azure.monitor.opentelemetry.exporter.statsbeat._state import (
5460
get_statsbeat_initial_success,
5561
get_statsbeat_shutdown,
@@ -70,7 +76,6 @@
7076
get_customer_stats_manager,
7177
)
7278

73-
7479
logger = logging.getLogger(__name__)
7580

7681
_AZURE_TEMPDIR_PREFIX = "Microsoft-AzureMonitor-"
@@ -86,6 +91,7 @@ class ExportResult(Enum):
8691

8792
# pylint: disable=broad-except
8893
# pylint: disable=too-many-instance-attributes
94+
# pylint: disable=too-many-statements
8995
# pylint: disable=C0301
9096
class BaseExporter:
9197
"""Azure Monitor base exporter for OpenTelemetry."""
@@ -98,6 +104,7 @@ def __init__(self, **kwargs: Any) -> None:
98104
:keyword ManagedIdentityCredential/ClientSecretCredential credential: Token credential, such as ManagedIdentityCredential or ClientSecretCredential, used for Azure Active Directory (AAD) authentication. Defaults to None.
99105
:keyword bool disable_offline_storage: Determines whether to disable storing failed telemetry records for retry. Defaults to `False`.
100106
:keyword str storage_directory: Storage path in which to store retry files. Defaults to `<tempfile.gettempdir()>/opentelemetry-python-<your-instrumentation-key>`.
107+
:keyword int max_envelopes_per_second: Maximum number of telemetry envelopes sent per second. Acts as a client-side safety cap to prevent overloading shared ingestion infrastructure during telemetry bursts. Defaults to 10000. Set to 0 to disable rate limiting.
101108
:rtype: None
102109
"""
103110
parsed_connection_string = ConnectionStringParser(kwargs.get("connection_string"))
@@ -139,6 +146,17 @@ def __init__(self, **kwargs: Any) -> None:
139146
"storage_retention_period", 48 * 60 * 60
140147
) # Retention period in seconds (default 48 hrs)
141148
self._timeout = kwargs.get("timeout", 10.0) # networking timeout in seconds
149+
max_eps = kwargs.get("max_envelopes_per_second", _DEFAULT_MAX_ENVELOPES_PER_SECOND)
150+
if max_eps is not None and max_eps < 0:
151+
raise ValueError("max_envelopes_per_second must be non-negative (0 disables rate limiting)")
152+
# Each exporter instance gets its own rate limiter. This is intentional:
153+
# different telemetry types (traces, logs, metrics) have different
154+
# ingestion characteristics and burst profiles, so per-exporter caps
155+
# provide more predictable behaviour than a shared process-wide bucket.
156+
if max_eps and max_eps > 0:
157+
self._rate_limiter: Optional[_TokenBucketRateLimiter] = _TokenBucketRateLimiter(max_eps)
158+
else:
159+
self._rate_limiter = None
142160
self._distro_version = kwargs.get(
143161
_AZURE_MONITOR_DISTRO_VERSION_ARG, ""
144162
) # If set, indicates the exporter is instantiated via Azure monitor OpenTelemetry distro. Versions corresponds to distro version.
@@ -168,7 +186,10 @@ def __init__(self, **kwargs: Any) -> None:
168186
policies.append(config.http_logging_policy or HttpLoggingPolicy(**kwargs))
169187

170188
self.client: AzureMonitorClient = AzureMonitorClient(
171-
host=self._endpoint, connection_timeout=self._timeout, policies=policies, **kwargs
189+
host=self._endpoint,
190+
connection_timeout=self._timeout,
191+
policies=policies,
192+
**kwargs,
172193
)
173194
# TODO: Uncomment configuration changes once testing is completed
174195
# if self._configuration_manager:
@@ -195,7 +216,9 @@ def __init__(self, **kwargs: Any) -> None:
195216
if self._should_collect_stats():
196217
try:
197218
# Import here to avoid circular dependencies
198-
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import collect_statsbeat_metrics
219+
from azure.monitor.opentelemetry.exporter.statsbeat._statsbeat import (
220+
collect_statsbeat_metrics,
221+
)
199222

200223
collect_statsbeat_metrics(self)
201224
except Exception as e: # pylint: disable=broad-except
@@ -205,7 +228,9 @@ def __init__(self, **kwargs: Any) -> None:
205228

206229
# customer sdkstats initialization
207230
if self._should_collect_customer_sdkstats():
208-
from azure.monitor.opentelemetry.exporter.statsbeat.customer import collect_customer_sdkstats
231+
from azure.monitor.opentelemetry.exporter.statsbeat.customer import (
232+
collect_customer_sdkstats,
233+
)
209234

210235
# Collect customer sdkstats metrics
211236
collect_customer_sdkstats(self)
@@ -261,18 +286,60 @@ def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result:
261286
# pylint: disable=too-many-branches
262287
# pylint: disable=too-many-nested-blocks
263288
# pylint: disable=too-many-statements
264-
def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
289+
def _transmit(self, envelopes: List[TelemetryItem], _skip_rate_limit: bool = False) -> ExportResult:
265290
"""
266291
Transmit the data envelopes to the ingestion service.
267292
268293
Returns an ExportResult, this function should never
269294
throw an exception.
270295
:param envelopes: The list of telemetry items to transmit.
271296
:type envelopes: list of ~azure.monitor.opentelemetry.exporter._generated.exporter.models.TelemetryItem
297+
:param _skip_rate_limit: Internal flag to skip rate limiting on recursive calls (e.g. redirects).
298+
:type _skip_rate_limit: bool
272299
:return: The result of the export.
273300
:rtype: ~azure.monitor.opentelemetry.exporter.export._base._ExportResult
274301
"""
275302
if len(envelopes) > 0:
303+
# Client-side rate limiting: cap send rate to protect shared ingestion infrastructure.
304+
# Stats exporters bypass rate limiting to ensure observability data is not lost.
305+
# Skip rate limiting on recursive calls (e.g. 307/308 redirects) to avoid
306+
# double-consuming tokens for the same batch.
307+
if (
308+
not _skip_rate_limit
309+
and self._rate_limiter
310+
and not self._is_stats_exporter()
311+
and not self._is_customer_sdkstats_exporter()
312+
):
313+
granted = self._rate_limiter.try_consume(len(envelopes))
314+
if granted == 0:
315+
logger.warning(
316+
"Rate limiter rejected entire batch of %d envelopes. Routing to local storage for retry.",
317+
len(envelopes),
318+
)
319+
return ExportResult.FAILED_RETRYABLE
320+
if granted < len(envelopes):
321+
# Send what we can, route the rest to local storage.
322+
# We mutate the list in-place so that the caller's reference
323+
# (used later in _handle_transmit_from_storage) only sees
324+
# the admitted envelopes, preventing double-persist of the
325+
# overflow on a subsequent retryable failure.
326+
overflow = envelopes[granted:]
327+
del envelopes[granted:]
328+
logger.info(
329+
"Rate limiter admitted %d of %d envelopes; %d envelopes deferred to local storage.",
330+
granted,
331+
granted + len(overflow),
332+
len(overflow),
333+
)
334+
if self.storage:
335+
self.storage.put([x.as_dict() for x in overflow])
336+
else:
337+
logger.warning(
338+
"Rate limiter deferred %d envelopes but offline "
339+
"storage is disabled; these envelopes are dropped.",
340+
len(overflow),
341+
)
342+
276343
result = ExportResult.SUCCESS
277344
# Track whether or not exporter has successfully reached ingestion
278345
# Currently only used for statsbeat exporter to detect shutdown cases
@@ -308,7 +375,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
308375
logger.info(
309376
"Data dropped due to ingestion sampling: %s %s.",
310377
error.message,
311-
envelopes[error.index] if error.index is not None else "",
378+
(envelopes[error.index] if error.index is not None else ""),
312379
)
313380
elif _is_retryable_code(error.status_code):
314381
resend_envelopes.append(envelopes[error.index]) # type: ignore
@@ -330,7 +397,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
330397
"Data drop %s: %s %s.",
331398
error.status_code,
332399
error.message,
333-
envelopes[error.index] if error.index is not None else "",
400+
(envelopes[error.index] if error.index is not None else ""),
334401
)
335402
if self.storage and resend_envelopes:
336403
envelopes_to_store = [x.as_dict() for x in resend_envelopes]
@@ -394,7 +461,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
394461
# Change the host to the new redirected host
395462
self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212
396463
# Attempt to export again
397-
result = self._transmit(envelopes)
464+
result = self._transmit(envelopes, _skip_rate_limit=True)
398465
else:
399466
if not self._is_stats_exporter():
400467
if self._should_collect_customer_sdkstats():
@@ -412,7 +479,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
412479
# Track dropped items in customer sdkstats, non-retryable scenario
413480
if self._should_collect_customer_sdkstats():
414481
track_dropped_items(
415-
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
482+
envelopes,
483+
DropCode.CLIENT_EXCEPTION,
484+
_exception_categories.CLIENT_EXCEPTION.value,
416485
)
417486
logger.error(
418487
"Error sending telemetry because of circular redirects. "
@@ -474,7 +543,9 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
474543
# Track dropped items in customer sdkstats for general exceptions
475544
if self._should_collect_customer_sdkstats():
476545
track_dropped_items(
477-
envelopes, DropCode.CLIENT_EXCEPTION, _exception_categories.CLIENT_EXCEPTION.value
546+
envelopes,
547+
DropCode.CLIENT_EXCEPTION,
548+
_exception_categories.CLIENT_EXCEPTION.value,
478549
)
479550

480551
if self._should_collect_stats():
@@ -608,7 +679,9 @@ def _is_sampling_rejection(message: Optional[str]) -> bool:
608679

609680

610681
# mypy: disable-error-code="union-attr"
611-
def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCredential]:
682+
def _get_authentication_credential(
683+
**kwargs: Any,
684+
) -> Optional[ManagedIdentityCredential]:
612685
if "credential" in kwargs:
613686
return kwargs.get("credential")
614687
auth_string = os.getenv(_APPLICATIONINSIGHTS_AUTHENTICATION_STRING, "")
@@ -625,7 +698,9 @@ def _get_authentication_credential(**kwargs: Any) -> Optional[ManagedIdentityCre
625698
return credential
626699
except ValueError as exc:
627700
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
628-
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s", auth_string, exc
701+
"APPLICATIONINSIGHTS_AUTHENTICATION_STRING, %s, has invalid format: %s",
702+
auth_string,
703+
exc,
629704
)
630705
except Exception as e:
631706
logger.error( # pylint: disable=do-not-log-exceptions-if-not-debug
@@ -688,6 +763,8 @@ def _safe_psutil_call(func, default=""):
688763
)
689764
subdirectory = _get_sha256_hash(hash_input)
690765
storage_directory = os.path.join(
691-
shared_root, _AZURE_TEMPDIR_PREFIX + subdirectory, _TEMPDIR_PREFIX + instrumentation_key
766+
shared_root,
767+
_AZURE_TEMPDIR_PREFIX + subdirectory,
768+
_TEMPDIR_PREFIX + instrumentation_key,
692769
)
693770
return storage_directory
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
import logging
5+
import threading
6+
import time
7+
8+
logger = logging.getLogger(__name__)
9+
10+
# Default maximum envelopes per second across all telemetry types.
11+
# This is a client-side safety cap to prevent self-inflicted overload
12+
# of shared ingestion infrastructure during telemetry bursts.
13+
_DEFAULT_MAX_ENVELOPES_PER_SECOND = 10000
14+
15+
# Minimum allowed value to prevent misconfiguration
16+
_MIN_MAX_ENVELOPES_PER_SECOND = 1
17+
18+
19+
class _TokenBucketRateLimiter:
20+
"""Thread-safe token bucket rate limiter for outbound telemetry.
21+
22+
The bucket refills at ``max_per_second`` tokens per second and holds
23+
at most ``max_per_second`` tokens (i.e. one second of burst capacity).
24+
25+
:param float max_per_second: Maximum tokens (envelopes) allowed per second.
26+
"""
27+
28+
def __init__(self, max_per_second: float) -> None:
29+
if max_per_second < _MIN_MAX_ENVELOPES_PER_SECOND:
30+
raise ValueError(f"max_per_second must be at least {_MIN_MAX_ENVELOPES_PER_SECOND}")
31+
self._max_per_second = float(max_per_second)
32+
self._tokens = self._max_per_second # start full
33+
self._last_refill = time.monotonic()
34+
self._lock = threading.Lock()
35+
36+
def try_consume(self, count: int) -> int:
37+
"""Try to consume *count* tokens from the bucket.
38+
39+
Returns the number of tokens actually consumed (i.e. how many
40+
envelopes may be sent). The caller should handle the remainder
41+
(e.g. store for retry or drop).
42+
43+
:param int count: Number of tokens requested.
44+
:return: Number of tokens granted (<= *count*).
45+
:rtype: int
46+
"""
47+
if count <= 0:
48+
return 0
49+
50+
with self._lock:
51+
now = time.monotonic()
52+
elapsed = now - self._last_refill
53+
self._last_refill = now
54+
55+
# Refill tokens based on elapsed time, capped at bucket capacity
56+
self._tokens = min(
57+
self._max_per_second,
58+
self._tokens + elapsed * self._max_per_second,
59+
)
60+
61+
granted = min(count, int(self._tokens))
62+
self._tokens -= granted
63+
64+
return granted

0 commit comments

Comments
 (0)