Skip to content

Commit 218656c

Browse files
authored
Fix retry amplification storm in Azure Monitor OpenTelemetry exporter (#47002)
During sustained 429 throttling, failed telemetry accumulates as blob files in local storage. On recovery, _transmit_from_storage() drained all blobs in a tight loop, creating a burst of requests that could immediately re-trigger throttling. Changes: - Cap storage drain to 10 blobs per invocation (_MAX_STORAGE_DRAIN_BATCH) to spread retry load across export cycles - Stop draining immediately when a retryable failure occurs, since the service is still under pressure - Add tests for both drain cap and early termination behaviors
1 parent f1aaee2 commit 218656c

2 files changed

Lines changed: 69 additions & 2 deletions

File tree

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/export/_base.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,10 +210,18 @@ def __init__(self, **kwargs: Any) -> None:
210210
# Collect customer sdkstats metrics
211211
collect_customer_sdkstats(self)
212212

213+
# Maximum number of blobs to drain from storage per invocation.
214+
# Prevents a retry storm when many blobs have accumulated during
215+
# sustained throttling (e.g. 429).
216+
_MAX_STORAGE_DRAIN_BATCH = 10
217+
213218
def _transmit_from_storage(self) -> None:
214219
if not self.storage:
215220
return
221+
drained = 0
216222
for blob in self.storage.gets():
223+
if drained >= self._MAX_STORAGE_DRAIN_BATCH:
224+
break
217225
# give a few more seconds for blob lease operation
218226
# to reduce the chance of race (for perf consideration)
219227
if blob.lease(self._timeout + 5):
@@ -223,11 +231,16 @@ def _transmit_from_storage(self) -> None:
223231
result = self._transmit(envelopes)
224232
if result == ExportResult.FAILED_RETRYABLE:
225233
blob.lease(1)
226-
else:
227-
blob.delete()
234+
# Stop draining: the service is still under
235+
# pressure. Remaining blobs will be retried on
236+
# the next successful export cycle, avoiding a
237+
# burst of requests that re-triggers throttling.
238+
break
239+
blob.delete()
228240
else:
229241
# If blob.get() returns None, delete the corrupted blob
230242
blob.delete()
243+
drained += 1
231244

232245
def _handle_transmit_from_storage(self, envelopes: List[TelemetryItem], result: ExportResult) -> None:
233246
if self.storage:

sdk/monitor/azure-monitor-opentelemetry-exporter/tests/test_base_exporter.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,60 @@ def test_transmit_from_storage_blob_get_returns_none(self):
423423
blob_mock.delete.assert_called_once() # Corrupted blob should be deleted
424424
transmit_mock.assert_not_called() # No transmission should occur
425425

426+
def test_transmit_from_storage_stops_on_retryable_failure(self):
427+
"""Test that _transmit_from_storage stops draining blobs when
428+
a retryable failure (e.g. 429) occurs, preventing retry storms."""
429+
exporter = BaseExporter()
430+
exporter.storage = mock.Mock()
431+
envelope_mock = {"name": "test", "time": "time"}
432+
# Create three blobs in storage
433+
blob1 = mock.Mock()
434+
blob1.lease.return_value = True
435+
blob1.get.return_value = [envelope_mock]
436+
blob2 = mock.Mock()
437+
blob2.lease.return_value = True
438+
blob2.get.return_value = [envelope_mock]
439+
blob3 = mock.Mock()
440+
blob3.lease.return_value = True
441+
blob3.get.return_value = [envelope_mock]
442+
exporter.storage.gets.return_value = [blob1, blob2, blob3]
443+
with mock.patch.object(exporter, "_transmit") as transmit_mock:
444+
# First blob transmit fails with retryable error
445+
transmit_mock.return_value = ExportResult.FAILED_RETRYABLE
446+
exporter._transmit_from_storage()
447+
# Only the first blob should have been attempted; loop should break
448+
transmit_mock.assert_called_once()
449+
blob1.lease.assert_called()
450+
# blob2 and blob3 should not have been touched
451+
blob2.lease.assert_not_called()
452+
blob3.lease.assert_not_called()
453+
454+
def test_transmit_from_storage_caps_drain_batch_size(self):
455+
"""Test that _transmit_from_storage processes at most
456+
_MAX_STORAGE_DRAIN_BATCH blobs per invocation to prevent
457+
flooding the service on recovery from throttling."""
458+
exporter = BaseExporter()
459+
exporter.storage = mock.Mock()
460+
envelope_mock = {"name": "test", "time": "time"}
461+
num_blobs = exporter._MAX_STORAGE_DRAIN_BATCH + 5
462+
blobs = []
463+
for _ in range(num_blobs):
464+
b = mock.Mock()
465+
b.lease.return_value = True
466+
b.get.return_value = [envelope_mock]
467+
blobs.append(b)
468+
exporter.storage.gets.return_value = blobs
469+
with mock.patch.object(exporter, "_transmit") as transmit_mock:
470+
transmit_mock.return_value = ExportResult.SUCCESS
471+
exporter._transmit_from_storage()
472+
# Should only process _MAX_STORAGE_DRAIN_BATCH blobs
473+
self.assertEqual(transmit_mock.call_count, exporter._MAX_STORAGE_DRAIN_BATCH)
474+
# The extra blobs beyond the cap should not be deleted
475+
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH):
476+
blobs[i].delete.assert_called_once()
477+
for i in range(exporter._MAX_STORAGE_DRAIN_BATCH, num_blobs):
478+
blobs[i].delete.assert_not_called()
479+
426480
def test_telemetry_item_dict_roundtrip(self):
427481
"""Test that TelemetryItem correctly round-trips through as_dict() -> TelemetryItem(dict)
428482
for all telemetry data types used in offline storage."""

0 commit comments

Comments
 (0)