Skip to content

Commit 988fa6e

Browse files
committed
Add sync streaming support for Anthropic instrumentation
- Add support for Messages.create(stream=True) with StreamWrapper - Add support for Messages.stream() with MessageStreamManagerWrapper - Add MessageWrapper for non-streaming response telemetry - Rename MessageCreateParams to MessageRequestParams - Add comprehensive tests for sync streaming functionality
1 parent 262a097 commit 988fa6e

14 files changed

Lines changed: 1916 additions & 29 deletions

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/__init__.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@
5454
)
5555

5656
from opentelemetry.instrumentation.anthropic.package import _instruments
57-
from opentelemetry.instrumentation.anthropic.patch import messages_create
57+
from opentelemetry.instrumentation.anthropic.patch import (
58+
messages_create,
59+
messages_stream,
60+
)
5861
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
5962
from opentelemetry.instrumentation.utils import unwrap
6063
from opentelemetry.util.genai.handler import TelemetryHandler
@@ -103,6 +106,13 @@ def _instrument(self, **kwargs: Any) -> None:
103106
wrapper=messages_create(handler),
104107
)
105108

109+
# Patch Messages.stream
110+
wrap_function_wrapper(
111+
module="anthropic.resources.messages",
112+
name="Messages.stream",
113+
wrapper=messages_stream(handler),
114+
)
115+
106116
def _uninstrument(self, **kwargs: Any) -> None:
107117
"""Disable Anthropic instrumentation.
108118
@@ -114,3 +124,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
114124
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
115125
"create",
116126
)
127+
unwrap(
128+
anthropic.resources.messages.Messages, # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType,reportUnknownArgumentType]
129+
"stream",
130+
)

instrumentation-genai/opentelemetry-instrumentation-anthropic/src/opentelemetry/instrumentation/anthropic/patch.py

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,40 @@
1414

1515
"""Patching functions for Anthropic instrumentation."""
1616

17-
from typing import TYPE_CHECKING, Any, Callable
17+
from typing import TYPE_CHECKING, Any, Callable, Union
1818

1919
from opentelemetry.semconv._incubating.attributes import (
2020
gen_ai_attributes as GenAIAttributes,
2121
)
2222
from opentelemetry.util.genai.handler import TelemetryHandler
23-
from opentelemetry.util.genai.types import LLMInvocation
23+
from opentelemetry.util.genai.types import Error, LLMInvocation
2424

2525
from .utils import (
26+
MessageStreamManagerWrapper,
27+
MessageWrapper,
28+
StreamWrapper,
2629
extract_params,
2730
get_llm_request_attributes,
2831
)
2932

3033
if TYPE_CHECKING:
34+
from anthropic._streaming import Stream
35+
from anthropic.lib.streaming import MessageStreamManager
3136
from anthropic.resources.messages import Messages
32-
from anthropic.types import Message
37+
from anthropic.types import Message, RawMessageStreamEvent
3338

3439

3540
def messages_create(
3641
handler: TelemetryHandler,
37-
) -> Callable[..., "Message"]:
42+
) -> Callable[..., Union["Message", "Stream[RawMessageStreamEvent]"]]:
3843
"""Wrap the `create` method of the `Messages` class to trace it."""
3944

4045
def traced_method(
41-
wrapped: Callable[..., "Message"],
46+
wrapped: Callable[..., Union["Message", "Stream[RawMessageStreamEvent]"]],
4247
instance: "Messages",
4348
args: tuple[Any, ...],
4449
kwargs: dict[str, Any],
45-
) -> "Message":
50+
) -> Union["Message", StreamWrapper]:
4651
params = extract_params(*args, **kwargs)
4752
attributes = get_llm_request_attributes(params, instance)
4853
request_model = str(
@@ -57,22 +62,60 @@ def traced_method(
5762
attributes=attributes,
5863
)
5964

60-
with handler.llm(invocation) as invocation:
65+
is_streaming = kwargs.get("stream", False)
66+
67+
# Use manual lifecycle management for both streaming and non-streaming
68+
handler.start_llm(invocation)
69+
try:
6170
result = wrapped(*args, **kwargs)
71+
if is_streaming:
72+
return StreamWrapper(result, handler, invocation)
73+
wrapper = MessageWrapper(result, handler, invocation)
74+
return wrapper.message
75+
except Exception as exc:
76+
handler.fail_llm(
77+
invocation, Error(message=str(exc), type=type(exc))
78+
)
79+
raise
6280

63-
if result.model:
64-
invocation.response_model_name = result.model
81+
return traced_method
6582

66-
if result.id:
67-
invocation.response_id = result.id
6883

69-
if result.stop_reason:
70-
invocation.finish_reasons = [result.stop_reason]
84+
def messages_stream(
85+
handler: TelemetryHandler,
86+
) -> Callable[..., "MessageStreamManager"]:
87+
"""Wrap the `stream` method of the `Messages` class to trace it."""
7188

72-
if result.usage:
73-
invocation.input_tokens = result.usage.input_tokens
74-
invocation.output_tokens = result.usage.output_tokens
89+
def traced_method(
90+
wrapped: Callable[..., "MessageStreamManager"],
91+
instance: "Messages",
92+
args: tuple[Any, ...],
93+
kwargs: dict[str, Any],
94+
) -> MessageStreamManagerWrapper:
95+
params = extract_params(*args, **kwargs)
96+
attributes = get_llm_request_attributes(params, instance)
97+
request_model = str(
98+
attributes.get(GenAIAttributes.GEN_AI_REQUEST_MODEL)
99+
or params.model
100+
or "unknown"
101+
)
102+
103+
invocation = LLMInvocation(
104+
request_model=request_model,
105+
provider="anthropic",
106+
attributes=attributes,
107+
)
75108

76-
return result
109+
# Start the span before calling the wrapped method
110+
handler.start_llm(invocation)
111+
try:
112+
result = wrapped(*args, **kwargs)
113+
# Return wrapped MessageStreamManager
114+
return MessageStreamManagerWrapper(result, handler, invocation)
115+
except Exception as exc:
116+
handler.fail_llm(
117+
invocation, Error(message=str(exc), type=type(exc))
118+
)
119+
raise
77120

78121
return traced_method

0 commit comments

Comments
 (0)