Skip to content

Commit 86baed0

Browse files
committed
mock eyeflow run + structural changes using PyDantic
1 parent ed9f197 commit 86baed0

38 files changed

Lines changed: 1053 additions & 1039 deletions

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ license = { text = "GPL-3.0-only" }
1515
dependencies = [
1616
"numpy>=1.24",
1717
"h5py>=3.9",
18+
"pydantic>=2.6",
1819
"scipy>=1.10",
1920
"scikit-image>=0.20",
2021
"joblib>=1.3",

src/cli.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
from collections.abc import Callable, Sequence
2424
from pathlib import Path
2525

26+
from runtime_limits import configure_numeric_threads
27+
28+
configure_numeric_threads()
29+
2630
import h5py
2731

2832
from pipelines import (
@@ -31,7 +35,7 @@
3135
load_pipeline_catalog,
3236
)
3337
from pipelines.core.errors import format_pipeline_exception
34-
from utils.io import write_combined_results_h5
38+
from input_output import write_combined_results_h5
3539

3640

3741
def _build_pipeline_registry() -> dict[str, PipelineDescriptor]:

src/domain/steps/vessel_velocity_estimator.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import numpy as np
88

9+
from runtime_limits import cap_parallel_jobs
10+
911
from ._masks import elliptical_mask
1012
from .base import DomainStep as BaseStep
1113

@@ -56,15 +58,15 @@ def run(self, ctx):
5658
disk, dilation, inpaint = _skimage_dependencies()
5759
mask = dilation(vessel_mask, disk(local_background_dist)) #TODO add parameter
5860

59-
n_jobs = _cpu_count() #TODO add parameter for number of parallel jobs
61+
n_jobs = cap_parallel_jobs(_cpu_count())
6062

6163
print(f" - Inpainting fRMS with {n_jobs} parallel jobs")
6264

6365
def _inpaint_frame(frame, mask):
6466
return inpaint.inpaint_biharmonic(frame, mask)
6567

6668
fRMSbkg = _run_in_parallel(
67-
partial(_inpaint_frame, mask=mask), fRMS, n_jobs=-1, chunking=False
69+
partial(_inpaint_frame, mask=mask), fRMS, n_jobs=n_jobs, chunking=False
6870
)
6971

7072
# fRMSbkg = np.stack(np.array([inpaint.inpaint_biharmonic(frame, mask) for frame in fRMS]), axis=0)
@@ -130,6 +132,7 @@ def _run_in_parallel(func, iterable, n_jobs=-1, chunking=False):
130132
return np.stack([func(item) for item in iterable], axis=0)
131133
if n_jobs == -1:
132134
n_jobs = joblib.cpu_count()
135+
n_jobs = cap_parallel_jobs(n_jobs)
133136
results = joblib.Parallel(n_jobs=n_jobs, backend="threading")(
134137
joblib.delayed(func)(item) for item in iterable
135138
)

src/eye_flow.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
from pathlib import Path
1515
from tkinter import filedialog, messagebox, ttk
1616

17+
from runtime_limits import configure_numeric_threads
18+
19+
configure_numeric_threads()
20+
1721
from app_settings import (
1822
LAST_BATCH_LOG_FILENAME,
1923
AppSettingsStore,
@@ -34,14 +38,13 @@
3438

3539
from pipelines import PipelineDescriptor, load_pipeline_catalog
3640
from pipelines.core.errors import format_pipeline_exception
37-
from utils.io import (
41+
from input_output import (
42+
EyeFlowOutputManager,
3843
HOLO_SUFFIX,
3944
PipelineInputView,
4045
ResolvedHoloInput,
41-
append_result_group,
4246
create_zip_from_tree,
4347
holo_input_status,
44-
initialize_output_h5,
4548
open_h5,
4649
reset_output_dir,
4750
resolve_selected_holo_inputs,
@@ -2026,8 +2029,8 @@ def _run_pipelines_to_output(
20262029
if doppler_vision_h5 is not None
20272030
else None
20282031
)
2029-
initialize_output_h5(
2030-
work_h5,
2032+
output_manager = EyeFlowOutputManager(work_h5)
2033+
output_manager.initialize(
20312034
holodoppler_source_file=(
20322035
str(holodoppler_h5) if holodoppler_h5 is not None else None
20332036
),
@@ -2051,7 +2054,7 @@ def _run_pipelines_to_output(
20512054
raise RuntimeError(
20522055
format_pipeline_exception(exc, pipeline)
20532056
) from exc
2054-
append_result_group(work_h5, pipeline.name, result)
2057+
output_manager.append_pipeline_result(pipeline.name, result)
20552058
result.output_h5_path = str(output_h5_path)
20562059
self._log_batch(f"[OK] {pipeline.name}")
20572060
self._advance_progress()
Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,15 @@
3232
write_result_h5,
3333
write_value_dataset,
3434
)
35-
from .output_writer import (
36-
is_angioeye_output_key,
37-
is_root_mirrored_output_key,
35+
from .output_manager import (
36+
EyeFlowOutputManager,
3837
pack_dopplerview_analysis_outputs,
3938
pack_velocity_per_beat_outputs,
4039
systolic_index_base_for_path,
4140
)
4241
from .schema import (
42+
DOPPLER_VIEW_ANALYSIS_SCHEMA,
43+
DOPPLER_VIEW_SCHEMA,
4344
DOPPLERVIEW_ANALYSIS_ROOT,
4445
DOPPLERVIEW_ARTERIAL_VELOCITY_SIGNAL_PATH,
4546
DOPPLERVIEW_BEAT_INDICES_PATH,
@@ -58,38 +59,30 @@
5859
DV_H5_LAYOUT,
5960
DV_RETINAL_ARTERY_MASK_PATH,
6061
DV_RETINAL_VEIN_MASK_PATH,
61-
EYE_FLOW_POSTPROCESS_ROOT,
62-
EYE_FLOW_PROCESSING_ROOT,
63-
EYE_FLOW_ROOT,
6462
HD_BATCH_STRIDE_KEY,
6563
HD_CONFIG_DIR_NAME,
6664
HD_CONFIG_FILENAME,
6765
HD_H5_LAYOUT,
6866
HD_MOMENT0_PATH,
6967
HD_MOMENT2_PATH,
7068
HD_SAMPLING_FREQ_KEY,
69+
H5DatasetSpec,
70+
H5SourceSchema,
7171
HDF5_SUFFIXES,
72+
HOLODOPPLER_SCHEMA,
7273
HOLO_COMPANION_H5_LAYOUTS,
7374
HOLO_DATA_DIR_TEMPLATE,
7475
HOLO_H5_SUBDIR,
7576
HOLO_SUFFIX,
76-
LEGACY_PIPELINES_ROOT,
7777
HoloCompanionH5Layout,
78-
find_pipeline_group,
79-
find_postprocess_group,
80-
get_postprocess_root,
81-
get_processing_root,
78+
JsonConfigValueSpec,
8279
iter_metric_datasets,
83-
pipeline_path_candidates,
84-
postprocess_path_candidates,
8580
)
86-
from .sources import (
81+
from .inputs import (
8782
EyeFlowView,
83+
HoloInputStatus,
8884
MergedAttrs,
8985
PipelineInputView,
90-
)
91-
from .resolvers import (
92-
HoloInputStatus,
9386
ResolvedHoloInput,
9487
holo_input_status,
9588
resolve_holo_input,
@@ -106,8 +99,11 @@
10699
"HOLO_SUFFIX",
107100
"MetricsTree",
108101
"EyeFlowView",
102+
"EyeFlowOutputManager",
109103
"HoloCompanionH5Layout",
110104
"HoloInputStatus",
105+
"DOPPLER_VIEW_ANALYSIS_SCHEMA",
106+
"DOPPLER_VIEW_SCHEMA",
111107
"DOPPLERVIEW_ANALYSIS_ROOT",
112108
"DOPPLERVIEW_ARTERIAL_VELOCITY_SIGNAL_PATH",
113109
"DOPPLERVIEW_BEAT_INDICES_PATH",
@@ -131,6 +127,10 @@
131127
"HD_MOMENT0_PATH",
132128
"HD_MOMENT2_PATH",
133129
"HD_SAMPLING_FREQ_KEY",
130+
"H5DatasetSpec",
131+
"H5SourceSchema",
132+
"HOLODOPPLER_SCHEMA",
133+
"JsonConfigValueSpec",
134134
"MergedAttrs",
135135
"PipelineInputView",
136136
"ResolvedHoloInput",
@@ -145,18 +145,12 @@
145145
"extracted_zip_tree",
146146
"find_child_group_by_attr",
147147
"find_first_existing_path",
148-
"get_postprocess_root",
149-
"get_processing_root",
150148
"holo_input_status",
151149
"initialize_output_h5",
152150
"iter_metric_datasets",
153-
"is_angioeye_output_key",
154-
"is_root_mirrored_output_key",
155151
"open_h5",
156152
"pack_velocity_per_beat_outputs",
157153
"pack_dopplerview_analysis_outputs",
158-
"pipeline_path_candidates",
159-
"postprocess_path_candidates",
160154
"read_array",
161155
"read_dataset",
162156
"replace_file_in_zip",
@@ -174,8 +168,4 @@
174168
"write_metrics_trees_to_h5",
175169
"write_result_h5",
176170
"write_value_dataset",
177-
"EYE_FLOW_POSTPROCESS_ROOT",
178-
"EYE_FLOW_PROCESSING_ROOT",
179-
"EYE_FLOW_ROOT",
180-
"LEGACY_PIPELINES_ROOT",
181171
]
Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,6 @@ def set_attr_safe(h5obj: h5py.File | h5py.Group | h5py.Dataset, key: str, value)
165165
h5obj.attrs[key] = str(value)
166166

167167

168-
DEFAULT_COMPRESSION_THRESHOLD = 1_000_000
169-
170-
171168
def _normalize_dataset_payload(data, ds_attrs):
172169
original_class = None
173170
payload = data
@@ -180,11 +177,6 @@ def _normalize_dataset_payload(data, ds_attrs):
180177
elif isinstance(payload, (list, tuple)):
181178
payload = np.asarray(payload)
182179

183-
if isinstance(payload, np.ndarray):
184-
if payload.dtype.kind == "f" and payload.dtype.itemsize == 8:
185-
payload = payload.astype(np.float32)
186-
original_class = original_class or "float64"
187-
188180
if original_class is not None:
189181
ds_attrs = {} if ds_attrs is None else dict(ds_attrs)
190182
ds_attrs.setdefault("original_class", original_class)
@@ -193,17 +185,6 @@ def _normalize_dataset_payload(data, ds_attrs):
193185

194186

195187
def _get_dataset_creation_kwargs(payload: np.ndarray) -> dict[str, object]:
196-
if (
197-
isinstance(payload, np.ndarray)
198-
and payload.dtype.kind in "bBhHiIlLef"
199-
and payload.nbytes >= DEFAULT_COMPRESSION_THRESHOLD
200-
and payload.ndim > 0
201-
):
202-
return {
203-
"compression": "gzip",
204-
"compression_opts": 6,
205-
"chunks": tuple(min(s, 1024) for s in payload.shape),
206-
}
207188
return {}
208189

209190

@@ -363,25 +344,18 @@ def append_result_group(
363344
pipeline_name: str,
364345
result: "ProcessResult",
365346
) -> h5py.Group:
366-
from pipelines.core.base import ProcessResult
367-
from utils.io.output_writer import is_root_mirrored_output_key
368-
369-
pipelines_grp = (
370-
h5file["EyeFlow"] if "EyeFlow" in h5file else h5file.create_group("EyeFlow")
371-
)
372-
pipeline_grp = _get_or_replace_group(pipelines_grp, safe_h5_key(pipeline_name))
373-
pipeline_grp.attrs["pipeline"] = pipeline_name
347+
if "EyeFlow" in h5file:
348+
del h5file["EyeFlow"]
349+
h5file.attrs["last_pipeline"] = pipeline_name
374350
if result.attrs:
375351
for key, value in result.attrs.items():
376352
if key == "pipeline":
377353
continue
378-
set_attr_safe(pipeline_grp, key, value)
354+
set_attr_safe(h5file, key, value)
379355
for key, value in result.metrics.items():
380-
write_value_dataset(pipeline_grp, key, value)
381-
if is_root_mirrored_output_key(key):
382-
write_value_dataset(h5file, key, value)
356+
write_value_dataset(h5file, key, value)
383357
h5file.flush()
384-
return pipeline_grp
358+
return h5file
385359

386360

387361
def write_result_h5(

0 commit comments

Comments
 (0)