-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathapi_client_adapter.py
More file actions
188 lines (171 loc) · 7.29 KB
/
Copy pathapi_client_adapter.py
File metadata and controls
188 lines (171 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from typing import Dict, Optional
from conductor.asyncio_client.adapters.models import GenerateTokenRequest
from conductor.asyncio_client.configuration import Configuration
from conductor.asyncio_client.http import rest
from conductor.asyncio_client.http.api_client import ApiClient
from conductor.asyncio_client.http.api_response import ApiResponse
from conductor.asyncio_client.http.api_response import T as ApiResponseT
from conductor.asyncio_client.http.exceptions import ApiException
logger = logging.getLogger(Configuration.get_logging_formatted_name(__name__))
class ApiClientAdapter(ApiClient):
def __init__(self, *args, **kwargs):
self._token_lock = asyncio.Lock()
super().__init__(*args, **kwargs)
async def call_api(
self,
method,
url,
header_params=None,
body=None,
post_params=None,
_request_timeout=None,
) -> rest.RESTResponse:
"""Makes the HTTP request (synchronous)
:param method: Method to call.
:param url: Path to method endpoint.
:param header_params: Header parameters to be
placed in the request header.
:param body: Request body.
:param post_params dict: Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
:param _request_timeout: timeout setting for this request.
:return: RESTResponse
"""
try:
logger.debug(
"HTTP request method: %s; url: %s; header_params: %s", method, url, header_params
)
response_data = await self.rest_client.request(
method,
url,
headers=header_params,
body=body,
post_params=post_params,
_request_timeout=_request_timeout,
)
if (
response_data.status == 401 # noqa: PLR2004 (Unauthorized status code)
and url != self.configuration.host + "/token"
):
logger.warning(
"HTTP response from: %s; status code: 401 - obtaining new token", url
)
async with self._token_lock:
# The lock is intentionally broad (covers the whole block including the token state)
# to avoid race conditions: without it, other coroutines could mis-evaluate
# token state during a context switch and trigger redundant refreshes
token_expired = (
self.configuration.token_update_time > 0
and time.time()
>= self.configuration.token_update_time
+ self.configuration.auth_token_ttl_sec
)
invalid_token = not self.configuration._http_config.api_key.get("api_key")
if invalid_token or token_expired:
token = await self.refresh_authorization_token()
else:
token = self.configuration._http_config.api_key["api_key"]
header_params["X-Authorization"] = token
response_data = await self.rest_client.request(
method,
url,
headers=header_params,
body=body,
post_params=post_params,
_request_timeout=_request_timeout,
)
except ApiException as e:
logger.error(
"HTTP request failed url: %s status: %s; reason: %s", url, e.status, e.reason
)
raise e
return response_data
def response_deserialize(
self,
response_data: rest.RESTResponse,
response_types_map: Optional[Dict[str, ApiResponseT]] = None,
) -> ApiResponse[ApiResponseT]:
"""Deserializes response into an object.
:param response_data: RESTResponse object to be deserialized.
:param response_types_map: dict of response types.
:return: ApiResponse
"""
msg = "RESTResponse.read() must be called before passing it to response_deserialize()"
assert response_data.data is not None, msg
response_type = response_types_map.get(str(response_data.status), None)
if (
not response_type
and isinstance(response_data.status, int)
and 100 <= response_data.status <= 599 # noqa: PLR2004
):
# if not found, look for '1XX', '2XX', etc.
response_type = response_types_map.get(str(response_data.status)[0] + "XX", None)
# deserialize response data
response_text = None
return_data = None
try:
if response_type == "bytearray":
return_data = response_data.data
elif response_type == "file":
return_data = self.__deserialize_file(response_data)
elif response_type is not None:
match = None
content_type = response_data.getheader("content-type")
if content_type is not None:
match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
encoding = match.group(1) if match else "utf-8"
response_text = response_data.data.decode(encoding)
return_data = self.deserialize(response_text, response_type, content_type)
finally:
if not 200 <= response_data.status <= 299: # noqa: PLR2004
logger.error("Unexpected response status code: %s", response_data.status)
raise ApiException.from_response(
http_resp=response_data,
body=response_text,
data=return_data,
)
return ApiResponse(
status_code=response_data.status,
data=return_data,
headers=response_data.getheaders(),
raw_data=response_data.data,
)
async def refresh_authorization_token(self):
obtain_new_token_response = await self.obtain_new_token()
token = obtain_new_token_response.get("token")
self.configuration._http_config.api_key["api_key"] = token
self.configuration.token_update_time = time.time()
logger.debug("New auth token been set")
return token
async def obtain_new_token(self):
body = GenerateTokenRequest(
key_id=self.configuration.auth_key,
key_secret=self.configuration.auth_secret,
)
_param = self.param_serialize(
method="POST",
resource_path="/token",
body=body.to_dict(),
)
response = await self.call_api(
*_param,
)
await response.read()
return json.loads(response.data)
@classmethod
def get_default(cls):
"""Return new instance of ApiClient.
This method returns newly created, based on default constructor,
object of ApiClient class or returns a copy of default
ApiClient.
:return: The ApiClient object.
"""
if cls._default is None:
cls._default = ApiClientAdapter()
return cls._default