Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions opentelemetry-api/tests/metrics/test_instruments.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
# type: ignore

import threading
from inspect import Signature, isabstract, signature
from unittest import TestCase

Expand All @@ -32,8 +33,6 @@
_Gauge,
)

# FIXME Test that the instrument methods can be called concurrently safely.


class ChildInstrument(Instrument):
# pylint: disable=useless-parent-delegation
Expand Down Expand Up @@ -724,3 +723,44 @@ def test_description_check(self):
],
"",
)


class TestConcurrency(TestCase):
def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100):
"""Helper: run fn concurrently across threads and assert no exceptions."""
errors = []

def worker():
try:
for _ in range(calls_per_thread):
fn()
except Exception as exc: # pylint: disable=broad-except
errors.append(exc)

threads = [threading.Thread(target=worker) for _ in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

self.assertEqual([], errors)

def test_counter_add_concurrent(self):
"""Test that Counter.add can be called concurrently safely."""
counter = NoOpCounter("name")
self._run_concurrently(lambda: counter.add(1))
Comment thread
chimchim89 marked this conversation as resolved.
Outdated

def test_up_down_counter_add_concurrent(self):
"""Test that UpDownCounter.add can be called concurrently safely."""
up_down_counter = NoOpUpDownCounter("name")
self._run_concurrently(lambda: up_down_counter.add(1))

def test_histogram_record_concurrent(self):
"""Test that Histogram.record can be called concurrently safely."""
histogram = NoOpHistogram("name")
self._run_concurrently(lambda: histogram.record(1))

def test_gauge_set_concurrent(self):
"""Test that Gauge.set can be called concurrently safely."""
gauge = NoOpMeter("name").create_gauge("name")
self._run_concurrently(lambda: gauge.set(1))
73 changes: 71 additions & 2 deletions opentelemetry-api/tests/metrics/test_meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
# limitations under the License.
# type: ignore

import threading
from logging import WARNING
from unittest import TestCase
from unittest.mock import Mock, patch

from opentelemetry.metrics import Meter, NoOpMeter

# FIXME Test that the meter methods can be called concurrently safely.


class ChildMeter(Meter):
# pylint: disable=signature-differs
Expand Down Expand Up @@ -195,3 +194,73 @@ def test_create_observable_up_down_counter(self):
self.assertTrue(
Meter.create_observable_up_down_counter.__isabstractmethod__
)


class TestConcurrency(TestCase):
Comment thread
chimchim89 marked this conversation as resolved.
Outdated
def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100):
"""Helper: run fn concurrently across threads and assert no exceptions."""
errors = []

def worker():
try:
for _ in range(calls_per_thread):
fn()
except Exception as exc: # pylint: disable=broad-except
errors.append(exc)

threads = [threading.Thread(target=worker) for _ in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

self.assertEqual([], errors)

def test_create_counter_concurrent(self):
Comment thread
chimchim89 marked this conversation as resolved.
"""Test that Meter.create_counter can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(lambda: meter.create_counter("counter"))

def test_create_up_down_counter_concurrent(self):
"""Test that Meter.create_up_down_counter can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(
lambda: meter.create_up_down_counter("up_down_counter")
)

def test_create_observable_counter_concurrent(self):
"""Test that Meter.create_observable_counter can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(
lambda: meter.create_observable_counter(
"observable_counter", lambda options: []
)
)

def test_create_histogram_concurrent(self):
"""Test that Meter.create_histogram can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(lambda: meter.create_histogram("histogram"))

def test_create_gauge_concurrent(self):
"""Test that Meter.create_gauge can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(lambda: meter.create_gauge("gauge"))

def test_create_observable_gauge_concurrent(self):
"""Test that Meter.create_observable_gauge can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(
lambda: meter.create_observable_gauge(
"observable_gauge", lambda options: []
)
)

def test_create_observable_up_down_counter_concurrent(self):
"""Test that Meter.create_observable_up_down_counter can be called concurrently safely."""
meter = NoOpMeter("name")
self._run_concurrently(
lambda: meter.create_observable_up_down_counter(
"observable_up_down_counter", lambda options: []
)
)
34 changes: 32 additions & 2 deletions opentelemetry-api/tests/metrics/test_meter_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

# pylint: disable=protected-access

import threading
from unittest import TestCase
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -48,8 +49,6 @@
reset_metrics_globals,
)

# FIXME Test that the instrument methods can be called concurrently safely.


@fixture
def reset_meter_provider():
Expand Down Expand Up @@ -165,6 +164,37 @@ def test_get_meter_wrapper(self):
self.assertEqual(meter.schema_url, "schema_url")


class TestConcurrency(TestCase):
def _run_concurrently(self, fn, num_threads=10, calls_per_thread=100):
"""Helper: run fn concurrently across threads and assert no exceptions."""
errors = []

def worker():
try:
for _ in range(calls_per_thread):
fn()
except Exception as exc: # pylint: disable=broad-except
errors.append(exc)

threads = [threading.Thread(target=worker) for _ in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

self.assertEqual([], errors)

def test_no_op_meter_provider_get_meter_concurrent(self):
"""Test that NoOpMeterProvider.get_meter can be called concurrently safely."""
meter_provider = NoOpMeterProvider()
self._run_concurrently(lambda: meter_provider.get_meter("name"))

def test_proxy_meter_provider_get_meter_concurrent(self):
"""Test that _ProxyMeterProvider.get_meter can be called concurrently safely."""
meter_provider = _ProxyMeterProvider()
self._run_concurrently(lambda: meter_provider.get_meter("name"))


class TestProxy(MetricsGlobalsTest, TestCase):
def test_global_proxy_meter_provider(self):
# Global get_meter_provider() should initially be a _ProxyMeterProvider
Expand Down
Loading