diff --git a/docs/source/_static/MetadataIO.drawio.svg b/docs/source/_static/MetadataIO.drawio.svg new file mode 100644 index 000000000..e62abbe16 --- /dev/null +++ b/docs/source/_static/MetadataIO.drawio.svg @@ -0,0 +1,4 @@ + + + +
Data Desc
Raw asset
Derived asset
(processed)
Derived asset
(analysis)
Single subject analysis
Multi-subject analysis
Subject
Procedures
Instrument
Acquisition
Quality Control
Processing
Updated
Inherited
Inherited
+new processes
+new metrics
Updated
Inherited
Inherited
+new processes
+new metrics
Data Desc
Subject
Procedures
Instrument
Acquisition
Quality Control
Processing
Updated
Inherited
Inherited
+new processes
+new metrics
Updated
Only new processing
Only new QC
Data Desc
Subject
Procedures
Instrument
Acquisition
Quality Control
Processing
Updated
Inherited
Inherited
+new processes
+new metrics
\ No newline at end of file diff --git a/docs/source/aind_data_schema_models/modalities.md b/docs/source/aind_data_schema_models/modalities.md index 33a654594..75efe770e 100644 --- a/docs/source/aind_data_schema_models/modalities.md +++ b/docs/source/aind_data_schema_models/modalities.md @@ -23,6 +23,7 @@ Modalities | `MAPSEQ` | `Multiplexed analysis of projections by sequencing` | `MAPseq` | | `MERFISH` | `Multiplexed error-robust fluorescence in situ hybridization` | `merfish` | | `MRI` | `Magnetic resonance imaging` | `MRI` | +| `ONE_PHOTON` | `One-photon imaging` | `one-photon` | | `POPHYS` | `Planar optical physiology` | `pophys` | | `SCRNASEQ` | `Single cell RNA sequencing` | `scRNAseq` | | `SLAP2` | `Random access projection microscopy` | `slap2` | diff --git a/docs/source/index.rst b/docs/source/index.rst index fe05a8c66..19bbbdb6f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -45,6 +45,7 @@ I want to... :maxdepth: 1 example_workflow/example_workflow + inheritance .. toctree:: diff --git a/docs/source/inheritance.md b/docs/source/inheritance.md new file mode 100644 index 000000000..a1af09199 --- /dev/null +++ b/docs/source/inheritance.md @@ -0,0 +1,72 @@ +# Derived metadata + +The subject and procedures core files are tied to a single subject, while the rest of the core files are related to an individual acquisition of data. Because of this, metadata inheritance for derived assets depends on how you combine assets across subjects. The following table demonstrates the basic principle, and a helper function `Metadata.from_metadata` exists to make it easy to inherit the correct metadata in your derived assets. + +![Metadata inheritance](_static/MetadataIO.drawio.svg) + +The four specific principles to follow are: + +- All derived assets need an updated **Data Description** +- If a derived asset is related to a single subject, inherit the **Subject** and **Procedures** unchanged. Otherwise, drop these files. +- If a derived asset is related to a single acquisition, inherit the **Instrument** and **Acquisition** unchanged. Otherwise, drop these files. +- If a derived asset is related to a single acquisition, *accumulate* **Processing** and **Quality Control**. Otherwise, start these files from scratch in the new asset. + +Most users should rely on the `Metadata.from_metadata` function which implements all four of these rules for you. Load your core files and validate them as a `Metadata` object as well as any new `Processing` or `QualityControl` core data that was generated during your processing or analysis, then pass all three objects to the function. + +## Example + +```python +from datetime import datetime, timezone + +from aind_data_schema.core.metadata import Metadata +from aind_data_schema.core.processing import DataProcess, Processing, ProcessName, ProcessStage +from aind_data_schema.core.quality_control import QCMetric, QCStatus, QualityControl, Stage, Status +from aind_data_schema.components.identifiers import Code +from aind_data_schema_models.modalities import Modality + +# Load and validate source metadata (e.g. from a JSON file) +source = Metadata.model_validate_json(open("metadata.nd.json").read()) + +# Define the new processing you performed +new_processing = Processing.create_with_sequential_process_graph( + data_processes=[ + DataProcess( + process_type=ProcessName.IMAGE_TILE_FUSING, + name="Tile fusing", + experimenters=["Dr. Dan"], + stage=ProcessStage.PROCESSING, + start_date_time=datetime(2024, 1, 15, 10, 0, 0, tzinfo=timezone.utc), + end_date_time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=timezone.utc), + code=Code(url="https://github.com/my-org/my-pipeline", version="1.0.0"), + ), + ] +) + +# Define any new QC metrics +new_qc = QualityControl( + metrics=[ + QCMetric( + name="Fused image SNR", + modality=Modality.SPIM, + stage=Stage.PROCESSING, + value=42.5, + status_history=[ + QCStatus(evaluator="Automated", status=Status.PASS, timestamp=datetime.now(timezone.utc)) + ], + tags={"step": "fusing"}, + ), + ], + default_grouping=["step"], +) + +# Create the derived metadata -- this applies all four inheritance rules +derived = Metadata.from_metadata( + source, + process_name="tile-fusing", + location="s3://my-bucket/derived-asset", + new_processing=new_processing, + new_quality_control=new_qc, +) + +derived.write_standard_file(output_directory="path/to/output") +``` diff --git a/pyproject.toml b/pyproject.toml index 6d3388d68..a484437a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "aind-data-schema" description = "A library that defines AIND data schema and validates JSON files." license = {text = "MIT"} -requires-python = ">=3.10" +requires-python = ">=3.10,<3.14" classifiers = [ "Programming Language :: Python :: 3" ] diff --git a/src/aind_data_schema/core/metadata.py b/src/aind_data_schema/core/metadata.py index 0dc3783f9..fe404e8c2 100644 --- a/src/aind_data_schema/core/metadata.py +++ b/src/aind_data_schema/core/metadata.py @@ -4,7 +4,7 @@ import json import logging import warnings -from typing import Dict, Literal, Optional, get_args +from typing import Dict, List, Literal, Optional, Union, get_args from aind_data_schema_models.modalities import Modality from pydantic import ( @@ -399,6 +399,99 @@ def validate_data_description_name_time_consistency(self): return self + @classmethod + def from_metadata( + cls, + metadata: "Union[Metadata, List[Metadata]]", + process_name: str, + location: str, + new_processing: Optional[Processing] = None, + new_quality_control: Optional[QualityControl] = None, + **data_description_kwargs, + ) -> "Metadata": + """Create a derived Metadata object from one or more source Metadata objects. + + Applies four inheritance rules: + 1. All derived assets get an updated DataDescription (DERIVED level). + 2. If all sources share a single subject, inherit Subject and Procedures. + Otherwise, drop them. + 3. If all sources share a single acquisition, inherit Instrument and Acquisition. + Otherwise, drop them. + 4. If all sources share a single acquisition, accumulate Processing and + QualityControl from the sources with the new ones. Otherwise, only use the + new Processing/QualityControl. + + Parameters + ---------- + metadata : Metadata or List[Metadata] + Source metadata object(s) to derive from. + process_name : str + Name of the process that created this derived asset. + location : str + Location of the new derived data asset. + new_processing : Optional[Processing] + New processing performed to create this derived asset. + new_quality_control : Optional[QualityControl] + New quality control performed on this derived asset. + **data_description_kwargs + Additional keyword arguments passed to DataDescription.from_data_description. + + Returns + ------- + Metadata + A new Metadata object for the derived asset. + """ + from aind_data_schema.utils.inheritance import ( + _accumulate_processing, + _accumulate_quality_control, + _inherit_instrument_and_acquisition, + _inherit_subject_and_procedures, + ) + + if isinstance(metadata, Metadata): + metadata_list = [metadata] + else: + metadata_list = list(metadata) + + if not metadata_list: + raise ValueError("At least one source Metadata object is required.") + + first_dd = None + for m in metadata_list: + if m.data_description: + first_dd = m.data_description + break + if first_dd is None: + raise ValueError("At least one source Metadata must have a data_description.") + + source_names = [ + m.data_description.name for m in metadata_list if m.data_description and m.data_description.name + ] + + derived_dd = DataDescription.from_data_description( + first_dd, + process_name=process_name, + source_data=source_names if len(source_names) > 1 else None, + **data_description_kwargs, + ) + + subject, procedures = _inherit_subject_and_procedures(metadata_list) + instrument, acquisition = _inherit_instrument_and_acquisition(metadata_list) + processing = _accumulate_processing(metadata_list, new_processing) + quality_control = _accumulate_quality_control(metadata_list, new_quality_control) + + return cls( + name=derived_dd.name, + location=location, + data_description=derived_dd, + subject=subject, + procedures=procedures, + instrument=instrument, + acquisition=acquisition, + processing=processing, + quality_control=quality_control, + ) + def create_metadata_json( name: str, diff --git a/src/aind_data_schema/utils/inheritance.py b/src/aind_data_schema/utils/inheritance.py new file mode 100644 index 000000000..20783ad40 --- /dev/null +++ b/src/aind_data_schema/utils/inheritance.py @@ -0,0 +1,133 @@ +"""Helper functions for metadata inheritance in derived assets""" + +from typing import List, Optional, Tuple + +from aind_data_schema_models.data_name_patterns import DataLevel + +from aind_data_schema.core.data_description import DataDescription +from aind_data_schema.core.processing import Processing +from aind_data_schema.core.quality_control import QualityControl + + +def _get_root_asset_name(data_description: DataDescription) -> Optional[str]: + """Return the original raw asset name that this data description traces back to""" + if data_description.data_level == DataLevel.RAW: + return data_description.name + if data_description.data_level == DataLevel.DERIVED and data_description.name: + parsed = DataDescription.parse_name(data_description.name, DataLevel.DERIVED) + return parsed.get("input") + return None + + +def _get_unique_subject_ids(metadata_list) -> List[str]: + """Extract unique subject IDs from a list of Metadata objects""" + subject_ids = set() + for m in metadata_list: + if m.subject: + subject_ids.add(m.subject.subject_id) + elif m.data_description and m.data_description.subject_id: + subject_ids.add(m.data_description.subject_id) + return list(subject_ids) + + +def _get_unique_acquisition_names(metadata_list) -> List[str]: + """Extract unique root raw asset names from a list of Metadata objects""" + names = set() + for m in metadata_list: + if m.data_description: + root = _get_root_asset_name(m.data_description) + if root: + names.add(root) + return list(names) + + +def _is_single_subject(metadata_list) -> bool: + """Check whether all metadata objects refer to the same subject""" + return len(_get_unique_subject_ids(metadata_list)) == 1 + + +def _is_single_acquisition(metadata_list) -> bool: + """Check whether all metadata objects refer to the same acquisition""" + return len(_get_unique_acquisition_names(metadata_list)) == 1 + + +def _inherit_subject_and_procedures(metadata_list) -> Tuple: + """Return (subject, procedures) from the first metadata that has them, or (None, None)""" + if not _is_single_subject(metadata_list): + return None, None + for m in metadata_list: + subject = m.subject + procedures = m.procedures + if subject or procedures: + return subject, procedures + return None, None + + +def _inherit_instrument_and_acquisition(metadata_list) -> Tuple: + """Return (instrument, acquisition) from the first metadata that has them, or (None, None)""" + if not _is_single_acquisition(metadata_list): + return None, None + for m in metadata_list: + instrument = m.instrument + acquisition = m.acquisition + if instrument or acquisition: + return instrument, acquisition + return None, None + + +def _accumulate_processing( + metadata_list, + new_processing: Optional[Processing] = None, +) -> Optional[Processing]: + """Accumulate processing from source metadata and new processing. + + If single acquisition, combine all existing processing with the new one. + If multiple acquisitions, only return the new processing. + """ + if not _is_single_acquisition(metadata_list): + return new_processing + + accumulated = None + for m in metadata_list: + if m.processing: + if accumulated is None: + accumulated = m.processing + else: + accumulated = accumulated + m.processing + + if new_processing: + if accumulated is None: + accumulated = new_processing + else: + accumulated = accumulated + new_processing + + return accumulated + + +def _accumulate_quality_control( + metadata_list, + new_quality_control: Optional[QualityControl] = None, +) -> Optional[QualityControl]: + """Accumulate quality control from source metadata and new QC. + + If single acquisition, combine all existing QC with the new one. + If multiple acquisitions, only return the new QC. + """ + if not _is_single_acquisition(metadata_list): + return new_quality_control + + accumulated = None + for m in metadata_list: + if m.quality_control: + if accumulated is None: + accumulated = m.quality_control + else: + accumulated = accumulated + m.quality_control + + if new_quality_control: + if accumulated is None: + accumulated = new_quality_control + else: + accumulated = accumulated + new_quality_control + + return accumulated diff --git a/tests/test_inheritance.py b/tests/test_inheritance.py new file mode 100644 index 000000000..c407eff3a --- /dev/null +++ b/tests/test_inheritance.py @@ -0,0 +1,418 @@ +"""Tests for Metadata.from_metadata inheritance logic""" + +import unittest +from datetime import datetime, timezone + +from aind_data_schema_models.data_name_patterns import DataLevel +from aind_data_schema_models.modalities import Modality +from aind_data_schema_models.organizations import Organization + +from aind_data_schema.components.identifiers import Code, Person +from aind_data_schema.core.data_description import DataDescription, Funding +from aind_data_schema.core.metadata import Metadata +from aind_data_schema.core.processing import DataProcess, Processing, ProcessName, ProcessStage +from aind_data_schema.core.quality_control import QCMetric, QCStatus, QualityControl, Stage, Status +from aind_data_schema.core.subject import Subject +from aind_data_schema.utils.inheritance import ( + _accumulate_processing, + _accumulate_quality_control, + _get_root_asset_name, + _get_unique_subject_ids, + _inherit_instrument_and_acquisition, + _inherit_subject_and_procedures, +) + +from examples.ephys_instrument import inst as example_inst +from examples.processing import p as example_processing +from examples.quality_control import q as example_qc +from examples.subject import s as example_subject + + +t = datetime(2022, 11, 22, 8, 43, 00, tzinfo=timezone.utc) + +example_code = Code(url="https://github.com/example", version="0.1") + + +_counter = 0 + + +def _make_metadata(subject_id="123456"): + """Helper to create a Metadata object with a given subject ID and unique creation time""" + global _counter + _counter += 1 + dd = DataDescription( + modalities=[Modality.ECEPHYS], + subject_id=subject_id, + creation_time=datetime(2022, 2, 21, 16, 30, _counter, tzinfo=timezone.utc), + institution=Organization.AIND, + investigators=[Person(name="Jane Smith")], + funding_source=[Funding(funder=Organization.AI)], + project_name="Test project", + data_level=DataLevel.RAW, + ) + sub = Subject.model_validate(example_subject.model_dump()) + sub.subject_id = subject_id + return Metadata( + name=dd.name, + location=f"s3://bucket/{dd.name}", + subject=sub, + data_description=dd, + processing=example_processing, + quality_control=example_qc, + ) + + +class TestFromMetadataSingleSource(unittest.TestCase): + """Tests for Metadata.from_metadata with a single source""" + + def setUp(self): + """Create a single source Metadata object and some new processing and QC to add""" + self.source = _make_metadata() + self.new_processing = Processing.create_with_sequential_process_graph( + data_processes=[ + DataProcess( + process_type=ProcessName.ANALYSIS, + name="Derived analysis", + experimenters=["Dr. Test"], + stage=ProcessStage.ANALYSIS, + start_date_time=t, + end_date_time=t, + code=example_code, + ), + ] + ) + self.new_qc = QualityControl( + metrics=[ + QCMetric( + name="Derived metric", + modality=Modality.ECEPHYS, + stage=Stage.PROCESSING, + value=0.95, + status_history=[QCStatus(evaluator="Auto", status=Status.PASS, timestamp=t)], + tags={"step": "derived"}, + ), + ], + default_grouping=["modality"], + ) + + def test_single_source_inherits_subject(self): + """Subject should be inherited from the single source""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.subject) + self.assertEqual(result.subject.subject_id, "123456") + + def test_single_source_data_description_is_derived(self): + """Data description should be updated to data level DERIVED and name should include process name""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.data_description) + self.assertEqual(result.data_description.data_level, DataLevel.DERIVED) + self.assertIn("my-analysis", result.data_description.name) + + def test_single_source_accumulates_processing(self): + """Processing from source should be accumulated with new processing""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + new_processing=self.new_processing, + ) + self.assertIsNotNone(result.processing) + process_names = [dp.name for dp in result.processing.data_processes] + self.assertIn("Derived analysis", process_names) + self.assertGreater( + len(result.processing.data_processes), + len(self.new_processing.data_processes), + ) + + def test_single_source_accumulates_qc(self): + """Quality control metrics from source should be accumulated with new metrics""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + new_quality_control=self.new_qc, + ) + self.assertIsNotNone(result.quality_control) + metric_names = [m.name for m in result.quality_control.metrics] + self.assertIn("Derived metric", metric_names) + self.assertGreater( + len(result.quality_control.metrics), + len(self.new_qc.metrics), + ) + + def test_single_source_no_new_processing(self): + """Processing should remain unchanged when no new processing is provided""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.processing) + self.assertEqual( + len(result.processing.data_processes), + len(example_processing.data_processes), + ) + + def test_accepts_single_metadata_not_list(self): + """Method should accept a single Metadata object, not just a list""" + result = Metadata.from_metadata( + self.source, + process_name="my-analysis", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.data_description) + + +class TestFromMetadataMultipleSameSubject(unittest.TestCase): + """Tests for multiple sources with same subject but different acquisitions""" + + def setUp(self): + """Create two metadata objects with same subject""" + self.source1 = _make_metadata(subject_id="123456") + self.source2 = _make_metadata(subject_id="123456") + + def test_same_subject_inherits_subject(self): + """Subject should be inherited when all sources have the same subject""" + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.subject) + self.assertEqual(result.subject.subject_id, "123456") + + def test_different_acquisitions_drops_instrument_and_acquisition(self): + """Instrument and acquisition should be dropped when sources have different acquisitions""" + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + ) + self.assertIsNone(result.instrument) + self.assertIsNone(result.acquisition) + + def test_different_acquisitions_does_not_accumulate_processing(self): + """Processing should not be accumulated when sources have different acquisitions""" + new_proc = Processing.create_with_sequential_process_graph( + data_processes=[ + DataProcess( + process_type=ProcessName.ANALYSIS, + name="New step", + experimenters=["Dr. Test"], + stage=ProcessStage.ANALYSIS, + start_date_time=t, + end_date_time=t, + code=example_code, + ), + ] + ) + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + new_processing=new_proc, + ) + self.assertEqual(len(result.processing.data_processes), 1) + self.assertEqual(result.processing.data_processes[0].name, "New step") + + def test_different_acquisitions_does_not_accumulate_qc(self): + """Quality control should not be accumulated when sources have different acquisitions""" + new_qc = QualityControl( + metrics=[ + QCMetric( + name="New QC", + modality=Modality.ECEPHYS, + stage=Stage.PROCESSING, + value=1.0, + status_history=[QCStatus(evaluator="Auto", status=Status.PASS, timestamp=t)], + tags={"step": "new"}, + ), + ], + default_grouping=["modality"], + ) + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + new_quality_control=new_qc, + ) + self.assertEqual(len(result.quality_control.metrics), 1) + + def test_source_data_lists_both_sources(self): + """Result should list both source assets in source_data field""" + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + ) + self.assertIsNotNone(result.data_description.source_data) + self.assertEqual(len(result.data_description.source_data), 2) + + +class TestFromMetadataDifferentSubjects(unittest.TestCase): + """Tests for multiple sources with different subjects""" + + def setUp(self): + """Create two metadata objects with different subjects""" + self.source1 = _make_metadata(subject_id="123456") + self.source2 = _make_metadata(subject_id="789012") + + def test_different_subjects_drops_subject(self): + """Subject should be dropped when sources have different subjects""" + new_proc = Processing.create_with_sequential_process_graph( + data_processes=[ + DataProcess( + process_type=ProcessName.ANALYSIS, + name="New step", + experimenters=["Dr. Test"], + stage=ProcessStage.ANALYSIS, + start_date_time=t, + end_date_time=t, + code=example_code, + ), + ] + ) + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + new_processing=new_proc, + ) + self.assertIsNone(result.subject) + + def test_different_subjects_drops_procedures(self): + """Procedures should be dropped when sources have different subjects""" + new_proc = Processing.create_with_sequential_process_graph( + data_processes=[ + DataProcess( + process_type=ProcessName.ANALYSIS, + name="New step", + experimenters=["Dr. Test"], + stage=ProcessStage.ANALYSIS, + start_date_time=t, + end_date_time=t, + code=example_code, + ), + ] + ) + result = Metadata.from_metadata( + [self.source1, self.source2], + process_name="merge", + location="s3://bucket/derived", + new_processing=new_proc, + ) + self.assertIsNone(result.procedures) + + +class TestFromMetadataEdgeCases(unittest.TestCase): + """Tests for edge cases""" + + def test_empty_list_raises(self): + """Empty source list should raise ValueError""" + with self.assertRaises(ValueError): + Metadata.from_metadata([], process_name="x", location="s3://bucket/x") + + def test_no_data_description_raises(self): + """Source without data_description should raise ValueError""" + m = Metadata( + name="test", + location="s3://bucket/test", + processing=example_processing, + ) + with self.assertRaises(ValueError): + Metadata.from_metadata(m, process_name="x", location="s3://bucket/x") + + def test_result_name_matches_data_description(self): + """Result name should match its data_description name""" + source = _make_metadata() + result = Metadata.from_metadata( + source, + process_name="my-pipeline", + location="s3://bucket/derived", + ) + self.assertEqual(result.name, result.data_description.name) + + +class TestInternalHelpers(unittest.TestCase): + """Direct tests for internal helper functions to ensure full coverage""" + + def setUp(self): + """Create source metadata and derived metadata for testing""" + self.source = _make_metadata() + self.derived = Metadata.from_metadata( + self.source, + process_name="test-pipeline", + location="s3://bucket/derived", + ) + + def test_get_root_asset_name_derived(self): + """_get_root_asset_name should return source asset name for derived data""" + root = _get_root_asset_name(self.derived.data_description) + self.assertEqual(root, self.source.data_description.name) + + def test_get_root_asset_name_returns_none_for_non_raw_non_derived(self): + """_get_root_asset_name should return None for non-raw, non-derived data levels""" + simulated_dd = self.source.data_description.model_copy(update={"data_level": DataLevel.SIMULATED}) + self.assertIsNone(_get_root_asset_name(simulated_dd)) + + def test_get_unique_subject_ids_from_data_description(self): + """_get_unique_subject_ids should extract subject ID from data_description when subject is None""" + no_subject = self.source.model_copy(update={"subject": None}) + ids = _get_unique_subject_ids([no_subject]) + self.assertEqual(ids, ["123456"]) + + def test_inherit_subject_and_procedures_returns_none_when_no_subject_or_procedures(self): + """_inherit_subject_and_procedures should return None when source has neither subject nor procedures""" + no_subject = self.source.model_copy(update={"subject": None, "procedures": None}) + subject, procedures = _inherit_subject_and_procedures([no_subject]) + self.assertIsNone(subject) + self.assertIsNone(procedures) + + def test_inherit_instrument_and_acquisition_returns_instrument_when_set(self): + """_inherit_instrument_and_acquisition should return instrument when it is set""" + with_inst = self.source.model_copy(update={"instrument": example_inst}) + instrument, acquisition = _inherit_instrument_and_acquisition([with_inst]) + self.assertIs(instrument, example_inst) + self.assertIsNone(acquisition) + + def test_accumulate_processing_two_same_acquisition_sources(self): + """_accumulate_processing should combine processing from multiple sources with same acquisition""" + source_copy = Metadata.model_validate(self.source.model_dump()) + result = _accumulate_processing([self.source, source_copy]) + self.assertEqual( + len(result.data_processes), + 2 * len(example_processing.data_processes), + ) + + def test_accumulate_processing_no_source_processing(self): + """_accumulate_processing should return new_processing when source has no processing""" + no_proc = self.source.model_copy(update={"processing": None}) + result = _accumulate_processing([no_proc], new_processing=example_processing) + self.assertIs(result, example_processing) + + def test_accumulate_quality_control_two_same_acquisition_sources(self): + """_accumulate_quality_control should combine metrics from multiple sources with same acquisition""" + source_copy = Metadata.model_validate(self.source.model_dump()) + result = _accumulate_quality_control([self.source, source_copy]) + self.assertEqual( + len(result.metrics), + 2 * len(example_qc.metrics), + ) + + def test_accumulate_quality_control_no_source_qc(self): + """_accumulate_quality_control should return new_quality_control when source has no QC""" + no_qc = self.source.model_copy(update={"quality_control": None}) + result = _accumulate_quality_control([no_qc], new_quality_control=example_qc) + self.assertIs(result, example_qc) + + +if __name__ == "__main__": + unittest.main()