-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathhttp_client.py
More file actions
83 lines (65 loc) · 3.01 KB
/
Copy pathhttp_client.py
File metadata and controls
83 lines (65 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
import logging
import typing
from typing import Dict, Type, TypeVar, Any, Generator
import httpx
import pydantic
from httpx import URL, Response
from pydantic import ValidationError
from cozeloop.internal import consts
from cozeloop.internal.httpclient.base_model import BaseResponse
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseResponse)
class HTTPClient:
def __init__(self):
self.sync_client = httpx.Client()
self.async_client = httpx.AsyncClient()
def request(self, method: str, url: URL | str, **kwargs: Any) -> Response:
return self.sync_client.request(method, url, **kwargs)
def stream(self, method: str, url: URL | str, **kwargs: Any):
"""Return synchronous stream context manager"""
return self.sync_client.stream(method, url, **kwargs)
async def arequest(self, method: str, url: URL | str, **kwargs: Any) -> Response:
return await self.async_client.request(method, url, **kwargs)
def astream(self, method: str, url: URL | str, **kwargs: Any):
"""Return asynchronous stream context manager"""
return self.async_client.stream(method, url, **kwargs)
def _check_oauth_error(body: Dict, http_code: int, log_id: str) -> None:
if http_code != 200 and "error_code" in body and "error_message" in body and "error" in body:
auth_error = consts.AuthErrorFormat(**body)
if auth_error.error_code:
logger.error(f"OAuth error, {auth_error}")
raise consts.AuthError(auth_error, http_code, log_id)
def parse_response(url: str, response: httpx.Response, response_model: Type[T]) -> T:
log_id = response.headers.get(consts.LOG_ID_HEADER, None)
http_code = response.status_code
try:
data = response.json()
_check_oauth_error(data, http_code, log_id)
except consts.AuthError as e:
raise e
except Exception as e:
logger.error(f"Failed to parse response. Path: {url}, http code: {http_code}, log id: {log_id}, error: {e}.")
raise consts.RemoteServiceError(http_code, -1, "", log_id) from e
code = data.get("code")
msg = data.get("msg")
if code and code != 0:
e = consts.RemoteServiceError(http_code=http_code, error_code=code, error_message=msg, log_id=log_id)
logger.error(
f"Call remote service failed. Path: {url}, {e}, log id: {log_id}.")
raise e
try:
res = None
if data is not None:
if pydantic.VERSION.startswith('1'):
res = response_model.parse_obj(data)
else:
res = response_model.model_validate(data)
else:
res = response_model()
except ValidationError as e:
logger.error(f"Failed to parse response. Path: {url}, http code: {http_code}, log id: {log_id}, error: {e}.")
raise consts.InternalError from e
logger.debug(f"Call remote service success. Path: {url}, response: {res}, log id: {log_id}")
return res