Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/_static/MetadataIO.drawio.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/source/aind_data_schema_models/modalities.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Modalities
| `MAPSEQ` | `Multiplexed analysis of projections by sequencing` | `MAPseq` |
| `MERFISH` | `Multiplexed error-robust fluorescence in situ hybridization` | `merfish` |
| `MRI` | `Magnetic resonance imaging` | `MRI` |
| `ONE_PHOTON` | `One-photon imaging` | `one-photon` |
| `POPHYS` | `Planar optical physiology` | `pophys` |
| `SCRNASEQ` | `Single cell RNA sequencing` | `scRNAseq` |
| `SLAP2` | `Random access projection microscopy` | `slap2` |
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ I want to...
:maxdepth: 1

example_workflow/example_workflow
inheritance


.. toctree::
Expand Down
72 changes: 72 additions & 0 deletions docs/source/inheritance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Derived metadata

The subject and procedures core files are tied to a single subject, while the rest of the core files are related to an individual acquisition of data. Because of this, metadata inheritance for derived assets depends on how you combine assets across subjects. The following table demonstrates the basic principle, and a helper function `Metadata.from_metadata` exists to make it easy to inherit the correct metadata in your derived assets.

![Metadata inheritance](_static/MetadataIO.drawio.svg)

The four specific principles to follow are:

- All derived assets need an updated **Data Description**
- If a derived asset is related to a single subject, inherit the **Subject** and **Procedures** unchanged. Otherwise, drop these files.
- If a derived asset is related to a single acquisition, inherit the **Instrument** and **Acquisition** unchanged. Otherwise, drop these files.
- If a derived asset is related to a single acquisition, *accumulate* **Processing** and **Quality Control**. Otherwise, start these files from scratch in the new asset.

Most users should rely on the `Metadata.from_metadata` function which implements all four of these rules for you. Load your core files and validate them as a `Metadata` object as well as any new `Processing` or `QualityControl` core data that was generated during your processing or analysis, then pass all three objects to the function.

## Example

```python
from datetime import datetime, timezone

from aind_data_schema.core.metadata import Metadata
from aind_data_schema.core.processing import DataProcess, Processing, ProcessName, ProcessStage
from aind_data_schema.core.quality_control import QCMetric, QCStatus, QualityControl, Stage, Status
from aind_data_schema.components.identifiers import Code
from aind_data_schema_models.modalities import Modality

# Load and validate source metadata (e.g. from a JSON file)
source = Metadata.model_validate_json(open("metadata.nd.json").read())

# Define the new processing you performed
new_processing = Processing.create_with_sequential_process_graph(
data_processes=[
DataProcess(
process_type=ProcessName.IMAGE_TILE_FUSING,
name="Tile fusing",
experimenters=["Dr. Dan"],
stage=ProcessStage.PROCESSING,
start_date_time=datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc),
end_date_time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc),
code=Code(url="https://github.com/my-org/my-pipeline", version="1.0.0"),
),
]
)

# Define any new QC metrics
new_qc = QualityControl(
metrics=[
QCMetric(
name="Fused image SNR",
modality=Modality.SPIM,
stage=Stage.PROCESSING,
value=42.5,
status_history=[
QCStatus(evaluator="Automated", status=Status.PASS, timestamp=datetime.now(timezone.utc))
],
tags={"step": "fusing"},
),
],
default_grouping=["step"],
)

# Create the derived metadata -- this applies all four inheritance rules
derived = Metadata.from_metadata(
source,
process_name="tile-fusing",
location="s3://my-bucket/derived-asset",
new_processing=new_processing,
new_quality_control=new_qc,
)

derived.write_standard_file(output_directory="path/to/output")
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "aind-data-schema"
description = "A library that defines AIND data schema and validates JSON files."
license = {text = "MIT"}
requires-python = ">=3.10"
requires-python = ">=3.10,<3.14"
classifiers = [
"Programming Language :: Python :: 3"
]
Expand Down
95 changes: 94 additions & 1 deletion src/aind_data_schema/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import logging
import warnings
from typing import Dict, Literal, Optional, get_args
from typing import Dict, List, Literal, Optional, Union, get_args

from aind_data_schema_models.modalities import Modality
from pydantic import (
Expand Down Expand Up @@ -399,6 +399,99 @@ def validate_data_description_name_time_consistency(self):

return self

@classmethod
def from_metadata(
cls,
metadata: "Union[Metadata, List[Metadata]]",
process_name: str,
location: str,
new_processing: Optional[Processing] = None,
new_quality_control: Optional[QualityControl] = None,
**data_description_kwargs,
) -> "Metadata":
"""Create a derived Metadata object from one or more source Metadata objects.

Applies four inheritance rules:
1. All derived assets get an updated DataDescription (DERIVED level).
2. If all sources share a single subject, inherit Subject and Procedures.
Otherwise, drop them.
3. If all sources share a single acquisition, inherit Instrument and Acquisition.
Otherwise, drop them.
4. If all sources share a single acquisition, accumulate Processing and
QualityControl from the sources with the new ones. Otherwise, only use the
new Processing/QualityControl.

Parameters
----------
metadata : Metadata or List[Metadata]
Source metadata object(s) to derive from.
process_name : str
Name of the process that created this derived asset.
location : str
Location of the new derived data asset.
new_processing : Optional[Processing]
New processing performed to create this derived asset.
new_quality_control : Optional[QualityControl]
New quality control performed on this derived asset.
**data_description_kwargs
Additional keyword arguments passed to DataDescription.from_data_description.

Returns
-------
Metadata
A new Metadata object for the derived asset.
"""
from aind_data_schema.utils.inheritance import (
_accumulate_processing,
_accumulate_quality_control,
_inherit_instrument_and_acquisition,
_inherit_subject_and_procedures,
)

if isinstance(metadata, Metadata):
metadata_list = [metadata]
else:
metadata_list = list(metadata)

if not metadata_list:
raise ValueError("At least one source Metadata object is required.")

first_dd = None
for m in metadata_list:
if m.data_description:
first_dd = m.data_description
break
if first_dd is None:
raise ValueError("At least one source Metadata must have a data_description.")

source_names = [
m.data_description.name for m in metadata_list if m.data_description and m.data_description.name
]

derived_dd = DataDescription.from_data_description(
first_dd,
process_name=process_name,
source_data=source_names if len(source_names) > 1 else None,
**data_description_kwargs,
)

subject, procedures = _inherit_subject_and_procedures(metadata_list)
instrument, acquisition = _inherit_instrument_and_acquisition(metadata_list)
processing = _accumulate_processing(metadata_list, new_processing)
quality_control = _accumulate_quality_control(metadata_list, new_quality_control)

return cls(
name=derived_dd.name,
location=location,
data_description=derived_dd,
subject=subject,
procedures=procedures,
instrument=instrument,
acquisition=acquisition,
processing=processing,
quality_control=quality_control,
)


def create_metadata_json(
name: str,
Expand Down
133 changes: 133 additions & 0 deletions src/aind_data_schema/utils/inheritance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Helper functions for metadata inheritance in derived assets"""

from typing import List, Optional, Tuple

from aind_data_schema_models.data_name_patterns import DataLevel

from aind_data_schema.core.data_description import DataDescription
from aind_data_schema.core.processing import Processing
from aind_data_schema.core.quality_control import QualityControl


def _get_root_asset_name(data_description: DataDescription) -> Optional[str]:
"""Return the original raw asset name that this data description traces back to"""
if data_description.data_level == DataLevel.RAW:
return data_description.name
if data_description.data_level == DataLevel.DERIVED and data_description.name:
parsed = DataDescription.parse_name(data_description.name, DataLevel.DERIVED)
return parsed.get("input")
return None


def _get_unique_subject_ids(metadata_list) -> List[str]:
"""Extract unique subject IDs from a list of Metadata objects"""
subject_ids = set()
for m in metadata_list:
if m.subject:
subject_ids.add(m.subject.subject_id)
elif m.data_description and m.data_description.subject_id:
subject_ids.add(m.data_description.subject_id)
return list(subject_ids)


def _get_unique_acquisition_names(metadata_list) -> List[str]:
"""Extract unique root raw asset names from a list of Metadata objects"""
names = set()
for m in metadata_list:
if m.data_description:
root = _get_root_asset_name(m.data_description)
if root:
names.add(root)
return list(names)


def _is_single_subject(metadata_list) -> bool:
"""Check whether all metadata objects refer to the same subject"""
return len(_get_unique_subject_ids(metadata_list)) == 1


def _is_single_acquisition(metadata_list) -> bool:
"""Check whether all metadata objects refer to the same acquisition"""
return len(_get_unique_acquisition_names(metadata_list)) == 1


def _inherit_subject_and_procedures(metadata_list) -> Tuple:
"""Return (subject, procedures) from the first metadata that has them, or (None, None)"""
if not _is_single_subject(metadata_list):
return None, None
for m in metadata_list:
subject = m.subject
procedures = m.procedures
if subject or procedures:
return subject, procedures
return None, None


def _inherit_instrument_and_acquisition(metadata_list) -> Tuple:
"""Return (instrument, acquisition) from the first metadata that has them, or (None, None)"""
if not _is_single_acquisition(metadata_list):
return None, None
for m in metadata_list:
instrument = m.instrument
acquisition = m.acquisition
if instrument or acquisition:
return instrument, acquisition
return None, None


def _accumulate_processing(
metadata_list,
new_processing: Optional[Processing] = None,
) -> Optional[Processing]:
"""Accumulate processing from source metadata and new processing.

If single acquisition, combine all existing processing with the new one.
If multiple acquisitions, only return the new processing.
"""
if not _is_single_acquisition(metadata_list):
return new_processing

accumulated = None
for m in metadata_list:
if m.processing:
if accumulated is None:
accumulated = m.processing
else:
accumulated = accumulated + m.processing

if new_processing:
if accumulated is None:
accumulated = new_processing
else:
accumulated = accumulated + new_processing

return accumulated


def _accumulate_quality_control(
metadata_list,
new_quality_control: Optional[QualityControl] = None,
) -> Optional[QualityControl]:
"""Accumulate quality control from source metadata and new QC.

If single acquisition, combine all existing QC with the new one.
If multiple acquisitions, only return the new QC.
"""
if not _is_single_acquisition(metadata_list):
return new_quality_control

accumulated = None
for m in metadata_list:
if m.quality_control:
if accumulated is None:
accumulated = m.quality_control
else:
accumulated = accumulated + m.quality_control

if new_quality_control:
if accumulated is None:
accumulated = new_quality_control
else:
accumulated = accumulated + new_quality_control

return accumulated
Loading
Loading