Skip to content

Commit a07e039

Browse files
authored
Add _ViewInstrumentMatch (open-telemetry#2400)
1 parent ec3053e commit a07e039

File tree

2 files changed

+225
-0
lines changed

2 files changed

+225
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
from logging import getLogger
17+
from threading import Lock
18+
from typing import Iterable, Set
19+
20+
from opentelemetry.sdk._metrics.aggregation import (
21+
_convert_aggregation_temporality,
22+
)
23+
from opentelemetry.sdk._metrics.measurement import Measurement
24+
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
25+
from opentelemetry.sdk.resources import Resource
26+
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
27+
28+
_logger = getLogger(__name__)
29+
30+
31+
class _ViewInstrumentMatch:
32+
def __init__(
33+
self,
34+
name: str,
35+
unit: str,
36+
description: str,
37+
aggregation: type,
38+
instrumentation_info: InstrumentationInfo,
39+
resource: Resource,
40+
attribute_keys: Set[str] = None,
41+
):
42+
self._name = name
43+
self._unit = unit
44+
self._description = description
45+
self._aggregation = aggregation
46+
self._instrumentation_info = instrumentation_info
47+
self._resource = resource
48+
self._attribute_keys = attribute_keys
49+
self._attributes_aggregation = {}
50+
self._attributes_previous_point = {}
51+
self._lock = Lock()
52+
53+
def consume_measurement(self, measurement: Measurement) -> None:
54+
55+
if self._attribute_keys is not None:
56+
57+
attributes = {}
58+
59+
for key, value in measurement.attributes.items():
60+
if key in self._attribute_keys:
61+
attributes[key] = value
62+
else:
63+
attributes = measurement.attributes
64+
65+
attributes = frozenset(attributes.items())
66+
67+
if attributes not in self._attributes_aggregation.keys():
68+
with self._lock:
69+
self._attributes_aggregation[attributes] = self._aggregation()
70+
71+
self._attributes_aggregation[attributes].aggregate(measurement.value)
72+
73+
def collect(self, temporality: int) -> Iterable[Metric]:
74+
75+
with self._lock:
76+
for (
77+
attributes,
78+
aggregation,
79+
) in self._attributes_aggregation.items():
80+
81+
previous_point = self._attributes_previous_point.get(
82+
attributes
83+
)
84+
85+
current_point = aggregation.collect()
86+
87+
# pylint: disable=assignment-from-none
88+
self._attributes_previous_point[
89+
attributes
90+
] = _convert_aggregation_temporality(
91+
previous_point,
92+
current_point,
93+
AggregationTemporality.CUMULATIVE,
94+
)
95+
96+
if current_point is not None:
97+
98+
yield Metric(
99+
attributes=dict(attributes),
100+
description=self._description,
101+
instrumentation_info=self._instrumentation_info,
102+
name=self._name,
103+
resource=self._resource,
104+
unit=self._unit,
105+
point=_convert_aggregation_temporality(
106+
previous_point,
107+
current_point,
108+
temporality,
109+
),
110+
)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import TestCase
16+
from unittest.mock import Mock
17+
18+
from opentelemetry.sdk._metrics._view_instrument_match import (
19+
_ViewInstrumentMatch,
20+
)
21+
from opentelemetry.sdk._metrics.measurement import Measurement
22+
from opentelemetry.sdk._metrics.point import Metric
23+
24+
25+
class Test_ViewInstrumentMatch(TestCase):
26+
@classmethod
27+
def setUpClass(cls):
28+
29+
cls.mock_aggregation_instance = Mock()
30+
cls.mock_aggregation_class = Mock(
31+
return_value=cls.mock_aggregation_instance
32+
)
33+
cls.mock_resource = Mock()
34+
cls.mock_instrumentation_info = Mock()
35+
36+
def test_consume_measurement(self):
37+
38+
view_instrument_match = _ViewInstrumentMatch(
39+
"name",
40+
"unit",
41+
"description",
42+
self.mock_aggregation_class,
43+
self.mock_instrumentation_info,
44+
self.mock_resource,
45+
{"a", "c"},
46+
)
47+
48+
view_instrument_match.consume_measurement(
49+
Measurement(value=0, attributes={"c": "d", "f": "g"})
50+
)
51+
self.assertEqual(
52+
view_instrument_match._attributes_aggregation,
53+
{frozenset([("c", "d")]): self.mock_aggregation_instance},
54+
)
55+
56+
view_instrument_match.consume_measurement(
57+
Measurement(value=0, attributes={"w": "x", "y": "z"})
58+
)
59+
60+
self.assertEqual(
61+
view_instrument_match._attributes_aggregation,
62+
{
63+
frozenset(): self.mock_aggregation_instance,
64+
frozenset([("c", "d")]): self.mock_aggregation_instance,
65+
},
66+
)
67+
68+
view_instrument_match = _ViewInstrumentMatch(
69+
"name",
70+
"unit",
71+
"description",
72+
self.mock_aggregation_class,
73+
self.mock_instrumentation_info,
74+
self.mock_resource,
75+
)
76+
77+
view_instrument_match.consume_measurement(
78+
Measurement(value=0, attributes={"c": "d", "f": "g"})
79+
)
80+
self.assertEqual(
81+
view_instrument_match._attributes_aggregation,
82+
{
83+
frozenset(
84+
[("c", "d"), ("f", "g")]
85+
): self.mock_aggregation_instance
86+
},
87+
)
88+
89+
def test_collect(self):
90+
91+
view_instrument_match = _ViewInstrumentMatch(
92+
"name",
93+
"unit",
94+
"description",
95+
self.mock_aggregation_class,
96+
self.mock_instrumentation_info,
97+
self.mock_resource,
98+
{"a", "c"},
99+
)
100+
101+
view_instrument_match.consume_measurement(
102+
Measurement(value=0, attributes={"c": "d", "f": "g"})
103+
)
104+
self.assertEqual(
105+
next(view_instrument_match.collect(1)),
106+
Metric(
107+
attributes={"c": "d"},
108+
description="description",
109+
instrumentation_info=self.mock_instrumentation_info,
110+
name="name",
111+
resource=self.mock_resource,
112+
unit="unit",
113+
point=None,
114+
),
115+
)

0 commit comments

Comments
 (0)