Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/opentelemetry-instrumentation-azure-search/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# OpenTelemetry Azure AI Search Instrumentation

This package provides OpenTelemetry instrumentation for [Azure AI Search](https://learn.microsoft.com/en-us/azure/search/search-what-is-azure-search) (formerly Azure Cognitive Search).

## Installation

```bash
pip install opentelemetry-instrumentation-azure-search
```

Or with the Azure Search SDK included:

```bash
pip install 'opentelemetry-instrumentation-azure-search[instruments]'
```

## Usage

### Auto-instrumentation via Traceloop SDK

```python
from traceloop.sdk import Traceloop

Traceloop.init(app_name="my_app")
```

### Manual instrumentation

```python
from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor

AzureSearchInstrumentor().instrument()
```

## Instrumented methods

| Method | Span name |
|---|---|
| `SearchClient.search()` | `azure_search.search` |
| `SearchClient.upload_documents()` | `azure_search.upload_documents` |
| `SearchClient.merge_documents()` | `azure_search.merge_documents` |
| `SearchClient.merge_or_upload_documents()` | `azure_search.merge_or_upload_documents` |
| `SearchClient.delete_documents()` | `azure_search.delete_documents` |

## Span attributes

| Attribute | Description |
|---|---|
| `db.system` | Always `azure_search` |
| `server.address` | Azure Search endpoint URL |
| `azure_search.index_name` | Name of the index being queried |
| `azure_search.search_text` | The search query string |
| `azure_search.top` | Max results requested |
| `azure_search.filter` | OData filter expression |
| `azure_search.duration` | Operation duration in seconds |
| `azure_search.affected_documents` | Number of documents in write operations |
| `azure_search.succeeded_documents` | Number of successfully written documents |
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""OpenTelemetry Azure AI Search instrumentation"""

import logging
import time
from typing import Collection

from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
unwrap,
)
from opentelemetry.instrumentation.azure_search.version import __version__

logger = logging.getLogger(__name__)

_instruments = ("azure-search-documents >= 11.0.0",)

WRAPPED_METHODS = [
{
"object": "SearchClient",
"method": "search",
"span_name": "azure_search.search",
},
{
"object": "SearchClient",
"method": "upload_documents",
"span_name": "azure_search.upload_documents",
},
{
"object": "SearchClient",
"method": "merge_documents",
"span_name": "azure_search.merge_documents",
},
{
"object": "SearchClient",
"method": "merge_or_upload_documents",
"span_name": "azure_search.merge_or_upload_documents",
},
{
"object": "SearchClient",
"method": "delete_documents",
"span_name": "azure_search.delete_documents",
},
]


def _set_input_attributes(span, instance, method, args, kwargs):
try:
if hasattr(instance, "_index_name"):
span.set_attribute("azure_search.index_name", instance._index_name)
if hasattr(instance, "_endpoint"):
span.set_attribute("server.address", str(instance._endpoint))
if method == "search":
search_text = kwargs.get("search_text") or (args[0] if args else None)
if search_text:
span.set_attribute("azure_search.search_text", str(search_text))
top = kwargs.get("top")
if top:
span.set_attribute("azure_search.top", int(top))
filter_expr = kwargs.get("filter")
if filter_expr:
span.set_attribute("azure_search.filter", str(filter_expr))
except Exception as e:
logger.debug("Failed to set input attributes: %s", e)


def _set_response_attributes(span, method, response):
try:
if method in ("upload_documents", "merge_documents",
"merge_or_upload_documents", "delete_documents"):
if hasattr(response, "__iter__"):
affected = 0
succeeded = 0
for r in response:
affected += 1
if getattr(r, "succeeded", False):
succeeded += 1
span.set_attribute("azure_search.affected_documents", affected)
span.set_attribute("azure_search.succeeded_documents", succeeded)
return affected, succeeded
except Exception as e:
logger.debug("Failed to set response attributes: %s", e)
return None, None


def _wrap(tracer, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)

method = to_wrap.get("method")
span_name = to_wrap.get("span_name")

with tracer.start_as_current_span(
span_name,
kind=SpanKind.CLIENT,
attributes={"db.system": "azure_search"},
record_exception=False,
set_status_on_exception=False,
) as span:
if span.is_recording():
_set_input_attributes(span, instance, method, args, kwargs)

start_time = time.time()
try:
response = wrapped(*args, **kwargs)
except Exception as e:
span.set_attribute(ERROR_TYPE, e.__class__.__name__)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
finally:
duration = time.time() - start_time
span.set_attribute("azure_search.duration", round(duration, 4))

if span.is_recording():
affected, succeeded = _set_response_attributes(span, method, response)
if affected is not None and succeeded is not None and succeeded < affected:
span.set_status(Status(StatusCode.ERROR, f"{affected - succeeded} document(s) failed to index"))
else:
span.set_status(Status(StatusCode.OK))

return response

return wrapper


class AzureSearchInstrumentor(BaseInstrumentor):
"""An instrumentor for Azure AI Search client library."""

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)

for wrapped_method in WRAPPED_METHODS:
wrap_function_wrapper(
"azure.search.documents",
f"{wrapped_method['object']}.{wrapped_method['method']}",
_wrap(tracer, wrapped_method),
)

def _uninstrument(self, **kwargs):
for wrapped_method in WRAPPED_METHODS:
try:
unwrap(
f"azure.search.documents.{wrapped_method['object']}",
wrapped_method["method"],
)
except Exception as e:
logger.debug(
"Failed to unwrap %s.%s: %s",
wrapped_method["object"],
wrapped_method["method"],
e,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[virtualenvs]
in-project = true
69 changes: 69 additions & 0 deletions packages/opentelemetry-instrumentation-azure-search/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
[project]
name = "opentelemetry-instrumentation-azure-search"
version = "0.1.0"
description = "OpenTelemetry Azure AI Search instrumentation"
authors = [
{ name = "Traceloop", email = "dev@traceloop.com" },
]
license = "Apache-2.0"
readme = "README.md"
requires-python = ">=3.10,<4"
dependencies = [
"opentelemetry-api>=1.38.0,<2",
"opentelemetry-instrumentation>=0.59b0",
"opentelemetry-semantic-conventions>=0.63b1",
]

[project.urls]
Repository = "https://github.com/traceloop/openllmetry/tree/main/packages/opentelemetry-instrumentation-azure-search"

[project.optional-dependencies]
instruments = ["azure-search-documents>=11.0.0"]

[project.entry-points."opentelemetry_instrumentor"]
azure_search = "opentelemetry.instrumentation.azure_search:AzureSearchInstrumentor"

[dependency-groups]
dev = [
"autopep8>=2.2.0,<3",
"pytest-sugar==1.0.0",
"pytest>=8.2.2,<9",
"ruff>=0.4.0",
]
test = [
"opentelemetry-sdk>=1.38.0,<2",
"azure-search-documents>=11.0.0",
"pytest>=8.2.2,<9",
"pytest-sugar==1.0.0",
"pytest-recording>=0.13.1,<0.14.0",
"vcrpy>=8.0.0,<9",
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["opentelemetry"]

[tool.coverage.run]
branch = true
source = ["opentelemetry/instrumentation/azure_search"]

[tool.coverage.report]
exclude_lines = ["if TYPE_CHECKING:"]
show_missing = true

[tool.ruff]
line-length = 120
exclude = [
".git",
"__pycache__",
"build",
"dist",
".venv",
".pytest_cache",
]

[tool.ruff.lint]
select = ["E", "F", "W"]
26 changes: 26 additions & 0 deletions packages/opentelemetry-instrumentation-azure-search/test_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter
from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor
from unittest.mock import patch, MagicMock
from azure.search.documents import SearchClient

provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)

AzureSearchInstrumentor().instrument()

client = SearchClient.__new__(SearchClient)
client._index_name = "demo-index"
client._endpoint = "https://demo.search.windows.net"

with patch("azure.core.pipeline.Pipeline.run", return_value=MagicMock(
http_response=MagicMock(status_code=200, text=lambda encoding=None: '{"value": []}', headers={})
)):
try:
client.search("azure openai", top=5)
except Exception:
pass

print("Done")
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.azure_search import AzureSearchInstrumentor


@pytest.fixture(scope="session")
def exporter():
exporter = InMemorySpanExporter()
processor = SimpleSpanProcessor(exporter)
provider = TracerProvider()
provider.add_span_processor(processor)
instrumentor = AzureSearchInstrumentor()
instrumentor.instrument(tracer_provider=provider)
yield exporter
instrumentor.uninstrument()


@pytest.fixture(autouse=True)
def clear_exporter(exporter):
exporter.clear()
Loading