forked from koalalorenzo/python-digitalocean
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbaseapi.py
More file actions
230 lines (180 loc) Β· 7.26 KB
/
Copy pathbaseapi.py
File metadata and controls
230 lines (180 loc) Β· 7.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# -*- coding: utf-8 -*-
import os
import json
import logging
import requests
from . import __name__, __version__
try:
import urlparse
except ImportError:
from urllib import parse as urlparse
GET = 'GET'
POST = 'POST'
DELETE = 'DELETE'
PUT = 'PUT'
REQUEST_TIMEOUT_ENV_VAR = 'PYTHON_DIGITALOCEAN_REQUEST_TIMEOUT_SEC'
class Error(Exception):
"""Base exception class for this module"""
pass
class TokenError(Error):
pass
class DataReadError(Error):
pass
class JSONReadError(Error):
pass
class NotFoundError(Error):
pass
class EndPointError(Error):
pass
class BaseAPI(object):
"""
Basic api class for
"""
token = ""
end_point = "https://api.digitalocean.com/v2/"
def __init__(self, *args, **kwargs):
self.token = os.getenv("DIGITALOCEAN_ACCESS_TOKEN", "")
self.end_point = os.getenv("DIGITALOCEAN_END_POINT", "https://api.digitalocean.com/v2/")
self._log = logging.getLogger(__name__)
self._session = requests.Session()
for attr in kwargs.keys():
setattr(self, attr, kwargs[attr])
parsed_url = urlparse.urlparse(self.end_point)
if not parsed_url.scheme or not parsed_url.netloc:
raise EndPointError("Provided end point is not a valid URL. Please use a valid URL")
if not parsed_url.path:
self.end_point += '/'
def __getstate__(self):
state = self.__dict__.copy()
# The logger is not pickleable due to using thread.lock
del state['_log']
return state
def __setstate__(self, state):
self.__dict__ = state
self._log = logging.getLogger(__name__)
def __perform_request(self, url, type=GET, params=None):
"""
This method will perform the real request,
in this way we can customize only the "output" of the API call by
using self.__call_api method.
This method will return the request object.
"""
if params is None:
params = {}
if not self.token:
raise TokenError("No token provided. Please use a valid token")
url = urlparse.urljoin(self.end_point, url)
# lookup table to find out the appropriate requests method,
# headers and payload type (json or query parameters)
identity = lambda x: x
json_dumps = lambda x: json.dumps(x)
lookup = {
GET: (self._session.get, {'Content-type': 'application/json'}, 'params', identity),
POST: (self._session.post, {'Content-type': 'application/json'}, 'data',
json_dumps),
PUT: (self._session.put, {'Content-type': 'application/json'}, 'data',
json_dumps),
DELETE: (self._session.delete,
{'content-type': 'application/json'},
'data', json_dumps),
}
requests_method, headers, payload, transform = lookup[type]
agent = "{0}/{1} {2}/{3}".format('python-digitalocean',
__version__,
requests.__name__,
requests.__version__)
headers.update({'Authorization': 'Bearer ' + self.token,
'User-Agent': agent})
kwargs = {'headers': headers, payload: transform(params)}
timeout = self.get_timeout()
if timeout:
kwargs['timeout'] = timeout
# remove token from log
headers_str = str(headers).replace(self.token.strip(), 'TOKEN')
self._log.debug('%s %s %s:%s %s %s' %
(type, url, payload, params, headers_str, timeout))
return requests_method(url, **kwargs)
def __deal_with_pagination(self, url, method, params, data):
"""
Perform multiple calls in order to have a full list of elements
when the API are "paginated". (content list is divided in more
than one page)
"""
all_data = data
while data.get("links", {}).get("pages", {}).get("next"):
url, query = data["links"]["pages"]["next"].split("?", 1)
# Merge the query parameters
for key, value in urlparse.parse_qs(query).items():
params[key] = value
data = self.__perform_request(url, method, params).json()
# Merge the dictionaries
for key, value in data.items():
if isinstance(value, list) and key in all_data:
all_data[key] += value
else:
all_data[key] = value
return all_data
def __init_ratelimit(self, headers):
# Add the account requests/hour limit
self.ratelimit_limit = headers.get('Ratelimit-Limit', None)
# Add the account requests remaining
self.ratelimit_remaining = headers.get('Ratelimit-Remaining', None)
# Add the account requests limit reset time
self.ratelimit_reset = headers.get('Ratelimit-Reset', None)
def get_timeout(self):
"""
Checks if any timeout for the requests to DigitalOcean is required.
To set a timeout, use the REQUEST_TIMEOUT_ENV_VAR environment
variable.
"""
timeout_str = os.environ.get(REQUEST_TIMEOUT_ENV_VAR)
if timeout_str:
try:
return float(timeout_str)
except:
self._log.error('Failed parsing the request read timeout of '
'"%s". Please use a valid float number!' %
timeout_str)
return None
def get_data(self, url, type=GET, params=None):
"""
This method is a basic implementation of __call_api that checks
errors too. In case of success the method will return True or the
content of the response to the request.
Pagination is automatically detected and handled accordingly
"""
if params is None:
params = dict()
# If per_page is not set, make sure it has a sane default
if type is GET:
params.setdefault("per_page", 200)
req = self.__perform_request(url, type, params)
if req.status_code in (202, 204):
return True
if req.status_code == 404:
raise NotFoundError()
try:
data = req.json()
except ValueError as e:
raise JSONReadError(
'Read failed from DigitalOcean: %s' % str(e)
)
if not req.ok:
msg = [data[m] for m in ("id", "message") if m in data][1]
raise DataReadError(msg)
# init request limits
self.__init_ratelimit(req.headers)
# If there are more elements available (total) than the elements per
# page, try to deal with pagination. Note: Breaking the logic on
# multiple pages,
pages = data.get("links", {}).get("pages", {})
if pages.get("next") and "page" not in params:
return self.__deal_with_pagination(url, type, params, data)
else:
return data
def __str__(self):
return "<%s>" % self.__class__.__name__
def __unicode__(self):
return u"%s" % self.__str__()
def __repr__(self):
return str(self)