Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 118 additions & 30 deletions api/share/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
"""
from http import HTTPStatus
import logging
from rdflib import Graph

from django.apps import apps
from django.db.models import Q
from celery.utils.time import get_exponential_backoff_interval
import requests

Expand All @@ -14,6 +16,7 @@
from framework.encryption import ensure_bytes
from framework.sentry import log_exception
from osf.external.gravy_valet.exceptions import GVException
from osf.metadata.rdfutils import OSF
from osf.metadata.osf_gathering import (
OsfmapPartition,
pls_get_magic_metadata_basket,
Expand Down Expand Up @@ -64,6 +67,102 @@ def _enqueue_update_share(osfresource):
enqueue_task(task__update_share.s(_osfguid_value))


def retry_shtrove_request(self_celery_task, _response):
try:
_response.raise_for_status()
except Exception as e:
log_exception(e)
if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = _response.headers.get('Retry-After')
try:
countdown = int(retry_after)
except (TypeError, ValueError):
retries = getattr(self_celery_task.request, 'retries', 0)
countdown = get_exponential_backoff_interval(
factor=4,
retries=retries,
maximum=2 * 60,
full_jitter=True,
)
raise self_celery_task.retry(exc=e, countdown=countdown)

raise self_celery_task.retry(exc=e)


def cedar_record_to_turtle(referent, cedar_record):
graph = Graph()
iri = referent.get_semantic_iri()
full_metadata = {
'@id': iri,
OSF.hasCedarRecord: cedar_record.metadata,
}
graph.parse(data=full_metadata, format='json-ld')

return graph.serialize(format='turtle')


@celery_app.task(bind=True)
def share_update_cedar_metadata_record(self, referent_id, cedar_record_pk):
from osf.models import Guid, CedarMetadataRecord

guid = Guid.load(referent_id)
referent = guid.referent
cedar_record = CedarMetadataRecord.objects.filter(pk=cedar_record_pk).first()
if not cedar_record:
return

serialized_data = cedar_record_to_turtle(referent, cedar_record)
response = requests.post(
shtrove_ingest_url(),
params={
'focus_iri': referent.get_semantic_iri(),
'record_identifier': _shtrove_cedar_record_identifier(cedar_record._id, cedar_record.template.cedar_id),
'is_supplementary': True,
},
headers={
'Content-Type': 'text/turtle; charset=utf-8',
**_shtrove_auth_headers(referent),
},
data=ensure_bytes(serialized_data),
)
retry_shtrove_request(self, response)


@celery_app.task(bind=True)
def share_delete_cedar_metadata_record(
self,
cedar_referent___id,
cedar_record___id,
cedar_template_cedar_id,
):
from osf.models import Guid
referent = Guid.load(cedar_referent___id).referent
response = requests.delete(
shtrove_ingest_url(),
params={
'record_identifier': _shtrove_cedar_record_identifier(cedar_record___id, cedar_template_cedar_id),
},
headers=_shtrove_auth_headers(referent),
)
retry_shtrove_request(self, response)


def _schedule_cedar_record_updates(guid_instance):
for cedar_record in guid_instance.cedar_metadata_records.filter(
is_published=True,
template__should_index_for_search=True,
):
share_update_cedar_metadata_record.delay(guid_instance._id, cedar_record.pk)
for cedar_record in guid_instance.cedar_metadata_records.filter(
Q(is_published=False) | Q(template__should_index_for_search=False),
):
share_delete_cedar_metadata_record.delay(
cedar_record.guid._id,
cedar_record._id,
cedar_record.template.cedar_id,
)


@celery_app.task(
bind=True,
acks_late=True,
Expand Down Expand Up @@ -94,36 +193,21 @@ def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name
log_exception(e)
raise self.retry(exc=e)

try:
_response.raise_for_status()
except Exception as e:
log_exception(e)
if _response.status_code == HTTPStatus.TOO_MANY_REQUESTS:
retry_after = _response.headers.get('Retry-After')
try:
countdown = int(retry_after)
except (TypeError, ValueError):
retries = getattr(self.request, 'retries', 0)
countdown = get_exponential_backoff_interval(
factor=4,
retries=retries,
maximum=2 * 60,
full_jitter=True,
)
raise self.retry(exc=e, countdown=countdown)

if HTTPStatus(_response.status_code).is_server_error:
raise self.retry(exc=e)
else: # success response
if not _is_deletion:
# enqueue followup task for supplementary metadata
_next_partition = _next_osfmap_partition(_osfmap_partition)
if _next_partition is not None:
task__update_share.delay(
guid,
is_backfill=is_backfill,
osfmap_partition_name=_next_partition.name,
)
retry_shtrove_request(self, _response)
# success response
if _is_deletion:
return

# enqueue followup task for supplementary metadata
_next_partition = _next_osfmap_partition(_osfmap_partition)
if _next_partition is not None:
task__update_share.delay(
guid,
is_backfill=is_backfill,
osfmap_partition_name=_next_partition.name,
)
else:
_schedule_cedar_record_updates(_osfid_instance)


def pls_send_trove_record(osf_item, *, is_backfill: bool, osfmap_partition: OsfmapPartition):
Expand Down Expand Up @@ -179,6 +263,10 @@ def _shtrove_record_identifier(osf_item, osfmap_partition: OsfmapPartition):
)


def _shtrove_cedar_record_identifier(cedar_record___id, template_cedar_id) -> str:
return f'{cedar_record___id}/CedarMetadataRecord:{template_cedar_id}'


def _shtrove_auth_headers(osf_item):
_nonfile_item = (
osf_item.target
Expand Down
12 changes: 10 additions & 2 deletions api_tests/share/test_share_preprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,20 @@ def test_call_async_update_on_500_failure(self, mock_share_responses, preprint,
with expect_preprint_ingest_request(mock_share_responses, preprint, count=5):
preprint.update_search()

def test_no_call_async_update_on_400_failure(self, mock_share_responses, preprint, auth):
@mock.patch('api.share.utils.task__update_share.delay')
def test_no_call_async_update_on_400_failure(self, share_delay, mock_share_responses, preprint, auth):
with capture_notifications():
mock_share_responses.replace(responses.POST, shtrove_ingest_url(), status=400)
preprint.set_published(True, auth=auth, save=True)
with expect_preprint_ingest_request(mock_share_responses, preprint, count=1, error_response=True):
preprint.update_search()
try:
preprint.update_search()
except Exception as err:
share_delay.assert_not_called()
assert str(err).startswith("Retry in 180s: HTTPError('400 Client Error:")
assert len(mock_share_responses.calls) == 1
else:
pytest.fail('Expected Retry(HTTPError) to be raised')

def test_delete_from_share(self, mock_share_responses):
preprint = PreprintFactory()
Expand Down
17 changes: 0 additions & 17 deletions osf/models/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,6 @@ def update_or_create_from_json(cls, provider_data, user):
related_name='required_by_providers',
)

def validate_required_metadata(self, obj):
"""
Raises ValidationError if obj does not have a CedarMetadataRecord for
this provider's required_metadata_template.
Does nothing when required_metadata_template is not set.
"""
if not self.required_metadata_template_id:
return
guid = obj.guids.first()
if guid is None or not guid.cedar_metadata_records.filter(
template_id=self.required_metadata_template_id
).exists():
raise ValidationError(
f'Submitted object must have a CEDAR metadata record for template '
f'"{self.required_metadata_template.schema_name}" to be submitted to this collection.'
)

Comment thread
aaxelb marked this conversation as resolved.
def __repr__(self):
return ('(name={self.name!r}, default_license={self.default_license!r}, '
'allow_submissions={self.allow_submissions!r}) with id {self.id!r}').format(self=self)
Expand Down
8 changes: 8 additions & 0 deletions osf_tests/metadata/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ def assert_graphs_equal(actual_rdflib_graph, expected_rdflib_graph, label=''):
))


def assert_equivalent_turtle(actual_turtle, expected_turtle, label):
_actual = rdflib.Graph()
_actual.parse(data=actual_turtle, format='turtle')
_expected = rdflib.Graph()
_expected.parse(data=expected_turtle, format='turtle')
assert_graphs_equal(_actual, _expected, label=label)


def _get_graph_and_focuses(triples):
_graph = rdflib.Graph()
_focuses = set()
Expand Down
12 changes: 2 additions & 10 deletions osf_tests/metadata/test_serialized_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from unittest import mock

import pytest
import rdflib

from osf import models as osfdb
from osf.metadata.osf_gathering import OsfmapPartition
Expand All @@ -14,7 +13,7 @@
from osf.models.licenses import NodeLicense
from api_tests.utils import create_test_file
from osf_tests import factories
from osf_tests.metadata._utils import assert_graphs_equal
from osf_tests.metadata._utils import assert_equivalent_turtle
from tests.base import OsfTestCase


Expand Down Expand Up @@ -416,18 +415,11 @@ def _assert_expected_file(self, filename, actual_metadata):
if filename.endswith('.turtle'):
# HACK: because the turtle serializer may output things in different order
# TODO: stable turtle serializer (or another primitive rdf serialization)
self._assert_equivalent_turtle(actual_metadata, _expected_metadata, filename)
assert_equivalent_turtle(actual_metadata, _expected_metadata, filename)
else:
# note: ignore trailing spaces
self.assertEqual(actual_metadata.rstrip(), _expected_metadata.rstrip())

def _assert_equivalent_turtle(self, actual_turtle, expected_turtle, filename):
_actual = rdflib.Graph()
_actual.parse(data=actual_turtle, format='turtle')
_expected = rdflib.Graph()
_expected.parse(data=expected_turtle, format='turtle')
assert_graphs_equal(_actual, _expected, label=filename)

# def _write_expected_file(self, filename, expected_metadata):
# '''for updating expected metadata files from current serializers
# (be careful to check that changes are, in fact, expected)
Expand Down
Loading
Loading