Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def __init__(
)

@final
def collect(self, timeout_millis: float = 10_000) -> None:
def collect(self, timeout_millis: float = 10_000) -> bool | None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.

Expand All @@ -350,7 +350,7 @@ def collect(self, timeout_millis: float = 10_000) -> None:
_logger.warning(
"Cannot call collect on a MetricReader until it is registered on a MeterProvider"
)
return
return None

start_time = perf_counter()
try:
Expand All @@ -359,10 +359,11 @@ def collect(self, timeout_millis: float = 10_000) -> None:
self._metrics.record_collection(perf_counter() - start_time)

if metrics is not None:
self._receive_metrics(
return self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)
return None

@final
def _set_collect_callback(
Expand All @@ -384,8 +385,16 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
) -> bool | None:
"""Called by `MetricReader.collect` when it receives a batch of metrics.

Subclasses should return ``True`` on success and ``False`` on failure.

.. note::
Existing subclasses that return ``None`` (the old implicit default)
will be treated as vacuous success by ``force_flush``, preserving
backward-compatible behaviour.
"""

def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
self._metrics = create_metric_reader_metrics(
Expand All @@ -397,8 +406,8 @@ def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
self.collect(timeout_millis=timeout_millis)
return True
result = self.collect(timeout_millis=timeout_millis)
return result is not False

@abstractmethod
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
Expand Down Expand Up @@ -451,9 +460,10 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
with self._lock:
self._metrics_data = metrics_data
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Expand Down Expand Up @@ -569,17 +579,19 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
# pylint: disable=broad-exception-caught,invalid-name
try:
with self._export_lock:
self._exporter.export(
result = self._exporter.export(
metrics_data, timeout_millis=timeout_millis
)
except Exception:
return result is MetricExportResult.SUCCESS
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting metrics")
detach(token)
return False
finally:
detach(token)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
deadline_ns = time_ns() + timeout_millis * 10**6
Expand All @@ -598,6 +610,6 @@ def _shutdown():
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
super().force_flush(timeout_millis=timeout_millis)
self._exporter.force_flush(timeout_millis=timeout_millis)
return True
if not super().force_flush(timeout_millis=timeout_millis):
return False
return self._exporter.force_flush(timeout_millis=timeout_millis)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import pytest

import opentelemetry.sdk.metrics._internal.export as _export_module
from opentelemetry.sdk.environment_variables import (
OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED,
)
Expand Down Expand Up @@ -355,3 +356,52 @@ def test_metric_reader_metrics(self):
self.assertTrue(name.startswith("periodic_metric_reader/"))

mp.shutdown()

def test_force_flush_returns_true_on_success(self):
exporter = FakeMetricsExporter()
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertTrue(result)
pmr.shutdown()

def test_force_flush_returns_false_on_export_failure(self):
exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
pmr.shutdown()

def test_force_flush_skips_exporter_flush_when_collect_fails(self):
exporter = FakeMetricsExporter()
exporter.force_flush = Mock(return_value=True)
pmr = PeriodicExportingMetricReader(
exporter, export_interval_millis=math.inf
)
# No collect callback registered → collect returns None → force_flush
# on base treats None as not-False (success), so wire up a failing one
exporter.export = Mock(return_value=MetricExportResult.FAILURE)

def _collect_failure(reader, timeout_millis):
return metrics

pmr._set_collect_callback(_collect_failure)
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
exporter.force_flush.assert_not_called()
pmr.shutdown()

def test_detach_called_on_export_failure(self):
"""detach(token) must run in finally even when export returns FAILURE."""
exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)

with patch(
"opentelemetry.sdk.metrics._internal.export.detach",
wraps=_export_module.detach,
) as mock_detach:
Comment thread
ravitheja4531-cell marked this conversation as resolved.
pmr.force_flush(timeout_millis=5_000)
pmr.shutdown()
Comment thread
ravitheja4531-cell marked this conversation as resolved.
self.assertTrue(mock_detach.called)