-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexception.py
More file actions
151 lines (119 loc) · 3.62 KB
/
exception.py
File metadata and controls
151 lines (119 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""异常定义"""
from typing import Optional
class AgentRunError(Exception):
"""AgentRun SDK 基础异常类"""
def __init__(
self,
message: str,
**kwargs,
):
"""初始化异常
Args:
message: 错误消息
details: 详细信息
"""
msg = message or ""
if kwargs is not None:
msg += self.kwargs_str(**kwargs)
super().__init__(msg)
self.message = message
self.details = kwargs
@classmethod
def kwargs_str(cls, **kwargs) -> str:
"""获取详细信息字符串
Returns:
str: 详细信息字符串
"""
if not kwargs:
return ""
# return ", ".join([f"{key}={value!s}" for key, value in kwargs.items()])
import json
return json.dumps(
kwargs,
ensure_ascii=False,
default=lambda x: x.__dict__,
)
def details_str(self) -> str:
return self.kwargs_str(**self.details)
def __str__(self) -> str:
return self.message
class HTTPError(AgentRunError):
"""HTTP 异常类"""
def __init__(
self,
status_code: int,
message: str,
request_id: Optional[str] = None,
**kwargs,
):
self.status_code = status_code
self.request_id = request_id
self.details = kwargs
super().__init__(message, **kwargs)
def __str__(self) -> str:
return (
f"HTTP {self.status_code}: {self.message}. Request ID:"
f" {self.request_id}. Details: {self.details_str()}"
)
def to_resource_error(
self, resource_type: str, resource_id: Optional[str] = ""
):
if self.status_code == 404 and (
"does not exist" in self.message or "not found" in self.message
):
return ResourceNotExistError(resource_type, resource_id)
elif (
self.status_code == 400
or self.status_code == 409
or self.status_code == 500
) and ("already exists" in self.message):
# TODO: ModelProxy already exists returns 500
return ResourceAlreadyExistError(resource_type, resource_id)
else:
return self
class ClientError(HTTPError):
"""客户端异常类"""
def __init__(
self,
status_code: int,
message: str,
request_id: Optional[str] = None,
**kwargs,
):
super().__init__(status_code, message, request_id=request_id, **kwargs)
class ServerError(HTTPError):
"""服务端异常类"""
def __init__(
self, status_code: int, message: str, request_id: Optional[str] = None
):
super().__init__(status_code, message, request_id=request_id)
class ResourceNotExistError(AgentRunError):
"""资源不存在异常"""
def __init__(
self,
resource_type: str,
resource_id: Optional[str] = "",
):
super().__init__(
f"Resource {resource_type}({resource_id}) does not exist."
)
class ResourceAlreadyExistError(AgentRunError):
"""资源已存在异常"""
def __init__(
self,
resource_type: str,
resource_id: Optional[str] = "",
):
super().__init__(
f"Resource {resource_type}({resource_id}) already exists."
)
class DeleteResourceError(AgentRunError):
"""删除资源异常"""
def __init__(
self,
message: Optional[str] = None,
):
msg = "Failed to delete resource."
if message:
msg += f" Reason: {message}"
super().__init__(msg)