Skip to content

Commit 77c5e76

Browse files
committed
feat(http): add automatic retry for unknown field errors in API requests
1 parent f7eb55c commit 77c5e76

File tree

3 files changed

+414
-0
lines changed

3 files changed

+414
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""HTTP client patch for handling unknown field errors.
2+
3+
This module provides functionality to automatically retry API requests when the server
4+
rejects unknown or unexpected fields. This enables backward compatibility when using
5+
newer SDK versions with older server versions.
6+
"""
7+
8+
import logging
9+
from typing import Any, Set
10+
11+
import httpx
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def patch_client_for_unknown_field_retry(client: httpx.Client) -> None:
17+
"""Patch an httpx.Client instance to automatically retry on unknown field errors.
18+
19+
When the server returns a 400/422 error with unrecognized_keys, this automatically
20+
retries the request with those fields removed.
21+
22+
Args:
23+
client: The httpx.Client instance to patch
24+
25+
Example:
26+
>>> client = httpx.Client()
27+
>>> patch_client_for_unknown_field_retry(client)
28+
>>> # Now all requests through this client will handle unknown field errors
29+
"""
30+
original_request = client.request
31+
32+
def request_with_retry(
33+
method: str, url: str | httpx.URL, **kwargs: Any
34+
) -> httpx.Response:
35+
"""Wrapped request that handles unknown field errors with retry."""
36+
response = original_request(method, url, **kwargs)
37+
38+
# Retry if server rejected unrecognized keys
39+
if response.status_code in [400, 422] and "json" in kwargs:
40+
try:
41+
unknown_keys = _extract_unknown_keys(response)
42+
if unknown_keys:
43+
logger.warning(
44+
"Server rejected unrecognized keys %s for %s %s. Retrying without these fields.",
45+
unknown_keys,
46+
method,
47+
url,
48+
)
49+
kwargs["json"] = _remove_fields(kwargs["json"], unknown_keys)
50+
response = original_request(method, url, **kwargs)
51+
except Exception as e:
52+
logger.debug("Failed to parse unknown field error: %s", e)
53+
54+
return response
55+
56+
client.request = request_with_retry # type: ignore[method-assign]
57+
58+
59+
def patch_async_client_for_unknown_field_retry(client: httpx.AsyncClient) -> None:
60+
"""Patch an httpx.AsyncClient instance to automatically retry on unknown field errors.
61+
62+
Async version of patch_client_for_unknown_field_retry.
63+
64+
Args:
65+
client: The httpx.AsyncClient instance to patch
66+
"""
67+
original_request = client.request
68+
69+
async def request_with_retry(
70+
method: str, url: str | httpx.URL, **kwargs: Any
71+
) -> httpx.Response:
72+
"""Wrapped async request that handles unknown field errors with retry."""
73+
response = await original_request(method, url, **kwargs)
74+
75+
# Retry if server rejected unrecognized keys
76+
if response.status_code in [400, 422] and "json" in kwargs:
77+
try:
78+
unknown_keys = _extract_unknown_keys(response)
79+
if unknown_keys:
80+
logger.warning(
81+
"Server rejected unrecognized keys %s for %s %s. Retrying without these fields.",
82+
unknown_keys,
83+
method,
84+
url,
85+
)
86+
kwargs["json"] = _remove_fields(kwargs["json"], unknown_keys)
87+
response = await original_request(method, url, **kwargs)
88+
except Exception as e:
89+
logger.debug("Failed to parse unknown field error: %s", e)
90+
91+
return response
92+
93+
client.request = request_with_retry # type: ignore[method-assign]
94+
95+
96+
def _extract_unknown_keys(response: httpx.Response) -> Set[str]:
97+
"""Extract unknown keys from server error response.
98+
99+
Args:
100+
response: The HTTP response from the server
101+
102+
Returns:
103+
Set of field names that were rejected as unrecognized
104+
"""
105+
body = response.json()
106+
if isinstance(body, dict) and "error" in body:
107+
unknown_keys = set()
108+
for error in body.get("error", []):
109+
if isinstance(error, dict) and error.get("code") == "unrecognized_keys":
110+
unknown_keys.update(error.get("keys", []))
111+
return unknown_keys
112+
return set()
113+
114+
115+
def _remove_fields(data: Any, fields: Set[str]) -> Any:
116+
"""Remove specified fields from nested dict/list structures.
117+
118+
Args:
119+
data: The data structure to filter (dict, list, or primitive)
120+
fields: Set of field names to remove
121+
122+
Returns:
123+
Filtered data structure with specified fields removed
124+
"""
125+
if isinstance(data, dict):
126+
return {
127+
k: _remove_fields(v, fields) for k, v in data.items() if k not in fields
128+
}
129+
elif isinstance(data, list):
130+
return [_remove_fields(item, fields) for item in data]
131+
else:
132+
return data

langfuse/_client/resource_manager.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
LANGFUSE_RELEASE,
3636
LANGFUSE_TRACING_ENVIRONMENT,
3737
)
38+
from langfuse._client.http_retry_patch import (
39+
patch_async_client_for_unknown_field_retry,
40+
patch_client_for_unknown_field_retry,
41+
)
3842
from langfuse._client.span_processor import LangfuseSpanProcessor
3943
from langfuse._task_manager.media_manager import MediaManager
4044
from langfuse._task_manager.media_upload_consumer import MediaUploadConsumer
@@ -213,6 +217,10 @@ def _initialize_instance(
213217
client_headers = additional_headers if additional_headers else {}
214218
self.httpx_client = httpx.Client(timeout=timeout, headers=client_headers)
215219

220+
# Patch the httpx client to handle unknown field errors automatically
221+
# This enables backward compatibility when newer SDK versions interact with older servers
222+
patch_client_for_unknown_field_retry(self.httpx_client)
223+
216224
self.api = FernLangfuse(
217225
base_url=base_url,
218226
username=self.public_key,
@@ -223,13 +231,19 @@ def _initialize_instance(
223231
httpx_client=self.httpx_client,
224232
timeout=timeout,
225233
)
234+
235+
# Create and patch async client for async API operations
236+
async_httpx_client = httpx.AsyncClient(timeout=timeout)
237+
patch_async_client_for_unknown_field_retry(async_httpx_client)
238+
226239
self.async_api = AsyncFernLangfuse(
227240
base_url=base_url,
228241
username=self.public_key,
229242
password=secret_key,
230243
x_langfuse_sdk_name="python",
231244
x_langfuse_sdk_version=langfuse_version,
232245
x_langfuse_public_key=self.public_key,
246+
httpx_client=async_httpx_client,
233247
timeout=timeout,
234248
)
235249
score_ingestion_client = LangfuseClient(

0 commit comments

Comments
 (0)