Skip to content

Commit 71afb4c

Browse files
committed
improve logging parsing
1 parent 89e8657 commit 71afb4c

7 files changed

Lines changed: 240 additions & 8 deletions

File tree

src/mlpa/core/classes.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,34 @@ class PlayIntegrityTokenResponse(BaseModel):
8888
expires_in: int
8989

9090

91-
class AuthorizedChatRequest(ChatRequest):
91+
class AuthorizedRequestLogMixin:
92+
"""Shared structured log fields for authorized requests.
93+
94+
Bound into the loguru contextvar via ``logger.contextualize(**log_fields)``
95+
in the proxy handlers so every log line emitted while serving the request
96+
(including mid-stream errors) carries them as queryable ``record.extra.*``
97+
fields, rather than concatenated into the message string.
98+
"""
99+
100+
user: str
101+
service_type: str
102+
purpose: str
103+
104+
@property
105+
def log_fields(self) -> dict[str, str]:
106+
# `model` only exists on chat requests, not search requests.
107+
fields = {
108+
"user": self.user,
109+
"service_type": self.service_type,
110+
"purpose": self.purpose or "-",
111+
}
112+
model = getattr(self, "model", None)
113+
if model:
114+
fields["model"] = model
115+
return fields
116+
117+
118+
class AuthorizedChatRequest(ChatRequest, AuthorizedRequestLogMixin):
92119
user: str
93120
service_type: str
94121
purpose: str = (
@@ -101,7 +128,7 @@ class SearchRequest(BaseModel):
101128
max_results: int = Field(ge=1, le=10)
102129

103130

104-
class AuthorizedSearchRequest(SearchRequest):
131+
class AuthorizedSearchRequest(SearchRequest, AuthorizedRequestLogMixin):
105132
user: str
106133
service_type: str
107134
purpose: str = (

src/mlpa/core/completions.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,26 @@ async def get_or_create_user_for_completion(
7171

7272
async def stream_completion(
7373
authorized_chat_request: AuthorizedChatRequest, request: Request
74+
):
75+
"""Bind request log fields onto the loguru contextvar, then stream.
76+
77+
The contextvar must be held *inside* the generator (not the route handler)
78+
so it stays active while Starlette iterates the SSE body — otherwise
79+
mid-stream errors would lose the fields (the streaming blind spot).
80+
"""
81+
with logger.contextualize(**authorized_chat_request.log_fields):
82+
gen = _stream_completion(authorized_chat_request, request)
83+
try:
84+
async for chunk in gen:
85+
yield chunk
86+
finally:
87+
# Forward close/GeneratorExit into the inner generator so its
88+
# client-disconnect handling runs while the fields are still bound.
89+
await gen.aclose()
90+
91+
92+
async def _stream_completion(
93+
authorized_chat_request: AuthorizedChatRequest, request: Request
7494
):
7595
"""
7696
Proxies a streaming request to LiteLLM.
@@ -297,6 +317,12 @@ async def _read_next_chunk(
297317

298318

299319
async def get_completion(authorized_chat_request: AuthorizedChatRequest):
320+
"""Bind request log fields onto the loguru contextvar, then proxy."""
321+
with logger.contextualize(**authorized_chat_request.log_fields):
322+
return await _get_completion(authorized_chat_request)
323+
324+
325+
async def _get_completion(authorized_chat_request: AuthorizedChatRequest):
300326
"""
301327
Proxies a non-streaming request to LiteLLM.
302328
"""

src/mlpa/core/logger.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,13 @@ async def _wrapper(self, *args, **kwargs):
129129
)
130130
try:
131131
response = await original(self, *args, **kwargs)
132-
except Exception:
133-
logger.error(f"HTTPX {method_name.upper()=} request failed for {url=}")
132+
except Exception as exc:
133+
# Include the exception type + repr: transport failures often
134+
# have an empty str(), so the bare URL alone was undiagnosable.
135+
logger.error(
136+
f"HTTPX {method_name.upper()=} request failed for {url=}: "
137+
f"{type(exc).__name__}: {exc!r}"
138+
)
134139
raise
135140
logger.debug(
136141
f"HTTPX {method_name.upper()} response <- {url=} {response.status_code=}",

src/mlpa/core/search.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818

1919

2020
async def get_search(authorized_search_request: AuthorizedSearchRequest):
21+
"""Bind request log fields onto the loguru contextvar, then proxy."""
22+
with logger.contextualize(**authorized_search_request.log_fields):
23+
return await _get_search(authorized_search_request)
24+
25+
26+
async def _get_search(authorized_search_request: AuthorizedSearchRequest):
2127
start_time = time.perf_counter()
2228
body = sanitize_request_body(
2329
authorized_search_request.model_dump(exclude_none=True)

src/mlpa/core/utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ def raise_and_log(
245245
HTTPException with the chosen status code and a sanitized error message.
246246
If the upstream error body contains a nested error message, it is extracted
247247
so clients receive the actual upstream detail in debug mode. (dev environment only)
248+
249+
Request-identifying fields (user / service_type / model / purpose) are not
250+
passed here; they are bound on the loguru contextvar by the proxy handler
251+
(``logger.contextualize(**req.log_fields)``) so they ride along in
252+
``record.extra`` automatically.
248253
"""
249254
response = getattr(e, "response", None)
250255
error_text = response.text if response is not None else ""
@@ -264,7 +269,16 @@ def raise_and_log(
264269
except (json.JSONDecodeError, AttributeError, TypeError):
265270
pass
266271
status_code = response_code or getattr(response, "status_code", None) or 500
267-
logger.error(f"{response_text_prefix or GENERIC_UPSTREAM_ERROR}: {detail_text}")
272+
# Transport errors (httpx ConnectError / RemoteProtocolError / ReadError /
273+
# timeouts) carry no `.response` and frequently stringify to "", which is
274+
# why these logs used to read "Failed to proxy request: " with no detail.
275+
# Fall back to the exception class name + repr, and attach the traceback
276+
# (logger.opt(exception=...)) so jsonPayload.record.exception is populated.
277+
exc_type = type(e).__name__
278+
logged_detail = detail_text or repr(e)
279+
logger.opt(exception=e).error(
280+
f"{response_text_prefix or GENERIC_UPSTREAM_ERROR}: {exc_type}: {logged_detail}"
281+
)
268282
if stream:
269283
error_msg = detail_text if env.MLPA_DEBUG else GENERIC_UPSTREAM_ERROR
270284
payload = {"code": status_code, "error": error_msg}

src/tests/unit/test_completions.py

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,36 @@
2828
LITELLM_HEADER_RESPONSE_DURATION_MS,
2929
env,
3030
)
31+
from mlpa.core.logger import logger as loguru_logger
3132
from mlpa.core.prometheus_metrics import PrometheusRejectionReason, PrometheusResult
3233
from tests.consts import SAMPLE_REQUEST, SUCCESSFUL_CHAT_RESPONSE
3334

3435

36+
@contextlib.contextmanager
37+
def _capture_logs():
38+
"""Capture raw loguru records emitted within the block.
39+
40+
Each captured item is a loguru ``Message`` whose ``.record`` dict exposes
41+
``message`` / ``level`` / ``exception`` / ``extra`` — lets tests assert on
42+
log content, attached tracebacks, and contextvar-bound fields.
43+
"""
44+
records = []
45+
sink_id = loguru_logger.add(records.append, level="DEBUG", format="{message}")
46+
try:
47+
yield records
48+
finally:
49+
loguru_logger.remove(sink_id)
50+
51+
52+
def _proxy_error_records(records):
53+
return [
54+
item.record
55+
for item in records
56+
if item.record["level"].name == "ERROR"
57+
and "Failed to proxy request" in item.record["message"]
58+
]
59+
60+
3561
def _latency_count(spy, result: PrometheusResult, req=SAMPLE_REQUEST) -> float:
3662
return spy.histogram_count(
3763
"chat_completion_latency",
@@ -940,7 +966,7 @@ async def test_stream_completion_400_non_rate_limit_error(
940966
received_chunks[0]
941967
== b'data: {"code": 400, "error": "Upstream service returned an error"}\n\n'
942968
)
943-
mock_logger.error.assert_called_once()
969+
mock_logger.opt.return_value.error.assert_called_once()
944970
metrics_spy.assert_only({"chat_completion_latency"})
945971
assert _latency_count(metrics_spy, PrometheusResult.ERROR) == 1
946972

@@ -970,7 +996,7 @@ async def test_stream_completion_429_non_rate_limit_error(
970996
received_chunks[0]
971997
== b'data: {"code": 429, "error": "Upstream service returned an error"}\n\n'
972998
)
973-
mock_logger.error.assert_called_once()
999+
mock_logger.opt.return_value.error.assert_called_once()
9741000
metrics_spy.assert_only({"chat_completion_latency"})
9751001
assert _latency_count(metrics_spy, PrometheusResult.ERROR) == 1
9761002

@@ -1022,7 +1048,7 @@ async def test_stream_completion_429_invalid_json(
10221048
received_chunks[0]
10231049
== b'data: {"code": 429, "error": "Upstream service returned an error"}\n\n'
10241050
)
1025-
mock_logger.error.assert_called_once()
1051+
mock_logger.opt.return_value.error.assert_called_once()
10261052
metrics_spy.assert_only({"chat_completion_latency"})
10271053
assert _latency_count(metrics_spy, PrometheusResult.ERROR) == 1
10281054

@@ -1373,3 +1399,73 @@ async def test_get_completion_sanitizes_response_surrogates(mocker):
13731399
assert "\ud83e" not in data["choices"][0]["message"]["content"]
13741400
assert data["choices"][0]["message"]["content"].startswith("done ")
13751401
_httpx_encode_json(data) # must not raise
1402+
1403+
1404+
async def test_get_completion_empty_message_transport_error_is_diagnosable(mocker):
1405+
"""Regression for the prod 502s that logged a bare ``Failed to proxy request:``.
1406+
1407+
A transport error with no ``.response`` and an empty ``str()`` (e.g.
1408+
``RemoteProtocolError("")``) must still produce a diagnosable ERROR line:
1409+
the exception type + repr in the message, the traceback attached, and the
1410+
request-identifying fields bound via ``contextualize(**log_fields)``.
1411+
"""
1412+
mock_client = AsyncMock()
1413+
mock_client.post.side_effect = httpx.RemoteProtocolError("")
1414+
mocker.patch("mlpa.core.completions.get_http_client", return_value=mock_client)
1415+
mocker.patch.object(env, "MLPA_DEBUG", False)
1416+
1417+
with _capture_logs() as records:
1418+
with pytest.raises(HTTPException) as exc_info:
1419+
await get_completion(SAMPLE_REQUEST)
1420+
1421+
assert exc_info.value.status_code == 502
1422+
1423+
proxy_errors = _proxy_error_records(records)
1424+
assert len(proxy_errors) == 1
1425+
rec = proxy_errors[0]
1426+
# Exception type is named, and the message is NOT the old blank form.
1427+
assert "RemoteProtocolError" in rec["message"]
1428+
assert not rec["message"].rstrip().endswith("Failed to proxy request:")
1429+
# Traceback attached via logger.opt(exception=e).
1430+
assert rec["exception"] is not None
1431+
assert rec["exception"].type is httpx.RemoteProtocolError
1432+
# Request fields bound on the record (queryable as record.extra.*).
1433+
assert rec["extra"]["user"] == SAMPLE_REQUEST.user
1434+
assert rec["extra"]["model"] == SAMPLE_REQUEST.model
1435+
assert rec["extra"]["service_type"] == SAMPLE_REQUEST.service_type
1436+
1437+
1438+
async def test_stream_mid_stream_error_binds_request_fields(
1439+
mocker, mock_request, metrics_spy
1440+
):
1441+
"""Streaming blind-spot regression.
1442+
1443+
An error raised mid-SSE-stream (after MLPA already returned 200) must still
1444+
log with the request fields bound — proving the ``contextualize`` scope set
1445+
inside ``stream_completion`` survives generator iteration, unlike the
1446+
middleware scope which has already exited by the time the body iterates.
1447+
"""
1448+
role_chunk = (
1449+
b'data: {"choices":[{"delta":{"role":"assistant","content":null}}]}\n\n'
1450+
)
1451+
1452+
async def _failing_aiter_bytes():
1453+
yield role_chunk
1454+
raise httpx.RemoteProtocolError("")
1455+
1456+
_patch_mock_stream_client(mocker, _failing_aiter_bytes)
1457+
mocker.patch.object(env, "MLPA_DEBUG", False)
1458+
1459+
with _capture_logs() as records:
1460+
received = [c async for c in stream_completion(SAMPLE_REQUEST, mock_request)]
1461+
1462+
assert any(b'"error"' in chunk for chunk in received)
1463+
1464+
proxy_errors = _proxy_error_records(records)
1465+
assert len(proxy_errors) == 1
1466+
rec = proxy_errors[0]
1467+
assert "RemoteProtocolError" in rec["message"]
1468+
assert rec["exception"] is not None
1469+
assert rec["extra"]["user"] == SAMPLE_REQUEST.user
1470+
assert rec["extra"]["model"] == SAMPLE_REQUEST.model
1471+
assert rec["extra"]["service_type"] == SAMPLE_REQUEST.service_type

src/tests/unit/test_logger.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import contextlib
2+
3+
import httpx
4+
import pytest
5+
from loguru import logger as loguru_logger
6+
7+
from mlpa.core.config import env
8+
from mlpa.core.logger import _enable_httpx_logging
9+
10+
11+
@contextlib.contextmanager
12+
def _capture_logs():
13+
records = []
14+
sink_id = loguru_logger.add(records.append, level="DEBUG", format="{message}")
15+
try:
16+
yield records
17+
finally:
18+
loguru_logger.remove(sink_id)
19+
20+
21+
async def test_httpx_wrapper_logs_exc_type_on_transport_failure(mocker):
22+
"""The HTTPX logging wrapper must name the exception type + repr on failure.
23+
24+
Transport errors often stringify to ``""``, so the bare URL alone (the old
25+
log) was undiagnosable. This is the first line of the 502 "triple".
26+
"""
27+
mocker.patch.object(env, "HTTPX_LOGGING", True)
28+
29+
# Save and restore the real httpx methods so the global patch never leaks
30+
# into other tests, regardless of whether logging was already enabled.
31+
before_get = httpx.AsyncClient.get
32+
before_post = httpx.AsyncClient.post
33+
try:
34+
_enable_httpx_logging()
35+
36+
def _raise(request: httpx.Request) -> httpx.Response:
37+
raise httpx.ConnectError("", request=request)
38+
39+
transport = httpx.MockTransport(_raise)
40+
url = "http://litellm:8000/v1/chat/completions"
41+
with _capture_logs() as records:
42+
async with httpx.AsyncClient(transport=transport) as client:
43+
with pytest.raises(httpx.ConnectError):
44+
await client.post(url)
45+
finally:
46+
httpx.AsyncClient.get = before_get
47+
httpx.AsyncClient.post = before_post
48+
49+
failures = [
50+
item.record["message"]
51+
for item in records
52+
if item.record["level"].name == "ERROR"
53+
and "request failed" in item.record["message"]
54+
]
55+
assert len(failures) == 1
56+
msg = failures[0]
57+
assert "ConnectError" in msg
58+
assert url in msg

0 commit comments

Comments
 (0)