Skip to content

Commit c062045

Browse files
authored
fix: return clients in case of exceptions from client (#396)
1 parent ccc26da commit c062045

7 files changed

Lines changed: 153 additions & 30 deletions

File tree

langfuse/client.py

Lines changed: 56 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -464,11 +464,12 @@ def trace(
464464
event,
465465
)
466466

467+
except Exception as e:
468+
self.log.exception(e)
469+
finally:
467470
return StatefulTraceClient(
468471
self.client, new_id, StateType.TRACE, new_id, self.task_manager
469472
)
470-
except Exception as e:
471-
self.log.exception(e)
472473

473474
def score(
474475
self,
@@ -481,12 +482,13 @@ def score(
481482
observation_id: typing.Optional[str] = None,
482483
kwargs=None,
483484
):
485+
trace_id = self.trace_id if trace_id is None else trace_id
484486
try:
485487
new_id = str(uuid.uuid4()) if id is None else id
486488

487489
new_dict = {
488490
"id": new_id,
489-
"trace_id": self.trace_id if trace_id is None else trace_id,
491+
"trace_id": trace_id,
490492
"observation_id": observation_id,
491493
"name": name,
492494
"value": value,
@@ -504,9 +506,12 @@ def score(
504506
"type": "score-create",
505507
"body": new_body.dict(exclude_none=True),
506508
}
507-
508509
self.task_manager.add_task(event)
509510

511+
except Exception as e:
512+
print(f"exception {e}...")
513+
self.log.exception(e)
514+
finally:
510515
if observation_id is not None:
511516
return StatefulClient(
512517
self.client,
@@ -520,9 +525,6 @@ def score(
520525
self.client, new_id, StateType.TRACE, new_id, self.task_manager
521526
)
522527

523-
except Exception as e:
524-
self.log.exception(e)
525-
526528
def span(
527529
self,
528530
*,
@@ -581,15 +583,16 @@ def span(
581583
self.log.debug(f"Creating span {event}...")
582584
self.task_manager.add_task(event)
583585

586+
except Exception as e:
587+
self.log.exception(e)
588+
finally:
584589
return StatefulSpanClient(
585590
self.client,
586591
new_span_id,
587592
StateType.OBSERVATION,
588593
new_trace_id,
589594
self.task_manager,
590595
)
591-
except Exception as e:
592-
self.log.exception(e)
593596

594597
def event(
595598
self,
@@ -647,15 +650,16 @@ def event(
647650
self.log.debug(f"Creating event {event}...")
648651
self.task_manager.add_task(event)
649652

653+
except Exception as e:
654+
self.log.exception(e)
655+
finally:
650656
return StatefulSpanClient(
651657
self.client,
652658
event_id,
653659
StateType.OBSERVATION,
654660
new_trace_id,
655661
self.task_manager,
656662
)
657-
except Exception as e:
658-
self.log.exception(e)
659663

660664
def generation(
661665
self,
@@ -740,15 +744,16 @@ def generation(
740744
self.log.debug(f"Creating top-level generation {event} ...")
741745
self.task_manager.add_task(event)
742746

747+
except Exception as e:
748+
self.log.exception(e)
749+
finally:
743750
return StatefulGenerationClient(
744751
self.client,
745752
new_generation_id,
746753
StateType.OBSERVATION,
747754
new_trace_id,
748755
self.task_manager,
749756
)
750-
except Exception as e:
751-
self.log.exception(e)
752757

753758
def _generate_trace(self, trace_id: str, name: str):
754759
trace_dict = {
@@ -887,15 +892,16 @@ def generation(
887892
self.log.debug(f"Creating generation {new_body}...")
888893
self.task_manager.add_task(event)
889894

895+
except Exception as e:
896+
self.log.exception(e)
897+
finally:
890898
return StatefulGenerationClient(
891899
self.client,
892900
generation_id,
893901
StateType.OBSERVATION,
894902
self.trace_id,
895903
task_manager=self.task_manager,
896904
)
897-
except Exception as e:
898-
self.log.exception(e)
899905

900906
def span(
901907
self,
@@ -947,15 +953,16 @@ def span(
947953
}
948954

949955
self.task_manager.add_task(event)
956+
except Exception as e:
957+
self.log.exception(e)
958+
finally:
950959
return StatefulSpanClient(
951960
self.client,
952961
span_id,
953962
StateType.OBSERVATION,
954963
self.trace_id,
955964
task_manager=self.task_manager,
956965
)
957-
except Exception as e:
958-
self.log.exception(e)
959966

960967
def score(
961968
self,
@@ -996,15 +1003,17 @@ def score(
9961003
}
9971004

9981005
self.task_manager.add_task(event)
1006+
1007+
except Exception as e:
1008+
self.log.exception(e)
1009+
finally:
9991010
return StatefulClient(
10001011
self.client,
10011012
self.id,
10021013
StateType.OBSERVATION,
10031014
self.trace_id,
10041015
task_manager=self.task_manager,
10051016
)
1006-
except Exception as e:
1007-
self.log.exception(e)
10081017

10091018
def event(
10101019
self,
@@ -1054,11 +1063,12 @@ def event(
10541063
self.log.debug(f"Creating event {event}...")
10551064
self.task_manager.add_task(event)
10561065

1066+
except Exception as e:
1067+
self.log.exception(e)
1068+
finally:
10571069
return StatefulClient(
10581070
self.client, event_id, self.state_type, self.trace_id, self.task_manager
10591071
)
1060-
except Exception as e:
1061-
self.log.exception(e)
10621072

10631073
def get_trace_url(self):
10641074
return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}"
@@ -1130,15 +1140,16 @@ def update(
11301140
self.log.debug(f"Update generation {event}...")
11311141
self.task_manager.add_task(event)
11321142

1143+
except Exception as e:
1144+
self.log.exception(e)
1145+
finally:
11331146
return StatefulGenerationClient(
11341147
self.client,
11351148
self.id,
11361149
StateType.OBSERVATION,
11371150
self.trace_id,
11381151
task_manager=self.task_manager,
11391152
)
1140-
except Exception as e:
1141-
self.log.exception(e)
11421153

11431154
def end(
11441155
self,
@@ -1185,6 +1196,13 @@ def end(
11851196

11861197
except Exception as e:
11871198
self.log.warning(e)
1199+
return StatefulGenerationClient(
1200+
self.client,
1201+
self.id,
1202+
StateType.OBSERVATION,
1203+
self.trace_id,
1204+
task_manager=self.task_manager,
1205+
)
11881206

11891207

11901208
class StatefulSpanClient(StatefulClient):
@@ -1240,15 +1258,16 @@ def update(
12401258
}
12411259

12421260
self.task_manager.add_task(event)
1261+
except Exception as e:
1262+
self.log.exception(e)
1263+
finally:
12431264
return StatefulSpanClient(
12441265
self.client,
12451266
self.id,
12461267
StateType.OBSERVATION,
12471268
self.trace_id,
12481269
task_manager=self.task_manager,
12491270
)
1250-
except Exception as e:
1251-
self.log.exception(e)
12521271

12531272
def end(
12541273
self,
@@ -1283,6 +1302,14 @@ def end(
12831302

12841303
except Exception as e:
12851304
self.log.warning(e)
1305+
finally:
1306+
return StatefulSpanClient(
1307+
self.client,
1308+
self.id,
1309+
StateType.OBSERVATION,
1310+
self.trace_id,
1311+
task_manager=self.task_manager,
1312+
)
12861313

12871314
def get_langchain_handler(self):
12881315
from langfuse.callback import CallbackHandler
@@ -1345,15 +1372,17 @@ def update(
13451372
}
13461373

13471374
self.task_manager.add_task(event)
1375+
1376+
except Exception as e:
1377+
self.log.exception(e)
1378+
finally:
13481379
return StatefulTraceClient(
13491380
self.client,
13501381
self.id,
13511382
StateType.TRACE,
13521383
self.trace_id,
13531384
task_manager=self.task_manager,
13541385
)
1355-
except Exception as e:
1356-
self.log.exception(e)
13571386

13581387
def get_langchain_handler(self):
13591388
try:

langfuse/llama_index/llama_index.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class LlamaIndexCallbackHandler(
3636
LlamaIndexBaseCallbackHandler, LangfuseBaseCallbackHandler
3737
):
3838
"""[Alpha] LlamaIndex callback handler for Langfuse. This version is in alpha and may change in the future."""
39+
3940
log = logging.getLogger("langfuse")
4041

4142
def __init__(

langfuse/utils/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def _convert_usage_input(usage: typing.Union[pydantic.BaseModel, ModelUsage]):
4949

5050
if isinstance(usage, pydantic.BaseModel):
5151
usage = usage.dict()
52-
52+
5353
# sometimes we do not match the pydantic usage object
5454
# in these cases, we convert to dict manually
5555
if hasattr(usage, "__dict__"):
@@ -60,7 +60,7 @@ def _convert_usage_input(usage: typing.Union[pydantic.BaseModel, ModelUsage]):
6060

6161
if is_langfuse_usage:
6262
return usage
63-
63+
6464
is_openai_usage = any(
6565
k in usage
6666
for k in (

tests/test_core_sdk.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@
1010

1111
from langfuse import Langfuse
1212
from tests.api_wrapper import LangfuseAPI
13-
from tests.utils import CompletionUsage, LlmUsage, LlmUsageWithCost, create_uuid, get_api
13+
from tests.utils import (
14+
CompletionUsage,
15+
LlmUsage,
16+
LlmUsageWithCost,
17+
create_uuid,
18+
get_api,
19+
)
1420

1521

1622
@pytest.mark.asyncio

tests/test_core_sdk_unit.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from unittest.mock import Mock
2+
from langfuse.api.client import FernLangfuse
3+
from langfuse.client import (
4+
StatefulClient,
5+
StatefulGenerationClient,
6+
StatefulSpanClient,
7+
StatefulTraceClient,
8+
)
9+
import pytest
10+
from langfuse import Langfuse
11+
12+
13+
@pytest.fixture
14+
def langfuse():
15+
langfuse_instance = Langfuse(debug=False)
16+
langfuse_instance.client = Mock()
17+
langfuse_instance.task_manager = Mock()
18+
langfuse_instance.log = Mock()
19+
20+
return langfuse_instance
21+
22+
23+
@pytest.fixture
24+
def stateful_client():
25+
stateful_client = StatefulClient(Mock(), "test_id", Mock(), "test_trace", Mock())
26+
27+
return stateful_client
28+
29+
30+
@pytest.mark.parametrize(
31+
"trace_method, expected_client, kwargs",
32+
[
33+
(Langfuse.trace, StatefulTraceClient, {}),
34+
(Langfuse.generation, StatefulGenerationClient, {}),
35+
(Langfuse.span, StatefulSpanClient, {}),
36+
(Langfuse.score, StatefulClient, {"value": 1, "trace_id": "test_trace"}),
37+
],
38+
)
39+
def test_langfuse_returning_if_taskmanager_fails(
40+
langfuse, trace_method, expected_client, kwargs
41+
):
42+
trace_name = "test_trace"
43+
44+
mock_task_manager = langfuse.task_manager.add_task
45+
mock_task_manager.return_value = Exception("Task manager unable to process event")
46+
47+
body = {
48+
"name": trace_name,
49+
**kwargs,
50+
}
51+
52+
result = trace_method(langfuse, **body)
53+
mock_task_manager.assert_called()
54+
55+
assert isinstance(result, expected_client)
56+
57+
58+
@pytest.mark.parametrize(
59+
"trace_method, expected_client, kwargs",
60+
[
61+
(StatefulClient.generation, StatefulGenerationClient, {}),
62+
(StatefulClient.span, StatefulSpanClient, {}),
63+
(StatefulClient.score, StatefulClient, {"value": 1}),
64+
],
65+
)
66+
def test_stateful_client_returning_if_taskmanager_fails(
67+
stateful_client, trace_method, expected_client, kwargs
68+
):
69+
trace_name = "test_trace"
70+
71+
mock_task_manager = stateful_client.task_manager.add_task
72+
mock_task_manager.return_value = Exception("Task manager unable to process event")
73+
mock_client = stateful_client.client
74+
mock_client.return_value = FernLangfuse(base_url="http://localhost:8000")
75+
76+
body = {
77+
"name": trace_name,
78+
**kwargs,
79+
}
80+
81+
result = trace_method(stateful_client, **body)
82+
mock_task_manager.assert_called()
83+
84+
assert isinstance(result, expected_client)

0 commit comments

Comments
 (0)