-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
294 lines (233 loc) · 11.7 KB
/
Copy pathclient.py
File metadata and controls
294 lines (233 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""
TINDClient — the main entry point for interacting with the TIND DA API.
"""
import json
import logging
import os
import re
from datetime import datetime, timezone
from io import StringIO
from pathlib import Path
from typing import Any, Iterator
import xml.etree.ElementTree as E
from pymarc import Record
from pymarc.marcxml import parse_xml_to_array
from .api import tind_get, tind_download
from .errors import RecordNotFoundError, TINDError
logger = logging.getLogger(__name__)
NS = "http://www.loc.gov/MARC21/slim"
E.register_namespace("", NS)
# remove namespace that ElementTree adds to records when passed
_NS_DECL: str = f' xmlns="{NS}"'
class TINDClient:
"""Client for interacting with a TIND DA instance.
:param str api_key: Your TIND API token.
:param str api_url: Base URL of the TIND instance, e.g. ``https://tind.example.edu``.
:param str default_storage_dir: Default directory used by :meth:`fetch_file`
when no ``output_dir`` is supplied.
"""
def __init__(
self,
api_key: str = "",
api_url: str = "",
default_storage_dir: str = "./tmp",
) -> None:
self.api_key = api_key or os.environ.get("TIND_API_KEY", "")
self.api_url = api_url or os.environ.get("TIND_API_URL", "")
self.default_storage_dir = default_storage_dir
def fetch_metadata(self, record: str) -> Record:
"""Fetch the MARC XML metadata for a given record.
:param str record: The record ID for which to fetch metadata.
:raises AuthorizationError: When the TIND API key is invalid.
:raises RecordNotFoundError: When the record ID is invalid or not found.
:returns Record: A PyMARC MARC record of the requested record.
"""
status, response = tind_get(
f"record/{record}/",
api_key=self.api_key,
api_url=self.api_url,
params={"of": "xm"},
)
if status == 404 or len(response.strip()) == 0:
raise RecordNotFoundError(f"Record {record} not found in TIND.")
records: list[Record] = parse_xml_to_array(StringIO(response))
# When the record does not match any records, we may receive a zero-length array of
# records. Additionally, if the XML is malformed, the parser function may return
# multiple records. We need to ensure that exactly one record is parsed.
if len(records) != 1:
raise RecordNotFoundError(f"Record {record} did not match exactly one record in TIND.")
return records[0]
def fetch_file(
self, file_url: str, output_dir: str = "", meta_mtime: datetime | None = None
) -> str:
"""Download a file from TIND and save it locally.
If the file already exists in the output directory and was modified at or after a supplied
``meta_mtime`` timestamp, the file will not be re-downloaded.
:param str file_url: The TIND file download URL.
:param str output_dir: Directory in which to save the file.
Falls back to ``default_storage_dir`` when empty.
:param datetime meta_mtime: Optional modified timestamp from the file metadata returned by
TIND. If not specified, the file will always be downloaded.
:raises AuthorizationError: When the TIND API key is invalid or the file is restricted.
:raises ValueError: When ``file_url`` is not a valid TIND file download URL.
:raises RecordNotFoundError: When the file is invalid or not found.
:returns str: The full path to the locally saved file.
"""
if not re.match(r"^http.*/download(/)?(\?version=\d+)?$", file_url):
raise ValueError("URL is not a valid TIND file download URL.")
output_target = output_dir or self.default_storage_dir
expected_filename = file_url.split("?")[0].rstrip("/").split("/")[-2]
expected_path = Path(output_target) / expected_filename
if meta_mtime and expected_path.exists():
local_mtime = datetime.fromtimestamp(expected_path.stat().st_mtime, tz=timezone.utc)
if local_mtime >= meta_mtime:
logger.debug("Cached file at (%s) is newer; skipping download.", expected_path)
return str(expected_path)
(status, saved_to) = tind_download(file_url, output_dir=output_target, api_key=self.api_key)
if status != 200 or not saved_to:
raise RecordNotFoundError("Referenced file could not be downloaded.")
return saved_to
def fetch_file_metadata(self, record: str) -> list[dict[str, Any]]:
"""Fetch file metadata for a given TIND record.
:param str record: The record ID in TIND to fetch file metadata for.
:raises AuthorizationError: When the TIND API key is invalid.
:raises TINDError: For any response other than 200.
:returns list: A list of file metadata dicts for the given record.
"""
status, files = tind_get(
f"record/{record}/files", api_key=self.api_key, api_url=self.api_url
)
if status != 200:
raise TINDError.from_json(status, files)
return json.loads(files) # type: ignore[no-any-return]
def fetch_ids_search(self, query: str) -> list[str]:
"""Return a list of TIND record IDs matching a search query.
:param str query: The query string to search for in TIND.
:returns list[str]: A list of TIND record IDs.
"""
status, rec_ids = tind_get(
"search", api_key=self.api_key, api_url=self.api_url, params={"p": query}
)
if status != 200:
raise TINDError.from_json(status, rec_ids)
j = json.loads(rec_ids)
hits = j.get("hits", []) if isinstance(j, dict) else []
if not isinstance(hits, list):
return []
return [str(item) for item in hits]
def fetch_marc_by_ids(self, ids: list[str]) -> list[Record]:
"""Fetch MARC records for a list of TIND record IDs.
:param list[str] ids: The TIND record IDs to fetch.
:returns list[Record]: A list of PyMARC records.
"""
return [self.fetch_metadata(item) for item in ids]
def fetch_search_metadata(self, query: str) -> list[Record]:
"""Return PyMARC records matching a search query.
:param str query: The TIND search query.
:returns list[Record]: A list of PyMARC records that match the given query.
"""
ids = self.fetch_ids_search(query)
return self.fetch_marc_by_ids(ids)
def search(self, query: str, result_format: str = "xml") -> list[Any]:
"""Search TIND and return results as XML strings or PyMARC records.
:param str query: A TIND search string.
:param str result_format: ``'xml'`` for XML strings, ``'pymarc'`` for PyMARC records.
:raises ValueError: When ``result_format`` is neither ``'xml'`` nor ``'pymarc'``.
:returns list: Records as XML strings or PyMARC Record objects.
"""
if result_format not in ("xml", "pymarc"):
raise ValueError(
f"Unexpected result format: {result_format} is neither 'xml' nor 'pymarc'"
)
recs: list[Any] = []
search_id = None
while True:
response = self._search_request(query, search_id=search_id)
xml, search_id = self._retrieve_xml_search_id(response)
collection = xml.find("{http://www.loc.gov/MARC21/slim}collection")
records = list(collection) if collection is not None else []
if result_format == "pymarc":
recs = recs + parse_xml_to_array(StringIO(response))
else:
for record in records:
recs.append(E.tostring(record, encoding="unicode"))
if not search_id or not records:
break
return recs
def write_search_results_to_file(
self, query: str = "", output_file_name: str = "tind.xml", output_dir: str = ""
) -> int:
"""Search TIND and stream results to an XML file.
:param str query: A TIND search query string.
:param str output_file_name: filename for the output XML file.
:param str output_dir: Directory in which to save the file.
Falls back to ``default_storage_dir`` when empty.
:returns int: The number of records written to the file.
"""
total_hits = len(self.fetch_ids_search(query))
if total_hits == 0:
return 0
recs_written = 0
output_path = Path(output_dir or self.default_storage_dir) / output_file_name
try:
with output_path.open("w", encoding="utf-8") as f:
f.write(f'<?xml version="1.0" encoding="UTF-8"?>\n<collection xmlns="{NS}">\n')
for record in self._iter_xml_records(query):
record_xml = E.tostring(record, encoding="unicode")
f.write(record_xml.replace(_NS_DECL, ""))
f.write("\n")
recs_written += 1
if recs_written == 0:
# We expected records but didn't receive any through pagination
raise TINDError(f"Matched {total_hits} tind ids, but API did not return any.")
f.write("</collection>\n")
except Exception:
output_path.unlink(missing_ok=True)
raise
if recs_written != total_hits:
raise TINDError(f"Expected {total_hits} records, but wrote {recs_written} to file.")
return recs_written
def _iter_xml_records(self, query: str) -> Iterator[E.Element]:
"""Yield every ``<record>`` element from all pages of a search.
Issues the initial search request, then yields records one at a time,
and continues to issue paginated search requests until all records have been yielded.
:param str query: A TIND search query string.
:yields: An iterator of XML elements representing the search results.
"""
search_id: str = ""
while True:
response = self._search_request(query, search_id=search_id)
xml, search_id = self._retrieve_xml_search_id(response)
collection = xml.find(f"{{{NS}}}collection")
if collection is None or len(collection) == 0:
break
yield from collection
if not search_id:
break
def _search_request(self, query: str, *, search_id: str | None = None) -> str:
"""Retrieve a page of MARC data records.
:param str query: The TIND search query.
:param str|None search_id: The search_id for pagination.
:returns str: A page of MARC records in XML format.
"""
params: dict[str, str] = {"format": "xml", "p": query}
if search_id:
params["search_id"] = search_id
status, response = tind_get(
"search", api_key=self.api_key, api_url=self.api_url, params=params
)
if status != 200:
raise TINDError(f"Status {status} while retrieving TIND record")
return response
def _retrieve_xml_search_id(self, response: str) -> tuple[E.Element, str]:
"""Parse a TIND search response and extract the pagination search_id.
:param str response: The string returned from the TIND search call.
:returns: A parsable XML element and the search ID for the next page.
:rtype: tuple[xml.etree.ElementTree.Element, str]
"""
try:
xml = E.fromstring(response)
except E.ParseError as e:
raise TINDError(f"Failed to parse xml response: {e}") from e
search_id = xml.findtext("search_id", default="")
return xml, search_id