Skip to content

Commit ec06b56

Browse files
Merge pull request #184 from fair-workflows/fair-prov-object
Fair prov object
2 parents 9f5c83c + 692c1db commit ec06b56

4 files changed

Lines changed: 115 additions & 34 deletions

File tree

fairworkflows/fairstep.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from fairworkflows import namespaces, LinguisticSystem, LINGSYS_ENGLISH, LINGSYS_PYTHON
1414
from fairworkflows.config import DUMMY_FAIRWORKFLOWS_URI, IS_FAIRSTEP_RETURN_VALUE_PARAMETER_NAME, \
1515
LOGGER
16+
from fairworkflows.prov import prov_logger, StepRetroProv
1617
from fairworkflows.rdf_wrapper import RdfWrapper, replace_in_rdf
1718

1819

@@ -441,7 +442,7 @@ def _modify_function(func):
441442
def _add_logging(func):
442443
@functools.wraps(func)
443444
def _wrapper(*func_args, **func_kwargs):
444-
LOGGER.info(f'Running step: {func.__name__}')
445+
prov_logger.add(StepRetroProv(step=fairstep))
445446
return func(*func_args, **func_kwargs)
446447
return _wrapper
447448
func._fairstep = fairstep

fairworkflows/fairworkflow.py

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,18 @@
77
from tempfile import TemporaryDirectory
88
from typing import Iterator, Optional, Callable
99

10-
import nanopub
1110
import networkx as nx
1211
import noodles
1312
import rdflib
1413
from noodles.interface import PromisedObject
15-
from rdflib import RDF, RDFS, DCTERMS
14+
from rdflib import RDF
1615
from rdflib.tools.rdf2dot import rdf2dot
1716
from requests import HTTPError
1817

18+
from fairworkflows import namespaces, LinguisticSystem, LINGSYS_PYTHON
1919
from fairworkflows.config import LOGGER
20-
from fairworkflows import namespaces, LinguisticSystem, LINGSYS_ENGLISH, LINGSYS_PYTHON
2120
from fairworkflows.fairstep import FairStep
21+
from fairworkflows.prov import WorkflowRetroProv, prov_logger
2222
from fairworkflows.rdf_wrapper import RdfWrapper
2323

2424

@@ -363,47 +363,31 @@ def execute(self, *args, **kwargs):
363363
Returns a tuple (result, retroprov), where result is the final output of the executed
364364
workflow and retroprov is the retrospective provenance logged during execution.
365365
"""
366-
367366
if not hasattr(self, 'workflow_level_promise'):
368367
raise ValueError('Cannot execute workflow as no noodles step_level_promise has been constructed.')
369-
370-
log = io.StringIO()
371-
log_handler = logging.StreamHandler(log)
372-
formatter = logging.Formatter('%(asctime)s - %(message)s')
373-
log_handler.setFormatter(formatter)
374-
375-
LOGGER.setLevel(logging.INFO)
376-
LOGGER.handlers = [log_handler]
368+
prov_logger.empty()
377369
self.workflow_level_promise = noodles.workflow.from_call(
378370
noodles.get_workflow(self.workflow_level_promise).root_node.foo, args, kwargs, {})
379371
result = noodles.run_single(self.workflow_level_promise)
380372

381373
# Generate the retrospective provenance as a (nano-) Publication object
382-
retroprov = self._generate_retrospective_prov_publication(log.getvalue())
374+
retroprov = self._generate_retrospective_prov_publication()
383375

384376
return result, retroprov
385377

386-
def _generate_retrospective_prov_publication(self, log:str) -> nanopub.Publication:
378+
def _generate_retrospective_prov_publication(self) -> WorkflowRetroProv:
387379
"""
388380
Utility method for generating a Publication object for the retrospective
389381
provenance of this workflow. Uses the given 'log' string as the actual
390382
provenance for now.
391383
"""
392-
log_message = rdflib.Literal(log)
393-
this_retroprov = rdflib.BNode('retroprov')
394-
if self.uri is None or self.uri == 'None': # TODO: This is horrific
395-
this_workflow = rdflib.URIRef('http://www.example.org/unpublishedworkflow')
384+
if self._is_published:
385+
workflow_uri = rdflib.URIRef(self.uri)
396386
else:
397-
this_workflow = rdflib.URIRef(self.uri)
398-
399-
retroprov_assertion = rdflib.Graph()
400-
retroprov_assertion.add((this_retroprov, rdflib.RDF.type, namespaces.PROV.Activity))
401-
retroprov_assertion.add((this_retroprov, namespaces.PROV.wasDerivedFrom, this_workflow))
402-
retroprov_assertion.add((this_retroprov, RDFS.label, log_message))
403-
retroprov = nanopub.Publication.from_assertion(assertion_rdf=retroprov_assertion)
404-
405-
return retroprov
387+
workflow_uri = rdflib.URIRef('http://www.example.org/unpublishedworkflow')
406388

389+
step_provs = prov_logger.get_all()
390+
return WorkflowRetroProv(self, workflow_uri, step_provs)
407391

408392
def draw(self, filepath):
409393
"""Visualize workflow.

fairworkflows/prov.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import threading
2+
from datetime import datetime
3+
from typing import List, Iterator
4+
5+
import rdflib
6+
7+
from fairworkflows import namespaces
8+
from fairworkflows.rdf_wrapper import RdfWrapper
9+
10+
11+
class ProvLogger:
12+
def __init__(self):
13+
self.lock = threading.Lock()
14+
self.items = []
15+
16+
def add(self, item):
17+
with self.lock:
18+
self.items.append(item)
19+
20+
def get_all(self):
21+
with self.lock:
22+
items, self.items = self.items, []
23+
return items
24+
25+
def empty(self):
26+
self.items = []
27+
28+
29+
prov_logger = ProvLogger()
30+
31+
32+
class RetroProv(RdfWrapper):
33+
def __init__(self):
34+
super().__init__(uri=None, ref_name='retroprov')
35+
self.timestamp = datetime.now()
36+
37+
38+
class StepRetroProv(RetroProv):
39+
def __init__(self, step):
40+
super().__init__()
41+
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Activity)
42+
self.step = step
43+
self.step_uri = step.uri
44+
45+
@property
46+
def step_uri(self):
47+
"""Refers to URI of step associated to this provenance.
48+
49+
Matches the predicate prov:wasDerivedFrom associated to this retrospective provenance
50+
"""
51+
return self.get_attribute(namespaces.PROV.wasDerivedFrom)
52+
53+
@step_uri.setter
54+
def step_uri(self, value):
55+
self.set_attribute(namespaces.PPLAN.correspondsToStep, rdflib.URIRef(value), overwrite=True)
56+
57+
def __str__(self):
58+
"""String representation."""
59+
s = f'Step retrospective provenance.\n'
60+
s += self._rdf.serialize(format='trig').decode('utf-8')
61+
return s
62+
63+
64+
class WorkflowRetroProv(RetroProv):
65+
def __init__(self, workflow, workflow_uri, step_provs: List[StepRetroProv]):
66+
super().__init__()
67+
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Bundle)
68+
self.workflow = workflow
69+
self.workflow_uri = workflow_uri
70+
self._step_provs = step_provs
71+
72+
@property
73+
def workflow_uri(self):
74+
"""Refers to URI of step associated to this provenance.
75+
76+
Matches the predicate prov:wasDerivedFrom associated to this retrospective provenance
77+
"""
78+
return self.get_attribute(namespaces.PROV.wasDerivedFrom)
79+
80+
@workflow_uri.setter
81+
def workflow_uri(self, value):
82+
self.set_attribute(namespaces.PROV.wasDerivedFrom, rdflib.URIRef(value), overwrite=True)
83+
84+
def __iter__(self) -> Iterator[StepRetroProv]:
85+
"""Iterate over StepRetroProv that were part of the execution of the workflow."""
86+
yield from self._step_provs
87+
88+
def __len__(self) -> int:
89+
return len(self._step_provs)
90+
91+
def __str__(self):
92+
"""String representation."""
93+
s = f'Workflow retrospective provenance.\n'
94+
s += self._rdf.serialize(format='trig').decode('utf-8')
95+
return s

tests/test_fairworkflow.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from conftest import skip_if_nanopub_server_unavailable, read_rdf_test_resource
1111
from fairworkflows import FairWorkflow, FairStep, namespaces, FairVariable, is_fairstep, is_fairworkflow
12+
from fairworkflows.prov import WorkflowRetroProv, StepRetroProv
1213
from fairworkflows.rdf_wrapper import replace_in_rdf
1314
from nanopub import Publication
1415

@@ -351,11 +352,11 @@ def my_workflow(in1, in2, in3):
351352
result, prov = fw.execute(1, 4, 3)
352353
assert result == -66
353354

354-
assert isinstance(prov, Publication)
355-
356-
prov_log = str(list(prov.assertion.objects(rdflib.URIRef(f'{DUMMY_NANOPUB_URI}#retroprov'),
357-
rdflib.RDFS.label))[0])
358-
assert 'Running step: add' in prov_log
355+
assert isinstance(prov, WorkflowRetroProv)
356+
assert len(prov) == 4
357+
for step_prov in prov:
358+
assert isinstance(step_prov, StepRetroProv)
359+
assert step_prov.step in fw._steps.values()
359360

360361
def test_workflow_complex_serialization(self):
361362
class OtherType:
@@ -375,7 +376,7 @@ def process_image(im: OtherType):
375376
result, prov = fw.execute(obj)
376377
assert isinstance(result, type(obj))
377378
assert result.message == obj.message
378-
assert isinstance(prov, Publication)
379+
assert isinstance(prov, WorkflowRetroProv)
379380

380381
def test_workflow_non_decorated_step(self):
381382
def return_value(a: float) -> float:

0 commit comments

Comments
 (0)