diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index c1d51e3fd3..4ea8c0f205 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -14,6 +14,7 @@ import gzip import logging +import os import random import threading import zlib @@ -203,6 +204,28 @@ def __init__( ) self._max_export_batch_size: int | None = max_export_batch_size self._shutdown = False + if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=self._reset_session_after_fork) + + def _reset_session_after_fork(self) -> None: + """ + Reset exporter session in the child process after fork. + + We close the existing session to avoid finalizer warnings if file + descriptors were already closed, then create a new session with the same + headers to prevent reusing the parent's connection state. + """ + try: + headers = self._session.headers.copy() + self._session.close() + + self._session = requests.Session() + self._session.headers.update(headers) + except Exception: + _logger.debug( + "Exception occurred while resetting exporter session", + exc_info=True, + ) def _export( self, serialized_data: bytes, timeout_sec: Optional[float] = None diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 8fe57f3553..8ac8621f09 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=too-many-lines +import os import threading import time +import unittest from logging import WARNING from os import environ from typing import List @@ -1316,6 +1317,36 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post): assert after - before < 0.2 + @unittest.skipUnless( + hasattr(os, "register_at_fork"), "fork session reset not available" + ) + def test_metric_exporter_register_at_fork_resets_session(self): + initial_session = MagicMock(spec=requests.Session) + initial_session.headers = {"preexisting": "yes"} + + new_session = MagicMock(spec=requests.Session) + new_session.headers = {} + + with patch("os.register_at_fork") as mock_register_at_fork, patch( + "opentelemetry.exporter.otlp.proto.http.metric_exporter.requests.Session", + return_value=new_session, + ): + exporter = OTLPMetricExporter( + session=initial_session, headers={"x-test": "1"} + ) + after_in_child = mock_register_at_fork.call_args.kwargs[ + "after_in_child" + ] + after_in_child() + + initial_session.close.assert_called_once() + self.assertEqual(exporter._session, new_session) + self.assertEqual(exporter._session.headers.get("x-test"), "1") + self.assertEqual( + exporter._session.headers.get("Content-Type"), + "application/x-protobuf", + ) + self.assertEqual(exporter._session.headers.get("preexisting"), "yes") def _resource_metrics( index: int, scope_metrics: List[pb2.ScopeMetrics]