Skip to content

Commit 7352922

Browse files
committed
perf: memory usage on repeat instantiation
1 parent 0a3caa2 commit 7352922

File tree

5 files changed

+36
-25
lines changed

5 files changed

+36
-25
lines changed

langfuse/_task_manager/task_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ def shutdown(self):
191191
"""Flush all messages and cleanly shutdown the client."""
192192
self._log.debug("shutdown initiated")
193193

194+
# Unregister the atexit handler first
195+
atexit.unregister(self.shutdown)
196+
194197
self.flush()
195198
self.join()
196199

langfuse/client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
UpdateGenerationBody,
3737
)
3838
from langfuse.api.resources.ingestion.types.update_span_body import UpdateSpanBody
39+
from langfuse.api.resources.media import GetMediaResponse
3940
from langfuse.api.resources.observations.types.observations_views import (
4041
ObservationsViews,
4142
)
@@ -49,7 +50,6 @@
4950
from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
5051
MetaResponse,
5152
)
52-
from langfuse.api.resources.media import GetMediaResponse
5353
from langfuse.model import (
5454
ChatMessageDict,
5555
ChatPromptClient,
@@ -74,8 +74,8 @@
7474
from langfuse.api.client import FernLangfuse
7575
from langfuse.environment import get_common_release_envs
7676
from langfuse.logging import clean_logger
77-
from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
7877
from langfuse.media import LangfuseMedia
78+
from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
7979
from langfuse.request import LangfuseClient
8080
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel
8181
from langfuse.utils import (
@@ -2018,6 +2018,14 @@ def shutdown(self):
20182018
As the SDK calls join() already on shutdown, refer to flush() to ensure all events arive at the Langfuse API.
20192019
"""
20202020
try:
2021+
self.prompt_cache._task_manager.shutdown()
2022+
2023+
# In logging.py, a handler is attached to the httpx logger.
2024+
# To avoid a memory leak on singleton reset, remove all handlers
2025+
httpx_logger = logging.getLogger("httpx")
2026+
for handler in httpx_logger.handlers:
2027+
httpx_logger.removeHandler(handler)
2028+
20212029
return self.task_manager.shutdown()
20222030
except Exception as e:
20232031
self.log.exception(e)

langfuse/decorators/langfuse_decorator.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,50 @@
11
import asyncio
2+
import inspect
3+
import json
4+
import logging
25
from collections import defaultdict
36
from contextvars import ContextVar
47
from datetime import datetime
58
from functools import wraps
6-
import httpx
7-
import inspect
8-
import json
9-
import logging
109
from typing import (
1110
Any,
11+
AsyncGenerator,
1212
Callable,
1313
DefaultDict,
14+
Dict,
15+
Generator,
16+
Iterable,
1417
List,
15-
Optional,
16-
Union,
1718
Literal,
18-
Dict,
19+
Optional,
1920
Tuple,
20-
Iterable,
21-
AsyncGenerator,
22-
Generator,
2321
TypeVar,
22+
Union,
2423
cast,
2524
overload,
2625
)
2726

27+
import httpx
28+
from pydantic import BaseModel
2829
from typing_extensions import ParamSpec
2930

3031
from langfuse.api import UsageDetails
3132
from langfuse.client import (
3233
Langfuse,
33-
StatefulSpanClient,
34-
StatefulTraceClient,
35-
StatefulGenerationClient,
36-
PromptClient,
37-
ModelUsage,
3834
MapValue,
35+
ModelUsage,
36+
PromptClient,
3937
ScoreDataType,
38+
StatefulGenerationClient,
39+
StatefulSpanClient,
40+
StatefulTraceClient,
4041
StateType,
4142
)
4243
from langfuse.serializer import EventSerializer
4344
from langfuse.types import ObservationParams, SpanLevel
4445
from langfuse.utils import _get_timestamp
45-
from langfuse.utils.langfuse_singleton import LangfuseSingleton
4646
from langfuse.utils.error_logging import catch_and_log_errors
47-
48-
from pydantic import BaseModel
47+
from langfuse.utils.langfuse_singleton import LangfuseSingleton
4948

5049
_observation_stack_context: ContextVar[
5150
List[Union[StatefulTraceClient, StatefulSpanClient, StatefulGenerationClient]]

langfuse/prompt_cache.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
"""@private"""
22

3-
from datetime import datetime
4-
from typing import List, Optional, Dict, Set
5-
from threading import Thread
63
import atexit
74
import logging
5+
from datetime import datetime
86
from queue import Empty, Queue
7+
from threading import Thread
8+
from typing import Dict, List, Optional, Set
99

1010
from langfuse.model import PromptClient
1111

12-
1312
DEFAULT_PROMPT_CACHE_TTL_SECONDS = 60
1413

1514
DEFAULT_PROMPT_CACHE_REFRESH_WORKERS = 1
@@ -114,6 +113,8 @@ def shutdown(self):
114113
f"Shutting down prompt refresh task manager, {len(self._consumers)} consumers,..."
115114
)
116115

116+
atexit.unregister(self.shutdown)
117+
117118
for consumer in self._consumers:
118119
consumer.pause()
119120

langfuse/utils/langfuse_singleton.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import httpx
21
import threading
32
from typing import Optional
43

4+
import httpx
55

66
from langfuse import Langfuse
77
from langfuse.types import MaskFunction

0 commit comments

Comments
 (0)