Skip to content

Commit 4cd23c3

Browse files
committed
stabilize e2e readbacks in ci
1 parent a20e811 commit 4cd23c3

6 files changed

Lines changed: 259 additions & 24 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ jobs:
199199
- name: Run the end-to-end tests
200200
run: |
201201
python --version
202-
uv run --frozen pytest -s -v --log-cli-level=INFO tests/e2e
202+
uv run --frozen pytest -n auto --dist loadfile -s -v --log-cli-level=INFO tests/e2e
203203
204204
all-tests-passed:
205205
# This allows us to have a branch protection rule for tests and deploys with matrix

tests/e2e/test_core_sdk.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def test_create_session_score():
152152
sleep(2)
153153

154154
# Retrieve and verify
155-
score = langfuse.api.scores.get_by_id(score_id)
155+
score = get_api().scores.get_by_id(score_id)
156156

157157
# find the score by name (server may transform the id format)
158158
assert score is not None
@@ -1841,26 +1841,26 @@ def test_get_observations():
18411841
def test_get_trace_not_found():
18421842
# Attempt to fetch a non-existent trace using the API
18431843
with pytest.raises(Exception):
1844-
get_api().trace.get(create_uuid())
1844+
get_api(retry=False).trace.get(create_uuid())
18451845

18461846

18471847
def test_get_observation_not_found():
18481848
# Attempt to fetch a non-existent observation using the API
18491849
with pytest.raises(Exception):
1850-
get_api().legacy.observations_v1.get(create_uuid())
1850+
get_api(retry=False).legacy.observations_v1.get(create_uuid())
18511851

18521852

18531853
def test_get_traces_empty():
18541854
# Fetch traces with a filter that should return no results
1855-
response = get_api().trace.list(name=create_uuid())
1855+
response = get_api(retry=False).trace.list(name=create_uuid())
18561856

18571857
assert len(response.data) == 0
18581858
assert response.meta.total_items == 0
18591859

18601860

18611861
def test_get_observations_empty():
18621862
# Fetch observations with a filter that should return no results
1863-
response = get_api().legacy.observations_v1.get_many(name=create_uuid())
1863+
response = get_api(retry=False).legacy.observations_v1.get_many(name=create_uuid())
18641864

18651865
assert len(response.data) == 0
18661866
assert response.meta.total_items == 0

tests/support/api_wrapper.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import os
2-
from time import sleep
32

43
import httpx
54

5+
from langfuse.api.commons.errors.not_found_error import NotFoundError
6+
from tests.support.retry import is_not_found_payload, retry_until_ready
7+
68

79
class LangfuseAPI:
810
def __init__(self, username=None, password=None, base_url=None):
@@ -11,28 +13,32 @@ def __init__(self, username=None, password=None, base_url=None):
1113
self.auth = (username, password)
1214
self.BASE_URL = base_url if base_url else os.environ["LANGFUSE_BASE_URL"]
1315

16+
def _get_json(self, url, params=None):
17+
def _request():
18+
response = httpx.get(url, params=params, auth=self.auth)
19+
payload = response.json()
20+
21+
if response.status_code == 404 and is_not_found_payload(payload):
22+
raise NotFoundError(body=payload, headers=dict(response.headers))
23+
24+
return payload
25+
26+
return retry_until_ready(_request)
27+
1428
def get_observation(self, observation_id):
15-
sleep(1)
1629
url = f"{self.BASE_URL}/api/public/observations/{observation_id}"
17-
response = httpx.get(url, auth=self.auth)
18-
return response.json()
30+
return self._get_json(url)
1931

2032
def get_scores(self, page=None, limit=None, user_id=None, name=None):
21-
sleep(1)
2233
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
2334
url = f"{self.BASE_URL}/api/public/scores"
24-
response = httpx.get(url, params=params, auth=self.auth)
25-
return response.json()
35+
return self._get_json(url, params=params)
2636

2737
def get_traces(self, page=None, limit=None, user_id=None, name=None):
28-
sleep(1)
2938
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
3039
url = f"{self.BASE_URL}/api/public/traces"
31-
response = httpx.get(url, params=params, auth=self.auth)
32-
return response.json()
40+
return self._get_json(url, params=params)
3341

3442
def get_trace(self, trace_id):
35-
sleep(1)
3643
url = f"{self.BASE_URL}/api/public/traces/{trace_id}"
37-
response = httpx.get(url, auth=self.auth)
38-
return response.json()
44+
return self._get_json(url)

tests/support/retry.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from __future__ import annotations
2+
3+
import os
4+
from time import monotonic, sleep
5+
from typing import Callable, TypeVar
6+
7+
from langfuse.api.commons.errors.not_found_error import NotFoundError
8+
from langfuse.api.core.api_error import ApiError
9+
10+
T = TypeVar("T")
11+
12+
DEFAULT_RETRY_TIMEOUT_SECONDS = float(
13+
os.environ.get("LANGFUSE_E2E_READ_TIMEOUT_SECONDS", "12")
14+
)
15+
DEFAULT_RETRY_INTERVAL_SECONDS = float(
16+
os.environ.get("LANGFUSE_E2E_READ_INTERVAL_SECONDS", "0.25")
17+
)
18+
19+
20+
def is_eventual_consistency_error(error: Exception) -> bool:
21+
if isinstance(error, NotFoundError):
22+
return True
23+
24+
if not isinstance(error, ApiError):
25+
return False
26+
27+
body = error.body
28+
return isinstance(body, dict) and body.get("error") == "LangfuseNotFoundError"
29+
30+
31+
def is_not_found_payload(payload: object) -> bool:
32+
return isinstance(payload, dict) and payload.get("error") == "LangfuseNotFoundError"
33+
34+
35+
def retry_until_ready(
36+
operation: Callable[[], T],
37+
*,
38+
is_retryable_error: Callable[[Exception], bool] = is_eventual_consistency_error,
39+
is_result_ready: Callable[[T], bool] | None = None,
40+
timeout_seconds: float = DEFAULT_RETRY_TIMEOUT_SECONDS,
41+
interval_seconds: float = DEFAULT_RETRY_INTERVAL_SECONDS,
42+
) -> T:
43+
deadline = monotonic() + timeout_seconds
44+
last_error: Exception | None = None
45+
46+
while True:
47+
try:
48+
result = operation()
49+
except Exception as error:
50+
if not is_retryable_error(error) or monotonic() >= deadline:
51+
raise
52+
53+
last_error = error
54+
else:
55+
if is_result_ready is None or is_result_ready(result):
56+
return result
57+
58+
if monotonic() >= deadline:
59+
return result
60+
61+
sleep(interval_seconds)
62+
63+
if monotonic() >= deadline and last_error is not None:
64+
raise last_error

tests/support/utils.py

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,72 @@
11
import base64
22
import os
3-
from time import sleep
3+
from typing import Any
44
from uuid import uuid4
55

66
from langfuse.api import LangfuseAPI
7+
from tests.support.retry import retry_until_ready
8+
9+
READ_METHOD_NAMES = {"get", "get_by_id", "get_many", "get_run", "list"}
10+
PAGINATION_ARGUMENTS = {"limit", "page"}
11+
12+
13+
def _has_filters(kwargs: dict[str, Any]) -> bool:
14+
return any(
15+
key not in PAGINATION_ARGUMENTS and value is not None
16+
for key, value in kwargs.items()
17+
)
18+
19+
20+
class _RetryingApiProxy:
21+
def __init__(self, target: Any):
22+
self._target = target
23+
24+
def __getattr__(self, name: str) -> Any:
25+
attr = getattr(self._target, name)
26+
27+
if callable(attr):
28+
if name not in READ_METHOD_NAMES:
29+
return attr
30+
31+
def _call(*args: Any, **kwargs: Any) -> Any:
32+
return retry_until_ready(
33+
lambda: attr(*args, **kwargs),
34+
is_result_ready=_result_ready(name, kwargs),
35+
)
36+
37+
return _call
38+
39+
if isinstance(attr, (str, bytes, int, float, bool, list, dict, tuple, set)):
40+
return attr
41+
42+
if attr is None:
43+
return None
44+
45+
return _RetryingApiProxy(attr)
46+
47+
48+
def _result_ready(method_name: str, kwargs: dict[str, Any]):
49+
if method_name not in {"get_many", "list"} or not _has_filters(kwargs):
50+
return None
51+
52+
def _has_data(result: Any) -> bool:
53+
data = getattr(result, "data", None)
54+
return data is None or len(data) > 0
55+
56+
return _has_data
757

858

959
def create_uuid():
1060
return str(uuid4())
1161

1262

13-
def get_api():
14-
sleep(2)
15-
16-
return LangfuseAPI(
63+
def get_api(*, retry: bool = True):
64+
client = LangfuseAPI(
1765
username=os.environ.get("LANGFUSE_PUBLIC_KEY"),
1866
password=os.environ.get("LANGFUSE_SECRET_KEY"),
1967
base_url=os.environ.get("LANGFUSE_BASE_URL"),
2068
)
69+
return _RetryingApiProxy(client) if retry else client
2170

2271

2372
def encode_file_to_base64(image_path) -> str:

tests/unit/test_e2e_support.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
from types import SimpleNamespace
2+
3+
from langfuse.api.commons.errors.not_found_error import NotFoundError
4+
from tests.support.api_wrapper import LangfuseAPI as SupportLangfuseAPI
5+
from tests.support.utils import get_api
6+
7+
8+
def test_get_api_retries_not_found(monkeypatch):
9+
monkeypatch.setattr("tests.support.retry.sleep", lambda _: None)
10+
11+
attempts = {"count": 0}
12+
13+
class FakeTraceService:
14+
def get(self, trace_id):
15+
attempts["count"] += 1
16+
17+
if attempts["count"] < 3:
18+
raise NotFoundError(
19+
body={
20+
"error": "LangfuseNotFoundError",
21+
"message": f"Trace {trace_id} not found within authorized project",
22+
}
23+
)
24+
25+
return {"id": trace_id}
26+
27+
class FakeClient:
28+
trace = FakeTraceService()
29+
30+
monkeypatch.setattr("tests.support.utils.LangfuseAPI", lambda **_: FakeClient())
31+
32+
trace = get_api().trace.get("trace-123")
33+
34+
assert trace == {"id": "trace-123"}
35+
assert attempts["count"] == 3
36+
37+
38+
def test_get_api_retries_filtered_lists(monkeypatch):
39+
monkeypatch.setattr("tests.support.retry.sleep", lambda _: None)
40+
41+
attempts = {"count": 0}
42+
43+
class FakeTraceService:
44+
def list(self, **kwargs):
45+
attempts["count"] += 1
46+
47+
if attempts["count"] < 3:
48+
return SimpleNamespace(data=[])
49+
50+
return SimpleNamespace(data=[kwargs["name"]])
51+
52+
class FakeClient:
53+
trace = FakeTraceService()
54+
55+
monkeypatch.setattr("tests.support.utils.LangfuseAPI", lambda **_: FakeClient())
56+
57+
response = get_api().trace.list(name="ready-trace")
58+
59+
assert response.data == ["ready-trace"]
60+
assert attempts["count"] == 3
61+
62+
63+
def test_get_api_retry_can_be_disabled(monkeypatch):
64+
attempts = {"count": 0}
65+
66+
class FakeTraceService:
67+
def list(self, **kwargs):
68+
attempts["count"] += 1
69+
return SimpleNamespace(data=[])
70+
71+
class FakeClient:
72+
trace = FakeTraceService()
73+
74+
monkeypatch.setattr("tests.support.utils.LangfuseAPI", lambda **_: FakeClient())
75+
76+
response = get_api(retry=False).trace.list(name="missing-trace")
77+
78+
assert response.data == []
79+
assert attempts["count"] == 1
80+
81+
82+
def test_raw_api_wrapper_retries_not_found_payload(monkeypatch):
83+
monkeypatch.setattr("tests.support.retry.sleep", lambda _: None)
84+
85+
attempts = {"count": 0}
86+
87+
class FakeResponse:
88+
def __init__(self, status_code, payload):
89+
self.status_code = status_code
90+
self._payload = payload
91+
self.headers = {}
92+
93+
def json(self):
94+
return self._payload
95+
96+
def fake_get(*args, **kwargs):
97+
attempts["count"] += 1
98+
99+
if attempts["count"] < 3:
100+
return FakeResponse(
101+
404,
102+
{
103+
"error": "LangfuseNotFoundError",
104+
"message": "Trace trace-123 not found within authorized project",
105+
},
106+
)
107+
108+
return FakeResponse(200, {"id": "trace-123", "observations": []})
109+
110+
monkeypatch.setattr("tests.support.api_wrapper.httpx.get", fake_get)
111+
112+
api = SupportLangfuseAPI(username="user", password="pass", base_url="http://test")
113+
trace = api.get_trace("trace-123")
114+
115+
assert trace["id"] == "trace-123"
116+
assert attempts["count"] == 3

0 commit comments

Comments
 (0)