Skip to content

Commit d4f9ae7

Browse files
JP-MYxrmxemdnetoaabmass
authored
New APIs to add/remove metric readers at run-time (open-telemetry#4863)
* New APIs to add/remove metric readers at run-time * Updated typehints for `_collect` callback. Added Changelog entry for the public API * Updated tests and fixed lint --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com> Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com> Co-authored-by: Aaron Abbott <aaronabbott@google.com>
1 parent 9726176 commit d4f9ae7

11 files changed

Lines changed: 230 additions & 73 deletions

File tree

.changelog/4863.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
`opentelemetry-sdk`: add `add_metric_reader` / `remove_metric_reader` public APIs to register / unregister metric readers at runtime.

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,12 @@ def __init__(
490490
)
491491
),
492492
resource=resource,
493-
metric_readers=metric_readers,
494493
views=views,
495494
)
495+
self._metric_readers = metric_readers
496496
self._measurement_consumer = SynchronousMeasurementConsumer(
497-
sdk_config=self._sdk_config
497+
sdk_config=self._sdk_config,
498+
metric_readers=metric_readers,
498499
)
499500
disabled = environ.get(OTEL_SDK_DISABLED, "")
500501
self._disabled = disabled.lower().strip() == "true"
@@ -509,7 +510,7 @@ def __init__(
509510
_meter_configurator or _default_meter_configurator
510511
)
511512

512-
for metric_reader in self._sdk_config.metric_readers:
513+
for metric_reader in self._metric_readers:
513514
with self._all_metric_readers_lock:
514515
if metric_reader in self._all_metric_readers:
515516
# pylint: disable=broad-exception-raised
@@ -560,7 +561,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
560561

561562
metric_reader_error = {}
562563

563-
for metric_reader in self._sdk_config.metric_readers:
564+
for metric_reader in self._metric_readers:
564565
current_ts = time_ns()
565566
try:
566567
if current_ts >= deadline_ns:
@@ -605,7 +606,7 @@ def _shutdown():
605606

606607
metric_reader_error = {}
607608

608-
for metric_reader in self._sdk_config.metric_readers:
609+
for metric_reader in self._metric_readers:
609610
current_ts = time_ns()
610611
try:
611612
if current_ts >= deadline_ns:
@@ -675,3 +676,36 @@ def get_meter(
675676
),
676677
)
677678
return self._meters[instrumentation_scope]
679+
680+
def add_metric_reader(
681+
self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader"
682+
) -> None:
683+
with self._all_metric_readers_lock:
684+
if metric_reader in self._all_metric_readers:
685+
_logger.warning(
686+
"MetricReader '%s' has been registered already!",
687+
metric_reader,
688+
)
689+
return
690+
self._measurement_consumer.add_metric_reader(metric_reader)
691+
# pylint: disable-next=protected-access
692+
metric_reader._set_collect_callback(
693+
self._measurement_consumer.collect
694+
)
695+
self._all_metric_readers.add(metric_reader)
696+
697+
def remove_metric_reader(
698+
self,
699+
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
700+
) -> None:
701+
with self._all_metric_readers_lock:
702+
if metric_reader not in self._all_metric_readers:
703+
_logger.warning(
704+
"MetricReader '%s' has not been registered!", metric_reader
705+
)
706+
return
707+
self._measurement_consumer.remove_metric_reader(metric_reader)
708+
# pylint: disable-next=protected-access
709+
metric_reader._set_collect_callback(None)
710+
metric_reader.shutdown()
711+
self._all_metric_readers.remove(metric_reader)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,16 @@ def __init__(
223223
*,
224224
otel_component_type: OtelComponentTypeValues | None = None,
225225
) -> None:
226-
self._collect: Callable[
227-
[
228-
opentelemetry.sdk.metrics.export.MetricReader,
229-
AggregationTemporality,
230-
],
231-
Iterable[opentelemetry.sdk.metrics.export.Metric],
232-
] = None
226+
self._collect: (
227+
Callable[
228+
[
229+
opentelemetry.sdk.metrics.export.MetricReader,
230+
AggregationTemporality,
231+
],
232+
Iterable[opentelemetry.sdk.metrics.export.Metric],
233+
]
234+
| None
235+
) = None
233236

234237
self._instrument_class_temporality = {
235238
_Counter: AggregationTemporality.CUMULATIVE,
@@ -373,7 +376,8 @@ def _set_collect_callback(
373376
AggregationTemporality,
374377
],
375378
MetricsData,
376-
],
379+
]
380+
| None,
377381
) -> None:
378382
"""This function is internal to the SDK. It should not be called or overridden by users"""
379383
self._collect = func

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33

44
# pylint: disable=unused-import
55

6+
import weakref
67
from abc import ABC, abstractmethod
7-
from collections.abc import Mapping
8+
from collections.abc import Iterable, Mapping
89
from threading import Lock
910
from time import time_ns
1011

1112
# This kind of import is needed to avoid Sphinx errors.
1213
import opentelemetry.sdk.metrics
1314
import opentelemetry.sdk.metrics._internal.instrument
14-
import opentelemetry.sdk.metrics._internal.sdk_configuration
1515
from opentelemetry.metrics._internal.instrument import CallbackOptions
1616
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
1717
from opentelemetry.sdk.metrics._internal.measurement import Measurement
@@ -48,10 +48,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer):
4848
def __init__(
4949
self,
5050
sdk_config: "opentelemetry.sdk.metrics._internal.sdk_configuration.SdkConfiguration",
51+
metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"],
5152
) -> None:
5253
self._lock = Lock()
5354
self._sdk_config = sdk_config
54-
# should never be mutated
5555
self._reader_storages: Mapping[
5656
opentelemetry.sdk.metrics.export.MetricReader, MetricReaderStorage
5757
] = {
@@ -60,7 +60,7 @@ def __init__(
6060
reader._instrument_class_temporality,
6161
reader._instrument_class_aggregation,
6262
)
63-
for reader in sdk_config.metric_readers
63+
for reader in metric_readers
6464
}
6565
self._async_instruments: list[
6666
opentelemetry.sdk.metrics._internal.instrument._Asynchronous
@@ -75,7 +75,9 @@ def consume_measurement(self, measurement: Measurement) -> None:
7575
measurement.context,
7676
)
7777
)
78-
for reader_storage in self._reader_storages.values():
78+
with self._lock:
79+
reader_storages = weakref.WeakSet(self._reader_storages.values())
80+
for reader_storage in reader_storages:
7981
reader_storage.consume_measurement(
8082
measurement, should_sample_exemplar
8183
)
@@ -132,3 +134,23 @@ def collect(
132134
result = self._reader_storages[metric_reader].collect()
133135

134136
return result
137+
138+
def add_metric_reader(
139+
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
140+
) -> None:
141+
"""Registers a new metric reader."""
142+
with self._lock:
143+
self._reader_storages[metric_reader] = MetricReaderStorage(
144+
self._sdk_config,
145+
# pylint: disable-next=protected-access
146+
metric_reader._instrument_class_temporality,
147+
# pylint: disable-next=protected-access
148+
metric_reader._instrument_class_aggregation,
149+
)
150+
151+
def remove_metric_reader(
152+
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
153+
) -> None:
154+
"""Unregisters the given metric reader."""
155+
with self._lock:
156+
self._reader_storages.pop(metric_reader)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,4 @@
1515
class SdkConfiguration:
1616
exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter"
1717
resource: "opentelemetry.sdk.resources.Resource"
18-
metric_readers: Sequence["opentelemetry.sdk.metrics.export.MetricReader"]
1918
views: Sequence["opentelemetry.sdk.metrics.view.View"]

opentelemetry-sdk/tests/_configuration/test_meter_provider.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def test_none_config_uses_supplied_resource(self):
108108

109109
def test_none_config_no_readers(self):
110110
provider = create_meter_provider(None)
111-
self.assertEqual(len(provider._sdk_config.metric_readers), 0)
111+
self.assertEqual(len(provider._metric_readers), 0)
112112

113113
def test_none_config_uses_trace_based_exemplar_filter(self):
114114
provider = create_meter_provider(None)
@@ -139,7 +139,7 @@ def test_none_config_does_not_read_interval_env_var(self):
139139
)
140140
with patch.dict(os.environ, {"OTEL_METRIC_EXPORT_INTERVAL": "999999"}):
141141
provider = create_meter_provider(config)
142-
reader = provider._sdk_config.metric_readers[0]
142+
reader = provider._metric_readers[0]
143143
self.assertIsInstance(reader, PeriodicExportingMetricReader)
144144
self.assertEqual(reader._export_interval_millis, 60000.0)
145145

@@ -163,11 +163,6 @@ def test_configure_with_config_sets_global(self):
163163
arg = mock_set.call_args[0][0]
164164
self.assertIsInstance(arg, MeterProvider)
165165

166-
def test_empty_readers_list(self):
167-
config = MeterProviderConfig(readers=[])
168-
provider = create_meter_provider(config)
169-
self.assertEqual(len(provider._sdk_config.metric_readers), 0)
170-
171166

172167
class TestCreateMetricReaders(unittest.TestCase):
173168
@staticmethod
@@ -189,7 +184,7 @@ def test_console_exporter(self):
189184
PushMetricExporterConfig(console=ConsoleMetricExporterConfig())
190185
)
191186
provider = create_meter_provider(config)
192-
reader = provider._sdk_config.metric_readers[0]
187+
reader = provider._metric_readers[0]
193188
self.assertIsInstance(reader, PeriodicExportingMetricReader)
194189
self.assertIsInstance(reader._exporter, ConsoleMetricExporter)
195190

@@ -198,15 +193,15 @@ def test_periodic_reader_default_interval(self):
198193
PushMetricExporterConfig(console=ConsoleMetricExporterConfig())
199194
)
200195
provider = create_meter_provider(config)
201-
reader = provider._sdk_config.metric_readers[0]
196+
reader = provider._metric_readers[0]
202197
self.assertEqual(reader._export_interval_millis, 60000.0)
203198

204199
def test_periodic_reader_default_timeout(self):
205200
config = self._make_periodic_config(
206201
PushMetricExporterConfig(console=ConsoleMetricExporterConfig())
207202
)
208203
provider = create_meter_provider(config)
209-
reader = provider._sdk_config.metric_readers[0]
204+
reader = provider._metric_readers[0]
210205
self.assertEqual(reader._export_timeout_millis, 30000.0)
211206

212207
def test_periodic_reader_explicit_interval(self):
@@ -215,7 +210,7 @@ def test_periodic_reader_explicit_interval(self):
215210
interval=5000,
216211
)
217212
provider = create_meter_provider(config)
218-
reader = provider._sdk_config.metric_readers[0]
213+
reader = provider._metric_readers[0]
219214
self.assertEqual(reader._export_interval_millis, 5000.0)
220215

221216
def test_periodic_reader_explicit_timeout(self):
@@ -224,7 +219,7 @@ def test_periodic_reader_explicit_timeout(self):
224219
timeout=10000,
225220
)
226221
provider = create_meter_provider(config)
227-
reader = provider._sdk_config.metric_readers[0]
222+
reader = provider._metric_readers[0]
228223
self.assertEqual(reader._export_timeout_millis, 10000.0)
229224

230225
def test_otlp_http_missing_package_raises(self):
@@ -347,7 +342,7 @@ def test_pull_prometheus_creates_reader(self):
347342

348343
mock_reader_cls.assert_called_once_with(disable_target_info=True)
349344
mock_start_server.assert_called_once_with(port=9090, addr="0.0.0.0")
350-
self.assertEqual(len(provider._sdk_config.metric_readers), 1)
345+
self.assertEqual(len(provider._metric_readers), 1)
351346

352347
def test_pull_prometheus_defaults(self):
353348
mock_reader_cls = MagicMock()
@@ -375,7 +370,7 @@ def test_pull_prometheus_defaults(self):
375370

376371
mock_reader_cls.assert_called_once_with(disable_target_info=False)
377372
mock_start_server.assert_called_once_with(port=9464, addr="localhost")
378-
self.assertEqual(len(provider._sdk_config.metric_readers), 1)
373+
self.assertEqual(len(provider._metric_readers), 1)
379374

380375
def test_pull_prometheus_missing_package_raises(self):
381376
with patch.dict(
@@ -432,7 +427,7 @@ def test_pull_plugin_loads_via_entry_point(self):
432427
]
433428
)
434429
provider = create_meter_provider(config)
435-
self.assertEqual(len(provider._sdk_config.metric_readers), 1)
430+
self.assertEqual(len(provider._metric_readers), 1)
436431
mock_class.assert_called_once_with(port=8080)
437432
mock_entry_points.assert_called_once_with(
438433
group="opentelemetry_pull_metric_exporter",
@@ -547,7 +542,7 @@ def test_plugin_metric_exporter_loaded_via_entry_point(self):
547542
PushMetricExporterConfig(my_custom_exporter={})
548543
)
549544
provider = create_meter_provider(config)
550-
self.assertEqual(len(provider._sdk_config.metric_readers), 1)
545+
self.assertEqual(len(provider._metric_readers), 1)
551546

552547
def test_unknown_metric_exporter_raises_configuration_error(self):
553548
with patch(
@@ -581,7 +576,7 @@ def test_multiple_readers(self):
581576
]
582577
)
583578
provider = create_meter_provider(config)
584-
self.assertEqual(len(provider._sdk_config.metric_readers), 2)
579+
self.assertEqual(len(provider._metric_readers), 2)
585580

586581

587582
class TestTemporalityAndAggregation(unittest.TestCase):
@@ -605,7 +600,7 @@ def _make_console_config(temporality=None, histogram_agg=None):
605600
@staticmethod
606601
def _get_exporter(config):
607602
provider = create_meter_provider(config)
608-
return provider._sdk_config.metric_readers[0]._exporter
603+
return provider._metric_readers[0]._exporter
609604

610605
def test_default_temporality_is_cumulative(self):
611606
exporter = self._get_exporter(self._make_console_config())

0 commit comments

Comments
 (0)