diff --git a/packages/opentelemetry-instrumentation-azure-search/README.md b/packages/opentelemetry-instrumentation-azure-search/README.md new file mode 100644 index 0000000000..b2964d3bde --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/README.md @@ -0,0 +1,57 @@ +# OpenTelemetry Azure AI Search Instrumentation + +This package provides OpenTelemetry instrumentation for [Azure AI Search](https://learn.microsoft.com/en-us/azure/search/search-what-is-azure-search) (formerly Azure Cognitive Search). + +## Installation + +```bash +pip install opentelemetry-instrumentation-azure-search +``` + +Or with the Azure Search SDK included: + +```bash +pip install 'opentelemetry-instrumentation-azure-search[instruments]' +``` + +## Usage + +### Auto-instrumentation via Traceloop SDK + +```python +from traceloop.sdk import Traceloop + +Traceloop.init(app_name="my_app") +``` + +### Manual instrumentation + +```python +from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor + +AzureSearchInstrumentor().instrument() +``` + +## Instrumented methods + +| Method | Span name | +|---|---| +| `SearchClient.search()` | `azure_search.search` | +| `SearchClient.upload_documents()` | `azure_search.upload_documents` | +| `SearchClient.merge_documents()` | `azure_search.merge_documents` | +| `SearchClient.merge_or_upload_documents()` | `azure_search.merge_or_upload_documents` | +| `SearchClient.delete_documents()` | `azure_search.delete_documents` | + +## Span attributes + +| Attribute | Description | +|---|---| +| `db.system` | Always `azure_search` | +| `server.address` | Azure Search endpoint URL | +| `azure_search.index_name` | Name of the index being queried | +| `azure_search.search_text` | The search query string | +| `azure_search.top` | Max results requested | +| `azure_search.filter` | OData filter expression | +| `azure_search.duration` | Operation duration in seconds | +| `azure_search.affected_documents` | Number of documents in write operations | +| `azure_search.succeeded_documents` | Number of successfully written documents | \ No newline at end of file diff --git a/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/__init__.py b/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/__init__.py new file mode 100644 index 0000000000..456906af9d --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/__init__.py @@ -0,0 +1,165 @@ +"""OpenTelemetry Azure AI Search instrumentation""" + +import logging +import time +from typing import Collection + +from wrapt import wrap_function_wrapper + +from opentelemetry import context as context_api +from opentelemetry.trace import get_tracer, SpanKind +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import ( + _SUPPRESS_INSTRUMENTATION_KEY, + unwrap, +) +from opentelemetry.instrumentation.azure_search.version import __version__ + +logger = logging.getLogger(__name__) + +_instruments = ("azure-search-documents >= 11.0.0",) + +WRAPPED_METHODS = [ + { + "object": "SearchClient", + "method": "search", + "span_name": "azure_search.search", + }, + { + "object": "SearchClient", + "method": "upload_documents", + "span_name": "azure_search.upload_documents", + }, + { + "object": "SearchClient", + "method": "merge_documents", + "span_name": "azure_search.merge_documents", + }, + { + "object": "SearchClient", + "method": "merge_or_upload_documents", + "span_name": "azure_search.merge_or_upload_documents", + }, + { + "object": "SearchClient", + "method": "delete_documents", + "span_name": "azure_search.delete_documents", + }, +] + + +def _set_input_attributes(span, instance, method, args, kwargs): + try: + if hasattr(instance, "_index_name"): + span.set_attribute("azure_search.index_name", instance._index_name) + if hasattr(instance, "_endpoint"): + span.set_attribute("server.address", str(instance._endpoint)) + if method == "search": + search_text = kwargs.get("search_text") or (args[0] if args else None) + if search_text: + span.set_attribute("azure_search.search_text", str(search_text)) + top = kwargs.get("top") + if top: + span.set_attribute("azure_search.top", int(top)) + filter_expr = kwargs.get("filter") + if filter_expr: + span.set_attribute("azure_search.filter", str(filter_expr)) + except Exception as e: + logger.debug("Failed to set input attributes: %s", e) + + +def _set_response_attributes(span, method, response): + try: + if method in ("upload_documents", "merge_documents", + "merge_or_upload_documents", "delete_documents"): + if hasattr(response, "__iter__"): + affected = 0 + succeeded = 0 + for r in response: + affected += 1 + if getattr(r, "succeeded", False): + succeeded += 1 + span.set_attribute("azure_search.affected_documents", affected) + span.set_attribute("azure_search.succeeded_documents", succeeded) + return affected, succeeded + except Exception as e: + logger.debug("Failed to set response attributes: %s", e) + return None, None + + +def _wrap(tracer, to_wrap): + def wrapper(wrapped, instance, args, kwargs): + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + method = to_wrap.get("method") + span_name = to_wrap.get("span_name") + + with tracer.start_as_current_span( + span_name, + kind=SpanKind.CLIENT, + attributes={"db.system": "azure_search"}, + record_exception=False, + set_status_on_exception=False, + ) as span: + if span.is_recording(): + _set_input_attributes(span, instance, method, args, kwargs) + + start_time = time.time() + try: + response = wrapped(*args, **kwargs) + except Exception as e: + span.set_attribute(ERROR_TYPE, e.__class__.__name__) + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + raise + finally: + duration = time.time() - start_time + span.set_attribute("azure_search.duration", round(duration, 4)) + + if span.is_recording(): + affected, succeeded = _set_response_attributes(span, method, response) + if affected is not None and succeeded is not None and succeeded < affected: + span.set_status(Status(StatusCode.ERROR, f"{affected - succeeded} document(s) failed to index")) + else: + span.set_status(Status(StatusCode.OK)) + + return response + + return wrapper + + +class AzureSearchInstrumentor(BaseInstrumentor): + """An instrumentor for Azure AI Search client library.""" + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(__name__, __version__, tracer_provider) + + for wrapped_method in WRAPPED_METHODS: + wrap_function_wrapper( + "azure.search.documents", + f"{wrapped_method['object']}.{wrapped_method['method']}", + _wrap(tracer, wrapped_method), + ) + + def _uninstrument(self, **kwargs): + for wrapped_method in WRAPPED_METHODS: + try: + unwrap( + f"azure.search.documents.{wrapped_method['object']}", + wrapped_method["method"], + ) + except Exception as e: + logger.debug( + "Failed to unwrap %s.%s: %s", + wrapped_method["object"], + wrapped_method["method"], + e, + ) \ No newline at end of file diff --git a/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/version.py b/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/version.py new file mode 100644 index 0000000000..3dc1f76bc6 --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/opentelemetry/instrumentation/azure_search/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/packages/opentelemetry-instrumentation-azure-search/poetry.toml b/packages/opentelemetry-instrumentation-azure-search/poetry.toml new file mode 100644 index 0000000000..ab1033bd37 --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/poetry.toml @@ -0,0 +1,2 @@ +[virtualenvs] +in-project = true diff --git a/packages/opentelemetry-instrumentation-azure-search/pyproject.toml b/packages/opentelemetry-instrumentation-azure-search/pyproject.toml new file mode 100644 index 0000000000..54ec45e652 --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/pyproject.toml @@ -0,0 +1,69 @@ +[project] +name = "opentelemetry-instrumentation-azure-search" +version = "0.1.0" +description = "OpenTelemetry Azure AI Search instrumentation" +authors = [ + { name = "Traceloop", email = "dev@traceloop.com" }, +] +license = "Apache-2.0" +readme = "README.md" +requires-python = ">=3.10,<4" +dependencies = [ + "opentelemetry-api>=1.38.0,<2", + "opentelemetry-instrumentation>=0.59b0", + "opentelemetry-semantic-conventions>=0.63b1", +] + +[project.urls] +Repository = "https://github.com/traceloop/openllmetry/tree/main/packages/opentelemetry-instrumentation-azure-search" + +[project.optional-dependencies] +instruments = ["azure-search-documents>=11.0.0"] + +[project.entry-points."opentelemetry_instrumentor"] +azure_search = "opentelemetry.instrumentation.azure_search:AzureSearchInstrumentor" + +[dependency-groups] +dev = [ + "autopep8>=2.2.0,<3", + "pytest-sugar==1.0.0", + "pytest>=8.2.2,<9", + "ruff>=0.4.0", +] +test = [ + "opentelemetry-sdk>=1.38.0,<2", + "azure-search-documents>=11.0.0", + "pytest>=8.2.2,<9", + "pytest-sugar==1.0.0", + "pytest-recording>=0.13.1,<0.14.0", + "vcrpy>=8.0.0,<9", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["opentelemetry"] + +[tool.coverage.run] +branch = true +source = ["opentelemetry/instrumentation/azure_search"] + +[tool.coverage.report] +exclude_lines = ["if TYPE_CHECKING:"] +show_missing = true + +[tool.ruff] +line-length = 120 +exclude = [ + ".git", + "__pycache__", + "build", + "dist", + ".venv", + ".pytest_cache", +] + +[tool.ruff.lint] +select = ["E", "F", "W"] \ No newline at end of file diff --git a/packages/opentelemetry-instrumentation-azure-search/test_trace.py b/packages/opentelemetry-instrumentation-azure-search/test_trace.py new file mode 100644 index 0000000000..cf31c081af --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/test_trace.py @@ -0,0 +1,26 @@ +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter +from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor +from unittest.mock import patch, MagicMock +from azure.search.documents import SearchClient + +provider = TracerProvider() +provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) +trace.set_tracer_provider(provider) + +AzureSearchInstrumentor().instrument() + +client = SearchClient.__new__(SearchClient) +client._index_name = "demo-index" +client._endpoint = "https://demo.search.windows.net" + +with patch("azure.core.pipeline.Pipeline.run", return_value=MagicMock( + http_response=MagicMock(status_code=200, text=lambda encoding=None: '{"value": []}', headers={}) +)): + try: + client.search("azure openai", top=5) + except Exception: + pass + +print("Done") diff --git a/packages/opentelemetry-instrumentation-azure-search/tests/conftest.py b/packages/opentelemetry-instrumentation-azure-search/tests/conftest.py new file mode 100644 index 0000000000..4bddea5afe --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/tests/conftest.py @@ -0,0 +1,22 @@ +import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor + + +@pytest.fixture(scope="session") +def exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = TracerProvider() + provider.add_span_processor(processor) + instrumentor = AzureSearchInstrumentor() + instrumentor.instrument(tracer_provider=provider) + yield exporter + instrumentor.uninstrument() + + +@pytest.fixture(autouse=True) +def clear_exporter(exporter): + exporter.clear() \ No newline at end of file diff --git a/packages/opentelemetry-instrumentation-azure-search/tests/test_documents.py b/packages/opentelemetry-instrumentation-azure-search/tests/test_documents.py new file mode 100644 index 0000000000..dd495424a9 --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/tests/test_documents.py @@ -0,0 +1,58 @@ +import pytest +from unittest.mock import patch, MagicMock +from azure.search.documents import SearchClient + + +def _make_client(): + client = SearchClient.__new__(SearchClient) + client._index_name = "test-index" + client._endpoint = "https://test.search.windows.net" + return client + + +def _mock_pipeline_response(): + return MagicMock( + http_response=MagicMock( + status_code=200, + text=lambda encoding=None: '{"value": []}', + headers={} + ) + ) + + +def test_upload_documents_creates_span(exporter): + client = _make_client() + with patch("azure.core.pipeline.Pipeline.run", return_value=_mock_pipeline_response()): + try: + client.upload_documents(documents=[{"id": "1"}, {"id": "2"}]) + except Exception: + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "azure_search.upload_documents" + + +def test_merge_documents_creates_span(exporter): + client = _make_client() + with patch("azure.core.pipeline.Pipeline.run", return_value=_mock_pipeline_response()): + try: + client.merge_documents(documents=[{"id": "1", "title": "updated"}]) + except Exception: + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "azure_search.merge_documents" + + +def test_delete_documents_error_captured(exporter): + client = _make_client() + with patch("azure.search.documents._patch.SearchClient.index_documents", side_effect=RuntimeError("Auth failed")): + with pytest.raises(RuntimeError): + client.delete_documents(documents=[{"id": "1"}]) + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + from opentelemetry.trace import StatusCode + assert spans[0].status.status_code == StatusCode.ERROR \ No newline at end of file diff --git a/packages/opentelemetry-instrumentation-azure-search/tests/test_search.py b/packages/opentelemetry-instrumentation-azure-search/tests/test_search.py new file mode 100644 index 0000000000..82c2469781 --- /dev/null +++ b/packages/opentelemetry-instrumentation-azure-search/tests/test_search.py @@ -0,0 +1,51 @@ +import pytest +from unittest.mock import patch, MagicMock +from azure.search.documents import SearchClient + + +def _make_client(): + client = SearchClient.__new__(SearchClient) + client._index_name = "test-index" + client._endpoint = "https://test.search.windows.net" + return client + + +def test_search_creates_span(exporter): + client = _make_client() + with patch("azure.core.pipeline.Pipeline.run", return_value=MagicMock(http_response=MagicMock(status_code=200, text=lambda encoding=None: '{"value": []}', headers={}))): + try: + client.search("hello world", top=5) + except Exception: + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "azure_search.search" + + +def test_search_span_attributes(exporter): + client = _make_client() + client._index_name = "my-index" + with patch("azure.core.pipeline.Pipeline.run", return_value=MagicMock(http_response=MagicMock(status_code=200, text=lambda encoding=None: '{"value": []}', headers={}))): + try: + client.search("test query", top=10) + except Exception: + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes.get("db.system") == "azure_search" + + +def test_search_error_captured(exporter): + client = _make_client() + with patch("azure.core.pipeline.Pipeline.run", side_effect=Exception("Service unavailable")): + try: + results = client.search("query") + list(results) + except Exception: + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "azure_search.search" \ No newline at end of file