Skip to content

Commit bc86c77

Browse files
Implement batch preparation methods for digitized theses workflow
Why these changes are being introduced: * The batch preparation methods for the digitized theses workflow must be capable of performing the following functions: 1. Sync the batch folder from the digitized theses workflow S3 bucket to a minted batch folder in the DSC S3 bucket 2. Download metadata from Alma via SRU 3. Get an item from DSpace 4. Determine if an item is a valid 'replacement thesis', given DSpace item metadata 5. Organize the contents of the minted batch folder into 'new-theses/' and 'replacement-theses/' subfolders How this addresses that need: * Implement methods on DigitizedThesesWorkflow for batch preparation functions * Expand ItemSubmissionStatus to `CREATE_SUCCESS`, `CREATE_FAILED`, `CREATE_SKIPPED` * Add exceptions to handle item retrieval from DSpace Side effects of this change: * Though we've expanded ItemSubmissionStatus to include `CREATE_*` statuses, `BATCH_CREATED` remains to support backwards compatibility with other workflows. Updating all workflows to use `CREATE_*` will be in the DSC backlog for future work. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/IN-1626
1 parent 0c852ac commit bc86c77

13 files changed

Lines changed: 1612 additions & 4 deletions

File tree

dsc/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ def warning_only_loggers(self) -> list:
8383

8484
# Workflow-specific env vars
8585
@property
86-
def dspace_credentials(self) -> str:
86+
def dspace_credentials(self) -> dict:
8787
value = os.getenv("DSPACE_CREDENTIALS")
8888
if not value:
8989
raise OSError("Env var 'DSPACE_CREDENTIALS' must be defined")
90-
return value
90+
credentials = json.loads(value)
91+
92+
return {"IR-8": credentials["ir-8"], "DDC-8": credentials["ddc-8"]}
9193

9294
@property
9395
def metadata_api_url(self) -> str:

dsc/db/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121

2222

2323
class ItemSubmissionStatus(StrEnum):
24+
CREATE_SUCCESS = "create_success"
25+
CREATE_FAILED = "create_failed"
26+
CREATE_SKIPPED = "create_skipped"
2427
BATCH_CREATED = "batch_created"
2528
SUBMIT_SUCCESS = "submit_success"
2629
SUBMIT_FAILED = "submit_failed"

dsc/exceptions.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,43 @@ class SQSMessageSendError(Exception):
3030
pass
3131

3232

33+
# Exceptions for DSpace client
34+
class DSpaceClientError(Exception):
35+
"""General exception raised when DSpace client action results in error."""
36+
37+
38+
class DSpaceClientCredentialsNotFoundError(DSpaceClientError):
39+
"""Raise when env var DSPACE_CREDENTIALS does not include submission system."""
40+
41+
42+
class DSpaceClientAuthenticationError(DSpaceClientError):
43+
"""Raise when DSpace client fails authentication.
44+
45+
Authentication may fail due to 401 Unauthorized or 403 Forbidden errors.
46+
"""
47+
48+
def __init__(
49+
self,
50+
dspace_url: str | float | None,
51+
dspace_user: str | float | None,
52+
):
53+
self.message = (
54+
f"Failed to authenticate to DSpace server at '{dspace_url}' with user "
55+
f"'{dspace_user}'. Please verify that the DSPACE_CREDENTIALS "
56+
"environment variable is set correctly and that the DSpace server is "
57+
"accessible."
58+
)
59+
60+
61+
class DSpaceClientSearchError(DSpaceClientError):
62+
"""Raise when DSpace client search operation results in error.
63+
64+
Search is performed by dspace_rest_client.client.search_objects,
65+
which returns None if the response from a GET request returns an
66+
exit code other than 200.
67+
"""
68+
69+
3370
# Exceptions for 'create-batch' step
3471
class BatchCreationFailedError(Exception):
3572
def __init__(self, errors: list[tuple]) -> None:
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
from dsc.workflows.digitized_theses.transformer import DigitizedThesesTransformer, NSMAP
1+
from dsc.workflows.digitized_theses.transformer import NSMAP, DigitizedThesesTransformer
2+
from dsc.workflows.digitized_theses.workflow import DigitizedTheses
23

3-
__all__ = ["DigitizedThesesTransformer", "NSMAP"]
4+
__all__ = ["NSMAP", "DigitizedTheses", "DigitizedThesesTransformer"]
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
# ruff: noqa: FIX002, TD002, TD003
2+
import logging
3+
from collections.abc import Iterator
4+
from enum import StrEnum
5+
from typing import Any
6+
7+
import requests
8+
from dspace_rest_client.client import DSpaceClient
9+
from dspace_rest_client.models import Item as DSpaceItem
10+
from lxml import etree
11+
12+
from dsc import exceptions
13+
from dsc.config import Config
14+
from dsc.item_submission import ItemSubmission
15+
from dsc.utils.aws.s3 import S3Client, run_aws_cli_sync
16+
from dsc.workflows.base import Workflow
17+
from dsc.workflows.digitized_theses import NSMAP, DigitizedThesesTransformer
18+
19+
CONFIG = Config()
20+
logger = logging.getLogger(__name__)
21+
22+
23+
class MITThesesCommunityUUID(StrEnum):
24+
dev = "" # TODO: Update to uuid for 'MIT Theses' community in test
25+
prod = "6fc02cc2-0d14-4023-8a6f-d9900d0c4302"
26+
27+
28+
class DigitizedTheses(Workflow):
29+
"""Workflow for digitized theses from the Imaging Lab."""
30+
31+
workflow_name: str = "digitized-theses"
32+
metadata_transformer = DigitizedThesesTransformer
33+
34+
def __init__(self, batch_id: str):
35+
self._minted_batch_id: str | None = None
36+
self._dspace_client = None
37+
self.community_uuid = (
38+
MITThesesCommunityUUID.prod
39+
if CONFIG.workspace == "prod"
40+
else MITThesesCommunityUUID.dev
41+
)
42+
super().__init__(batch_id)
43+
44+
@property
45+
def metadata_mapping_path(self) -> str:
46+
raise NotImplementedError
47+
48+
@property
49+
def minted_batch_id(self) -> str:
50+
if not self._minted_batch_id:
51+
self._minted_batch_id = (
52+
f"{self.batch_id}-{self.run_date.strftime('%Y%m%dT%H%M%SZ')}"
53+
)
54+
return self._minted_batch_id
55+
56+
@property
57+
def dspace_client(self) -> DSpaceClient:
58+
if not self._dspace_client:
59+
logger.debug(
60+
f"Creating DSpace client for destination {self.submission_system}"
61+
)
62+
63+
try:
64+
credentials = CONFIG.dspace_credentials[self.submission_system]
65+
except KeyError as exception:
66+
raise exceptions.DSpaceClientCredentialsNotFoundError(
67+
f"No credentials for {self.submission_system}"
68+
) from exception
69+
70+
client = DSpaceClient(
71+
api_endpoint=credentials["url"],
72+
username=credentials["user"],
73+
password=credentials["password"],
74+
fake_user_agent=True,
75+
)
76+
authenticated = client.authenticate()
77+
if not authenticated:
78+
raise exceptions.DSpaceClientAuthenticationError(
79+
credentials["url"], credentials["user"]
80+
)
81+
self._dspace_client = client
82+
logger.info(
83+
f"Successfully authenticated to {credentials['url']} "
84+
f"as {credentials['url']}"
85+
)
86+
87+
return self._dspace_client
88+
89+
def get_batch_bitstream_uris(self) -> list[str]:
90+
raise NotImplementedError
91+
92+
def item_metadata_iter(self) -> Iterator[dict[str, Any]]:
93+
raise NotImplementedError
94+
95+
def prepare_batch(self, *, synced: bool = False) -> tuple[list, ...]: # noqa: ARG002
96+
"""Prepare a batch folder in the DSC S3 bucket.
97+
98+
This method assumes that all the files synced from the batch folder in
99+
the digitized-theses-workspace S3 bucket are intended for ingest. Each file
100+
(a PDF) represents a digitized thesis and is assigned the filename
101+
'<oclc-number>-MIT.pdf', which yield item identifiers formatted as
102+
'<oclc-number>-MIT'.
103+
104+
Unlike other workflows, successes, failures, and skips are recorded in the
105+
item submissions table in DynamoDB, as a means to collate all results for a batch
106+
in the batch creation report. This means, the method will always return an empty
107+
'errors' list.
108+
"""
109+
item_submissions = []
110+
errors: list[tuple] = [] # set but not used
111+
112+
# sync batch folder from DT S3 bucket to minted batch folder in DSC S3 bucket
113+
run_aws_cli_sync(
114+
source=f"s3://{CONFIG.s3_bucket_digitized_theses}/{self.batch_id}",
115+
destination=f"s3://{CONFIG.s3_bucket_submission_assets}/{self.workflow_name}/{self.minted_batch_id}",
116+
)
117+
118+
s3_client = S3Client()
119+
for file in s3_client.files_iter(
120+
bucket=self.s3_bucket, prefix=(f"{self.workflow_name}/{self.minted_batch_id}")
121+
):
122+
item_submission = ItemSubmission(
123+
batch_id=self.minted_batch_id,
124+
item_identifier=self.parse_item_identifier(file),
125+
workflow_name=self.workflow_name,
126+
)
127+
128+
# get MARC XML metadata from Alma
129+
try:
130+
self._download_metadata_from_alma(item_submission)
131+
except (
132+
requests.exceptions.HTTPError,
133+
exceptions.ItemMetadataNotFoundError,
134+
) as exception:
135+
item_submission.status = "create_failed"
136+
item_submission.status_details = str(exception)
137+
item_submissions.append(item_submission)
138+
continue
139+
140+
# check if item with the OCLC number exists
141+
try:
142+
dspace_item = self._get_item_from_dspace(item_submission)
143+
except exceptions.DSpaceClientSearchError as exception:
144+
item_submission.status = "create_skipped"
145+
item_submission.status_details = str(exception)
146+
item_submissions.append(item_submission)
147+
continue
148+
149+
# check if item submission is a 'Replacement thesis'
150+
if dspace_item and self._is_replacement_thesis(dspace_item):
151+
item_submission.status = "create_success"
152+
item_submission.status_details = "Replacement thesis"
153+
item_submissions.append(item_submission)
154+
else:
155+
item_submission.status = "create_status"
156+
item_submission.status_details = "New thesis"
157+
item_submissions.append(item_submission)
158+
159+
self._move_batch_files_to_theses_subfolders(item_submissions)
160+
161+
return item_submissions, errors
162+
163+
def _download_metadata_from_alma(self, item_submission: ItemSubmission) -> bytes:
164+
"""Download MARC XML metadata for an item submission from Alma.
165+
166+
This method writes an XML file with MARC metadata to the
167+
batch folder in S3 named: '<oclc-number>-MIT.xml'
168+
169+
For more information, see:
170+
https://developers.exlibrisgroup.com/alma/integrations/sru/.
171+
"""
172+
logger.debug(
173+
f"Retrieving metadata from Alma for an item with alma.oclc_control_number_035_a={item_submission.item_identifier}" # noqa: E501
174+
)
175+
s3_client = S3Client()
176+
177+
query_url = f"https://{CONFIG.metadata_api_url}"
178+
response = requests.get(
179+
query_url,
180+
params={
181+
"operation": "searchRetrieve",
182+
"recordSchema": "marcxml",
183+
"query": (
184+
f"alma.oclc_control_number_035_a={item_submission.item_identifier}"
185+
),
186+
},
187+
timeout=180,
188+
)
189+
190+
try:
191+
response.raise_for_status()
192+
except requests.exceptions.HTTPError as exception:
193+
# TODO: Custom error includes HTTPError message
194+
raise exceptions.ItemMetadataNotFoundError from exception
195+
196+
# get nested `<marc:record>` element from SRU response
197+
record_element = self._parse_record_from_sru_response(response.content)
198+
199+
# write an XML file to batch folder
200+
s3_client.put_file(
201+
bucket=self.s3_bucket,
202+
key=f"{self.workflow_name}/{self.minted_batch_id}/{item_submission.item_identifier}.xml",
203+
file_content=etree.tostring(
204+
record_element, xml_declaration=True, encoding="UTF-8"
205+
),
206+
)
207+
return response.content
208+
209+
def _get_item_from_dspace(self, item_submission: ItemSubmission) -> DSpaceItem:
210+
logger.debug(
211+
f"Searching DSpace for an item with dc.identifier.oclc={item_submission.item_identifier}" # noqa: E501
212+
)
213+
214+
dspace_objects = self.dspace_client.search_objects(
215+
query=f"dc.identifier.oclc:{item_submission.item_identifier}",
216+
scope=self.community_uuid,
217+
dso_type="item",
218+
)
219+
220+
if dspace_objects is None:
221+
raise exceptions.DSpaceClientSearchError(
222+
f"Failed search for item with dc.identifier.oclc={item_submission.item_identifier} " # noqa: E501
223+
f"in community {"<community>"}"
224+
)
225+
226+
if len(dspace_objects) > 1:
227+
raise exceptions.DSpaceClientSearchError(
228+
f"Expecting one item with dc.identifier.oclc={item_submission.item_identifier} " # noqa: E501
229+
f"in community {"<community>"}; found {len(dspace_objects)} items"
230+
)
231+
232+
if len(dspace_objects) == 0:
233+
return None
234+
235+
return dspace_objects[0]
236+
237+
def _move_batch_files_to_theses_subfolders(
238+
self, item_submissions: list[ItemSubmission]
239+
) -> None:
240+
s3_client = S3Client()
241+
242+
for item_submission in item_submissions:
243+
if (
244+
item_submission.status_details
245+
and "replacement" in item_submission.status_details.lower()
246+
):
247+
prefix = "replacement-theses"
248+
elif (
249+
item_submission.status_details
250+
and "new" in item_submission.status_details.lower()
251+
):
252+
prefix = "new-theses"
253+
else:
254+
logger.warning(
255+
f"Cannot move files associated with item_identifier={item_submission.item_identifier} " # noqa: E501
256+
"because thesis type (new vs. replacement) is unknown"
257+
)
258+
continue
259+
260+
logger.debug(
261+
"Moving files associated with "
262+
f"item_identifier={item_submission.item_identifier} to '{prefix}/'"
263+
)
264+
for file in s3_client.files_iter(
265+
bucket=self.s3_bucket,
266+
prefix=f"{self.workflow_name}/{self.minted_batch_id}",
267+
item_identifier=item_submission.item_identifier,
268+
):
269+
filename = file.rsplit("/", maxsplit=1)[-1]
270+
s3_client.move_file(
271+
source_file=file,
272+
destination_file=f"s3://{self.s3_bucket}/{self.workflow_name}/{self.minted_batch_id}/{prefix}/{filename}",
273+
)
274+
275+
@staticmethod
276+
def _is_replacement_thesis(item: DSpaceItem) -> bool:
277+
"""Determine if a DSpace item is a replacement thesis.
278+
279+
If an item already exists in DSpace, the thesis (bitstream) should
280+
only be replaced *if* it is not a student-submitted electronic PDF
281+
file, which is checked via the presence of a target string in the
282+
dc.description field.
283+
284+
This method returns True when the target string does *not* appear in
285+
any dc.description value (if any), indicating that
286+
the replacement is valid.
287+
"""
288+
# skip the record if any dc_description value matches this string
289+
target_string = "This electronic version was submitted by the student author."
290+
if item.metadata.get("dc.description"):
291+
dc_description_values = [
292+
entry.get("value", "") for entry in item.metadata["dc.description"]
293+
]
294+
295+
if target_string in "|".join(dc_description_values):
296+
return False
297+
298+
return True
299+
300+
@staticmethod
301+
def _parse_record_from_sru_response(content: bytes) -> etree._Element:
302+
root = etree.fromstring(content)
303+
number_of_records = root.find("sru:numberOfRecords", namespaces=NSMAP)
304+
305+
if number_of_records is None:
306+
# TODO: Custom error includes message "Unexpected response from Alma SRU"
307+
raise exceptions.ItemMetadataNotFoundError
308+
309+
if int(number_of_records.text) == 0:
310+
raise exceptions.ItemMetadataNotFoundError
311+
312+
if int(number_of_records.text) > 1:
313+
# TODO: Custom error includes message
314+
# "Unexpected response from Alma SRU, multiple records with OCLC"
315+
raise exceptions.ItemMetadataNotFoundError
316+
317+
return root.xpath("//marc:record", namespaces=NSMAP)[0]
318+
319+
@staticmethod
320+
def parse_item_identifier(filename: str) -> str:
321+
return filename.rsplit("/", maxsplit=1)[-1].removesuffix("-MIT.pdf")

0 commit comments

Comments
 (0)