Skip to content

Commit 96ebbf6

Browse files
fix(litellm): Avoid double span exits when streaming (#5933)
Avoid an unhandled exception by only exiting the span on the final invocation of `_success_callback` when litellm streams a response. Remove the shared span reference in the metadata dictionary immediately before the span is finished, to avoid race conditions between concurrent `_success_callback` invocations. The `litellm.success_callback` callbacks are fired multiple times when streaming a response with litellm.
1 parent 7e22b5d commit 96ebbf6

File tree

3 files changed

+181
-20
lines changed

3 files changed

+181
-20
lines changed

sentry_sdk/integrations/litellm.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ def _success_callback(
168168
) -> None:
169169
"""Handle successful completion."""
170170

171-
span = _get_metadata_dict(kwargs).get("_sentry_span")
171+
metadata = _get_metadata_dict(kwargs)
172+
span = metadata.get("_sentry_span")
172173
if span is None:
173174
return
174175

@@ -220,8 +221,13 @@ def _success_callback(
220221
)
221222

222223
finally:
223-
# Always finish the span and clean up
224-
span.__exit__(None, None, None)
224+
is_streaming = kwargs.get("stream")
225+
# Callback is fired multiple times when streaming a response.
226+
# Streaming flag checked at https://github.com/BerriAI/litellm/blob/33c3f13443eaf990ac8c6e3da78bddbc2b7d0e7a/litellm/litellm_core_utils/litellm_logging.py#L1603
227+
if is_streaming is not True or "complete_streaming_response" in kwargs:
228+
span = metadata.pop("_sentry_span", None)
229+
if span is not None:
230+
span.__exit__(None, None, None)
225231

226232

227233
def _failure_callback(

tests/conftest.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,120 @@ def inner(response_content, serialize_pydantic=False, request_headers=None):
11101110
return inner
11111111

11121112

1113+
@pytest.fixture
1114+
def streaming_chat_completions_model_response():
1115+
return [
1116+
openai.types.chat.ChatCompletionChunk(
1117+
id="chatcmpl-test",
1118+
object="chat.completion.chunk",
1119+
created=10000000,
1120+
model="gpt-3.5-turbo",
1121+
choices=[
1122+
openai.types.chat.chat_completion_chunk.Choice(
1123+
index=0,
1124+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1125+
role="assistant"
1126+
),
1127+
finish_reason=None,
1128+
),
1129+
],
1130+
),
1131+
openai.types.chat.ChatCompletionChunk(
1132+
id="chatcmpl-test",
1133+
object="chat.completion.chunk",
1134+
created=10000000,
1135+
model="gpt-3.5-turbo",
1136+
choices=[
1137+
openai.types.chat.chat_completion_chunk.Choice(
1138+
index=0,
1139+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1140+
content="Tes"
1141+
),
1142+
finish_reason=None,
1143+
),
1144+
],
1145+
),
1146+
openai.types.chat.ChatCompletionChunk(
1147+
id="chatcmpl-test",
1148+
object="chat.completion.chunk",
1149+
created=10000000,
1150+
model="gpt-3.5-turbo",
1151+
choices=[
1152+
openai.types.chat.chat_completion_chunk.Choice(
1153+
index=0,
1154+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1155+
content="t r"
1156+
),
1157+
finish_reason=None,
1158+
),
1159+
],
1160+
),
1161+
openai.types.chat.ChatCompletionChunk(
1162+
id="chatcmpl-test",
1163+
object="chat.completion.chunk",
1164+
created=10000000,
1165+
model="gpt-3.5-turbo",
1166+
choices=[
1167+
openai.types.chat.chat_completion_chunk.Choice(
1168+
index=0,
1169+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1170+
content="esp"
1171+
),
1172+
finish_reason=None,
1173+
),
1174+
],
1175+
),
1176+
openai.types.chat.ChatCompletionChunk(
1177+
id="chatcmpl-test",
1178+
object="chat.completion.chunk",
1179+
created=10000000,
1180+
model="gpt-3.5-turbo",
1181+
choices=[
1182+
openai.types.chat.chat_completion_chunk.Choice(
1183+
index=0,
1184+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1185+
content="ons"
1186+
),
1187+
finish_reason=None,
1188+
),
1189+
],
1190+
),
1191+
openai.types.chat.ChatCompletionChunk(
1192+
id="chatcmpl-test",
1193+
object="chat.completion.chunk",
1194+
created=10000000,
1195+
model="gpt-3.5-turbo",
1196+
choices=[
1197+
openai.types.chat.chat_completion_chunk.Choice(
1198+
index=0,
1199+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(
1200+
content="e"
1201+
),
1202+
finish_reason=None,
1203+
),
1204+
],
1205+
),
1206+
openai.types.chat.ChatCompletionChunk(
1207+
id="chatcmpl-test",
1208+
object="chat.completion.chunk",
1209+
created=10000000,
1210+
model="gpt-3.5-turbo",
1211+
choices=[
1212+
openai.types.chat.chat_completion_chunk.Choice(
1213+
index=0,
1214+
delta=openai.types.chat.chat_completion_chunk.ChoiceDelta(),
1215+
finish_reason="stop",
1216+
),
1217+
],
1218+
usage=openai.types.CompletionUsage(
1219+
prompt_tokens=10,
1220+
completion_tokens=20,
1221+
total_tokens=30,
1222+
),
1223+
),
1224+
]
1225+
1226+
11131227
@pytest.fixture
11141228
def nonstreaming_responses_model_response():
11151229
return openai.types.responses.Response(

tests/integrations/litellm/test_litellm.py

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,28 @@ async def __call__(self, *args, **kwargs):
3131
)
3232
from sentry_sdk.utils import package_version
3333

34+
from openai import OpenAI
35+
36+
from concurrent.futures import ThreadPoolExecutor
37+
38+
import litellm.utils as litellm_utils
39+
from litellm.litellm_core_utils import streaming_handler
40+
from litellm.litellm_core_utils import thread_pool_executor
41+
from litellm.litellm_core_utils import litellm_logging
42+
3443

3544
LITELLM_VERSION = package_version("litellm")
3645

3746

47+
@pytest.fixture()
48+
def reset_litellm_executor():
49+
yield
50+
thread_pool_executor.executor = ThreadPoolExecutor(max_workers=100)
51+
litellm_utils.executor = thread_pool_executor.executor
52+
streaming_handler.executor = thread_pool_executor.executor
53+
litellm_logging.executor = thread_pool_executor.executor
54+
55+
3856
@pytest.fixture
3957
def clear_litellm_cache():
4058
"""
@@ -212,7 +230,14 @@ def test_nonstreaming_chat_completion(
212230
],
213231
)
214232
def test_streaming_chat_completion(
215-
sentry_init, capture_events, send_default_pii, include_prompts
233+
reset_litellm_executor,
234+
sentry_init,
235+
capture_events,
236+
send_default_pii,
237+
include_prompts,
238+
get_model_response,
239+
server_side_event_chunks,
240+
streaming_chat_completions_model_response,
216241
):
217242
sentry_init(
218243
integrations=[LiteLLMIntegration(include_prompts=include_prompts)],
@@ -222,29 +247,45 @@ def test_streaming_chat_completion(
222247
events = capture_events()
223248

224249
messages = [{"role": "user", "content": "Hello!"}]
225-
mock_response = MockCompletionResponse()
226250

227-
with start_transaction(name="litellm test"):
228-
kwargs = {
229-
"model": "gpt-3.5-turbo",
230-
"messages": messages,
231-
"stream": True,
232-
}
251+
client = OpenAI(api_key="z")
233252

234-
_input_callback(kwargs)
235-
_success_callback(
236-
kwargs,
237-
mock_response,
238-
datetime.now(),
239-
datetime.now(),
240-
)
253+
model_response = get_model_response(
254+
server_side_event_chunks(
255+
streaming_chat_completions_model_response,
256+
include_event_type=False,
257+
),
258+
request_headers={"X-Stainless-Raw-Response": "True"},
259+
)
260+
261+
with mock.patch.object(
262+
client.completions._client._client,
263+
"send",
264+
return_value=model_response,
265+
):
266+
with start_transaction(name="litellm test"):
267+
response = litellm.completion(
268+
model="gpt-3.5-turbo",
269+
messages=messages,
270+
client=client,
271+
stream=True,
272+
)
273+
for _ in response:
274+
pass
275+
276+
streaming_handler.executor.shutdown(wait=True)
241277

242278
assert len(events) == 1
243279
(event,) = events
244280

245281
assert event["type"] == "transaction"
246-
assert len(event["spans"]) == 1
247-
(span,) = event["spans"]
282+
chat_spans = list(
283+
x
284+
for x in event["spans"]
285+
if x["op"] == OP.GEN_AI_CHAT and x["origin"] == "auto.ai.litellm"
286+
)
287+
assert len(chat_spans) == 1
288+
span = chat_spans[0]
248289

249290
assert span["op"] == OP.GEN_AI_CHAT
250291
assert span["data"][SPANDATA.GEN_AI_RESPONSE_STREAMING] is True

0 commit comments

Comments
 (0)