Skip to content

Commit ec3053e

Browse files
ocelotlaabmass
andauthored
Merge Asynchronous and Synchronous sum aggregations (open-telemetry#2379)
* Merge Asynchronous and Synchronous sum aggregations Fixes open-telemetry#2353 * Don't use None for self._value * Remove temporality check from sum aggregate * Update opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py Co-authored-by: Aaron Abbott <aaronabbott@google.com> * Add cumulative test cases * Fix lint * Set value to None This is done in order to identify the situation when aggregate has not been called before collect is. * Refactor collect * Fix initial value * Update opentelemetry-sdk/tests/metrics/test_aggregation.py Co-authored-by: Aaron Abbott <aaronabbott@google.com> * Undo changes to lastvalueaggrgation * Fix test case Co-authored-by: Aaron Abbott <aaronabbott@google.com>
1 parent 8006a49 commit ec3053e

File tree

2 files changed

+79
-124
lines changed

2 files changed

+79
-124
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_metrics/aggregation.py

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@
3535
_logger = getLogger(__name__)
3636

3737

38-
class _InstrumentMonotonicityAwareAggregation:
39-
def __init__(self, instrument_is_monotonic: bool):
40-
self._instrument_is_monotonic = instrument_is_monotonic
41-
super().__init__()
42-
43-
4438
class Aggregation(ABC, Generic[_PointVarT]):
4539
def __init__(self):
4640
self._lock = Lock()
@@ -54,16 +48,27 @@ def collect(self) -> Optional[_PointVarT]:
5448
pass
5549

5650

57-
class SynchronousSumAggregation(
58-
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
59-
):
60-
def __init__(self, instrument_is_monotonic: bool):
61-
super().__init__(instrument_is_monotonic)
62-
self._value = 0
51+
class SumAggregation(Aggregation[Sum]):
52+
def __init__(
53+
self,
54+
instrument_is_monotonic: bool,
55+
instrument_temporality: AggregationTemporality,
56+
):
57+
super().__init__()
58+
6359
self._start_time_unix_nano = _time_ns()
60+
self._instrument_temporality = instrument_temporality
61+
self._instrument_is_monotonic = instrument_is_monotonic
62+
63+
if self._instrument_temporality is AggregationTemporality.DELTA:
64+
self._value = 0
65+
else:
66+
self._value = None
6467

6568
def aggregate(self, measurement: Measurement) -> None:
6669
with self._lock:
70+
if self._value is None:
71+
self._value = 0
6772
self._value = self._value + measurement.value
6873

6974
def collect(self) -> Optional[Sum]:
@@ -73,47 +78,32 @@ def collect(self) -> Optional[Sum]:
7378
"""
7479
now = _time_ns()
7580

76-
with self._lock:
77-
value = self._value
78-
start_time_unix_nano = self._start_time_unix_nano
79-
80-
self._value = 0
81-
self._start_time_unix_nano = now + 1
82-
83-
return Sum(
84-
aggregation_temporality=AggregationTemporality.DELTA,
85-
is_monotonic=self._instrument_is_monotonic,
86-
start_time_unix_nano=start_time_unix_nano,
87-
time_unix_nano=now,
88-
value=value,
89-
)
81+
if self._instrument_temporality is AggregationTemporality.DELTA:
9082

83+
with self._lock:
84+
value = self._value
85+
start_time_unix_nano = self._start_time_unix_nano
9186

92-
class AsynchronousSumAggregation(
93-
_InstrumentMonotonicityAwareAggregation, Aggregation[Sum]
94-
):
95-
def __init__(self, instrument_is_monotonic: bool):
96-
super().__init__(instrument_is_monotonic)
97-
self._value = None
98-
self._start_time_unix_nano = _time_ns()
87+
self._value = 0
88+
self._start_time_unix_nano = now + 1
9989

100-
def aggregate(self, measurement: Measurement) -> None:
101-
with self._lock:
102-
self._value = measurement.value
90+
return Sum(
91+
aggregation_temporality=AggregationTemporality.DELTA,
92+
is_monotonic=self._instrument_is_monotonic,
93+
start_time_unix_nano=start_time_unix_nano,
94+
time_unix_nano=now,
95+
value=value,
96+
)
10397

104-
def collect(self) -> Optional[Sum]:
105-
"""
106-
Atomically return a point for the current value of the metric.
107-
"""
10898
if self._value is None:
10999
return None
110100

111101
return Sum(
112-
start_time_unix_nano=self._start_time_unix_nano,
113-
time_unix_nano=_time_ns(),
114-
value=self._value,
115102
aggregation_temporality=AggregationTemporality.CUMULATIVE,
116103
is_monotonic=self._instrument_is_monotonic,
104+
start_time_unix_nano=self._start_time_unix_nano,
105+
time_unix_nano=now,
106+
value=self._value,
117107
)
118108

119109

@@ -180,7 +170,7 @@ def aggregate(self, measurement: Measurement) -> None:
180170

181171
self._bucket_counts[bisect_left(self._boundaries, value)] += 1
182172

183-
def collect(self) -> Optional[Histogram]:
173+
def collect(self) -> Histogram:
184174
"""
185175
Atomically return a point for the current value of the metric.
186176
"""

opentelemetry-sdk/tests/metrics/test_aggregation.py

Lines changed: 45 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -20,60 +20,74 @@
2020

2121
from opentelemetry.sdk._metrics.aggregation import (
2222
AggregationTemporality,
23-
AsynchronousSumAggregation,
2423
ExplicitBucketHistogramAggregation,
2524
LastValueAggregation,
26-
SynchronousSumAggregation,
25+
SumAggregation,
2726
_convert_aggregation_temporality,
28-
_InstrumentMonotonicityAwareAggregation,
2927
)
3028
from opentelemetry.sdk._metrics.measurement import Measurement
3129
from opentelemetry.sdk._metrics.point import Gauge, Sum
3230

3331

3432
class TestSynchronousSumAggregation(TestCase):
35-
def test_instrument_monotonicity_awareness(self):
33+
def test_aggregate_delta(self):
3634
"""
37-
`SynchronousSumAggregation` is aware of the instrument monotonicity
35+
`SynchronousSumAggregation` aggregates data for sum metric points
3836
"""
3937

40-
synchronous_sum_aggregation = SynchronousSumAggregation(True)
41-
self.assertIsInstance(
42-
synchronous_sum_aggregation,
43-
_InstrumentMonotonicityAwareAggregation,
38+
synchronous_sum_aggregation = SumAggregation(
39+
True, AggregationTemporality.DELTA
4440
)
45-
self.assertTrue(synchronous_sum_aggregation._instrument_is_monotonic)
4641

47-
synchronous_sum_aggregation = SynchronousSumAggregation(False)
48-
self.assertFalse(synchronous_sum_aggregation._instrument_is_monotonic)
42+
synchronous_sum_aggregation.aggregate(Measurement(1))
43+
synchronous_sum_aggregation.aggregate(Measurement(2))
44+
synchronous_sum_aggregation.aggregate(Measurement(3))
4945

50-
def test_aggregate(self):
46+
self.assertEqual(synchronous_sum_aggregation._value, 6)
47+
48+
synchronous_sum_aggregation = SumAggregation(
49+
True, AggregationTemporality.DELTA
50+
)
51+
52+
synchronous_sum_aggregation.aggregate(Measurement(1))
53+
synchronous_sum_aggregation.aggregate(Measurement(-2))
54+
synchronous_sum_aggregation.aggregate(Measurement(3))
55+
56+
self.assertEqual(synchronous_sum_aggregation._value, 2)
57+
58+
def test_aggregate_cumulative(self):
5159
"""
5260
`SynchronousSumAggregation` aggregates data for sum metric points
5361
"""
5462

55-
synchronous_sum_aggregation = SynchronousSumAggregation(True)
63+
synchronous_sum_aggregation = SumAggregation(
64+
True, AggregationTemporality.CUMULATIVE
65+
)
5666

5767
synchronous_sum_aggregation.aggregate(Measurement(1))
5868
synchronous_sum_aggregation.aggregate(Measurement(2))
5969
synchronous_sum_aggregation.aggregate(Measurement(3))
6070

6171
self.assertEqual(synchronous_sum_aggregation._value, 6)
6272

63-
synchronous_sum_aggregation = SynchronousSumAggregation(True)
73+
synchronous_sum_aggregation = SumAggregation(
74+
True, AggregationTemporality.CUMULATIVE
75+
)
6476

6577
synchronous_sum_aggregation.aggregate(Measurement(1))
6678
synchronous_sum_aggregation.aggregate(Measurement(-2))
6779
synchronous_sum_aggregation.aggregate(Measurement(3))
6880

6981
self.assertEqual(synchronous_sum_aggregation._value, 2)
7082

71-
def test_collect(self):
83+
def test_collect_delta(self):
7284
"""
7385
`SynchronousSumAggregation` collects sum metric points
7486
"""
7587

76-
synchronous_sum_aggregation = SynchronousSumAggregation(True)
88+
synchronous_sum_aggregation = SumAggregation(
89+
True, AggregationTemporality.DELTA
90+
)
7791

7892
synchronous_sum_aggregation.aggregate(Measurement(1))
7993
first_sum = synchronous_sum_aggregation.collect()
@@ -91,87 +105,37 @@ def test_collect(self):
91105
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
92106
)
93107

94-
95-
class TestAsynchronousSumAggregation(TestCase):
96-
def test_instrument_monotonicity_awareness(self):
108+
def test_collect_cumulative(self):
97109
"""
98-
`AsynchronousSumAggregation` is aware of the instrument monotonicity
110+
`SynchronousSumAggregation` collects sum metric points
99111
"""
100112

101-
asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
102-
self.assertIsInstance(
103-
asynchronous_sum_aggregation,
104-
_InstrumentMonotonicityAwareAggregation,
113+
sum_aggregation = SumAggregation(
114+
True, AggregationTemporality.CUMULATIVE
105115
)
106-
self.assertTrue(asynchronous_sum_aggregation._instrument_is_monotonic)
107-
108-
asynchronous_sum_aggregation = AsynchronousSumAggregation(False)
109-
self.assertFalse(asynchronous_sum_aggregation._instrument_is_monotonic)
110-
111-
def test_aggregate(self):
112-
"""
113-
`AsynchronousSumAggregation` aggregates data for sum metric points
114-
"""
115-
116-
asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
117-
118-
asynchronous_sum_aggregation.aggregate(Measurement(1))
119-
self.assertEqual(asynchronous_sum_aggregation._value, 1)
120-
121-
asynchronous_sum_aggregation.aggregate(Measurement(2))
122-
self.assertEqual(asynchronous_sum_aggregation._value, 2)
123-
124-
asynchronous_sum_aggregation.aggregate(Measurement(3))
125-
self.assertEqual(asynchronous_sum_aggregation._value, 3)
126-
127-
asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
128-
129-
asynchronous_sum_aggregation.aggregate(Measurement(1))
130-
self.assertEqual(asynchronous_sum_aggregation._value, 1)
131-
132-
asynchronous_sum_aggregation.aggregate(Measurement(-2))
133-
self.assertEqual(asynchronous_sum_aggregation._value, -2)
134-
135-
asynchronous_sum_aggregation.aggregate(Measurement(3))
136-
self.assertEqual(asynchronous_sum_aggregation._value, 3)
137-
138-
def test_collect(self):
139-
"""
140-
`AsynchronousSumAggregation` collects sum metric points
141-
"""
142-
143-
asynchronous_sum_aggregation = AsynchronousSumAggregation(True)
144116

145-
self.assertIsNone(asynchronous_sum_aggregation.collect())
146-
147-
asynchronous_sum_aggregation.aggregate(Measurement(1))
148-
first_sum = asynchronous_sum_aggregation.collect()
117+
sum_aggregation.aggregate(Measurement(1))
118+
first_sum = sum_aggregation.collect()
149119

150120
self.assertEqual(first_sum.value, 1)
151121
self.assertTrue(first_sum.is_monotonic)
152122

153-
asynchronous_sum_aggregation.aggregate(Measurement(1))
154-
second_sum = asynchronous_sum_aggregation.collect()
123+
sum_aggregation.aggregate(Measurement(1))
124+
second_sum = sum_aggregation.collect()
155125

156-
self.assertEqual(second_sum.value, 1)
126+
self.assertEqual(second_sum.value, 2)
157127
self.assertTrue(second_sum.is_monotonic)
158128

159129
self.assertEqual(
160130
second_sum.start_time_unix_nano, first_sum.start_time_unix_nano
161131
)
162132

163-
164-
class TestLastValueAggregation(TestCase):
165-
def test_instrument_monotonicity_awareness(self):
166-
"""
167-
`LastValueAggregation` is not aware of the instrument monotonicity
168-
"""
169-
170-
sum_aggregation = LastValueAggregation()
171-
self.assertNotIsInstance(
172-
sum_aggregation, _InstrumentMonotonicityAwareAggregation
133+
self.assertIsNone(
134+
SumAggregation(True, AggregationTemporality.CUMULATIVE).collect()
173135
)
174136

137+
138+
class TestLastValueAggregation(TestCase):
175139
def test_aggregate(self):
176140
"""
177141
`LastValueAggregation` collects data for gauge metric points with delta
@@ -200,6 +164,7 @@ def test_collect(self):
200164

201165
last_value_aggregation.aggregate(Measurement(1))
202166
first_gauge = last_value_aggregation.collect()
167+
self.assertIsInstance(first_gauge, Gauge)
203168

204169
self.assertEqual(first_gauge.value, 1)
205170

0 commit comments

Comments
 (0)