forked from stapi-spec/pystapi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstapi_api_io.py
More file actions
224 lines (184 loc) · 8.16 KB
/
stapi_api_io.py
File metadata and controls
224 lines (184 loc) · 8.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import json
import logging
import urllib
import urllib.parse
from collections.abc import Callable, Iterator
from typing import Any
import httpx
from httpx import Client as Session
from httpx import Request
from httpx._types import TimeoutTypes
from pydantic import AnyUrl
from stapi_pydantic import Link
from .exceptions import APIError
logger = logging.getLogger(__name__)
class StapiIO:
def __init__(
self,
root_url: AnyUrl,
headers: dict[str, str] | None = None,
parameters: dict[str, Any] | None = None,
request_modifier: Callable[[Request], Request] | None = None,
timeout: TimeoutTypes | None = None,
max_retries: int | None = 5,
) -> None:
"""Initialize class for API IO.
Args:
root_url: The root URL of the STAPI API
headers: Optional dictionary of headers to include in all requests
parameters: Optional dictionary of query string parameters to
include in all requests
request_modifier: Optional callable that can be used to modify Request
objects before they are sent. If provided, the callable receives a
`httpx.Request` and must either modify the object directly or return
a new / modified request instance
timeout: Optional timeout configuration. Can be:
- None to disable timeouts
- float for a default timeout
- tuple of (connect, read, write, pool) timeouts, each being float or None
- httpx.Timeout instance for fine-grained control
See `httpx timeouts <https://www.python-httpx.org/advanced/timeouts/>`__
for details
max_retries: Optional number of times to retry requests. Set to ``None`` to
disable retries. Defaults to 5
"""
self.root_url = root_url
transport = None
if max_retries is not None:
transport = httpx.HTTPTransport(retries=max_retries)
self.session = Session(transport=transport, timeout=timeout)
self.update(
headers=headers,
parameters=parameters,
request_modifier=request_modifier,
)
def update(
self,
headers: dict[str, str] | None = None,
parameters: dict[str, Any] | None = None,
request_modifier: Callable[[Request], Request] | None = None,
) -> None:
"""Updates this Stapi's headers, parameters, and/or request_modifier.
Args:
headers: Optional dictionary of headers to include in all requests
parameters: Optional dictionary of query string parameters to
include in all requests
request_modifier: Optional callable that can be used to modify Request
objects before they are sent. If provided, the callable receives a
`httpx.Request` and must either modify the object directly or return
a new / modified request instance
"""
self.session.headers.update(headers or {})
self.session.params.merge(parameters or {})
self._req_modifier = request_modifier
def _read_text(
self,
href: str,
method: str = "GET",
headers: dict[str, str] | None = None,
parameters: dict[str, Any] | None = None,
) -> str:
"""Read text from the given URI.
Args:
href: The URL to read from
method: The HTTP method to use. Defaults to "GET"
headers: Optional dictionary of additional headers to include in the request
parameters: Optional dictionary of parameters to include in the request.
For GET requests, these are added as query parameters.
For POST requests, these are sent as JSON in the request body
Returns:
str: The response text from the server
"""
return self.request(href, method=method, headers=headers, parameters=parameters if parameters else None)
def request(
self,
href: str,
method: str,
headers: dict[str, str] | None = None,
parameters: dict[str, Any] | None = None,
) -> str:
"""Makes a request to an http endpoint
Args:
href: The request URL
method: The http method to use, 'GET' or 'POST'. Defaults to None, which will result in 'GET' being used.
headers: Additional headers to include in request. Defaults to None.
parameters: Optional dictionary of parameters to include in the request.
For GET requests, these are added as query parameters.
For POST requests, these are sent as JSON in the request body.
Raises:
APIError: raised if the server returns an error response
Returns:
The decoded response text from the endpoint
"""
if method == "POST":
request = Request(method=method, url=href, headers=headers, json=parameters)
else:
request = Request(method=method, url=href, headers=headers, params=parameters if parameters else None)
modified = self._req_modifier(request) if self._req_modifier else request
# Log the request details
# NOTE can we mask header values?
msg = f"{modified.method} {modified.url} Headers: {modified.headers}"
if method == "POST" and hasattr(modified, "json"):
msg += f" Payload: {json.dumps(modified.json)}"
logger.debug(msg)
try:
resp = self.session.send(modified)
except Exception as err:
logger.debug(err)
raise APIError(f"Error sending request: {err=}")
# NOTE what about other successful status codes?
if resp.status_code != 200:
raise APIError.from_response(resp)
try:
return resp.text
except Exception as err:
raise APIError(str(err))
def read_json(self, endpoint: str, method: str = "GET", parameters: dict[str, Any] | None = None) -> dict[str, Any]:
"""Read JSON from a URL.
Args:
endpoint: The URL to read from
method: The HTTP method to use. Defaults to "GET"
parameters: Optional dictionary of parameters to include in the request.
For GET requests, these are added as query parameters.
For POST requests, these are sent as JSON in the request body
Returns:
The parsed JSON response
"""
href = urllib.parse.urljoin(str(self.root_url), endpoint)
if method == "POST" and parameters is None:
parameters = {}
text = self._read_text(href, method=method, parameters=parameters)
return json.loads(text) # type: ignore[no-any-return]
def _get_next_page(self, link: Link, lookup_key: str) -> tuple[dict[str, Any] | None, Link | None]:
page = self.read_json(str(link.href), method=link.method or "GET", parameters=link.body)
next_link = next((link for link in page.get("links", []) if link["rel"] == "next"), None)
if next_link is not None:
next_link = Link.model_validate(next_link)
if page.get(lookup_key):
return page, next_link
return None, None
def get_pages(
self,
link: Link,
lookup_key: str | None = None,
) -> Iterator[dict[str, Any]]:
"""Iterator that yields dictionaries for each page at a STAPI paging
endpoint.
Args:
link: The link to read from
lookup_key: The key in the response JSON that contains the iterable data.
# TODO update endpoint examples
Returns:
Iterator that yields dictionaries for each page
"""
if not lookup_key:
lookup_key = "features"
first_page, next_link = self._get_next_page(link, lookup_key)
if first_page is None:
return None
yield first_page
while next_link:
next_page, next_link = self._get_next_page(next_link, lookup_key)
if next_page is None:
return None
yield next_page