-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_client.py
More file actions
162 lines (140 loc) · 4.68 KB
/
http_client.py
File metadata and controls
162 lines (140 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from urllib.parse import quote, urlparse
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import botocore.session
import httpx
class AWSSignV4Auth(httpx.Auth):
def __init__(self, aws_region="eu-west-1") -> None:
self.aws_region = aws_region
def auth_flow(self, request):
request = self.sign_request(request)
yield request
def sign_request(self, request: httpx.Request) -> httpx.Request:
"""Signs an httpx request with AWS Signature Version 4."""
session = botocore.session.get_session()
credentials = session.get_credentials()
aws_request = AWSRequest(
method=request.method.upper(), url=str(request.url), data=request.content
)
region = self.aws_region
service = "execute-api"
# Sign the request
SigV4Auth(credentials, service, region).add_auth(aws_request)
# Update the httpx request headers with the signed headers
request.headers.update(dict(aws_request.headers))
return request
class ApiKeyAuth(httpx.Auth):
def __init__(self, api_key: str) -> None:
self.api_key = api_key
def auth_flow(self, request):
request.headers["x-api-key"] = self.api_key
yield request
class HttpClient:
def __init__(
self,
base_url: str,
auth: httpx.Auth,
user_agent: str = None,
aws_region="eu-west-1",
) -> None:
self.aws_region = aws_region
self.base_url = base_url
self.auth = auth
headers = {"Accept-Encoding": "gzip"}
if user_agent:
headers["User-Agent"] = user_agent
self.client = httpx.Client(headers=headers)
self.timeout = 30
def _replace_path_params(self, url: str, path_params: dict):
if path_params:
for param, value in path_params.items():
if not value:
raise ValueError(
f"Parameter {param} is required, cannot be empty or blank."
)
url = url.replace(f"{{{param}}}", quote(str(value)))
return url
def _normalize_url(self, url: str):
self.base_url = self.base_url.rstrip("/")
parsed_url = urlparse(url)
if not parsed_url.netloc:
return f"{self.base_url}{url}"
return url
def get(
self,
url: str,
path_params: dict = None,
params: dict = None,
headers: dict = None,
):
url = self._replace_path_params(
url=self._normalize_url(url), path_params=path_params
)
response = self.client.get(
url=url,
params=params,
headers=headers,
auth=self.auth,
timeout=self.timeout,
)
response.raise_for_status()
return response
def post(
self, url: str, path_params: dict, params: dict, headers: dict, json: dict
):
url = self._replace_path_params(
url=self._normalize_url(url), path_params=path_params
)
response = self.client.post(
url=url,
params=params,
headers=headers,
json=json,
auth=self.auth,
timeout=self.timeout,
)
response.raise_for_status()
return response
def put(self, url: str, path_params: dict, params: dict, headers: dict, json: dict):
url = self._replace_path_params(
url=self._normalize_url(url), path_params=path_params
)
response = self.client.put(
url=url,
params=params,
headers=headers,
json=json,
auth=self.auth,
timeout=self.timeout,
)
response.raise_for_status()
return response
def patch(
self, url: str, path_params: dict, params: dict, headers: dict, json: dict
):
url = self._replace_path_params(
url=self._normalize_url(url), path_params=path_params
)
response = self.client.patch(
url=url,
params=params,
headers=headers,
json=json,
auth=self.auth,
timeout=self.timeout,
)
response.raise_for_status()
return response
def delete(self, url: str, path_params: dict, params: dict, headers: dict):
url = self._replace_path_params(
url=self._normalize_url(url), path_params=path_params
)
response = self.client.delete(
url=url,
params=params,
headers=headers,
auth=self.auth,
timeout=self.timeout,
)
response.raise_for_status()
return response