Skip to content

Commit a3d38a7

Browse files
gen-ai instrumentation(feat): anthropic messages stream method instrumentation (#4499)
1 parent 5a7197e commit a3d38a7

17 files changed

Lines changed: 1597 additions & 259 deletions

AGENTS.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ uv run tox -e typecheck
6565
- The monorepo uses `uv` workspaces.
6666
- `tox.ini` defines the test matrix - check it for available test environments.
6767
- Do not add `type: ignore` comments. If a type error arises, solve it properly or write a follow-up plan to address it in another PR.
68+
- When a file uses `from __future__ import annotations`, do not quote type annotations just to
69+
avoid forward references. Keep quotes only for expressions still evaluated at runtime, such as
70+
`typing.cast(...)`, unless the referenced type is imported at runtime.
6871
- Whenever applicable, all code changes should have tests that actually validate the changes.
6972

7073
## Instrumentation rules

instrumentation-genai/AGENTS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ except Exception as exc:
6161
raise
6262
```
6363

64+
Content capture decisions must come from the shared handler, not from instrumentation-local
65+
environment checks or duplicated helper logic. Evaluate the handler's content-capture API once
66+
when creating wrappers (for example, `capture_content = handler.should_capture_content()`) and
67+
pass that value through invocation/request helpers.
68+
6469
## 4. Semantic conventions
6570

6671
Attributes, spans, events, and metrics follow the

instrumentation-genai/opentelemetry-instrumentation-anthropic/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1414

1515
### Added
1616

17+
- Add instrumentation for Anthropic `Messages.stream()` helper method
18+
([#4499](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4499))
1719
- Add async Anthropic message stream wrappers and manager wrappers, with wrapper
1820
tests ([#4346](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4346))
1921
- `AsyncMessagesStreamWrapper` for async message stream telemetry

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,16 @@
4242
wrap_function_wrapper, # pyright: ignore[reportUnknownVariableType]
4343
)
4444

45-
from opentelemetry.instrumentation.anthropic.package import _instruments
46-
from opentelemetry.instrumentation.anthropic.patch import (
47-
messages_create,
48-
)
4945
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
5046
from opentelemetry.instrumentation.utils import unwrap
5147
from opentelemetry.util.genai.handler import TelemetryHandler
5248

49+
from .package import _instruments
50+
from .patch import (
51+
messages_create,
52+
messages_stream,
53+
)
54+
5355

5456
class AnthropicInstrumentor(BaseInstrumentor):
5557
"""An instrumentor for the Anthropic Python SDK.
@@ -88,12 +90,17 @@ def _instrument(self, **kwargs: Any) -> None:
8890
logger_provider=logger_provider,
8991
)
9092

91-
# Patch Messages.create
93+
# Patch Messages.create and Messages.stream
9294
wrap_function_wrapper(
9395
"anthropic.resources.messages",
9496
"Messages.create",
9597
messages_create(handler),
9698
)
99+
wrap_function_wrapper(
100+
"anthropic.resources.messages",
101+
"Messages.stream",
102+
messages_stream(handler),
103+
)
97104

98105
def _uninstrument(self, **kwargs: Any) -> None:
99106
"""Disable Anthropic instrumentation.
@@ -106,3 +113,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
106113
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
107114
"create",
108115
)
116+
unwrap(
117+
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
118+
"stream",
119+
)

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/messages_extractors.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
from opentelemetry.semconv._incubating.attributes import (
1717
server_attributes as ServerAttributes,
1818
)
19+
from opentelemetry.util.genai.invocation import InferenceInvocation
1920
from opentelemetry.util.genai.types import (
2021
InputMessage,
21-
LLMInvocation,
2222
MessagePart,
2323
OutputMessage,
2424
)
@@ -144,7 +144,7 @@ def get_output_messages_from_message(
144144

145145

146146
def set_invocation_response_attributes(
147-
invocation: LLMInvocation,
147+
invocation: InferenceInvocation,
148148
message: Message | None,
149149
capture_content: bool,
150150
) -> None:
@@ -209,18 +209,15 @@ def extract_params( # pylint: disable=too-many-locals
209209
)
210210

211211

212-
def _set_server_address_and_port(
212+
def get_server_address_and_port(
213213
client_instance: "Messages",
214-
attributes: dict[str, AttributeValue | None],
215-
) -> None:
214+
) -> tuple[str | None, int | None]:
216215
base_url = client_instance._client.base_url
217-
host = base_url.host
218-
if host:
219-
attributes[ServerAttributes.SERVER_ADDRESS] = host
220-
221216
port = base_url.port
222-
if port and port != 443 and port > 0:
223-
attributes[ServerAttributes.SERVER_PORT] = port
217+
return (
218+
base_url.host or None,
219+
port if port and port != 443 and port > 0 else None,
220+
)
224221

225222

226223
def get_llm_request_attributes(
@@ -236,5 +233,9 @@ def get_llm_request_attributes(
236233
GenAIAttributes.GEN_AI_REQUEST_TOP_K: params.top_k,
237234
GenAIAttributes.GEN_AI_REQUEST_STOP_SEQUENCES: params.stop_sequences,
238235
}
239-
_set_server_address_and_port(client_instance, attributes)
236+
address, port = get_server_address_and_port(client_instance)
237+
if address is not None:
238+
attributes[ServerAttributes.SERVER_ADDRESS] = address
239+
if port is not None:
240+
attributes[ServerAttributes.SERVER_PORT] = port
240241
return {k: v for k, v in attributes.items() if v is not None}

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py

Lines changed: 79 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,25 @@
1515
gen_ai_attributes as GenAIAttributes,
1616
)
1717
from opentelemetry.util.genai.handler import TelemetryHandler
18-
from opentelemetry.util.genai.types import (
19-
Error,
20-
LLMInvocation, # TODO: migrate to InferenceInvocation
21-
)
22-
from opentelemetry.util.genai.utils import (
23-
should_capture_content_on_spans_in_experimental_mode,
24-
)
18+
from opentelemetry.util.genai.invocation import InferenceInvocation
2519

2620
from .messages_extractors import (
2721
extract_params,
2822
get_input_messages,
2923
get_llm_request_attributes,
24+
get_server_address_and_port,
3025
get_system_instruction,
3126
)
3227
from .wrappers import (
28+
MessagesStreamManagerWrapper,
3329
MessagesStreamWrapper,
3430
MessageWrapper,
3531
)
3632

3733
if TYPE_CHECKING:
34+
from anthropic.lib.streaming._messages import ( # pylint: disable=no-name-in-module
35+
MessageStreamManager,
36+
)
3837
from anthropic.resources.messages import Messages
3938
from anthropic.types import RawMessageStreamEvent
4039

@@ -48,73 +47,109 @@ def messages_create(
4847
) -> Callable[
4948
...,
5049
Union[
51-
"AnthropicMessage",
52-
"AnthropicStream[RawMessageStreamEvent]",
50+
AnthropicMessage,
51+
AnthropicStream[RawMessageStreamEvent],
5352
MessagesStreamWrapper[None],
5453
],
5554
]:
5655
"""Wrap the `create` method of the `Messages` class to trace it."""
57-
capture_content = should_capture_content_on_spans_in_experimental_mode()
56+
capture_content = handler.should_capture_content()
5857

5958
def traced_method(
6059
wrapped: Callable[
6160
...,
6261
Union[
63-
"AnthropicMessage",
64-
"AnthropicStream[RawMessageStreamEvent]",
62+
AnthropicMessage,
63+
AnthropicStream[RawMessageStreamEvent],
6564
],
6665
],
67-
instance: "Messages",
66+
instance: Messages,
6867
args: tuple[Any, ...],
6968
kwargs: dict[str, Any],
7069
) -> Union[
71-
"AnthropicMessage",
72-
"AnthropicStream[RawMessageStreamEvent]",
70+
AnthropicMessage,
71+
AnthropicStream[RawMessageStreamEvent],
7372
MessagesStreamWrapper[None],
7473
]:
75-
params = extract_params(*args, **kwargs)
76-
attributes = get_llm_request_attributes(params, instance)
77-
request_model_attribute = attributes.get(
78-
GenAIAttributes.GEN_AI_REQUEST_MODEL
79-
)
80-
request_model = (
81-
request_model_attribute
82-
if isinstance(request_model_attribute, str)
83-
else params.model
74+
invocation = _create_invocation(
75+
handler, instance, args, kwargs, capture_content
8476
)
85-
86-
invocation = LLMInvocation(
87-
request_model=request_model,
88-
provider=ANTHROPIC,
89-
input_messages=get_input_messages(params.messages)
90-
if capture_content
91-
else [],
92-
system_instruction=get_system_instruction(params.system)
93-
if capture_content
94-
else [],
95-
attributes=attributes,
96-
)
97-
98-
# Use manual lifecycle management for both streaming and non-streaming
99-
handler.start_llm(invocation)
10077
try:
10178
result = wrapped(*args, **kwargs)
10279
if isinstance(result, AnthropicStream):
10380
return MessagesStreamWrapper(
104-
result, handler, invocation, capture_content
81+
result, invocation, capture_content
10582
)
10683

10784
wrapper = MessageWrapper(result, capture_content)
10885
wrapper.extract_into(invocation)
109-
handler.stop_llm(invocation)
86+
invocation.stop()
11087
return wrapper.message
11188
except Exception as exc:
112-
handler.fail_llm(
113-
invocation, Error(message=str(exc), type=type(exc))
114-
)
89+
invocation.fail(exc)
11590
raise
11691

11792
return cast(
11893
'Callable[..., Union["AnthropicMessage", "AnthropicStream[RawMessageStreamEvent]", MessagesStreamWrapper[None]]]',
11994
traced_method,
12095
)
96+
97+
98+
def _create_invocation(
99+
handler: TelemetryHandler,
100+
instance: Messages,
101+
args: tuple[Any, ...],
102+
kwargs: dict[str, Any],
103+
capture_content: bool,
104+
) -> InferenceInvocation:
105+
params = extract_params(*args, **kwargs)
106+
attributes = get_llm_request_attributes(params, instance)
107+
request_model_attribute = attributes.get(
108+
GenAIAttributes.GEN_AI_REQUEST_MODEL
109+
)
110+
request_model = (
111+
request_model_attribute
112+
if isinstance(request_model_attribute, str)
113+
else params.model
114+
)
115+
116+
server_address, server_port = get_server_address_and_port(instance)
117+
invocation = handler.start_inference(
118+
provider=ANTHROPIC,
119+
request_model=request_model,
120+
server_address=server_address,
121+
server_port=server_port,
122+
)
123+
invocation.input_messages = (
124+
get_input_messages(params.messages) if capture_content else []
125+
)
126+
invocation.system_instruction = (
127+
get_system_instruction(params.system) if capture_content else []
128+
)
129+
invocation.attributes = attributes
130+
return invocation
131+
132+
133+
def messages_stream(
134+
handler: TelemetryHandler,
135+
) -> Callable[..., MessagesStreamManagerWrapper[Any]]:
136+
"""Wrap the sync `stream` method of the `Messages` class."""
137+
capture_content = handler.should_capture_content()
138+
139+
def traced_method(
140+
wrapped: Callable[..., MessageStreamManager],
141+
instance: Messages,
142+
args: tuple[Any, ...],
143+
kwargs: dict[str, Any],
144+
) -> MessagesStreamManagerWrapper[Any]:
145+
return MessagesStreamManagerWrapper(
146+
wrapped(*args, **kwargs),
147+
lambda: _create_invocation(
148+
handler, instance, args, kwargs, capture_content
149+
),
150+
capture_content,
151+
)
152+
153+
return cast(
154+
"Callable[..., MessagesStreamManagerWrapper[Any]]", traced_method
155+
)

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/utils.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@
77

88
import base64
99
import json
10+
from collections.abc import Mapping
1011
from dataclasses import dataclass
11-
from typing import TYPE_CHECKING, Any
12+
from typing import TYPE_CHECKING, Any, cast
1213

1314
from anthropic.types import (
1415
InputJSONDelta,
@@ -32,7 +33,7 @@
3233
)
3334

3435
if TYPE_CHECKING:
35-
from collections.abc import Iterable, Mapping
36+
from collections.abc import Iterable
3637

3738
from anthropic.types import (
3839
ContentBlock,
@@ -149,12 +150,9 @@ def _convert_content_block_to_part(
149150
id=block.tool_use_id,
150151
)
151152

152-
# ContentBlockParam variants are TypedDicts (dicts at runtime);
153-
# newer SDK versions may add Pydantic block types not handled above.
154-
if isinstance(block, dict):
155-
return _convert_dict_block_to_part(block)
156-
157-
return None
153+
if not hasattr(block, "get"):
154+
return None
155+
return _convert_dict_block_to_part(cast(Mapping[str, Any], block))
158156

159157

160158
def convert_content_to_parts(

0 commit comments

Comments
 (0)