Skip to content

Commit bdb17b2

Browse files
anuraagalzchenxrmx
authored
feat(sdk): implement processor metrics (#5012)
* feat(sdk): add metrics for span and log processors * Most * Finish * Changelog * git add * Format * Fix * Format * logger emit --------- Co-authored-by: Leighton Chen <lechen@microsoft.com> Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
1 parent 2a8014e commit bdb17b2

File tree

7 files changed

+703
-13
lines changed

7 files changed

+703
-13
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935))
3232
- `opentelemetry-sdk`: implement metric reader metrics
3333
([#4970](https://github.com/open-telemetry/opentelemetry-python/pull/4970))
34+
- `opentelemetry-sdk`: implement processor metrics
35+
([#5012](https://github.com/open-telemetry/opentelemetry-python/pull/5012))
3436
- `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0
3537
([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965))
3638
- improve check-links ci job

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,27 @@
3030
get_value,
3131
set_value,
3232
)
33+
from opentelemetry.metrics import MeterProvider, get_meter_provider
3334
from opentelemetry.sdk._logs import (
3435
LogRecordProcessor,
3536
ReadableLogRecord,
3637
ReadWriteLogRecord,
3738
)
38-
from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter
39+
from opentelemetry.sdk._shared_internal import (
40+
BatchProcessor,
41+
DuplicateFilter,
42+
ProcessorMetrics,
43+
)
3944
from opentelemetry.sdk.environment_variables import (
4045
OTEL_BLRP_EXPORT_TIMEOUT,
4146
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
4247
OTEL_BLRP_MAX_QUEUE_SIZE,
4348
OTEL_BLRP_SCHEDULE_DELAY,
4449
)
4550
from opentelemetry.sdk.resources import Resource
51+
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
52+
OtelComponentTypeValues,
53+
)
4654

4755
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4856
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -170,9 +178,19 @@ class SimpleLogRecordProcessor(LogRecordProcessor):
170178
propagating to the application.
171179
"""
172180

173-
def __init__(self, exporter: LogRecordExporter):
181+
def __init__(
182+
self,
183+
exporter: LogRecordExporter,
184+
*,
185+
meter_provider: MeterProvider | None = None,
186+
):
174187
self._exporter = exporter
175188
self._shutdown = False
189+
self._metrics = ProcessorMetrics(
190+
"logs",
191+
OtelComponentTypeValues.SIMPLE_LOG_PROCESSOR,
192+
meter_provider or get_meter_provider(),
193+
)
176194

177195
def on_emit(self, log_record: ReadWriteLogRecord):
178196
# Prevent entering a recursive loop.
@@ -193,6 +211,7 @@ def on_emit(self, log_record: ReadWriteLogRecord):
193211
set_value(_ON_EMIT_RECURSION_COUNT_KEY, cnt + 1), # pyright: ignore[reportOperatorIssue]
194212
)
195213
)
214+
error: Exception | None = None
196215
try:
197216
if self._shutdown:
198217
_logger.warning("Processor is already shutdown, ignoring call")
@@ -211,9 +230,11 @@ def on_emit(self, log_record: ReadWriteLogRecord):
211230
limits=log_record.limits,
212231
)
213232
self._exporter.export((readable_log_record,))
214-
except Exception: # pylint: disable=broad-exception-caught
233+
except Exception as err: # pylint: disable=broad-exception-caught
234+
error = err
215235
_logger.exception("Exception while exporting logs.")
216236
finally:
237+
self._metrics.finish_items(1, error)
217238
detach(token)
218239

219240
def shutdown(self):
@@ -246,6 +267,8 @@ def __init__(
246267
max_export_batch_size: int | None = None,
247268
export_timeout_millis: float | None = None,
248269
max_queue_size: int | None = None,
270+
*,
271+
meter_provider: MeterProvider | None = None,
249272
):
250273
if max_queue_size is None:
251274
max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
@@ -276,6 +299,12 @@ def __init__(
276299
export_timeout_millis,
277300
max_queue_size,
278301
"Log",
302+
ProcessorMetrics(
303+
"logs",
304+
OtelComponentTypeValues.BATCHING_LOG_PROCESSOR,
305+
meter_provider or get_meter_provider(),
306+
capacity=max_queue_size,
307+
),
279308
)
280309

281310
def on_emit(self, log_record: ReadWriteLogRecord) -> None:

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
detach,
3737
set_value,
3838
)
39+
from opentelemetry.sdk._shared_internal._processor_metrics import (
40+
ProcessorMetrics,
41+
)
3942
from opentelemetry.util._once import Once
4043

4144

@@ -98,6 +101,7 @@ def __init__(
98101
export_timeout_millis: float,
99102
max_queue_size: int,
100103
exporting: str,
104+
metrics: ProcessorMetrics,
101105
):
102106
self._bsp_reset_once = Once()
103107
self._exporter = exporter
@@ -127,6 +131,9 @@ def __init__(
127131
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pyright: ignore[reportOptionalCall] pylint: disable=unnecessary-lambda
128132
self._pid = os.getpid()
129133

134+
metrics.register_queue_size(lambda: len(self._queue))
135+
self._metrics = metrics
136+
130137
def _should_export_batch(
131138
self, batch_strategy: BatchExportStrategy, num_iterations: int
132139
) -> bool:
@@ -177,23 +184,27 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
177184
while self._should_export_batch(batch_strategy, iteration):
178185
iteration += 1
179186
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
187+
error: Exception | None = None
188+
count = 0
180189
try:
190+
count = min(
191+
self._max_export_batch_size,
192+
len(self._queue),
193+
)
181194
self._exporter.export(
182195
[
183196
# Oldest records are at the back, so pop from there.
184197
self._queue.pop()
185-
for _ in range(
186-
min(
187-
self._max_export_batch_size,
188-
len(self._queue),
189-
)
190-
)
198+
for _ in range(count)
191199
]
192200
)
193-
except Exception: # pylint: disable=broad-exception-caught
201+
except Exception as err: # pylint: disable=broad-exception-caught
202+
error = err
194203
_logger.exception(
195204
"Exception while exporting %s.", self._exporting
196205
)
206+
finally:
207+
self._metrics.finish_items(count, error)
197208
detach(token)
198209

199210
def emit(self, data: Telemetry) -> None:
@@ -204,6 +215,7 @@ def emit(self, data: Telemetry) -> None:
204215
self._bsp_reset_once.do_once(self._at_fork_reinit)
205216
if len(self._queue) == self._max_queue_size:
206217
_logger.warning("Queue full, dropping %s.", self._exporting)
218+
self._metrics.drop_items(1)
207219
# This will drop a log from the right side if the queue is at _max_queue_size.
208220
self._queue.appendleft(data)
209221
if len(self._queue) >= self._max_export_batch_size:
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from collections import Counter
18+
from collections.abc import Callable
19+
from typing import Literal
20+
21+
from opentelemetry.metrics import CallbackOptions, MeterProvider, Observation
22+
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
23+
OTEL_COMPONENT_NAME,
24+
OTEL_COMPONENT_TYPE,
25+
OtelComponentTypeValues,
26+
)
27+
from opentelemetry.semconv._incubating.metrics.otel_metrics import (
28+
OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE,
29+
OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE,
30+
create_otel_sdk_processor_log_processed,
31+
create_otel_sdk_processor_log_queue_capacity,
32+
create_otel_sdk_processor_span_processed,
33+
create_otel_sdk_processor_span_queue_capacity,
34+
)
35+
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
36+
37+
_component_counter = Counter()
38+
39+
40+
class ProcessorMetrics:
41+
def __init__(
42+
self,
43+
signal: Literal["traces", "logs"],
44+
component_type: OtelComponentTypeValues,
45+
meter_provider: MeterProvider,
46+
*,
47+
capacity: int | None = None,
48+
) -> None:
49+
self._signal = signal
50+
meter = meter_provider.get_meter("opentelemetry-sdk")
51+
self._meter = meter
52+
53+
count = _component_counter[component_type.value]
54+
_component_counter[component_type.value] = count + 1
55+
56+
self._standard_attrs = {
57+
OTEL_COMPONENT_TYPE: component_type.value,
58+
OTEL_COMPONENT_NAME: f"{component_type.value}/{count}",
59+
}
60+
61+
self._dropped_attrs = {
62+
**self._standard_attrs,
63+
ERROR_TYPE: "queue_full",
64+
}
65+
66+
if signal == "traces":
67+
create_processed = create_otel_sdk_processor_span_processed
68+
create_queue_capacity = (
69+
create_otel_sdk_processor_span_queue_capacity
70+
)
71+
else:
72+
create_processed = create_otel_sdk_processor_log_processed
73+
create_queue_capacity = (
74+
create_otel_sdk_processor_log_queue_capacity
75+
)
76+
77+
self._processed = create_processed(meter)
78+
79+
if capacity is not None:
80+
self._queue_capacity = create_queue_capacity(meter)
81+
self._queue_capacity.add(capacity, self._standard_attrs)
82+
83+
def register_queue_size(self, get_queue_size: Callable[[], int]) -> None:
84+
def record_queue_size(
85+
_options: CallbackOptions,
86+
) -> tuple[Observation]:
87+
return (Observation(get_queue_size(), self._standard_attrs),)
88+
89+
if self._signal == "traces":
90+
queue_size_name = OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE
91+
queue_size_description = "The number of spans in the queue of a given instance of an SDK span processor."
92+
queue_size_unit = "{span}"
93+
else:
94+
queue_size_name = OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE
95+
queue_size_description = "The number of logs in the queue of a given instance of an SDK log processor."
96+
queue_size_unit = "{log}"
97+
98+
self._meter.create_observable_up_down_counter(
99+
queue_size_name,
100+
callbacks=(record_queue_size,),
101+
description=queue_size_description,
102+
unit=queue_size_unit,
103+
)
104+
105+
def drop_items(self, count: int) -> None:
106+
self._processed.add(count, self._dropped_attrs)
107+
108+
def finish_items(self, count: int, error: Exception | None) -> None:
109+
if not error:
110+
self._processed.add(count, self._standard_attrs)
111+
return
112+
attrs = {
113+
**self._standard_attrs,
114+
ERROR_TYPE: type(error).__name__,
115+
}
116+
self._processed.add(count, attrs)

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@
2626
detach,
2727
set_value,
2828
)
29-
from opentelemetry.sdk._shared_internal import BatchProcessor
29+
from opentelemetry.metrics import MeterProvider, get_meter_provider
30+
from opentelemetry.sdk._shared_internal import BatchProcessor, ProcessorMetrics
3031
from opentelemetry.sdk.environment_variables import (
3132
OTEL_BSP_EXPORT_TIMEOUT,
3233
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
3334
OTEL_BSP_MAX_QUEUE_SIZE,
3435
OTEL_BSP_SCHEDULE_DELAY,
3536
)
3637
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
38+
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
39+
OtelComponentTypeValues,
40+
)
3741

3842
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
3943
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -91,8 +95,18 @@ class SimpleSpanProcessor(SpanProcessor):
9195
passes ended spans directly to the configured `SpanExporter`.
9296
"""
9397

94-
def __init__(self, span_exporter: SpanExporter):
98+
def __init__(
99+
self,
100+
span_exporter: SpanExporter,
101+
*,
102+
meter_provider: MeterProvider | None = None,
103+
):
95104
self.span_exporter = span_exporter
105+
self._metrics = ProcessorMetrics(
106+
"traces",
107+
OtelComponentTypeValues.SIMPLE_SPAN_PROCESSOR,
108+
meter_provider or get_meter_provider(),
109+
)
96110

97111
def on_start(
98112
self, span: Span, parent_context: typing.Optional[Context] = None
@@ -106,11 +120,15 @@ def on_end(self, span: ReadableSpan) -> None:
106120
if not (span.context and span.context.trace_flags.sampled):
107121
return
108122
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
123+
error: Exception | None = None
109124
try:
110125
self.span_exporter.export((span,))
111126
# pylint: disable=broad-exception-caught
112-
except Exception:
127+
except Exception as err:
128+
error = err
113129
logger.exception("Exception while exporting Span.")
130+
finally:
131+
self._metrics.finish_items(1, error)
114132
detach(token)
115133

116134
def shutdown(self) -> None:
@@ -145,6 +163,8 @@ def __init__(
145163
schedule_delay_millis: float | None = None,
146164
max_export_batch_size: int | None = None,
147165
export_timeout_millis: float | None = None,
166+
*,
167+
meter_provider: MeterProvider | None = None,
148168
):
149169
if max_queue_size is None:
150170
max_queue_size = BatchSpanProcessor._default_max_queue_size()
@@ -176,6 +196,12 @@ def __init__(
176196
export_timeout_millis,
177197
max_queue_size,
178198
"Span",
199+
ProcessorMetrics(
200+
"traces",
201+
OtelComponentTypeValues.BATCHING_SPAN_PROCESSOR,
202+
meter_provider or get_meter_provider(),
203+
capacity=max_queue_size,
204+
),
179205
)
180206

181207
# Added for backward compatibility. Not recommended to directly access/use underlying exporter.

0 commit comments

Comments
 (0)