Skip to content

Commit 4f52160

Browse files
committed
context manager
1 parent ba1f62e commit 4f52160

File tree

4 files changed

+155
-142
lines changed

4 files changed

+155
-142
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_exporter_metrics.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
from __future__ import annotations
1616

1717
from collections import Counter
18+
from contextlib import contextmanager
19+
from dataclasses import dataclass
1820
from time import perf_counter
19-
from typing import TYPE_CHECKING, Callable
21+
from typing import TYPE_CHECKING, Iterator
2022

2123
from opentelemetry.metrics import MeterProvider, get_meter_provider
2224
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
@@ -48,6 +50,12 @@
4850
_component_counter = Counter()
4951

5052

53+
@dataclass
54+
class ExportResult:
55+
error: Exception | None
56+
error_attrs: Attributes
57+
58+
5159
class ExporterMetrics:
5260
def __init__(
5361
self,
@@ -97,16 +105,18 @@ def __init__(
97105
self._exported = create_exported(meter)
98106
self._duration = create_otel_sdk_exporter_operation_duration(meter)
99107

100-
def start_export(
101-
self, num_items: int
102-
) -> Callable[[Exception | None, Attributes], None]:
108+
@contextmanager
109+
def export_operation(self, num_items: int) -> Iterator[ExportResult]:
103110
start_time = perf_counter()
104111
self._inflight.add(num_items, self._standard_attrs)
105112

106-
def finish_export(
107-
error: Exception | None,
108-
error_attrs: Attributes,
109-
):
113+
result = ExportResult()
114+
try:
115+
yield result
116+
finally:
117+
error = result.error
118+
error_attrs = result.error_attrs
119+
110120
end_time = perf_counter()
111121
self._inflight.add(-num_items, self._standard_attrs)
112122
exported_attrs = (
@@ -121,5 +131,3 @@ def finish_export(
121131
else exported_attrs
122132
)
123133
self._duration.record(end_time - start_time, duration_attrs)
124-
125-
return finish_export

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 73 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -441,86 +441,88 @@ def _export(
441441
logger.warning("Exporter already shutdown, ignoring batch")
442442
return self._result.FAILURE # type: ignore [reportReturnType]
443443

444-
finish_export = self._metrics.start_export(self._count_data(data))
445-
446-
# FIXME remove this check if the export type for traces
447-
# gets updated to a class that represents the proto
448-
# TracesData and use the code below instead.
449-
deadline_sec = time() + self._timeout
450-
for retry_num in range(_MAX_RETRYS):
451-
try:
452-
if self._client is None:
453-
return self._result.FAILURE
454-
self._client.Export(
455-
request=self._translate_data(data),
456-
metadata=self._headers,
457-
timeout=deadline_sec - time(),
458-
)
459-
finish_export(None, None)
460-
return self._result.SUCCESS # type: ignore [reportReturnType]
461-
except RpcError as error:
462-
retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue]
463-
"google.rpc.retryinfo-bin" # type: ignore [reportArgumentType]
464-
)
465-
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
466-
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
467-
if retry_info_bin is not None:
468-
retry_info = RetryInfo()
469-
retry_info.ParseFromString(retry_info_bin)
470-
backoff_seconds = (
471-
retry_info.retry_delay.seconds
472-
+ retry_info.retry_delay.nanos / 1.0e9
444+
with self._metrics.export_operation(self._count_data(data)) as result:
445+
# FIXME remove this check if the export type for traces
446+
# gets updated to a class that represents the proto
447+
# TracesData and use the code below instead.
448+
deadline_sec = time() + self._timeout
449+
for retry_num in range(_MAX_RETRYS):
450+
try:
451+
if self._client is None:
452+
return self._result.FAILURE
453+
self._client.Export(
454+
request=self._translate_data(data),
455+
metadata=self._headers,
456+
timeout=deadline_sec - time(),
473457
)
474-
475-
# For UNAVAILABLE errors, reinitialize the channel to force reconnection
476-
if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore
477-
logger.debug(
478-
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
479-
self._exporting,
458+
return self._result.SUCCESS # type: ignore [reportReturnType]
459+
except RpcError as error:
460+
retry_info_bin = dict(error.trailing_metadata()).get( # type: ignore [reportAttributeAccessIssue]
461+
"google.rpc.retryinfo-bin" # type: ignore [reportArgumentType]
480462
)
481-
try:
482-
if self._channel:
483-
self._channel.close()
484-
except Exception as e:
463+
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
464+
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
465+
if retry_info_bin is not None:
466+
retry_info = RetryInfo()
467+
retry_info.ParseFromString(retry_info_bin)
468+
backoff_seconds = (
469+
retry_info.retry_delay.seconds
470+
+ retry_info.retry_delay.nanos / 1.0e9
471+
)
472+
473+
# For UNAVAILABLE errors, reinitialize the channel to force reconnection
474+
if (
475+
error.code() == StatusCode.UNAVAILABLE
476+
and retry_num == 0
477+
): # type: ignore
485478
logger.debug(
486-
"Error closing channel for %s exporter to %s: %s",
479+
"Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error",
480+
self._exporting,
481+
)
482+
try:
483+
if self._channel:
484+
self._channel.close()
485+
except Exception as e:
486+
logger.debug(
487+
"Error closing channel for %s exporter to %s: %s",
488+
self._exporting,
489+
self._endpoint,
490+
str(e),
491+
)
492+
# Enable channel reconnection for subsequent calls
493+
self._initialize_channel_and_stub()
494+
495+
if (
496+
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
497+
or retry_num + 1 == _MAX_RETRYS
498+
or backoff_seconds > (deadline_sec - time())
499+
or self._shutdown
500+
):
501+
logger.error(
502+
"Failed to export %s to %s, error code: %s",
487503
self._exporting,
488504
self._endpoint,
489-
str(e),
505+
error.code(), # type: ignore [reportAttributeAccessIssue]
506+
exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue]
490507
)
491-
# Enable channel reconnection for subsequent calls
492-
self._initialize_channel_and_stub()
493-
494-
if (
495-
error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue]
496-
or retry_num + 1 == _MAX_RETRYS
497-
or backoff_seconds > (deadline_sec - time())
498-
or self._shutdown
499-
):
500-
logger.error(
501-
"Failed to export %s to %s, error code: %s",
508+
result.error = error
509+
result.error_attrs = {
510+
RPC_RESPONSE_STATUS_CODE: error.code().name
511+
}
512+
return self._result.FAILURE # type: ignore [reportReturnType]
513+
logger.warning(
514+
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
515+
error.code(), # type: ignore [reportAttributeAccessIssue]
502516
self._exporting,
503517
self._endpoint,
504-
error.code(), # type: ignore [reportAttributeAccessIssue]
505-
exc_info=error.code() == StatusCode.UNKNOWN, # type: ignore [reportAttributeAccessIssue]
518+
backoff_seconds,
506519
)
507-
finish_export(
508-
error, {RPC_RESPONSE_STATUS_CODE: error.code().name}
509-
)
510-
return self._result.FAILURE # type: ignore [reportReturnType]
511-
logger.warning(
512-
"Transient error %s encountered while exporting %s to %s, retrying in %.2fs.",
513-
error.code(), # type: ignore [reportAttributeAccessIssue]
514-
self._exporting,
515-
self._endpoint,
516-
backoff_seconds,
517-
)
518-
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
519-
if shutdown:
520-
logger.warning("Shutdown in progress, aborting retry.")
521-
break
522-
# Not possible to reach here but the linter is complaining.
523-
return self._result.FAILURE # type: ignore [reportReturnType]
520+
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
521+
if shutdown:
522+
logger.warning("Shutdown in progress, aborting retry.")
523+
break
524+
# Not possible to reach here but the linter is complaining.
525+
return self._result.FAILURE # type: ignore [reportReturnType]
524526

525527
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
526528
"""

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 63 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -271,69 +271,69 @@ def _export_with_retries(
271271
_logger.warning("Exporter already shutdown, ignoring batch")
272272
return MetricExportResult.FAILURE
273273

274-
finish_export = self._metrics.start_export(num_items)
274+
with self._metrics.export_operation(num_items) as result:
275+
serialized_data = export_request.SerializeToString()
276+
deadline_sec = time() + self._timeout
277+
for retry_num in range(_MAX_RETRYS):
278+
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
279+
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
280+
export_error: Optional[Exception] = None
281+
try:
282+
resp = self._export(serialized_data, deadline_sec - time())
283+
if resp.ok:
284+
return MetricExportResult.SUCCESS
285+
except requests.exceptions.RequestException as error:
286+
reason = error
287+
export_error = error
288+
retryable = isinstance(error, ConnectionError)
289+
status_code = None
290+
else:
291+
reason = resp.reason
292+
retryable = _is_retryable(resp)
293+
status_code = resp.status_code
294+
295+
if not retryable:
296+
_logger.error(
297+
"Failed to export metrics batch code: %s, reason: %s",
298+
status_code,
299+
reason,
300+
)
301+
error_attrs = (
302+
{HTTP_RESPONSE_STATUS_CODE: status_code}
303+
if status_code is not None
304+
else None
305+
)
306+
result.error = export_error
307+
result.error_attrs = error_attrs
308+
return MetricExportResult.FAILURE
309+
if (
310+
retry_num + 1 == _MAX_RETRYS
311+
or backoff_seconds > (deadline_sec - time())
312+
or self._shutdown
313+
):
314+
_logger.error(
315+
"Failed to export metrics batch due to timeout, "
316+
"max retries or shutdown."
317+
)
318+
error_attrs = (
319+
{HTTP_RESPONSE_STATUS_CODE: status_code}
320+
if status_code is not None
321+
else None
322+
)
323+
result.error = export_error
324+
result.error_attrs = error_attrs
325+
return MetricExportResult.FAILURE
275326

276-
serialized_data = export_request.SerializeToString()
277-
deadline_sec = time() + self._timeout
278-
for retry_num in range(_MAX_RETRYS):
279-
# multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff.
280-
backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2)
281-
export_error: Optional[Exception] = None
282-
try:
283-
resp = self._export(serialized_data, deadline_sec - time())
284-
if resp.ok:
285-
finish_export(None, None)
286-
return MetricExportResult.SUCCESS
287-
except requests.exceptions.RequestException as error:
288-
reason = error
289-
export_error = error
290-
retryable = isinstance(error, ConnectionError)
291-
status_code = None
292-
else:
293-
reason = resp.reason
294-
retryable = _is_retryable(resp)
295-
status_code = resp.status_code
296-
297-
if not retryable:
298-
_logger.error(
299-
"Failed to export metrics batch code: %s, reason: %s",
300-
status_code,
327+
_logger.warning(
328+
"Transient error %s encountered while exporting metrics batch, retrying in %.2fs.",
301329
reason,
330+
backoff_seconds,
302331
)
303-
error_attrs = (
304-
{HTTP_RESPONSE_STATUS_CODE: status_code}
305-
if status_code is not None
306-
else None
307-
)
308-
finish_export(export_error, error_attrs)
309-
return MetricExportResult.FAILURE
310-
if (
311-
retry_num + 1 == _MAX_RETRYS
312-
or backoff_seconds > (deadline_sec - time())
313-
or self._shutdown
314-
):
315-
_logger.error(
316-
"Failed to export metrics batch due to timeout, "
317-
"max retries or shutdown."
318-
)
319-
error_attrs = (
320-
{HTTP_RESPONSE_STATUS_CODE: status_code}
321-
if status_code is not None
322-
else None
323-
)
324-
finish_export(export_error, error_attrs)
325-
return MetricExportResult.FAILURE
326-
327-
_logger.warning(
328-
"Transient error %s encountered while exporting metrics batch, retrying in %.2fs.",
329-
reason,
330-
backoff_seconds,
331-
)
332-
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
333-
if shutdown:
334-
_logger.warning("Shutdown in progress, aborting retry.")
335-
break
336-
return MetricExportResult.FAILURE
332+
shutdown = self._shutdown_in_progress.wait(backoff_seconds)
333+
if shutdown:
334+
_logger.warning("Shutdown in progress, aborting retry.")
335+
break
336+
return MetricExportResult.FAILURE
337337

338338
def export(
339339
self,
@@ -356,7 +356,9 @@ def export(
356356

357357
# If no batch size configured, export as single batch with retries as configured
358358
if self._max_export_batch_size is None:
359-
return self._export_with_retries(export_request, deadline_sec, num_items)
359+
return self._export_with_retries(
360+
export_request, deadline_sec, num_items
361+
)
360362

361363
# Else, export in batches of configured size
362364
batched_export_requests = _split_metrics_data(

exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,7 @@ def assert_standard_metric_attrs(self, attributes):
16261626
self.assertEqual(attributes["server.address"], "localhost")
16271627
self.assertEqual(attributes["server.port"], 4318)
16281628

1629+
16291630
def _resource_metrics(
16301631
index: int, scope_metrics: List[pb2.ScopeMetrics]
16311632
) -> pb2.ResourceMetrics:

0 commit comments

Comments
 (0)