Skip to content

Commit 692c1db

Browse files
Introduce StepRetroProv
1 parent 0eb131b commit 692c1db

4 files changed

Lines changed: 83 additions & 39 deletions

File tree

fairworkflows/fairstep.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from fairworkflows import namespaces, LinguisticSystem, LINGSYS_ENGLISH, LINGSYS_PYTHON
1414
from fairworkflows.config import DUMMY_FAIRWORKFLOWS_URI, IS_FAIRSTEP_RETURN_VALUE_PARAMETER_NAME, \
1515
LOGGER
16+
from fairworkflows.prov import prov_logger, StepRetroProv
1617
from fairworkflows.rdf_wrapper import RdfWrapper, replace_in_rdf
1718

1819

@@ -441,7 +442,7 @@ def _modify_function(func):
441442
def _add_logging(func):
442443
@functools.wraps(func)
443444
def _wrapper(*func_args, **func_kwargs):
444-
LOGGER.info(f'Running step: {func.__name__}')
445+
prov_logger.add(StepRetroProv(step=fairstep))
445446
return func(*func_args, **func_kwargs)
446447
return _wrapper
447448
func._fairstep = fairstep

fairworkflows/fairworkflow.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from fairworkflows import namespaces, LinguisticSystem, LINGSYS_PYTHON
1919
from fairworkflows.config import LOGGER
2020
from fairworkflows.fairstep import FairStep
21-
from fairworkflows.prov import WorkflowRetroProv
21+
from fairworkflows.prov import WorkflowRetroProv, prov_logger
2222
from fairworkflows.rdf_wrapper import RdfWrapper
2323

2424

@@ -363,38 +363,31 @@ def execute(self, *args, **kwargs):
363363
Returns a tuple (result, retroprov), where result is the final output of the executed
364364
workflow and retroprov is the retrospective provenance logged during execution.
365365
"""
366-
367366
if not hasattr(self, 'workflow_level_promise'):
368367
raise ValueError('Cannot execute workflow as no noodles step_level_promise has been constructed.')
369-
370-
log = io.StringIO()
371-
log_handler = logging.StreamHandler(log)
372-
formatter = logging.Formatter('%(asctime)s - %(message)s')
373-
log_handler.setFormatter(formatter)
374-
375-
LOGGER.setLevel(logging.INFO)
376-
LOGGER.handlers = [log_handler]
368+
prov_logger.empty()
377369
self.workflow_level_promise = noodles.workflow.from_call(
378370
noodles.get_workflow(self.workflow_level_promise).root_node.foo, args, kwargs, {})
379371
result = noodles.run_single(self.workflow_level_promise)
380372

381373
# Generate the retrospective provenance as a (nano-) Publication object
382-
retroprov = self._generate_retrospective_prov_publication(log.getvalue())
374+
retroprov = self._generate_retrospective_prov_publication()
383375

384376
return result, retroprov
385377

386-
def _generate_retrospective_prov_publication(self, log: str) -> WorkflowRetroProv:
378+
def _generate_retrospective_prov_publication(self) -> WorkflowRetroProv:
387379
"""
388380
Utility method for generating a Publication object for the retrospective
389381
provenance of this workflow. Uses the given 'log' string as the actual
390382
provenance for now.
391383
"""
392384
if self._is_published:
393-
this_workflow = rdflib.URIRef(self.uri)
385+
workflow_uri = rdflib.URIRef(self.uri)
394386
else:
395-
this_workflow = rdflib.URIRef('http://www.example.org/unpublishedworkflow')
387+
workflow_uri = rdflib.URIRef('http://www.example.org/unpublishedworkflow')
396388

397-
return WorkflowRetroProv(this_workflow, log)
389+
step_provs = prov_logger.get_all()
390+
return WorkflowRetroProv(self, workflow_uri, step_provs)
398391

399392
def draw(self, filepath):
400393
"""Visualize workflow.

fairworkflows/prov.py

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,95 @@
1+
import threading
2+
from datetime import datetime
3+
from typing import List, Iterator
4+
15
import rdflib
26

37
from fairworkflows import namespaces
48
from fairworkflows.rdf_wrapper import RdfWrapper
59

610

11+
class ProvLogger:
12+
def __init__(self):
13+
self.lock = threading.Lock()
14+
self.items = []
15+
16+
def add(self, item):
17+
with self.lock:
18+
self.items.append(item)
19+
20+
def get_all(self):
21+
with self.lock:
22+
items, self.items = self.items, []
23+
return items
24+
25+
def empty(self):
26+
self.items = []
27+
28+
29+
prov_logger = ProvLogger()
30+
31+
732
class RetroProv(RdfWrapper):
8-
def __init__(self, prov_was_derived_from, log_message: str):
33+
def __init__(self):
934
super().__init__(uri=None, ref_name='retroprov')
10-
self.set_attribute(rdflib.RDF.type, namespaces.PROV.Activity)
11-
self.set_attribute(namespaces.PROV.wasDerivedFrom, rdflib.URIRef(prov_was_derived_from))
12-
self.rdf.add((self.self_ref, rdflib.RDFS.label, rdflib.Literal(log_message)))
35+
self.timestamp = datetime.now()
36+
37+
38+
class StepRetroProv(RetroProv):
39+
def __init__(self, step):
40+
super().__init__()
41+
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Activity)
42+
self.step = step
43+
self.step_uri = step.uri
1344

1445
@property
15-
def prov_was_derived_from(self):
16-
"""Refers to URI of object that this provenance was derived from.
46+
def step_uri(self):
47+
"""Refers to URI of step associated to this provenance.
1748
1849
Matches the predicate prov:wasDerivedFrom associated to this retrospective provenance
19-
object
2050
"""
2151
return self.get_attribute(namespaces.PROV.wasDerivedFrom)
2252

23-
@prov_was_derived_from.setter
24-
def prov_was_derived_from(self, value):
25-
self.set_attribute(namespaces.PROV.wasDerivedFrom, rdflib.URIRef(value), overwrite=True)
26-
27-
28-
class WorkflowRetroProv(RetroProv):
29-
def __init__(self, prov_was_derived_from, log_message):
30-
super().__init__(prov_was_derived_from, log_message)
53+
@step_uri.setter
54+
def step_uri(self, value):
55+
self.set_attribute(namespaces.PPLAN.correspondsToStep, rdflib.URIRef(value), overwrite=True)
3156

3257
def __str__(self):
3358
"""String representation."""
34-
s = f'Workflow retrospective provenance.\n'
59+
s = f'Step retrospective provenance.\n'
3560
s += self._rdf.serialize(format='trig').decode('utf-8')
3661
return s
3762

3863

39-
class StepRetroProv(RetroProv):
40-
def __init__(self, prov_was_derived_from, log_message):
41-
super().__init__(prov_was_derived_from, log_message)
64+
class WorkflowRetroProv(RetroProv):
65+
def __init__(self, workflow, workflow_uri, step_provs: List[StepRetroProv]):
66+
super().__init__()
67+
self.set_attribute(rdflib.RDF.type, namespaces.PPLAN.Bundle)
68+
self.workflow = workflow
69+
self.workflow_uri = workflow_uri
70+
self._step_provs = step_provs
71+
72+
@property
73+
def workflow_uri(self):
74+
"""Refers to URI of step associated to this provenance.
75+
76+
Matches the predicate prov:wasDerivedFrom associated to this retrospective provenance
77+
"""
78+
return self.get_attribute(namespaces.PROV.wasDerivedFrom)
79+
80+
@workflow_uri.setter
81+
def workflow_uri(self, value):
82+
self.set_attribute(namespaces.PROV.wasDerivedFrom, rdflib.URIRef(value), overwrite=True)
83+
84+
def __iter__(self) -> Iterator[StepRetroProv]:
85+
"""Iterate over StepRetroProv that were part of the execution of the workflow."""
86+
yield from self._step_provs
87+
88+
def __len__(self) -> int:
89+
return len(self._step_provs)
4290

4391
def __str__(self):
4492
"""String representation."""
45-
s = f'Step retrospective provenance.\n'
93+
s = f'Workflow retrospective provenance.\n'
4694
s += self._rdf.serialize(format='trig').decode('utf-8')
4795
return s

tests/test_fairworkflow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from conftest import skip_if_nanopub_server_unavailable, read_rdf_test_resource
1111
from fairworkflows import FairWorkflow, FairStep, namespaces, FairVariable, is_fairstep, is_fairworkflow
12-
from fairworkflows.prov import WorkflowRetroProv
12+
from fairworkflows.prov import WorkflowRetroProv, StepRetroProv
1313
from fairworkflows.rdf_wrapper import replace_in_rdf
1414
from nanopub import Publication
1515

@@ -353,8 +353,10 @@ def my_workflow(in1, in2, in3):
353353
assert result == -66
354354

355355
assert isinstance(prov, WorkflowRetroProv)
356-
prov_log = str(list(prov.rdf.objects(prov.self_ref, rdflib.RDFS.label))[0])
357-
assert 'Running step: add' in prov_log
356+
assert len(prov) == 4
357+
for step_prov in prov:
358+
assert isinstance(step_prov, StepRetroProv)
359+
assert step_prov.step in fw._steps.values()
358360

359361
def test_workflow_complex_serialization(self):
360362
class OtherType:

0 commit comments

Comments
 (0)