diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e7f507360..15b6a43045 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ #### New Features +- Added support for `Session.client_telemetry`. - Added support for `Session.udf_profiler`. ## 1.41.0 (YYYY-MM-DD) diff --git a/setup.py b/setup.py index b666ce9b4f..1a64b40001 100644 --- a/setup.py +++ b/setup.py @@ -231,6 +231,7 @@ def run(self): "opentelemetry": [ "opentelemetry-api>=1.0.0, <2.0.0", "opentelemetry-sdk>=1.0.0, <2.0.0", + "opentelemetry-exporter-otlp>=1.0.0, <2.0.0", ], }, classifiers=[ diff --git a/src/snowflake/snowpark/_internal/event_table_telemetry.py b/src/snowflake/snowpark/_internal/event_table_telemetry.py new file mode 100644 index 0000000000..6bd8a7ee94 --- /dev/null +++ b/src/snowflake/snowpark/_internal/event_table_telemetry.py @@ -0,0 +1,456 @@ +# +# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. +# +import importlib +import logging +from abc import ABC +from logging import getLogger +from typing import Dict, Optional, Tuple +from snowflake.connector.options import MissingOptionalDependency, ModuleLikeObject +from snowflake.connector.wif_util import create_attestation + +import snowflake.snowpark +import requests + +from snowflake.snowpark._internal.utils import parse_table_name + +_logger = getLogger(__name__) + +DEFAULT_EVENT_TABLE = "snowflake.telemetry.events" +SERVICE_NAME = "snow.snowpark.client" + + +class MissingOpenTelemetry(MissingOptionalDependency): + _dep_name = "opentelemetry" + + +def _import_or_missing_opentelemetry() -> Tuple[ModuleLikeObject, bool]: + try: + opentelemetry = importlib.import_module("opentelemetry") + importlib.import_module("opentelemetry.sdk") + importlib.import_module("opentelemetry.exporter.otlp") + importlib.import_module("opentelemetry._logs") + return opentelemetry, True + except ImportError: + return MissingOpenTelemetry(), False + + +opentelemetry, installed_opentelemetry = _import_or_missing_opentelemetry() + +BaseLogProvider = opentelemetry._logs.LoggerProvider if installed_opentelemetry else ABC +BaseTraceProvider = ( + opentelemetry.trace.TracerProvider if installed_opentelemetry else ABC +) +Attributes = opentelemetry.util.types.Attributes if installed_opentelemetry else ABC + + +class RetryWithTokenRefreshAdapter(requests.adapters.HTTPAdapter): + def __init__( + self, + session_instance: "snowflake.snowpark.Session", + header: Dict, + max_retries: int = 3, + ) -> None: + super().__init__() + self.snowpark_session = session_instance + self.max_retries = max_retries + self.header = header + self.retryable_status_code = [401] + + def send(self, request, **kwargs): + """Send request with retry logic and token refresh on failure""" + for attempt in range(self.max_retries + 1): + try: + request.headers.update(self.header) + + response = super().send(request, **kwargs) + + # If successful, return the response + if ( + response.status_code in self.retryable_status_code + and attempt < self.max_retries + ): + self.header = ( + self.snowpark_session._get_external_telemetry_auth_token() + ) + continue + else: + return response + + except (requests.exceptions.RequestException, Exception) as e: + if attempt < self.max_retries: + self.header = ( + self.snowpark_session._get_external_telemetry_auth_token() + ) + continue + else: + # Re-raise the exception if we've exhausted retries + raise e + + +class ProxyTracerProvider(BaseTraceProvider): + def __init__(self, real_provider=None) -> None: + super().__init__() + self._real_provider = real_provider + self._enabled = real_provider is not None + + def set_real_provider(self, provider): + self._real_provider = provider + self._enabled = provider is not None + + def disable(self): + self._enabled = False + + def enable(self): + self._enabled = True + + def get_tracer( + self, + instrumenting_module_name: str, + instrumenting_library_version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> "opentelemetry.trace.Tracer": + if self._enabled and self._real_provider: + return self._real_provider.get_tracer( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + attributes, + ) + else: + # Return a no-op tracer when disabled + return opentelemetry.trace.NoOpTracer() + + def shutdown(self): + if self._real_provider: + self._real_provider.shutdown() + self._real_provider = None + self._enabled = False + + # Delegate span processor methods to real provider + def add_span_processor(self, processor): + if self._real_provider: + self._real_provider.add_span_processor(processor) + + def force_flush(self, timeout_millis=None): + if self._real_provider: + self._real_provider.force_flush(timeout_millis) + + +class ProxyLogProvider(BaseLogProvider): + def __init__(self, real_provider=None) -> None: + super().__init__() + self._real_provider = real_provider + self._enabled = real_provider is not None + + def set_real_provider(self, provider): + self._real_provider = provider + self._enabled = provider is not None + + def disable(self): + self._enabled = False + + def enable(self): + self._enabled = True + + def get_logger( + self, + instrumenting_module_name: str, + instrumenting_library_version: Optional[str] = None, + schema_url: Optional[str] = None, + attributes: Optional[Attributes] = None, + ) -> "opentelemetry._logs.Logger": + if self._enabled and self._real_provider: + return self._real_provider.get_logger( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + attributes, + ) + else: + # Return a no-op logger when disabled + return opentelemetry._logs.NoOpLogger(name="noop") + + def shutdown(self): + if self._real_provider: + self._real_provider.shutdown() + self._real_provider = None + self._enabled = False + + def add_log_record_processor(self, processor): + if self._real_provider: + self._real_provider.add_log_record_processor(processor) + + def force_flush(self, timeout_millis=None): + if self._real_provider: + self._real_provider.force_flush(timeout_millis) + + +class EventTableTelemetry: + def __init__(self, session: "snowflake.snowpark.Session") -> None: + self._tracer_provider = None + self._span_processor = None + self._proxy_tracer_provider = None + self._proxy_log_provider = None + self._logger_provider = None + self._log_processor = None + self._log_handler = None + self._attestation = None + self._event_table = None + self._tracer_provider_enabled = False + self._logger_provider_enabled = False + self.session = session + + def enable_event_table_telemetry_collection( + self, + event_table: str = DEFAULT_EVENT_TABLE, + log_level: int = None, + enable_trace_level: bool = False, + ) -> None: + """ + Enable user to send telemetry to designated event table when necessary dependencies are installed. + + Only traces and logs between `client_telemetry.enable_event_table_telemetry_collection` and + `client_telemetry.disable_event_table_telemetry_collection` will be sent to event table. + You can call `client_telemetry.enable_event_table_telemetry_collection` again to re-enable external + telemetry after it is turned off. + + Note: + This function requires the `opentelemetry` extra from Snowpark. + Install it via pip: + .. code-block:: bash + + pip install "snowflake-snowpark-python[opentelemetry]" + + Examples 1 + .. code-block:: python + + ext = session.client_telemetry + ext.enable_event_table_telemetry_collection("snowflake.telemetry.events", logging.INFO, True) + tracer = trace.get_tracer("my_tracer") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + ext.disable_event_table_telemetry_collection() + + Examples 2 + .. code-block:: python + + ext = session.client_telemetry + logging.info("log before enable event table telemetry collection") # this log is not sent to event table + ext.enable_event_table_telemetry_collection("snowflake.telemetry.events", logging.INFO, True) + tracer = trace.get_tracer("my_tracer") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + ext.disable_event_table_telemetry_collection() + logging.info("out of scope log") # this log is not sent to event table + ext.enable_event_table_telemetry_collection("db.sc.external_et", logging.DEBUG, True) + logging.debug("debug log") # this log is sent to event table because event table telemetry collection is re-enabled + ext.disable_event_table_telemetry_collection() + + """ + if not installed_opentelemetry: + _logger.debug( + f"Opentelemetry dependencies are missing, no telemetry export into event table: {event_table}" + ) + return + + if log_level is None and not enable_trace_level: + _logger.warning( + f"Snowpark python log_level and trace_level are not enabled to collect telemetry into event table: {event_table}." + ) + return + + if len(parse_table_name(event_table)) != 3: + event_table = self.session.get_fully_qualified_name_if_possible(event_table) + _logger.warning( + f"Input event table is converted to fully qualified name: {event_table}." + ) + + self._event_table = event_table + + try: + + url = f"https://{self.session.connection.host}:{self.session.connection.port}/observability/event-table/hostname" + response = requests.get( + url, headers=self._get_external_telemetry_auth_token() + ) + if response.status_code != 200: + response.raise_for_status() + endpoint = response.text + except Exception as e: + _logger.debug( + f"failed to acquire event table endpoint with:{str(e)}, no external telemetry will be collected" + ) + return + + resource = opentelemetry.sdk.resources.Resource.create( + {"service.name": SERVICE_NAME} + ) + + header = self._get_external_telemetry_auth_token() + + if enable_trace_level and self._proxy_tracer_provider is None: + self._init_trace_level(endpoint, header, resource) + elif ( + enable_trace_level + and self._proxy_tracer_provider + and not self._tracer_provider_enabled + ): + self._enable_tracer_provider() + + if log_level is not None and self._log_handler is None: + self._init_log_level(endpoint, header, resource, log_level) + elif ( + log_level is not None + and self._logger_provider + and not self._logger_provider_enabled + ): + self._enable_logger_provider() + + def disable_event_table_telemetry_collection(self) -> None: + if self._tracer_provider: + self._disable_tracer_provider() + + if self._logger_provider: + self._disable_logger_provider() + + def _get_external_telemetry_auth_token(self) -> Dict: + self._attestation = create_attestation( + self.session.connection.auth_class.provider, + self.session.connection.auth_class.entra_resource, + self.session.connection.auth_class.token, + session_manager=( + self.session.connection._session_manager.clone(max_retries=0) + if self.session.connection + else None + ), + ) + headers = { + "Authorization": f"Bearer WIF.AWS.{self._attestation.credential}", + } + if self._event_table is not None: + headers["event-table"] = self._event_table + + return headers + + def _disable_tracer_provider(self) -> None: + if self._proxy_tracer_provider and self._tracer_provider_enabled: + self._proxy_tracer_provider.disable() + self._tracer_provider_enabled = False + + def _enable_tracer_provider(self) -> None: + # Clear the batch processor's internal queue so that span collected during disable is not exported + if self._span_processor._batch_processor and hasattr( + self._span_processor._batch_processor, "_queue" + ): + self._span_processor._batch_processor._queue.clear() + if self._proxy_tracer_provider and not self._tracer_provider_enabled: + self._proxy_tracer_provider.enable() + self._tracer_provider_enabled = True + + def _disable_logger_provider(self) -> None: + if self._proxy_log_provider and self._logger_provider_enabled: + self._proxy_log_provider.disable() + self._logger_provider_enabled = False + + def _enable_logger_provider(self) -> None: + if self._log_processor._batch_processor and hasattr( + self._log_processor._batch_processor, "_queue" + ): + self._log_processor._batch_processor._queue.clear() + if self._proxy_log_provider and not self._logger_provider_enabled: + self._proxy_log_provider.enable() + self._logger_provider_enabled = True + + def _opentelemetry_shutdown(self) -> None: + if self._span_processor is not None: + self._span_processor.shutdown() + if self._log_processor is not None: + self._log_processor.shutdown() + if self._tracer_provider is not None: + self._proxy_tracer_provider.shutdown() + if self._logger_provider is not None: + self._proxy_log_provider.shutdown() + + def _init_trace_level( + self, + endpoint: str, + header: dict, + resource: "opentelemetry.sdk.resources.Resource", + ) -> None: + url = f"https://{endpoint}/v1/traces" + + self._proxy_tracer_provider = ProxyTracerProvider() + opentelemetry.trace.set_tracer_provider(self._proxy_tracer_provider) + + self._tracer_provider = opentelemetry.sdk.trace.TracerProvider( + resource=resource + ) + + trace_session = requests.Session() + trace_session.headers.update(header) + trace_session.mount( + "https://", RetryWithTokenRefreshAdapter(self.session, header) + ) + trace_session.mount( + "http://", RetryWithTokenRefreshAdapter(self.session, header) + ) + + exporter = ( + opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter( + endpoint=url, session=trace_session + ) + ) + self._span_processor = opentelemetry.sdk.trace.export.BatchSpanProcessor( + exporter + ) + self._tracer_provider.add_span_processor(self._span_processor) + self._proxy_tracer_provider.set_real_provider(self._tracer_provider) + self._proxy_tracer_provider.enable() + + self._tracer_provider_enabled = True + + def _init_log_level( + self, + endpoint: str, + header: dict, + resource: "opentelemetry.sdk.resources.Resource", + log_level: int, + ) -> None: + url = f"https://{endpoint}/v1/logs" + self._proxy_log_provider = ProxyLogProvider() + opentelemetry._logs.set_logger_provider(self._proxy_log_provider) + + self._logger_provider = opentelemetry.sdk._logs.LoggerProvider( + resource=resource + ) + + log_session = requests.Session() + log_session.headers.update(header) + log_session.mount( + "https://", RetryWithTokenRefreshAdapter(self.session, header) + ) + log_session.mount("http://", RetryWithTokenRefreshAdapter(self.session, header)) + + exporter = opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter( + endpoint=url, session=log_session + ) + self._log_processor = opentelemetry.sdk._logs.export.BatchLogRecordProcessor( + exporter + ) + self._logger_provider.add_log_record_processor(self._log_processor) + + self._proxy_log_provider.set_real_provider(self._logger_provider) + self._proxy_log_provider.enable() + + self._log_handler = opentelemetry.sdk._logs.LoggingHandler( + logger_provider=self._proxy_log_provider, level=log_level + ) + logging.getLogger().addHandler(self._log_handler) + + self._logger_provider_enabled = True diff --git a/src/snowflake/snowpark/session.py b/src/snowflake/snowpark/session.py index 17648c5f05..ac6251e784 100644 --- a/src/snowflake/snowpark/session.py +++ b/src/snowflake/snowpark/session.py @@ -80,6 +80,7 @@ with_src_position, ) from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages +from snowflake.snowpark._internal.event_table_telemetry import EventTableTelemetry from snowflake.snowpark._internal.packaging_utils import ( DEFAULT_PACKAGES, ENVIRONMENT_METADATA_FILE_NAME, @@ -812,6 +813,7 @@ def __init__( self._udf_profiler = UDFProfiler(session=self) self._dataframe_profiler = DataframeProfiler(session=self) self._catalog = None + self._client_telemetry = EventTableTelemetry(session=self) self._ast_batch = AstBatch(self) @@ -828,6 +830,7 @@ def _close_at_exit(self) -> None: """ with _session_management_lock: try: + self._client_telemetry._opentelemetry_shutdown() self.close() except Exception: pass @@ -4317,6 +4320,57 @@ def stored_procedure_profiler(self) -> StoredProcedureProfiler: """ return self._sp_profiler + @property + def client_telemetry(self) -> EventTableTelemetry: + """ + Returns a :class:`event_table_telemetry.EventTableTelemetry` object that you can use to send telemetry to snowflake event table. + See details of how to use this object in :class:`event_table_telemetry.EventTableTelemetry`. + + `Session.client_telemetry` object enable user to send telemetry to designated event table + when necessary dependencies are installed. Only traces and logs between + `client_telemetry.enable_event_table_telemetry_collection` and `client_telemetry.disable_event_table_telemetry_collection` + will be sent to event table. You can call `client_telemetry.enable_event_table_telemetry_collection` again to re-enable external + telemetry after it is turned off. + + Note: + This function requires the `opentelemetry` extra from Snowpark. + Install it via pip: + .. code-block:: bash + + pip install "snowflake-snowpark-python[opentelemetry]" + + Examples 1 + .. code-block:: python + + ext = session.client_telemetry + ext.enable_event_table_telemetry_collection("snowflake.telemetry.events", logging.INFO, True) + tracer = trace.get_tracer("my_tracer") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + ext.disable_event_table_telemetry_collection() + + Examples 2 + .. code-block:: python + + ext = session.client_telemetry + logging.info("log before enable external telemetry") # this log is not sent to event table + ext.enable_event_table_telemetry_collection("snowflake.telemetry.events", logging.INFO, True) + tracer = trace.get_tracer("my_tracer") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + ext.disable_event_table_telemetry_collection() + logging.info("out of scope log") # this log is not sent to event table + ext.enable_event_table_telemetry_collection("db.sc.external_et", logging.DEBUG, True) + logging.debug("debug log") # this log is sent to event table because external telemetry is re-enabled + ext.disable_event_table_telemetry_collection() + + """ + return self._client_telemetry + @property def udf_profiler(self) -> UDFProfiler: """ diff --git a/tests/integ/test_external_telemetry.py b/tests/integ/test_external_telemetry.py new file mode 100644 index 0000000000..0f3b73bd05 --- /dev/null +++ b/tests/integ/test_external_telemetry.py @@ -0,0 +1,214 @@ +# +# Copyright (c) 2012-2025 Snowflake Computing Inc. All rights reserved. +# + +import logging +import threading +from unittest.mock import patch, MagicMock +import pytest + +from snowflake.snowpark._internal.event_table_telemetry import EventTableTelemetry + +try: + from opentelemetry import trace + from opentelemetry.sdk.resources import Resource # noqa: F401 + from opentelemetry.sdk.trace import TracerProvider # noqa: F401 + from opentelemetry.sdk.trace.export import ( # noqa: F401 + BatchSpanProcessor, + SpanExportResult, + ) # noqa: F401 + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, + ) + from opentelemetry._logs import set_logger_provider # noqa: F401 + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler # noqa: F401 + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor # noqa: F401 + from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + OTLPLogExporter, + ) + from opentelemetry.exporter.otlp.proto.http import Compression # noqa: F401 + from opentelemetry.sdk._logs._internal.export import LogExportResult + + dependencies_missing = False +except Exception: + dependencies_missing = True + +pytestmark = [ + pytest.mark.skipif( + "config.getoption('local_testing_mode', default=False)", + reason="feature not available in local testing", + ), + pytest.mark.skipif( + dependencies_missing, + reason="opentelemetry is not installed", + ), +] + +mock_tracer_results = [] +mock_log_results = [] +lock = threading.RLock() + + +class MockOTLPLogExporter(OTLPLogExporter): + def export(self, batch): + if self._shutdown: + return LogExportResult.FAILURE + + with lock: + mock_log_results.extend(batch) + return LogExportResult.SUCCESS + + +class MockOTLPSpanExporter(OTLPSpanExporter): + def export(self, batch): + if self._shutdown: + return SpanExportResult.FAILURE + with lock: + mock_tracer_results.extend(batch) + return SpanExportResult.SUCCESS + + +class FakeAttestation: + def __init__(self) -> None: + self.credential = "mock_cred" + + +# mock exporter +def make_mock_trace_exporter(*args, **kwargs): + exporter = MockOTLPSpanExporter(*args, **kwargs) + return exporter + + +def make_mock_log_exporter(*args, **kwargs): + exporter = MockOTLPLogExporter(*args, **kwargs) + return exporter + + +def create_mock_response(current_endpoint): + # mock authentication + fake_response = MagicMock() + fake_response.status_code = 200 + fake_response.text = current_endpoint + return fake_response + + +@pytest.fixture(scope="module", autouse=True) +def mock_session(session): + session.connection.auth_class.provider = "mock_provider" + session.connection.auth_class.entra_resource = "mock_resource" + session.connection.auth_class.token = "mock_token" + return session + + +def test_end_to_end(session): + external_telemetry = session.client_telemetry + + mock_response = create_mock_response("https://fake_endpoint") + + # test with mock exporter and authentication + with ( + patch( + "snowflake.snowpark._internal.event_table_telemetry.create_attestation", + return_value=FakeAttestation(), + ), + patch("requests.get", return_value=mock_response), + patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter", + side_effect=make_mock_trace_exporter, + ), + patch( + "opentelemetry.exporter.otlp.proto.http._log_exporter.OTLPLogExporter", + side_effect=make_mock_log_exporter, + ), + ): + # out of scope trace and log + tracer = trace.get_tracer("external_telemetry") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.pos", "before_enable") + logging.info("log before enable") + + assert len(mock_tracer_results) == 0 + assert len(mock_log_results) == 0 + + external_telemetry.enable_event_table_telemetry_collection( + "db.sc.tb", logging.INFO, True + ) + + # in scope trace and log + tracer = trace.get_tracer("external_telemetry") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + logging.info("second log recorded") + + external_telemetry.disable_event_table_telemetry_collection() + # force batch processor to send telemetry + external_telemetry._proxy_tracer_provider.force_flush(1000) + external_telemetry._proxy_log_provider.force_flush(1000) + assert mock_tracer_results[0].attributes == { + "code.lineno": "21", + "code.content": "session.sql(...)", + } + assert mock_log_results[0].log_record.body == "Trace being sent to event table" + assert mock_log_results[1].log_record.body == "second log recorded" + + # clean up after disable + mock_log_results.clear() + mock_tracer_results.clear() + + # out of scope trace and log + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.pos", "after_enable") + logging.info("log after enable") + + assert len(mock_tracer_results) == 0 + assert len(mock_log_results) == 0 + + # re-enable external telemetry + external_telemetry.enable_event_table_telemetry_collection( + "db.sc.tb", logging.INFO, True + ) + + tracer = trace.get_tracer("external_telemetry") + with tracer.start_as_current_span("code_store") as span: + span.set_attribute("code.lineno", "21") + span.set_attribute("code.content", "session.sql(...)") + logging.info("Trace being sent to event table") + logging.info("second log recorded") + + # force batch processor to send telemetry + external_telemetry._proxy_tracer_provider.force_flush(1000) + external_telemetry._proxy_log_provider.force_flush(1000) + assert mock_tracer_results[0].attributes == { + "code.lineno": "21", + "code.content": "session.sql(...)", + } + assert mock_log_results[0].log_record.body == "Trace being sent to event table" + assert mock_log_results[1].log_record.body == "second log recorded" + + +def test_negative_case(session, caplog): + external_telemetry = EventTableTelemetry(session) + external_telemetry.enable_event_table_telemetry_collection("db.sc.tb", None, False) + assert ( + "Snowpark python log_level and trace_level are not enabled to collect telemetry into event table:" + in caplog.text + ) + + external_telemetry.enable_event_table_telemetry_collection( + "no_fully_qualified", logging.INFO, True + ) + assert "Input event table is converted to fully qualified name:" in caplog.text + + with patch( + "snowflake.snowpark._internal.event_table_telemetry.installed_opentelemetry", + False, + ): + external_telemetry.enable_event_table_telemetry_collection( + "db.sc.tb", logging.INFO, True + ) + assert ( + "Opentelemetry dependencies are missing, no telemetry export into event table:" + in caplog.text + )