Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/core_contrib_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,36 @@ jobs:
- name: Run tests
run: tox -e py310-test-instrumentation-anthropic-latest -- -ra

py310-test-instrumentation-anthropic-pydantic1:
name: instrumentation-anthropic-pydantic1
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout contrib repo @ SHA - ${{ env.CONTRIB_REPO_SHA }}
uses: actions/checkout@v4
with:
repository: open-telemetry/opentelemetry-python-contrib
ref: ${{ env.CONTRIB_REPO_SHA }}

- name: Checkout core repo @ SHA - ${{ env.CORE_REPO_SHA }}
uses: actions/checkout@v4
with:
repository: open-telemetry/opentelemetry-python
ref: ${{ env.CORE_REPO_SHA }}
path: opentelemetry-python

- name: Set up Python 3.10
uses: actions/setup-python@v5
with:
python-version: "3.10"
architecture: "x64"

- name: Install tox
run: pip install tox-uv

- name: Run tests
run: tox -e py310-test-instrumentation-anthropic-pydantic1 -- -ra

py310-test-instrumentation-claude-agent-sdk-oldest:
name: instrumentation-claude-agent-sdk-oldest
runs-on: ubuntu-latest
Expand Down
19 changes: 19 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,25 @@ jobs:
- name: Run tests
run: tox -e py314-test-instrumentation-anthropic-latest -- -ra

py310-test-instrumentation-anthropic-pydantic1_ubuntu-latest:
name: instrumentation-anthropic-pydantic1 3.10 Ubuntu
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout repo @ SHA - ${{ github.sha }}
uses: actions/checkout@v4

- name: Set up Python 3.10
uses: actions/setup-python@v5
with:
python-version: "3.10"

- name: Install tox
run: pip install tox-uv

- name: Run tests
run: tox -e py310-test-instrumentation-anthropic-pydantic1 -- -ra

py310-test-instrumentation-claude-agent-sdk-oldest_ubuntu-latest:
name: instrumentation-claude-agent-sdk-oldest 3.10 Ubuntu
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Add Pydantic validation for Anthropic message extractor inputs and responses
([#4475](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4475))
- Add async Anthropic message stream wrappers and manager wrappers, with wrapper
tests ([#4346](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4346))
- `AsyncMessagesStreamWrapper` for async message stream telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Sequence
import logging
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, List, Optional, TypeVar, Union

from anthropic.types import MessageDeltaUsage
from pydantic import (
BaseModel,
Field,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
ValidationError,
)

from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
Expand All @@ -40,8 +50,15 @@
normalize_finish_reason,
)

_PYDANTIC_V2 = hasattr(BaseModel, "model_validate")

if _PYDANTIC_V2:
from pydantic import ConfigDict
else:
ConfigDict = None

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
from collections.abc import Iterable, Sequence

import httpx
from anthropic.resources.messages import Messages
Expand All @@ -57,17 +74,36 @@
)


@dataclass
class MessageRequestParams:
model: str | None = None
max_tokens: int | None = None
temperature: float | None = None
top_k: int | None = None
top_p: float | None = None
stop_sequences: Sequence[str] | None = None
stream: bool | None = None
messages: Iterable[MessageParam] | None = None
system: str | Iterable[TextBlockParam] | None = None
_logger = logging.getLogger(__name__)
ModelT = TypeVar("ModelT", bound=BaseModel)


class _ExtractorModel(BaseModel):
if _PYDANTIC_V2:
assert ConfigDict is not None
model_config = ConfigDict(extra="ignore", from_attributes=True)
else:

class Config:
extra = "ignore"
orm_mode = True


class _InputMessageParamModel(_ExtractorModel):
role: Optional[StrictStr] = None
content: Any = None


class MessageRequestParams(_ExtractorModel):
model: Optional[StrictStr] = None
max_tokens: Optional[StrictInt] = None
temperature: Optional[Union[StrictFloat, StrictInt]] = None
top_k: Optional[StrictInt] = None
top_p: Optional[Union[StrictFloat, StrictInt]] = None
stop_sequences: Optional[List[StrictStr]] = None
stream: Optional[StrictBool] = None
messages: Optional[List[_InputMessageParamModel]] = None
system: Optional[Union[StrictStr, List[Any]]] = None


GEN_AI_USAGE_CACHE_CREATION_INPUT_TOKENS = (
Expand All @@ -76,24 +112,89 @@ class MessageRequestParams:
GEN_AI_USAGE_CACHE_READ_INPUT_TOKENS = "gen_ai.usage.cache_read.input_tokens"


@dataclass
class UsageTokens:
input_tokens: int | None = None
output_tokens: int | None = None
cache_creation_input_tokens: int | None = None
cache_read_input_tokens: int | None = None
class UsageTokens(_ExtractorModel):
input_tokens: Optional[StrictInt] = None
output_tokens: Optional[StrictInt] = None
cache_creation_input_tokens: Optional[StrictInt] = None
cache_read_input_tokens: Optional[StrictInt] = None


class _UsageModel(_ExtractorModel):
input_tokens: Optional[StrictInt] = None
output_tokens: Optional[StrictInt] = None
cache_creation_input_tokens: Optional[StrictInt] = None
cache_read_input_tokens: Optional[StrictInt] = None


class _MessageResultModel(_ExtractorModel):
role: Optional[StrictStr] = None
content: List[Any] = Field(default_factory=list)
stop_reason: Optional[StrictStr] = None
model: Optional[StrictStr] = None
id: Optional[StrictStr] = None
usage: Optional[_UsageModel] = None


def _rebuild_model(model_type: type[BaseModel]) -> None:
if _PYDANTIC_V2:
model_type.model_rebuild(_types_namespace=globals())
else:
update_forward_refs = getattr(model_type, "update_forward_refs")
update_forward_refs(**globals())


for _model_type in (
_InputMessageParamModel,
MessageRequestParams,
UsageTokens,
_UsageModel,
_MessageResultModel,
):
_rebuild_model(_model_type)


def _validate_model(
model_type: type[ModelT], value: object, context: str
) -> ModelT | None:
try:
if _PYDANTIC_V2:
return model_type.model_validate(value)
if isinstance(value, Mapping):
parse_obj = getattr(model_type, "parse_obj")
return parse_obj(value)
from_orm = getattr(model_type, "from_orm")
return from_orm(value)
except ValidationError:
_logger.debug(
"Anthropic messages extractor validation failed for %s",
context,
exc_info=True,
)
return None


def _validate_usage(usage: object) -> _UsageModel | None:
return _validate_model(_UsageModel, usage, "usage payload")


def _validate_message_result(result: object) -> _MessageResultModel | None:
return _validate_model(_MessageResultModel, result, "message payload")


def extract_usage_tokens(
usage: Usage | MessageDeltaUsage | None,
usage: Usage | MessageDeltaUsage | _UsageModel | None,
) -> UsageTokens:
if usage is None:
return UsageTokens()

input_tokens = usage.input_tokens
output_tokens = usage.output_tokens
cache_creation_input_tokens = usage.cache_creation_input_tokens
cache_read_input_tokens = usage.cache_read_input_tokens
validated_usage = _validate_usage(usage)
if validated_usage is None:
return UsageTokens()

input_tokens = validated_usage.input_tokens
output_tokens = validated_usage.output_tokens
cache_creation_input_tokens = validated_usage.cache_creation_input_tokens
cache_read_input_tokens = validated_usage.cache_read_input_tokens

if (
input_tokens is None
Expand All @@ -117,37 +218,54 @@ def extract_usage_tokens(


def get_input_messages(
messages: Iterable[MessageParam] | None,
messages: Iterable[MessageParam] | list[_InputMessageParamModel] | None,
) -> list[InputMessage]:
if messages is None:
return []
result: list[InputMessage] = []
for message in messages:
role = message["role"]
parts = convert_content_to_parts(message["content"])
validated_message = (
message
if isinstance(message, _InputMessageParamModel)
else _validate_model(
_InputMessageParamModel, message, "input message"
)
)
if validated_message is None or validated_message.role is None:
continue
role = validated_message.role
parts = convert_content_to_parts(validated_message.content)
result.append(InputMessage(role=role, parts=parts))
return result


def get_system_instruction(
system: str | Iterable[TextBlockParam] | None,
system: str | Iterable[TextBlockParam] | list[Any] | None,
) -> list[MessagePart]:
if system is None:
return []
return convert_content_to_parts(system)


def get_output_messages_from_message(
message: Message | None,
message: Message | _MessageResultModel | None,
) -> list[OutputMessage]:
if message is None:
return []

parts = convert_content_to_parts(message.content)
finish_reason = normalize_finish_reason(message.stop_reason)
validated_message = (
message
if isinstance(message, _MessageResultModel)
else _validate_message_result(message)
)
if validated_message is None:
return []

parts = convert_content_to_parts(validated_message.content)
finish_reason = normalize_finish_reason(validated_message.stop_reason)
return [
OutputMessage(
role=message.role,
role=validated_message.role or "assistant",
parts=parts,
finish_reason=finish_reason or "",
)
Expand All @@ -162,14 +280,18 @@ def set_invocation_response_attributes(
if message is None:
return

invocation.response_model_name = message.model
invocation.response_id = message.id
validated_message = _validate_message_result(message)
if validated_message is None:
return

invocation.response_model_name = validated_message.model
invocation.response_id = validated_message.id

finish_reason = normalize_finish_reason(message.stop_reason)
finish_reason = normalize_finish_reason(validated_message.stop_reason)
if finish_reason:
invocation.finish_reasons = [finish_reason]

tokens = extract_usage_tokens(message.usage)
tokens = extract_usage_tokens(validated_message.usage)
invocation.input_tokens = tokens.input_tokens
invocation.output_tokens = tokens.output_tokens
if tokens.cache_creation_input_tokens is not None:
Expand All @@ -182,7 +304,9 @@ def set_invocation_response_attributes(
)

if capture_content:
invocation.output_messages = get_output_messages_from_message(message)
invocation.output_messages = get_output_messages_from_message(
validated_message
)


def extract_params( # pylint: disable=too-many-locals
Expand All @@ -207,17 +331,22 @@ def extract_params( # pylint: disable=too-many-locals
timeout: float | httpx.Timeout | None = None,
**_kwargs: object,
) -> MessageRequestParams:
return MessageRequestParams(
model=model,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
stop_sequences=stop_sequences,
stream=stream,
messages=messages,
system=system,
params = _validate_model(
MessageRequestParams,
{
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"stop_sequences": stop_sequences,
"stream": stream,
"messages": messages,
"system": system,
},
"request parameters",
)
return params or MessageRequestParams()


def _set_server_address_and_port(
Expand Down
Loading
Loading