Skip to content

Commit 4a70184

Browse files
committed
Fix retry amplification storm in Azure Monitor OpenTelemetry exporter
During sustained 429 throttling, failed telemetry accumulates as blob files in local storage. On recovery, _transmit_from_storage() drained all blobs in a tight loop, creating a burst of requests that could immediately re-trigger throttling. Changes: - Cap storage drain to 10 blobs per invocation (_MAX_STORAGE_DRAIN_BATCH) to spread retry load across export cycles - Stop draining immediately when a retryable failure occurs, since the service is still under pressure - Add tests for both drain cap and early termination behaviors
1 parent 41ea3dd commit 4a70184

2 files changed

Lines changed: 68 additions & 0 deletions

File tree

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,18 @@ def __init__(self, **kwargs: Any) -> None:
210210
# Collect customer sdkstats metrics
211211
collect_customer_sdkstats(self)
212212

213+
# Maximum number of blobs to drain from storage per invocation.
214+
# Prevents a retry storm when many blobs have accumulated during
215+
# sustained throttling (e.g. 429).
216+
_MAX_STORAGE_DRAIN_BATCH = 10
217+
213218
def _transmit_from_storage(self) -> None:
214219
if not self.storage:
215220
return
221+
drained = 0
216222
for blob in self.storage.gets():
223+
if drained >= self._MAX_STORAGE_DRAIN_BATCH:
224+
break
217225
# give a few more seconds for blob lease operation
218226
# to reduce the chance of race (for perf consideration)
219227
if blob.lease(self._timeout + 5):
@@ -223,11 +231,17 @@ def _transmit_from_storage(self) -> None:
223231
result = self._transmit(envelopes)
224232
if result == ExportResult.FAILED_RETRYABLE:
225233
blob.lease(1)
234+
# Stop draining: the service is still under
235+
# pressure. Remaining blobs will be retried on
236+
# the next successful export cycle, avoiding a
237+
# burst of requests that re-triggers throttling.
238+
break
226239
else:
227240
blob.delete()
228241
else:
229242
# If blob.get() returns None, delete the corrupted blob
230243
blob.delete()
244+
drained += 1
231245

232246
def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None:
233247
if self.storage:

sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,60 @@ def test_transmit_from_storage_blob_get_returns_none(self):
423423
blob_mock.delete.assert_called_once() # Corrupted blob should be deleted
424424
transmit_mock.assert_not_called() # No transmission should occur
425425

426+
def test_transmit_from_storage_stops_on_retryable_failure(self):
427+
"""Test that _transmit_from_storage stops draining blobs when
428+
a retryable failure (e.g. 429) occurs, preventing retry storms."""
429+
exporter = BaseExporter()
430+
exporter.storage = mock.Mock()
431+
envelope_mock = {"name": "test", "time": "time"}
432+
# Create three blobs in storage
433+
blob1 = mock.Mock()
434+
blob1.lease.return_value = True
435+
blob1.get.return_value = [envelope_mock]
436+
blob2 = mock.Mock()
437+
blob2.lease.return_value = True
438+
blob2.get.return_value = [envelope_mock]
439+
blob3 = mock.Mock()
440+
blob3.lease.return_value = True
441+
blob3.get.return_value = [envelope_mock]
442+
exporter.storage.gets.return_value = [blob1, blob2, blob3]
443+
with mock.patch.object(exporter, "_transmit") as transmit_mock:
444+
# First blob transmit fails with retryable error
445+
transmit_mock.return_value = ExportResult.FAILED_RETRYABLE
446+
exporter._transmit_from_storage()
447+
# Only the first blob should have been attempted; loop should break
448+
transmit_mock.assert_called_once()
449+
blob1.lease.assert_called()
450+
# blob2 and blob3 should not have been touched
451+
blob2.lease.assert_not_called()
452+
blob3.lease.assert_not_called()
453+
454+
def test_transmit_from_storage_caps_drain_batch_size(self):
455+
"""Test that _transmit_from_storage processes at most
456+
_MAX_STORAGE_DRAIN_BATCH blobs per invocation to prevent
457+
flooding the service on recovery from throttling."""
458+
exporter = BaseExporter()
459+
exporter.storage = mock.Mock()
460+
envelope_mock = {"name": "test", "time": "time"}
461+
num_blobs = exporter._MAX_STORAGE_DRAIN_BATCH + 5
462+
blobs = []
463+
for _ in range(num_blobs):
464+
b = mock.Mock()
465+
b.lease.return_value = True
466+
b.get.return_value = [envelope_mock]
467+
blobs.append(b)
468+
exporter.storage.gets.return_value = blobs
469+
with mock.patch.object(exporter, "_transmit") as transmit_mock:
470+
transmit_mock.return_value = ExportResult.SUCCESS
471+
exporter._transmit_from_storage()
472+
# Should only process _MAX_STORAGE_DRAIN_BATCH blobs
473+
self.assertEqual(transmit_mock.call_count, exporter._MAX_STORAGE_DRAIN_BATCH)
474+
# The extra blobs beyond the cap should not be deleted
475+
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH):
476+
blobs[i].delete.assert_called_once()
477+
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH, num_blobs):
478+
blobs[i].delete.assert_not_called()
479+
426480
def test_telemetry_item_dict_roundtrip(self):
427481
"""Test that TelemetryItem correctly round-trips through as_dict() -> TelemetryItem(dict)
428482
for all telemetry data types used in offline storage."""

0 commit comments

Comments
 (0)