diff --git a/examples/propagate_noise.py b/examples/propagate_noise.py new file mode 100644 index 0000000..cf41b07 --- /dev/null +++ b/examples/propagate_noise.py @@ -0,0 +1,118 @@ +import numpy as np +import matplotlib.pyplot as plt +import seaborn as sns + +import mritk + + +def main(c_target, SNR=25): + # Physical Parameters + r1 = 3.2 # Longitudinal relaxivity (s^-1 L mmol^-1) + T10 = 4.5 # Native T1 time of CSF (seconds) + # SNR = 25.0 # Signal-to-Noise Ratio + sigma = 1.0 / SNR # Theoretical max signals approach 1.0 (based on M0=1.0) + M0 = 1.0 + # # Sequence Parameters + TR = 9.6 # Taken from Gonzo paper + TI = 2.65 # Taken from Gonzo paper + t_LL = np.linspace(0.115, 2.754, 14) # Look-Locker: 14 data points over 2.75s same as Gonzo + + N = 5000 + + T1_target = mritk.concentration.T1_from_concentration_expr(c=c_target, t1_0=T10, r1=r1) + np.random.seed(42) # For reproducibility + c_true_array = np.full(N, c_target) + T1_true = mritk.concentration.T1_from_concentration_expr(c=c_true_array, t1_0=T10, r1=r1) + + # Generate Noisy T1 Estimates + T1_est_LL = mritk.looklocker.T1_to_noisy_T1_looklocker( + T1_true, + t_LL=t_LL, + M0=M0, + sigma=sigma, + ) + T1_est_LL /= 1000.0 # Convert ms to seconds for consistency + T1_range = np.linspace(0.1, 10.0, 5000) + S_SE_range, S_IR_range = mritk.mixed.T1_to_mixed_signals(T1_range, TR=TR, TI=TI) + ratio_range = S_IR_range / S_SE_range + T1_est_mixed = mritk.mixed.T1_to_noisy_T1_mixed( + T1_true, + TR=TR, + TI=TI, + f_grid=ratio_range, + t_grid=T1_range, + sigma=sigma, + ) + + T1_est_hybrid = mritk.hybrid.compute_hybrid_t1_array( + ll_data=T1_est_LL, + mixed_data=T1_est_mixed, + mask=None, + threshold=1.5, + ) + + T1_mixed = T1_est_mixed[~np.isnan(T1_est_mixed)] + T1_LL = T1_est_LL[~np.isnan(T1_est_LL)] + T1_hybrid = T1_est_hybrid[~np.isnan(T1_est_hybrid)] + + c_mixed = mritk.concentration.concentration_from_T1_expr(T1_mixed, t1_0=T10, r1=r1) + c_LL = mritk.concentration.concentration_from_T1_expr(T1_LL, t1_0=T10, r1=r1) + c_hybrid = mritk.concentration.concentration_from_T1_expr(T1_hybrid, t1_0=T10, r1=r1) + + # Create 3 subplots sharing the x-axis + fig, axes = plt.subplots(3, 2, figsize=(10, 10)) # , sharex="col") + + # 1. Mixed Sequence Subplot + sns.histplot(T1_mixed, bins=60, stat="density", color="orange", alpha=0.5, ax=axes[0, 0], label="Mixed Distribution") + sns.kdeplot(T1_mixed, color="darkorange", linestyle="-", ax=axes[0, 0]) + axes[0, 0].axvline(T1_target, color="red", linestyle="solid", linewidth=2, label=f"True T1 = {T1_target:.2f} s") + axes[0, 0].set_title("Mixed Sequence T1 Estimation") + axes[0, 0].legend() + + sns.histplot(c_mixed, bins=60, stat="density", color="orange", alpha=0.5, ax=axes[0, 1], label="Mixed Distribution") + sns.kdeplot(c_mixed, color="darkorange", linestyle="-", ax=axes[0, 1]) + axes[0, 1].axvline(c_target, color="red", linestyle="solid", linewidth=2, label=f"True c = {c_target}") + axes[0, 1].set_title("Mixed Sequence Concentration Estimation") + axes[0, 1].legend() + + # 2. Look-Locker Sequence Subplot + sns.histplot(T1_LL, bins=60, stat="density", color="blue", alpha=0.5, ax=axes[1, 0], label="Look-Locker Distribution") + sns.kdeplot(T1_LL, color="darkblue", linestyle="-", ax=axes[1, 0]) + axes[1, 0].axvline(T1_target, color="red", linestyle="solid", linewidth=2, label=f"True T1 = {T1_target:.2f} s") + axes[1, 0].set_title("Look-Locker Sequence T1 Estimation") + axes[1, 0].legend() + + sns.histplot(c_LL, bins=60, stat="density", color="blue", alpha=0.5, ax=axes[1, 1], label="Look-Locker Distribution") + sns.kdeplot(c_LL, color="darkblue", linestyle="-", ax=axes[1, 1]) + axes[1, 1].axvline(c_target, color="red", linestyle="solid", linewidth=2, label=f"True c = {c_target}") + axes[1, 1].set_title("Look-Locker Sequence Concentration Estimation") + axes[1, 1].legend() + + # 3. Hybrid Logic Subplot + sns.histplot(T1_hybrid, bins=60, stat="density", color="purple", alpha=0.5, ax=axes[2, 0], label="Hybrid Distribution") + sns.kdeplot(T1_hybrid, color="indigo", linestyle="-", ax=axes[2, 0]) + axes[2, 0].axvline(T1_target, color="red", linestyle="solid", linewidth=2, label=f"True T1 = {T1_target:.2f} s") + axes[2, 0].set_title("Final Hybrid Pipeline T1 Estimation") + axes[2, 0].set_xlabel("T1 Relaxation Time (seconds)") + axes[2, 0].legend() + + sns.histplot(c_hybrid, bins=60, stat="density", color="purple", alpha=0.5, ax=axes[2, 1], label="Hybrid Distribution") + sns.kdeplot(c_hybrid, color="indigo", linestyle="-", ax=axes[2, 1]) + axes[2, 1].axvline(c_target, color="red", linestyle="solid", linewidth=2, label=f"True c = {c_target}") + axes[2, 1].set_title("Final Hybrid Pipeline Concentration Estimation") + axes[2, 1].set_xlabel("Concentration (mmol/L)") + axes[2, 1].legend() + + for ax in axes.flatten(): + ax.set_ylabel("Density") + + fig.tight_layout() + fig.savefig(f"hybrid_pipeline_histograms_c{c_target}_{SNR}_{TR}_{TI}.png", dpi=300) + + +if __name__ == "__main__": + # for c_target in [0.0, 0.05, 0.1]: + # for SNR in [7.0, 25.0]: + # print(f"Running main() with c_target={c_target}, SNR={SNR}") + # main(c_target=c_target, SNR=SNR) + main(c_target=0.05, SNR=25) diff --git a/src/mritk/__init__.py b/src/mritk/__init__.py index 45be4fb..d805c50 100644 --- a/src/mritk/__init__.py +++ b/src/mritk/__init__.py @@ -16,6 +16,7 @@ statistics, utils, ) +from .data import MRIData meta = metadata("mritk") __version__ = meta["Version"] @@ -35,4 +36,5 @@ "hybrid", "r1", "statistics", + "MRIData", ] diff --git a/src/mritk/concentration.py b/src/mritk/concentration.py index cf1960c..23115f2 100644 --- a/src/mritk/concentration.py +++ b/src/mritk/concentration.py @@ -6,6 +6,7 @@ import argparse import logging +import typing from collections.abc import Callable from pathlib import Path @@ -16,8 +17,10 @@ logger = logging.getLogger(__name__) +T = typing.TypeVar("T", np.ndarray, float) -def concentration_from_T1_expr(t1: np.ndarray, t1_0: np.ndarray, r1: float) -> np.ndarray: + +def concentration_from_T1_expr(t1: T, t1_0: T, r1: float) -> T: """ Computes tracer concentration from T1 relaxation times. @@ -34,7 +37,7 @@ def concentration_from_T1_expr(t1: np.ndarray, t1_0: np.ndarray, r1: float) -> n return (1.0 / r1) * ((1.0 / t1) - (1.0 / t1_0)) -def concentration_from_R1_expr(r1_map: np.ndarray, r1_0_map: np.ndarray, r1: float) -> np.ndarray: +def concentration_from_R1_expr(r1_map: T, r1_0_map: T, r1: float) -> T: """ Computes tracer concentration from R1 relaxation rates. @@ -51,6 +54,42 @@ def concentration_from_R1_expr(r1_map: np.ndarray, r1_0_map: np.ndarray, r1: flo return (1.0 / r1) * (r1_map - r1_0_map) +def R1_from_concentration_expr(c: T, r1_0: T, r1: float) -> T: + """ + Computes post-contrast R1 relaxation rates from tracer concentration. + + Formula: R1 = C * r1 + R1_0 + + Args: + c (np.ndarray): Array of tracer concentrations. + r1_0 (np.ndarray): Array of pre-contrast (baseline) R1 relaxation rates. + r1 (float): Relaxivity of the contrast agent. + + Returns: + np.ndarray: Computed post-contrast R1 array. + """ + return c * r1 + r1_0 + + +def T1_from_concentration_expr(c: T, t1_0: T, r1: float) -> T: + """ + Computes post-contrast T1 relaxation times from tracer concentration. + + Formula: T1 = 1 / (C * r1 + (1 / T1_0)) + + Args: + c (np.ndarray | float): Array of tracer concentrations. + t1_0 (np.ndarray | float): Array of pre-contrast (baseline) T1 relaxation times. + r1 (float): Relaxivity of the contrast agent. + + Returns: + np.ndarray | float: Computed post-contrast T1 array. + """ + # Note: In a robust pipeline, you might want to mask or handle cases + # where t1_0 is 0 to avoid division by zero errors. + return 1.0 / (c * r1 + (1.0 / t1_0)) + + def compute_concentration_from_T1_array( t1_data: np.ndarray, t10_data: np.ndarray, r1: float, mask: np.ndarray | None = None ) -> np.ndarray: diff --git a/src/mritk/data.py b/src/mritk/data.py index b293565..7a6f4a0 100644 --- a/src/mritk/data.py +++ b/src/mritk/data.py @@ -6,7 +6,7 @@ from pathlib import Path -from typing import Optional +from typing import Callable, Optional import nibabel import numpy as np @@ -178,6 +178,42 @@ def find_nearest_valid_voxels(query_indices: np.ndarray, mask: np.ndarray, k: in return dof_neighbours +def aggregate_nearest_valid_voxels( + data: np.ndarray, query_indices: np.ndarray, mask: np.ndarray | None = None, k: int = 10, agg_func: Callable = np.nanmedian +) -> np.ndarray: + """Finds the `k` nearest valid voxels for each query index and aggregates their values. + + Args: + data: (X, Y, Z) array of data values to sample from. + query_indices: (N, 3) array of voxel indices to find neighbors for. + mask: (X, Y, Z) boolean mask array indicating which voxels are valid neighbors. + k: Number of nearest neighbors to find. + agg_func: Aggregation function to apply over the `k` neighbors (default: np.median). + Must accept an `axis` keyword argument. + + Returns: + (N,) array of aggregated values for each query point. + """ + + if mask is None: + # If no mask is provided, consider all non-nan voxels as valid. + mask = np.isfinite(data) + # 1. Get the spatial indices of the nearest valid voxels. + # Returned shape is (3, k, N) where 3 corresponds to the x, y, z dimensions. + nearest_coords = find_nearest_valid_voxels(query_indices, mask, k) + + # 2. Extract the data values using advanced numpy indexing. + # nearest_coords[0], nearest_coords[1], and nearest_coords[2] each have shape (k, N). + # The resulting extracted_values array will also have shape (k, N). + extracted_values = data[nearest_coords[0], nearest_coords[1], nearest_coords[2]] + + # 3. Aggregate the extracted values across the neighbor dimension (axis=0). + # The output will be collapsed to shape (N,). + aggregated_values = agg_func(extracted_values, axis=0) + + return aggregated_values + + def apply_affine(T: np.ndarray, X: np.ndarray) -> np.ndarray: """Apply a homogeneous affine transformation matrix to a set of points. diff --git a/src/mritk/datasets.py b/src/mritk/datasets.py index bc6904d..a0156f8 100644 --- a/src/mritk/datasets.py +++ b/src/mritk/datasets.py @@ -38,7 +38,7 @@ def get_datasets() -> dict[str, Dataset]: name="Test Data", description="A small test dataset for testing functionality (based on the Gonzo dataset).", license="CC-BY-4.0", - links={"mritk-test-data.zip": download_link_google_drive("1YVXoV1UhmpkMIeaNKeS9eqCsdMULwKBO")}, + links={"mritk-test-data.zip": download_link_google_drive("1IYbomfJ38REUstbCdiqc3W39RaHrZfje")}, ), "gonzo": Dataset( name="The Gonzo Dataset", diff --git a/src/mritk/hybrid.py b/src/mritk/hybrid.py index 6aa4cdf..a0b7776 100644 --- a/src/mritk/hybrid.py +++ b/src/mritk/hybrid.py @@ -16,7 +16,9 @@ logger = logging.getLogger(__name__) -def compute_hybrid_t1_array(ll_data: np.ndarray, mixed_data: np.ndarray, mask: np.ndarray, threshold: float) -> np.ndarray: +def compute_hybrid_t1_array( + ll_data: np.ndarray, mixed_data: np.ndarray, mask: np.ndarray | None = None, threshold: float = 1500 +) -> np.ndarray: """ Creates a hybrid T1 array by selectively substituting Look-Locker voxels with Mixed voxels. @@ -27,15 +29,19 @@ def compute_hybrid_t1_array(ll_data: np.ndarray, mixed_data: np.ndarray, mask: n ll_data (np.ndarray): 3D numpy array of Look-Locker T1 values. mixed_data (np.ndarray): 3D numpy array of Mixed T1 values. mask (np.ndarray): 3D boolean mask (typically eroded CSF). - threshold (float): T1 threshold value (in ms). + threshold (float): T1 threshold value (in ms), default is 1500 ms. Returns: np.ndarray: Hybrid 3D T1 array. """ logger.debug("Computing hybrid T1 array with threshold %.2f ms.", threshold) hybrid = ll_data.copy() - newmask = mask & (ll_data > threshold) & (mixed_data > threshold) + newmask = (ll_data > threshold) & (mixed_data > threshold) + if mask is not None: + newmask &= mask + logger.debug("Substituting %d voxels based on threshold and mask criteria.", np.sum(newmask)) hybrid[newmask] = mixed_data[newmask] + return hybrid diff --git a/src/mritk/looklocker.py b/src/mritk/looklocker.py index c799444..a24a848 100644 --- a/src/mritk/looklocker.py +++ b/src/mritk/looklocker.py @@ -8,21 +8,69 @@ import logging import shutil import tempfile +import warnings from collections.abc import Callable +from dataclasses import dataclass from functools import partial from pathlib import Path -from typing import Optional import numpy as np +import scipy.optimize import skimage import tqdm from .data import MRIData -from .utils import fit_voxel, mri_facemask, nan_filter_gaussian, run_dcm2niix +from .utils import mri_facemask, nan_filter_gaussian, run_dcm2niix logger = logging.getLogger(__name__) +def T1_to_noisy_T1_looklocker(T1_true: np.ndarray, t_LL: np.ndarray, sigma: float = 0.04, M0: float = 1.0) -> np.ndarray: + """Simulates noisy Look-Locker T1 estimation from true T1 values. + + Args: + T1_true (np.ndarray): Array of true T1 values in milliseconds. + t_LL (np.ndarray): Array of Look-Locker trigger times in seconds. + sigma (float, optional): Standard deviation of the noise. Defaults to 0.04. + M0 (float, optional): Scaling factor for the signal amplitude. Defaults to 1.0. + + Returns: + np.ndarray: Array of estimated T1 values from the noisy Look-Locker signals. + + Notes: + We first generate the theoretical Look-Locker signal time series + for the given T1 values and trigger times, then add Rician noise + to simulate realistic measurement conditions. Finally, we estimate + T1 from the noisy signals using the same fitting procedure + as applied to real data. + """ + S_LL_t = T1_to_LL_signal(T1_true, t_LL=t_LL, M0=M0) + real_LL = S_LL_t + np.random.normal(0, sigma, S_LL_t.shape) + imag_LL = np.random.normal(0, sigma, S_LL_t.shape) + S_LL_noisy = np.sqrt(real_LL**2 + imag_LL**2) + + return estimate_LL_T1(S_LL_noisy, t_LL) + + +def T1_to_LL_signal(T1: np.ndarray, t_LL: np.ndarray, M0: float = 1.0) -> np.ndarray: + """Converts T1 values to theoretical Look-Locker signal magnitudes at specified trigger times. + + Args: + T1 (np.ndarray): Array of T1 values in milliseconds. + t_LL (np.ndarray): Array of Look-Locker trigger times in seconds. + M0 (float, optional): Scaling factor for the signal amplitude. Defaults to 1.0. + + Returns: + np.ndarray: 2D array of shape (N_T1, N_timepoints) containing the theoretical + Look-Locker signal magnitudes for each T1 value at each trigger time. + """ + T1 = np.atleast_1d(T1) + # M(t) = M0 (1 − 2exp(−t/T1)) + # Output shape (N, M) where N is the number of T1 values + # and M is the number of time points + return M0 * (1.0 - 2.0 * np.exp(-np.outer(1.0 / T1, t_LL))) + + def read_dicom_trigger_times(dicomfile: Path) -> np.ndarray: """ Extracts unique nominal cardiac trigger delay times from DICOM functional groups. @@ -44,13 +92,13 @@ def read_dicom_trigger_times(dicomfile: Path) -> np.ndarray: return np.unique(all_frame_times) -def remove_outliers(data: np.ndarray, mask: np.ndarray, t1_low: float, t1_high: float) -> np.ndarray: +def remove_outliers(data: np.ndarray, mask: np.ndarray | None = None, t1_low: float = 50, t1_high: float = 5000) -> np.ndarray: """ Applies a mask and removes values outside the physiological T1 range. Args: data (np.ndarray): 3D array of T1 values. - mask (np.ndarray): 3D boolean mask of the brain/valid area. + mask (np.ndarray | None): 3D boolean mask of the brain/valid area. t1_low (float): Lower physiological limit. t1_high (float): Upper physiological limit. @@ -59,7 +107,8 @@ def remove_outliers(data: np.ndarray, mask: np.ndarray, t1_low: float, t1_high: """ logger.info("Removing outliers from T1 map with physiological range [%f, %f].", t1_low, t1_high) processed = data.copy() - processed[~mask] = np.nan + if mask is not None: + processed[~mask] = np.nan outliers = (processed < t1_low) | (processed > t1_high) processed[outliers] = np.nan return processed @@ -130,132 +179,252 @@ def compute_looklocker_t1_array(data: np.ndarray, time_s: np.ndarray, t1_roof: f voxel_mask = np.array(np.where(valid_voxels)).T d_masked = np.array([data_normalized[i, j, k] for (i, j, k) in voxel_mask]) - logger.debug(f"Starting fitting for {len(d_masked)} voxels.") - with tqdm.tqdm(total=len(d_masked), desc="Fitting Look-Locker Voxels") as pbar: - voxel_fitter = partial(fit_voxel, time_s, pbar) + i, j, k = voxel_mask.T + t1map = np.nan * np.zeros_like(data[..., 0]) + + t1map[i, j, k] = estimate_LL_T1(d_masked, time_s) + + return np.minimum(t1map, t1_roof) + + +def estimate_LL_T1(data: np.ndarray, time_s: np.ndarray) -> np.ndarray: + """Estimates T1 values from normalized Look-Locker signal time series using voxel-wise curve fitting. + + Args: + data (np.ndarray): 2D array of shape (N_voxels, N_timepoints) containing normalized signal time series for each voxel. + time_s (np.ndarray): 1D array of trigger times in seconds. + + Returns: + np.ndarray: 1D array of estimated T1 values for each voxel in milliseconds. + + Notes: + - The function fits the theoretical Look-Locker model to each voxel's time series using Levenberg-Marquardt optimization. + - Initial parameter guesses are based on the location of the signal minimum. + - Voxels that fail to fit or produce non-physical parameters are assigned NaN. + """ + assert isinstance(data, np.ndarray) and data.ndim == 2, ( + f"Data should be a 2D array of shape (N_voxels, N_timepoints), got {data.shape}" + ) + msg = f"Time dimension of data {data.shape[1]} does not match length of time_s {len(time_s)}" + assert data.shape[1] == len(time_s), msg + + logger.debug(f"Starting fitting for {len(data)} voxels.") + with tqdm.tqdm(total=len(data), desc="Fitting Look-Locker Voxels") as pbar: + voxel_fitter = partial(fit_voxel, time_s=time_s, pbar=pbar) vfunc = np.vectorize(voxel_fitter, signature="(n) -> (3)") - fitted_coefficients = vfunc(d_masked) + fitted_coefficients = vfunc(m=data) x2 = fitted_coefficients[:, 1] x3 = fitted_coefficients[:, 2] + # Calculate T1 in ms. Formula: T1 = (x2 / x3)^2 * 1000 + return (x2 / x3) ** 2 * 1000.0 - i, j, k = voxel_mask.T - t1map = np.nan * np.zeros_like(data[..., 0]) - # Calculate T1 in ms. Formula: T1 = (x2 / x3)^2 * 1000 - t1map[i, j, k] = (x2 / x3) ** 2 * 1000.0 +def voxel_fit_function(t: np.ndarray, x1: float, x2: float, x3: float) -> np.ndarray: + """ + Theoretical Look-Locker T1 recovery curve model. - return np.minimum(t1map, t1_roof) + Evaluates the function: f(t) = | x1 * (1 - (1 + x2^2) * exp(-x3^2 * t)) | + + Args: + t (np.ndarray): Time array in seconds. + x1 (float): Amplitude scaling factor (equivalent to A). + x2 (float): Inversion efficiency term (used to ensure (1+x2^2) > 1). + x3 (float): Relaxation rate, defined as 1 / sqrt(T1*). + + Returns: + np.ndarray: The theoretical signal magnitude at times `t`. + """ + return np.abs(x1 * (1.0 - (1 + x2**2) * np.exp(-(x3**2) * t))) -def looklocker_t1map_postprocessing( - T1map: Path, - T1_low: float, - T1_high: float, - radius: int = 10, - erode_dilate_factor: float = 1.3, - mask: Optional[np.ndarray] = None, - output: Path | None = None, -) -> MRIData: +@np.errstate(divide="raise", invalid="raise", over="raise") +def curve_fit_wrapper(f, t: np.ndarray, y: np.ndarray, p0: np.ndarray): """ - Performs quality-control and post-processing on a raw Look-Locker T1 map. + A strict wrapper around scipy.optimize.curve_fit. + + Temporarily converts numpy warnings (like division by zero) and + scipy's OptimizeWarning into hard errors. This allows the calling + function to gracefully catch and handle poorly-fitting voxels + (e.g., by assigning them NaN) rather than silently returning bad fits. Args: - T1map (Path): Path to the raw, unmasked Look-Locker T1 map NIfTI file. - T1_low (float): Lower physiological limit for T1 values (in ms). - T1_high (float): Upper physiological limit for T1 values (in ms). - radius (int, optional): Base radius for morphological dilation when generating - the automatic mask. Defaults to 10. - erode_dilate_factor (float, optional): Multiplier for the erosion radius - relative to the dilation radius to ensure tight mask edges. Defaults to 1.3. - mask (Optional[np.ndarray], optional): Pre-computed 3D boolean mask. If None, - one is generated automatically. Defaults to None. - output (Path | None, optional): Path to save the cleaned T1 map. Defaults to None. + f (callable): The model function, e.g., voxel_fit_function. + t (np.ndarray): The independent variable (time). + y (np.ndarray): The dependent variable (signal). + p0 (np.ndarray): Initial guesses for the parameters. Returns: - MRIData: An MRIData object containing the cleaned, masked, and interpolated T1 map. + np.ndarray: Optimal values for the parameters so that the sum of + the squared residuals of :code:`f(xdata, *popt) - ydata` is minimized. + """ + with warnings.catch_warnings(): + warnings.simplefilter("error", scipy.optimize.OptimizeWarning) + popt, _ = scipy.optimize.curve_fit(f, xdata=t, ydata=y, p0=p0, maxfev=1000) + return popt - Raises: - RuntimeError: If more than 99% of the voxels are removed during the outlier - filtering step, indicating a likely unit mismatch (e.g., T1 in seconds instead of ms). - Notes: - This function cleans up noisy T1 fits by applying a three-step pipeline: - 1. Masking: If no mask is provided, it automatically isolates the brain/head by - finding the largest contiguous tissue island and applying morphological smoothing. - 2. Outlier Removal: Voxels falling outside the provided physiological bounds - [T1_low, T1_high] are discarded (set to NaN). - 3. Interpolation: Internal "holes" (NaNs) created by poor fits or outlier - removal are iteratively filled using a specialized Gaussian filter that - interpolates from surrounding valid tissue without blurring the edges. +def fit_voxel(time_s: np.ndarray, m: np.ndarray, pbar=None) -> np.ndarray: """ - logger.info(f"Post-processing Look-Locker T1 map at {T1map} with T1 range [{T1_low}, {T1_high}] ms.") - t1map_mri = MRIData.from_file(T1map, dtype=np.single) - t1map_data = t1map_mri.data.copy() + Fits the Look-Locker relaxation curve for a single voxel's time series. - if mask is None: - logger.debug("No mask provided, generating automatic mask based on the largest contiguous tissue island.") - mask = create_largest_island_mask(t1map_data, radius, erode_dilate_factor) - else: - logger.debug("Using provided mask for post-processing.") + Provides initial parameter guesses based on the location of the signal minimum + and attempts to fit the voxel_fit_function using Levenberg-Marquardt optimization. + Returns NaNs if the optimization fails or hits evaluation limits. + + Args: + time_s (np.ndarray): 1D array of trigger times in seconds. + m (np.ndarray): 1D array of signal magnitudes over time for the voxel. + pbar: A tqdm progress bar instance (or None) to update incrementally. + + Returns: + np.ndarray: A 3-element array containing the fitted parameters `[x1, x2, x3]`. + If the fit fails, returns an array of NaNs. + """ + if pbar is not None: + pbar.update(1) + x1 = 1.0 + x2 = np.sqrt(1.25) + T1 = time_s[np.argmin(m)] / np.log(1 + x2**2) + x3 = np.sqrt(1 / T1) + p0 = np.array((x1, x2, x3)) + if not np.all(np.isfinite(m)): + return np.nan * np.zeros_like(p0) + try: + popt = curve_fit_wrapper(voxel_fit_function, time_s, m, p0) + except (scipy.optimize.OptimizeWarning, FloatingPointError): + return np.nan * np.zeros_like(p0) + except RuntimeError as e: + if "maxfev" in str(e): + return np.nan * np.zeros_like(p0) + raise e + return popt - t1map_data = remove_outliers(t1map_data, mask, T1_low, T1_high) - if np.isfinite(t1map_data).sum() / t1map_data.size < 0.01: - raise RuntimeError("After outlier removal, less than 1% of the image is left. Check image units.") +@dataclass +class LookLockerT1: + """A class representing a Look-Locker T1 map with post-processing capabilities. - # Fill internal missing values iteratively using a Gaussian filter - fill_mask = np.isnan(t1map_data) & mask - logger.debug(f"Initial fill mask has {fill_mask.sum()} voxels.") - while fill_mask.sum() > 0: - logger.info(f"Filling in {fill_mask.sum()} voxels within the mask.") - t1map_data[fill_mask] = nan_filter_gaussian(t1map_data, 1.0)[fill_mask] - fill_mask = np.isnan(t1map_data) & mask + Args: + mri (MRIData): An MRIData object containing the raw T1 map data and affine transformation. + """ - processed_T1map = MRIData(t1map_data, t1map_mri.affine) - if output is not None: - processed_T1map.save(output, dtype=np.single) - logger.info(f"Post-processed Look-Locker T1 map saved to {output}.") - else: - logger.info("No output path provided, returning post-processed Look-Locker T1 map as MRIData object.") + mri: MRIData + + @classmethod + def from_file(cls, t1map_path: Path) -> "LookLockerT1": + """Loads a Look-Locker T1 map from a NIfTI file. + + Args: + t1map_path (Path): The file path to the Look-Locker T1 map + NIfTI file. + Returns: + LookLockerT1: An instance of the LookLockerT1 class containing the loaded + T1 map data and affine transformation. + """ + + logger.info(f"Loading Look-Locker T1 map from {t1map_path}.") + mri = MRIData.from_file(t1map_path, dtype=np.single) + return cls(mri=mri) + + def postprocess( + self, + T1_low: float = 100, + T1_high: float = 6000, + radius: int = 10, + erode_dilate_factor: float = 1.3, + mask: np.ndarray | None = None, + ) -> MRIData: + """Performs quality-control and post-processing on a raw Look-Locker T1 map. + + Args: + T1_low (float): Lower physiological limit for T1 values (in ms). + T1_high (float): Upper physiological limit for T1 values (in ms). + radius (int, optional): Base radius for morphological dilation when generating + the automatic mask. Defaults to 10. + erode_dilate_factor (float, optional): Multiplier for the erosion radius + relative to the dilation radius to ensure tight mask edges. Defaults to 1.3. + mask (Optional[np.ndarray], optional): Pre-computed 3D boolean mask. If None, + one is generated automatically. Defaults to None. + output (Path | None, optional): Path to save the cleaned T1 map. Defaults to None. + + Returns: + MRIData: An MRIData object containing the cleaned, masked, and interpolated T1 map. + + Raises: + RuntimeError: If more than 99% of the voxels are removed during the outlier + filtering step, indicating a likely unit mismatch (e.g., T1 in seconds instead of ms). + + Notes: + This function cleans up noisy T1 fits by applying a three-step pipeline: + 1. Masking: If no mask is provided, it automatically isolates the brain/head by + finding the largest contiguous tissue island and applying morphological smoothing. + 2. Outlier Removal: Voxels falling outside the provided physiological bounds + [T1_low, T1_high] are discarded (set to NaN). + 3. Interpolation: Internal "holes" (NaNs) created by poor fits or outlier + removal are iteratively filled using a specialized Gaussian filter that + interpolates from surrounding valid tissue without blurring the edges. + """ + logger.info(f"Post-processing Look-Locker T1 map with T1 range [{T1_low}, {T1_high}] ms.") + t1map_data = self.mri.data.copy() + + if mask is None: + logger.debug("No mask provided, generating automatic mask based on the largest contiguous tissue island.") + mask = create_largest_island_mask(t1map_data, radius, erode_dilate_factor) + else: + logger.debug("Using provided mask for post-processing.") + + t1map_data = remove_outliers(t1map_data, mask, T1_low, T1_high) + + if np.isfinite(t1map_data).sum() / t1map_data.size < 0.01: + raise RuntimeError("After outlier removal, less than 1% of the image is left. Check image units.") + + # Fill internal missing values iteratively using a Gaussian filter + fill_mask = np.isnan(t1map_data) & mask + logger.debug(f"Initial fill mask has {fill_mask.sum()} voxels.") + while fill_mask.sum() > 0: + logger.info(f"Filling in {fill_mask.sum()} voxels within the mask.") + t1map_data[fill_mask] = nan_filter_gaussian(t1map_data, 1.0)[fill_mask] + fill_mask = np.isnan(t1map_data) & mask - return processed_T1map + return MRIData(t1map_data, self.mri.affine) -def looklocker_t1map(looklocker_input: Path, timestamps: Path, output: Path | None = None) -> MRIData: +@dataclass +class LookLocker: """ - Generates a T1 map from a 4D Look-Locker inversion recovery dataset. - - This function acts as an I/O wrapper. It loads the 4D Look-Locker sequence - and the corresponding trigger times. It converts the timestamps from milliseconds - (standard DICOM/text output) to seconds, which is required by the underlying - exponential fitting math, and triggers the voxel-by-voxel T1 calculation. + A class encapsulating Look-Locker T1 map generation and post-processing. Args: - looklocker_input (Path): Path to the 4D Look-Locker NIfTI file. - timestamps (Path): Path to the text file containing the nominal trigger - delay times (in milliseconds) for each volume in the 4D series. - output (Path | None, optional): Path to save the resulting T1 map NIfTI file. Defaults to None. + mri (MRIData): An MRIData object containing the 4D Look-Locker sequence. + times (np.ndarray): A 1D array of trigger delay times in seconds corresponding to each volume in the 4D sequence. - Returns: - MRIData: An MRIData object containing the computed 3D T1 map (in milliseconds) - and the original affine transformation matrix. """ - logger.info(f"Generating T1 map from Look-Locker data at {looklocker_input} with trigger times from {timestamps}.") - ll_mri = MRIData.from_file(looklocker_input, dtype=np.single) - # Convert timestamps from milliseconds to seconds - time_s = np.loadtxt(timestamps) / 1000.0 - logger.debug(f"Loaded trigger times: {time_s}.") - t1map_array = compute_looklocker_t1_array(ll_mri.data, time_s) - t1map_mri = MRIData(t1map_array.astype(np.single), ll_mri.affine) - - if output is not None: - t1map_mri.save(output, dtype=np.single) - logger.info(f"Look-Locker T1 map saved to {output}.") - else: - logger.info("No output path provided, returning Look-Locker T1 map as MRIData object.") - return t1map_mri + mri: MRIData + times: np.ndarray + + def t1_map(self) -> LookLockerT1: + """ + Computes the T1 map from the Look-Locker data using the provided trigger times. + + Returns: + LookLockerT1: A LookLockerT1 object containing the computed 3D T1 map (in milliseconds) + and the original affine transformation matrix. + """ + + logger.info("Generating T1 map from Look-Locker data") + t1map_array = compute_looklocker_t1_array(self.mri.data, self.times) + mri_data = MRIData(t1map_array.astype(np.single), self.mri.affine) + return LookLockerT1(mri=mri_data) + + @classmethod + def from_file(cls, looklocker_input: Path, timestamps: Path): + logger.info(f"Loading Look-Locker data from {looklocker_input} and trigger times from {timestamps}.") + ll_mri = MRIData.from_file(looklocker_input, dtype=np.single) + time_s = np.loadtxt(timestamps) / 1000.0 + logger.debug(f"Loaded trigger times: {time_s}.") + return cls(mri=ll_mri, times=time_s) def dicom_to_looklocker(dicomfile: Path, outpath: Path): @@ -343,15 +512,29 @@ def dispatch(args): if command == "dcm2ll": dicom_to_looklocker(args.pop("input"), args.pop("output")) elif command == "t1": - looklocker_t1map(args.pop("input"), args.pop("timestamps"), output=args.pop("output")) + ll = LookLocker.from_file(args.pop("input"), args.pop("timestamps")) + + t1_map = ll.t1_map() + + output = args.pop("output") + if output is not None: + t1_map.mri.save(output, dtype=np.single) + logger.info(f"Look-Locker T1 map saved to {output}.") + else: + logger.info("No output path provided, returning Look-Locker T1 map as MRIData object.") + elif command == "postprocess": - looklocker_t1map_postprocessing( - T1map=args.pop("input"), + t1_map_post = LookLockerT1.from_file(args.pop("input")).postprocess( T1_low=args.pop("t1_low"), T1_high=args.pop("t1_high"), radius=args.pop("radius"), erode_dilate_factor=args.pop("erode_dilate_factor"), - output=args.pop("output"), ) + output = args.pop("output") + if output is not None: + t1_map_post.save(output, dtype=np.single) + logger.info(f"Post-processed Look-Locker T1 map saved to {output}.") + else: + logger.info("No output path provided, returning Post-processed Look-Locker T1 map as MRIData object.") else: raise ValueError(f"Unknown Look-Locker command: {command}") diff --git a/src/mritk/mixed.py b/src/mritk/mixed.py index 186ee65..78aaa1f 100644 --- a/src/mritk/mixed.py +++ b/src/mritk/mixed.py @@ -7,6 +7,7 @@ import argparse import json import logging +import typing from collections.abc import Callable from pathlib import Path @@ -18,11 +19,130 @@ from .data import MRIData, change_of_coordinates_map, data_reorientation from .masks import create_csf_mask -from .utils import VOLUME_LABELS, T1_lookup_table, run_dcm2niix +from .utils import VOLUME_LABELS, run_dcm2niix logger = logging.getLogger(__name__) +T = typing.TypeVar("T", np.ndarray, float) + + +def estimate_se_free_relaxation_time(TRse: float, TE: float, ETL: int) -> float: + """ + Computes the estimated free relaxation time following a Spin Echo image. + + Corrects the standard Repetition Time (TR) by accounting for the Effective + Echo Time (TE), the Echo Train Length (ETL), and an adjustment for 20 + dummy preparation echoes. + + Args: + TRse (float): Repetition time of the spin echo sequence (in ms). + TE (float): Effective echo time (in ms). + ETL (int): Echo train length. + + Returns: + float: The corrected free relaxation time `TRfree`. + """ + return TRse - TE * (1 + 0.5 * (ETL - 1) / (0.5 * (ETL + 1) + 20)) + + +def T1_lookup_table(TRse: float, TI: float, TE: float, ETL: int, T1_low: float, T1_hi: float) -> tuple[np.ndarray, np.ndarray]: + """ + Generates a Fraction/T1 lookup table for mixed T1 mapping interpolations. + + Calculates the theoretical ratio of the Inversion Recovery signal (Sir) to + the Spin Echo signal (Sse) over a highly discretized grid of physiological + T1 relaxation times. + + Args: + TRse (float): Spin-echo repetition time (in ms). + TI (float): Inversion time (in ms). + TE (float): Effective echo time (in ms). + ETL (int): Echo train length. + T1_low (float): Lower bound of the T1 grid (in ms). + T1_hi (float): Upper bound of the T1 grid (in ms). + + Returns: + tuple[np.ndarray, np.ndarray]: A tuple containing: + - fractionCurve (np.ndarray): The theoretical Sir/Sse signal ratios. + - T1_grid (np.ndarray): The corresponding T1 values (in ms). + """ + TRfree = estimate_se_free_relaxation_time(TRse, TE, ETL) + T1_grid = np.arange(int(T1_low), int(T1_hi + 1)) + S_SE, S_IR = T1_to_mixed_signals(T1_grid, TR=TRfree, TI=TI) + fractionCurve = S_IR / S_SE + return fractionCurve, T1_grid + + +def T1_to_mixed_signals(T1: T, TR: float, TI: float) -> tuple[np.ndarray, np.ndarray]: + """Computes the theoretical Spin-Echo and Inversion-Recovery signals for a given T1. + + Evaluates the standard signal equations for Spin-Echo and Inversion-Recovery + sequences based on the provided T1 relaxation time, repetition time (TR), + and inversion time (TI). + + Args: + T1: The T1 relaxation time (in ms) for which to compute the signals. + TR: The repetition time of the Spin-Echo sequence (in ms). + TI: The inversion time of the Inversion-Recovery sequence (in ms). + + Returns: + tuple[np.ndarray, np.ndarray]: A tuple containing: + - S_SE (np.ndarray): The theoretical Spin-Echo signal. + - S_IR (np.ndarray): The theoretical Inversion-Recovery signal. + + Notes: + The Spin-Echo signal is computed as: + + ..math:: + S_{SE} = 1 - e^{-TR / T1} + + The Inversion-Recovery signal is computed as: + + ..math:: + S_{IR} = 1 - (1 + S_{SE}) e^{-TI / T1} + + """ + S_SE = 1.0 * (1.0 - np.exp(-TR / T1)) + S_IR = 1.0 - (1.0 + S_SE) * np.exp(-TI / T1) + return S_SE, S_IR + + +def T1_to_noisy_T1_mixed( + T1_true: np.ndarray, TR: float, TI: float, f_grid: np.ndarray, t_grid: np.ndarray, sigma: float = 0.04 +) -> np.ndarray: + """Simulates noisy Mixed T1 estimation from true T1 values using lookup table interpolation. + + Args: + T1_true: The true T1 relaxation times (in ms) for which to simulate the noisy estimates. + TR: The repetition time of the Spin-Echo sequence (in ms). + TI: The inversion time of the Inversion-Recovery sequence (in ms). + f_grid: The precomputed grid of theoretical Sir/Sse ratios corresponding to the T1 values in t_grid. + t_grid: The precomputed grid of T1 values (in ms) corresponding to the ratios in f_grid. + sigma: The standard deviation of the Gaussian noise to be added to the signals. Defaults to 0.04. + + Returns: + np.ndarray: The simulated noisy T1 estimates (in ms) obtained + by interpolating the noisy signal ratios onto the T1 grid. + + Notes: + This function first computes the theoretical Spin-Echo + and Inversion-Recovery signals for the given true T1 values, + adds Gaussian noise to the Inversion-Recovery signals and Rician + noise to the Spin-Echo signals to simulate measurement variability, + and then uses the precomputed lookup table (f_grid and t_grid) to + interpolate the noisy signal ratios back to T1 estimates. + """ + S_SE_t, S_IR_t = T1_to_mixed_signals(T1_true, TR, TI) + real_SE = S_SE_t + np.random.normal(0, sigma, S_SE_t.shape) + imag_SE = np.random.normal(0, sigma, S_SE_t.shape) + S_SE_noisy = np.sqrt(real_SE**2 + imag_SE**2) + S_IR_noisy = S_IR_t + np.random.normal(0, sigma, S_IR_t.shape) + + interpolator = create_interpolator(f_grid, t_grid) + return interpolator(S_IR_noisy / S_SE_noisy).astype(np.single) + + def dicom_standard_affine(frame_fg) -> np.ndarray: """ Generates the DICOM to LPS (Left-Posterior-Superior) affine transformation matrix. @@ -101,7 +221,12 @@ def extract_single_volume(D: np.ndarray, frame_fg) -> MRIData: def mixed_t1map( - SE_nii_path: Path, IR_nii_path: Path, meta_path: Path, T1_low: float, T1_high: float, output: Path | None = None + SE_nii_path: Path, + IR_nii_path: Path, + meta_path: Path, + T1_low: float = 500.0, + T1_high: float = 5000.0, + output: Path | None = None, ) -> nibabel.nifti1.Nifti1Image: """ Generates a T1 relaxation map by combining Spin-Echo (SE) and Inversion-Recovery (IR) acquisitions. @@ -116,8 +241,8 @@ def mixed_t1map( IR_nii_path (Path): Path to the Inversion-Recovery corrected real NIfTI file. meta_path (Path): Path to the JSON file containing the sequence parameters ('TR_SE', 'TI', 'TE', 'ETL'). - T1_low (float): Lower bound for the T1 interpolation grid (in ms). - T1_high (float): Upper bound for the T1 interpolation grid (in ms). + T1_low (float): Lower bound for the T1 interpolation grid (in ms). Defaults to 500 ms. + T1_high (float): Upper bound for the T1 interpolation grid (in ms). Defaults to 5000 ms. output (Path | None, optional): Path to save the resulting T1 map NIfTI file. Defaults to None. Returns: @@ -187,7 +312,9 @@ def mixed_t1map_postprocessing(SE_nii_path: Path, T1_path: Path, output: Path | return masked_t1map_nii -def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float, t1_high: float) -> np.ndarray: +def compute_mixed_t1_array( + se_data: np.ndarray, ir_data: np.ndarray, meta: dict, t1_low: float = 500.0, t1_high: float = 5000.0 +) -> np.ndarray: """ Computes a Mixed T1 array from Spin-Echo and Inversion-Recovery volumes using a lookup table. @@ -195,8 +322,8 @@ def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, se_data (np.ndarray): 3D numpy array of the Spin-Echo modulus data. ir_data (np.ndarray): 3D numpy array of the Inversion-Recovery corrected real data. meta (dict): Dictionary containing sequence parameters ('TR_SE', 'TI', 'TE', 'ETL'). - t1_low (float): Lower bound for T1 generation grid. - t1_high (float): Upper bound for T1 generation grid. + t1_low (float): Lower bound for T1 generation grid. Defaults to 500 ms. + t1_high (float): Upper bound for T1 generation grid. Defaults to 5000 ms. Returns: np.ndarray: Computed T1 map as a 3D float32 array. @@ -207,16 +334,26 @@ def compute_mixed_t1_array(se_data: np.ndarray, ir_data: np.ndarray, meta: dict, f_data[nonzero_mask] = ir_data[nonzero_mask] / se_data[nonzero_mask] tr_se, ti, te, etl = meta["TR_SE"], meta["TI"], meta["TE"], meta["ETL"] - f_curve, t1_grid = T1_lookup_table(tr_se, ti, te, etl, t1_low, t1_high) + tr_free = estimate_se_free_relaxation_time(tr_se, te, etl) + + t1_grid = np.arange(int(t1_low), int(t1_high + 1)) + S_SE, S_IR = T1_to_mixed_signals(t1_grid, TR=tr_free, TI=ti) + f_curve = S_IR / S_SE + logger.debug( f"Generated T1 lookup table with TR_SE={tr_se}, TI={ti}, TE={te}, " f"ETL={etl}, T1 range=({t1_low}, {t1_high}), table size={len(t1_grid)}" ) - interpolator = scipy.interpolate.interp1d(f_curve, t1_grid, kind="nearest", bounds_error=False, fill_value=np.nan) - logger.debug("Created interpolation function for T1 estimation based on the lookup table.") + interpolator = create_interpolator(f_curve, t1_grid) return interpolator(f_data).astype(np.single) +def create_interpolator(f_grid, t1_grid): + interpolator = scipy.interpolate.interp1d(f_grid, t1_grid, kind="nearest", bounds_error=False, fill_value=np.nan) + logger.debug("Created interpolation function for T1 estimation based on the lookup table.") + return interpolator + + def _extract_frame_metadata(frame_fg) -> dict: """ Extracts core physical parameters (TR, TE, TI, ETL) from a DICOM frame functional group. diff --git a/src/mritk/napari.py b/src/mritk/napari.py index 0aa334a..368b06e 100644 --- a/src/mritk/napari.py +++ b/src/mritk/napari.py @@ -52,6 +52,8 @@ def dispatch(args): mri_resource = MRIData.from_file(file_path) data = mri_resource.data - viewer.add_image(data, name=file_path.stem) + affine = mri_resource.affine + + viewer.add_image(data, affine=affine, name=file_path.stem) napari.run() diff --git a/src/mritk/utils.py b/src/mritk/utils.py index bd32b8c..7d10996 100644 --- a/src/mritk/utils.py +++ b/src/mritk/utils.py @@ -8,13 +8,11 @@ import shlex import shutil import subprocess -import warnings from pathlib import Path import numpy as np import scipy import skimage -from scipy.optimize import OptimizeWarning VOLUME_LABELS = [ "IR-modulus", @@ -51,88 +49,7 @@ def mri_facemask(vol: np.ndarray, smoothing_level: float = 5.0) -> np.ndarray: return binary -def voxel_fit_function(t: np.ndarray, x1: float, x2: float, x3: float) -> np.ndarray: - """ - Theoretical Look-Locker T1 recovery curve model. - - Evaluates the function: f(t) = | x1 * (1 - (1 + x2^2) * exp(-x3^2 * t)) | - - Args: - t (np.ndarray): Time array in seconds. - x1 (float): Amplitude scaling factor (equivalent to A). - x2 (float): Inversion efficiency term (used to ensure (1+x2^2) > 1). - x3 (float): Relaxation rate, defined as 1 / sqrt(T1*). - - Returns: - np.ndarray: The theoretical signal magnitude at times `t`. - """ - return np.abs(x1 * (1.0 - (1 + x2**2) * np.exp(-(x3**2) * t))) - - -@np.errstate(divide="raise", invalid="raise", over="raise") -def curve_fit_wrapper(f, t: np.ndarray, y: np.ndarray, p0: np.ndarray): - """ - A strict wrapper around scipy.optimize.curve_fit. - - Temporarily converts numpy warnings (like division by zero) and - scipy's OptimizeWarning into hard errors. This allows the calling - function to gracefully catch and handle poorly-fitting voxels - (e.g., by assigning them NaN) rather than silently returning bad fits. - - Args: - f (callable): The model function, e.g., voxel_fit_function. - t (np.ndarray): The independent variable (time). - y (np.ndarray): The dependent variable (signal). - p0 (np.ndarray): Initial guesses for the parameters. - - Returns: - np.ndarray: Optimal values for the parameters so that the sum of - the squared residuals of :code:`f(xdata, *popt) - ydata` is minimized. - """ - with warnings.catch_warnings(): - warnings.simplefilter("error", OptimizeWarning) - popt, _ = scipy.optimize.curve_fit(f, xdata=t, ydata=y, p0=p0, maxfev=1000) - return popt - - -def fit_voxel(time_s: np.ndarray, pbar, m: np.ndarray) -> np.ndarray: - """ - Fits the Look-Locker relaxation curve for a single voxel's time series. - - Provides initial parameter guesses based on the location of the signal minimum - and attempts to fit the voxel_fit_function using Levenberg-Marquardt optimization. - Returns NaNs if the optimization fails or hits evaluation limits. - - Args: - time_s (np.ndarray): 1D array of trigger times in seconds. - pbar: A tqdm progress bar instance (or None) to update incrementally. - m (np.ndarray): 1D array of signal magnitudes over time for the voxel. - - Returns: - np.ndarray: A 3-element array containing the fitted parameters `[x1, x2, x3]`. - If the fit fails, returns an array of NaNs. - """ - if pbar is not None: - pbar.update(1) - x1 = 1.0 - x2 = np.sqrt(1.25) - T1 = time_s[np.argmin(m)] / np.log(1 + x2**2) - x3 = np.sqrt(1 / T1) - p0 = np.array((x1, x2, x3)) - if not np.all(np.isfinite(m)): - return np.nan * np.zeros_like(p0) - try: - popt = curve_fit_wrapper(voxel_fit_function, time_s, m, p0) - except (OptimizeWarning, FloatingPointError): - return np.nan * np.zeros_like(p0) - except RuntimeError as e: - if "maxfev" in str(e): - return np.nan * np.zeros_like(p0) - raise e - return popt - - -def nan_filter_gaussian(U: np.ndarray, sigma: float, truncate: float = 4.0) -> np.ndarray: +def nan_filter_gaussian(U: np.ndarray, sigma: float, truncate: float = 4.0, mode: str = "constant") -> np.ndarray: """ Applies a Gaussian filter to an array containing NaNs, smoothly interpolating the missing values. @@ -146,71 +63,24 @@ def nan_filter_gaussian(U: np.ndarray, sigma: float, truncate: float = 4.0) -> n U (np.ndarray): Input array potentially containing NaN values. sigma (float): Standard deviation for the Gaussian kernel. truncate (float, optional): Truncate the filter at this many standard deviations. Defaults to 4.0. + mode (str, optional): Mode for handling edges. Defaults to "constant". Returns: np.ndarray: Filtered array where original NaN values have been interpolated. """ V = U.copy() V[np.isnan(U)] = 0 - VV = scipy.ndimage.gaussian_filter(V, sigma=sigma, truncate=truncate) + VV = scipy.ndimage.gaussian_filter(V, sigma=sigma, truncate=truncate, mode=mode) W = np.ones_like(U) W[np.isnan(U)] = 0 - WW = scipy.ndimage.gaussian_filter(W, sigma=sigma, truncate=truncate) + WW = scipy.ndimage.gaussian_filter(W, sigma=sigma, truncate=truncate, mode=mode) mask = ~((WW == 0) * (VV == 0)) out = np.nan * np.zeros_like(U) out[mask] = VV[mask] / WW[mask] return out -def estimate_se_free_relaxation_time(TRse: float, TE: float, ETL: int) -> float: - """ - Computes the estimated free relaxation time following a Spin Echo image. - - Corrects the standard Repetition Time (TR) by accounting for the Effective - Echo Time (TE), the Echo Train Length (ETL), and an adjustment for 20 - dummy preparation echoes. - - Args: - TRse (float): Repetition time of the spin echo sequence (in ms). - TE (float): Effective echo time (in ms). - ETL (int): Echo train length. - - Returns: - float: The corrected free relaxation time `TRfree`. - """ - return TRse - TE * (1 + 0.5 * (ETL - 1) / (0.5 * (ETL + 1) + 20)) - - -def T1_lookup_table(TRse: float, TI: float, TE: float, ETL: int, T1_low: float, T1_hi: float) -> tuple[np.ndarray, np.ndarray]: - """ - Generates a Fraction/T1 lookup table for mixed T1 mapping interpolations. - - Calculates the theoretical ratio of the Inversion Recovery signal (Sir) to - the Spin Echo signal (Sse) over a highly discretized grid of physiological - T1 relaxation times. - - Args: - TRse (float): Spin-echo repetition time (in ms). - TI (float): Inversion time (in ms). - TE (float): Effective echo time (in ms). - ETL (int): Echo train length. - T1_low (float): Lower bound of the T1 grid (in ms). - T1_hi (float): Upper bound of the T1 grid (in ms). - - Returns: - tuple[np.ndarray, np.ndarray]: A tuple containing: - - fractionCurve (np.ndarray): The theoretical Sir/Sse signal ratios. - - T1_grid (np.ndarray): The corresponding T1 values (in ms). - """ - TRfree = estimate_se_free_relaxation_time(TRse, TE, ETL) - T1_grid = np.arange(int(T1_low), int(T1_hi + 1)) - Sse = 1 - np.exp(-TRfree / T1_grid) - Sir = 1 - (1 + Sse) * np.exp(-TI / T1_grid) - fractionCurve = Sir / Sse - return fractionCurve, T1_grid - - def run_dcm2niix(input_path: Path, output_dir: Path, form: str, extra_args: str = "", check: bool = True): """ Utility wrapper to execute the dcm2niix command-line tool securely. diff --git a/tests/conftest.py b/tests/conftest.py index fc11ba0..be56825 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,11 @@ import os +import typing from pathlib import Path import numpy as np import pytest +import mritk from mritk.data import MRIData from mritk.segmentation import Segmentation @@ -36,3 +38,33 @@ def example_values() -> MRIData: ] ).T return MRIData(data, affine=np.eye(4)) + + +class GonzoRoi(typing.NamedTuple): + points: np.ndarray + affine: np.ndarray + shape: tuple + + def voxel_indices(self, affine: np.ndarray) -> np.ndarray: + return mritk.data.physical_to_voxel_indices(self.points, affine=affine) + + +@pytest.fixture +def gonzo_roi() -> GonzoRoi: + xs = np.linspace(0, 70, 80) + ys = np.linspace(0, 20, 20) + zs = np.linspace(-40, 90, 110) + + # Create a 3D grid of points as one long vector + grid_x, grid_y, grid_z = np.meshgrid(xs, ys, zs, indexing="ij") + grid_points = np.vstack([grid_x.ravel(), grid_y.ravel(), grid_z.ravel()]).T # Shape: (N, 3) + grid_shape = grid_x.shape + + new_affine = np.eye(4) + new_affine[0, 0] = xs[1] - xs[0] + new_affine[1, 1] = ys[1] - ys[0] + new_affine[2, 2] = zs[1] - zs[0] + new_affine[0, 3] = xs[0] + new_affine[1, 3] = ys[0] + new_affine[2, 3] = zs[0] + return GonzoRoi(points=grid_points, affine=new_affine, shape=grid_shape) diff --git a/tests/create_test_data.py b/tests/create_test_data.py index e507e32..6899566 100644 --- a/tests/create_test_data.py +++ b/tests/create_test_data.py @@ -7,6 +7,8 @@ def main(): inputdir = Path("gonzo") # Assumes you have the Gonzo dataset downloaded here files = [ "timetable/timetable.tsv", + "mri-dataset/mri_dataset/sub-01/ses-01/anat/sub-01_ses-01_acq-looklocker_IRT1.nii.gz", + "mri-dataset/mri_dataset/sub-01/ses-01/anat/sub-01_ses-01_acq-looklocker_IRT1_trigger_times.txt", "mri-dataset/mri_dataset/sub-01/ses-01/mixed/sub-01_ses-01_acq-mixed_SE-modulus.nii.gz", "mri-dataset/mri_dataset/sub-01/ses-01/mixed/sub-01_ses-01_acq-mixed_IR-corrected-real.nii.gz", "mri-dataset/mri_dataset/sub-01/ses-01/mixed/sub-01_ses-01_acq-mixed_meta.json", @@ -18,6 +20,7 @@ def main(): "mri-processed/mri_processed_data/sub-01/concentrations/sub-01_ses-02_concentration.nii.gz", "mri-processed/mri_processed_data/sub-01/segmentations/sub-01_seg-intracranial_binary.nii.gz", "mri-processed/mri_dataset/derivatives/sub-01/ses-01/sub-01_ses-01_acq-mixed_T1map.nii.gz", + "mri-processed/mri_dataset/derivatives/sub-01/ses-01/sub-01_ses-01_acq-looklocker_T1map.nii.gz", "mri-processed/mri_processed_data/sub-01/segmentations/sub-01_seg-aparc+aseg_refined.nii.gz", "mri-processed/mri_processed_data/sub-01/registered/sub-01_ses-01_acq-looklocker_T1map_registered.nii.gz", ] diff --git a/tests/test_concentration.py b/tests/test_concentration.py index 29a7435..a50909c 100644 --- a/tests/test_concentration.py +++ b/tests/test_concentration.py @@ -9,13 +9,8 @@ import numpy as np import mritk.cli -from mritk.concentration import ( - compute_concentration_from_R1_array, - compute_concentration_from_T1_array, - concentration_from_R1_expr, - concentration_from_T1, - concentration_from_T1_expr, -) +import mritk.concentration +import mritk.data from mritk.testing import compare_nifti_images @@ -36,7 +31,7 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path): test_outputs = [tmp_path / f"output_ses-0{i}_concentration.nii.gz" for i in sessions] for i, s in enumerate(sessions): - concentration_from_T1( + mritk.concentration.concentration_from_T1( input_path=images_path[i], reference_path=baseline_path, output_path=test_outputs[i], @@ -46,13 +41,68 @@ def test_intracranial_concentration(tmp_path, mri_data_dir: Path): compare_nifti_images(test_outputs[i], ref_outputs[i], data_tolerance=1e-12) +def test_inverse_intracranial_concentration(tmp_path, mri_data_dir: Path): + """ + Tests that taking a generated concentration map and applying the inverse + T1 formula recovers the original post-contrast T1 map within the valid mask. + """ + baseline_path = mri_data_dir / "mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-01_T1map_hybrid.nii.gz" + sessions = [1, 2] + + # These are the original T1 maps we want to recover + original_t1_paths = [ + mri_data_dir / f"mri-processed/mri_processed_data/sub-01/T1maps/sub-01_ses-0{i}_T1map_hybrid.nii.gz" for i in sessions + ] + + # These are the concentration maps generated by the forward pass + concentration_paths = [ + mri_data_dir / f"mri-processed/mri_processed_data/sub-01/concentrations/sub-01_ses-0{i}_concentration.nii.gz" + for i in sessions + ] + + r1 = 0.0032 + + # Load the baseline T1 map (T1_0) once + t1_0_mri = mritk.data.MRIData.from_file(baseline_path, dtype=np.single) + t1_0_data = t1_0_mri.data + + for i, s in enumerate(sessions): + # 1. Load the pre-calculated concentration map + conc_mri = mritk.data.MRIData.from_file(concentration_paths[i], dtype=np.single) + conc_data = conc_mri.data + + # 2. Load the original post-contrast T1 map (our ground truth) + orig_t1_mri = mritk.data.MRIData.from_file(original_t1_paths[i], dtype=np.single) + orig_t1_data = orig_t1_mri.data + + # 3. Create a mask of valid voxels. + # The forward function sets unmasked/invalid voxels to NaN. + valid_mask = ~np.isnan(conc_data) + + # 4. Compute the inverse T1 only on valid voxels + recovered_t1_data = np.full_like(conc_data, np.nan) + + # Formula: T1 = 1 / (C * r1 + (1 / T1_0)) + recovered_t1_data[valid_mask] = 1.0 / (conc_data[valid_mask] * r1 + (1.0 / t1_0_data[valid_mask])) + + # 5. Assert that the recovered T1 matches the original T1 strictly within the mask. + # We use assert_allclose because floating point math (1/x) will create minor precision differences. + np.testing.assert_allclose( + recovered_t1_data[valid_mask], + orig_t1_data[valid_mask], + rtol=1e-5, + atol=1e-5, + err_msg=f"Inverse T1 mismatch for session {s}", + ) + + def test_compute_concentration_array_no_mask(): """Test the array computation correctly defaults to keeping all valid positive T1s when no mask is provided.""" t1_data = np.array([1000.0, 1000.0]) t10_data = np.array([2000.0, 2000.0]) r1 = 0.005 - result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None) + result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=None) assert np.isclose(result[0], 0.1) assert np.isclose(result[1], 0.1) @@ -69,7 +119,7 @@ def test_concentration_from_t1_expr(): # C = 200 * 0.0005 = 0.1 expected = np.array([0.1]) - result = concentration_from_T1_expr(t1, t1_0, r1) + result = mritk.concentration.concentration_from_T1_expr(t1, t1_0, r1) np.testing.assert_array_almost_equal(result, expected) @@ -83,7 +133,7 @@ def test_concentration_from_r1_expr(): # C = 200 * 1.0 = 200.0 expected = np.array([200.0]) - result = concentration_from_R1_expr(r1_map, r1_0_map, r1) + result = mritk.concentration.concentration_from_R1_expr(r1_map, r1_0_map, r1) np.testing.assert_array_almost_equal(result, expected) @@ -96,7 +146,7 @@ def test_compute_concentration_from_T1_array_masking(): mask = np.array([True, True, True, False]) r1 = 0.005 - result = compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask) + result = mritk.concentration.compute_concentration_from_T1_array(t1_data, t10_data, r1, mask=mask) # Expectations: # Voxel 0: Valid, should be 0.1 @@ -119,7 +169,7 @@ def test_compute_concentration_from_R1_array_masking(): mask = np.array([True, True, True, False]) r1 = 0.005 - result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask) + result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=mask) # Expectations: # Voxel 0: Valid, should be 200.0 @@ -139,12 +189,54 @@ def test_compute_concentration_from_R1_array_no_mask(): r10_data = np.array([1.0, 1.0]) r1 = 0.005 - result = compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None) + result = mritk.concentration.compute_concentration_from_R1_array(r1_data, r10_data, r1, mask=None) assert np.isclose(result[0], 200.0) assert np.isclose(result[1], 200.0) +def test_r1_concentration_roundtrip(): + """Test that Signal -> Concentration -> Signal roundtrips perfectly for R1.""" + np.random.seed(42) + + # Generate synthetic baseline R1 and post-contrast R1 + # Post-contrast R1 is typically higher than baseline R1 + shape = (10, 10, 10) + r1_0 = np.random.uniform(0.5, 1.5, size=shape) + r1_post = r1_0 + np.random.uniform(0.1, 2.0, size=shape) + r1_relaxivity = 0.0045 + + # 1. Forward: Calculate concentration + concentration = mritk.concentration.concentration_from_R1_expr(r1_post, r1_0, r1_relaxivity) + + # 2. Inverse: Calculate R1 back from concentration + r1_recovered = mritk.concentration.R1_from_concentration_expr(concentration, r1_0, r1_relaxivity) + + # 3. Assert they are effectively identical + np.testing.assert_allclose(r1_recovered, r1_post, rtol=1e-5, atol=1e-8, err_msg="R1 round-trip failed!") + + +def test_t1_concentration_roundtrip(): + """Test that Signal -> Concentration -> Signal roundtrips perfectly for T1.""" + np.random.seed(42) + + # Generate synthetic baseline T1 and post-contrast T1 + # Post-contrast T1 is typically lower than baseline T1 + shape = (10, 10, 10) + t1_0 = np.random.uniform(1000, 2000, size=shape) # milliseconds, for instance + t1_post = t1_0 - np.random.uniform(100, 500, size=shape) + r1_relaxivity = 0.0045 + + # 1. Forward: Calculate concentration + concentration = mritk.concentration.concentration_from_T1_expr(t1_post, t1_0, r1_relaxivity) + + # 2. Inverse: Calculate T1 back from concentration + t1_recovered = mritk.concentration.T1_from_concentration_expr(concentration, t1_0, r1_relaxivity) + + # 3. Assert they are effectively identical + np.testing.assert_allclose(t1_recovered, t1_post, rtol=1e-5, atol=1e-8, err_msg="T1 round-trip failed!") + + @patch("mritk.concentration.concentration_from_T1") def test_dispatch_concentration_t1_defaults(mock_conc_t1): """Test the T1 concentration command with minimum required arguments.""" diff --git a/tests/test_looklocker.py b/tests/test_looklocker.py index d3d7950..ddead8e 100644 --- a/tests/test_looklocker.py +++ b/tests/test_looklocker.py @@ -5,31 +5,71 @@ import pytest import mritk.cli -from mritk.looklocker import ( - create_largest_island_mask, - looklocker_t1map, - looklocker_t1map_postprocessing, - remove_outliers, -) -from mritk.testing import compare_nifti_images +from mritk.looklocker import create_largest_island_mask, remove_outliers, voxel_fit_function + + +def test_voxel_fit_function(): + """Test the theoretical Look-Locker recovery curve math.""" + t = np.array([0.0, 1.0, 2.0]) + x1, x2, x3 = 1.0, 1.0, 1.0 + + # Equation: abs(x1 * (1 - (1+x2^2)*exp(-x3^2 * t))) + # With all 1s: abs(1 - 2*exp(-t)) + # t=0 -> abs(1 - 2*1) = 1 + # t=1 -> abs(1 - 2/e) ≈ 0.2642 + # t=2 -> abs(1 - 2/e^2) ≈ 0.7293 + + expected = np.abs(1.0 - 2.0 * np.exp(-t)) + result = voxel_fit_function(t, x1, x2, x3) + np.testing.assert_array_almost_equal(result, expected) -@pytest.mark.skip(reason="Takes too long") -def test_looklocker_t1map(tmp_path, mri_data_dir: Path): + +@pytest.mark.xfail( + reason=( + "Generated T1 map does not match reference. " + "Need to investigate whether this is a bug in the code " + "or an issue with the test data." + ) +) +def test_looklocker_t1map(tmp_path, mri_data_dir: Path, gonzo_roi): LL_path = mri_data_dir / "mri-dataset/mri_dataset/sub-01" / "ses-01/anat/sub-01_ses-01_acq-looklocker_IRT1.nii.gz" timestamps = ( mri_data_dir / "mri-dataset/mri_dataset/sub-01" / "ses-01/anat/sub-01_ses-01_acq-looklocker_IRT1_trigger_times.txt" ) T1_low = 100 T1_high = 6000 + ll_file = mritk.data.MRIData.from_file(LL_path, dtype=np.single) + vi = gonzo_roi.voxel_indices(affine=ll_file.affine) + v = ll_file.data[tuple(vi.T)].reshape((*gonzo_roi.shape, -1)) + piece_ll_data = mritk.data.MRIData(data=v, affine=gonzo_roi.affine) + ll_piece_path = Path("piece_ll.nii.gz") + piece_ll_data.save(ll_piece_path) + + ll_data = mritk.looklocker.LookLocker.from_file(ll_piece_path, timestamps) + t1_map = ll_data.t1_map() + t1_post = t1_map.postprocess(T1_high=T1_high, T1_low=T1_low) + + t1_arr = t1_post.data + + ref_output = mri_data_dir / "mri-processed/mri_dataset/derivatives/sub-01/ses-01/sub-01_ses-01_acq-looklocker_T1map.nii.gz" + ll_ref = mritk.data.MRIData.from_file(ref_output, dtype=np.single) + v_ref = ll_ref.data[tuple(vi.T)].reshape((*gonzo_roi.shape,)) + + arr1 = np.nan_to_num(v_ref, nan=0.0) + arr2 = np.nan_to_num(t1_arr, nan=0.0) + + worst_index = np.unravel_index(np.abs(arr1 - arr2).argmax(), arr1.shape) + print(f"Worst voxel index: {worst_index}") + print(f"Reference T1: {arr1[worst_index]}, Estimated T1: {arr2[worst_index]}") + print(f"Unmasked Reference T1: {v_ref[worst_index]}, Unmasked Estimated T1: {t1_arr[worst_index]}") + + n_differences = np.sum(np.abs(arr1 - arr2) > 1e-12) + print( + f"Number of voxels with differences > 1e-12: {n_differences} out of {arr1.size} ({n_differences / arr1.size * 100:.2f}%)" + ) - ref_output = mri_data_dir / "mri-dataset/mri_dataset/derivatives/sub-01" / "ses-01/sub-01_ses-01_acq-looklocker_T1map.nii.gz" - test_output_raw = tmp_path / "output_acq-looklocker_T1map_raw.nii.gz" - test_output = tmp_path / "output_acq-looklocker_T1map.nii.gz" - - looklocker_t1map(looklocker_input=LL_path, timestamps=timestamps, output=test_output_raw) - looklocker_t1map_postprocessing(T1map=test_output_raw, T1_low=T1_low, T1_high=T1_high, output=test_output) - compare_nifti_images(test_output, ref_output, data_tolerance=1e-12) + mritk.testing.compare_nifti_arrays(t1_arr, v_ref, data_tolerance=1e-12) def test_remove_outliers(): @@ -86,17 +126,18 @@ def test_dispatch_dcm2ll(mock_dicom_to_ll): mock_dicom_to_ll.assert_called_once_with(Path("dummy_in.dcm"), Path("dummy_out.nii.gz")) -@patch("mritk.looklocker.looklocker_t1map") -def test_dispatch_t1(mock_ll_t1map): +@patch("mritk.looklocker.LookLocker") +def test_dispatch_t1(mock_ll): """Test that dispatch correctly routes to looklocker_t1map.""" mritk.cli.main(["looklocker", "t1", "-i", "data.nii.gz", "-t", "times.txt", "-o", "t1map.nii.gz"]) - mock_ll_t1map.assert_called_once_with(Path("data.nii.gz"), Path("times.txt"), output=Path("t1map.nii.gz")) + mock_ll.from_file.assert_called_once_with(Path("data.nii.gz"), Path("times.txt")) + mock_ll.from_file.return_value.t1_map.assert_called_once() -@patch("mritk.looklocker.looklocker_t1map_postprocessing") -def test_dispatch_postprocess(mock_postprocessing): +@patch("mritk.looklocker.LookLockerT1") +def test_dispatch_postprocess(mock_ll_post): """Test that dispatch correctly routes to looklocker_t1map_postprocessing.""" mritk.cli.main( @@ -118,11 +159,12 @@ def test_dispatch_postprocess(mock_postprocessing): ] ) - mock_postprocessing.assert_called_once_with( - T1map=Path("raw_t1.nii.gz"), + mock_ll_post.from_file.assert_called_once_with(Path("raw_t1.nii.gz")) + inst = mock_ll_post.from_file.return_value + inst.postprocess.assert_called_once_with( T1_low=50.0, T1_high=5000.0, radius=5, erode_dilate_factor=1.5, - output=Path("clean_t1.nii.gz"), ) + inst.postprocess.return_value.save.assert_called_once_with(Path("clean_t1.nii.gz"), dtype=np.single) diff --git a/tests/test_mixed.py b/tests/test_mixed.py index 9b5f546..0ff3339 100644 --- a/tests/test_mixed.py +++ b/tests/test_mixed.py @@ -7,6 +7,7 @@ from mritk.mixed import ( _extract_frame_metadata, compute_mixed_t1_array, + estimate_se_free_relaxation_time, extract_mixed_dicom, mixed_t1map, mixed_t1map_postprocessing, @@ -71,6 +72,21 @@ def test_extract_frame_metadata(): assert meta["ETL"] == 5 +def test_estimate_se_free_relaxation_time(): + """Test the calculation for free relaxation time.""" + TRse = 1000.0 + TE = 10.0 + ETL = 5 + + # Formula check: TRse - TE * (1 + 0.5 * (ETL - 1) / (0.5 * (ETL + 1) + 20)) + # 1000 - 10 * (1 + 0.5 * 4 / (0.5 * 6 + 20)) + # 1000 - 10 * (1 + 2 / 23) + expected = 1000.0 - 10.0 * (1.0 + 2.0 / 23.0) + + result = estimate_se_free_relaxation_time(TRse, TE, ETL) + assert np.isclose(result, expected) + + @patch("mritk.mixed.extract_single_volume") @patch("pydicom.dcmread") def test_extract_mixed_dicom(mock_dcmread, mock_extract_single): diff --git a/tests/test_utils.py b/tests/test_utils.py index c5c54fc..4116466 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -11,31 +11,11 @@ import numpy as np from mritk.utils import ( - T1_lookup_table, - estimate_se_free_relaxation_time, nan_filter_gaussian, run_dcm2niix, - voxel_fit_function, ) -def test_voxel_fit_function(): - """Test the theoretical Look-Locker recovery curve math.""" - t = np.array([0.0, 1.0, 2.0]) - x1, x2, x3 = 1.0, 1.0, 1.0 - - # Equation: abs(x1 * (1 - (1+x2^2)*exp(-x3^2 * t))) - # With all 1s: abs(1 - 2*exp(-t)) - # t=0 -> abs(1 - 2*1) = 1 - # t=1 -> abs(1 - 2/e) ≈ 0.2642 - # t=2 -> abs(1 - 2/e^2) ≈ 0.7293 - - expected = np.abs(1.0 - 2.0 * np.exp(-t)) - result = voxel_fit_function(t, x1, x2, x3) - - np.testing.assert_array_almost_equal(result, expected) - - def test_nan_filter_gaussian(): """Test that NaNs are smoothly interpolated without pulling valid data to zero.""" # Create a 3x3 uniform array with a NaN hole in the center @@ -59,41 +39,6 @@ def test_nan_filter_gaussian_edges(): np.testing.assert_array_almost_equal(filtered, np.ones((3, 3))) -def test_estimate_se_free_relaxation_time(): - """Test the calculation for free relaxation time.""" - TRse = 1000.0 - TE = 10.0 - ETL = 5 - - # Formula check: TRse - TE * (1 + 0.5 * (ETL - 1) / (0.5 * (ETL + 1) + 20)) - # 1000 - 10 * (1 + 0.5 * 4 / (0.5 * 6 + 20)) - # 1000 - 10 * (1 + 2 / 23) - expected = 1000.0 - 10.0 * (1.0 + 2.0 / 23.0) - - result = estimate_se_free_relaxation_time(TRse, TE, ETL) - assert np.isclose(result, expected) - - -def test_t1_lookup_table(): - """Test the fraction/T1 lookup table generation creates arrays of correct shape/bounds.""" - TRse, TI, TE, ETL = 1000.0, 100.0, 10.0, 5 - T1_low, T1_hi = 100.0, 500.0 - - fraction_curve, t1_grid = T1_lookup_table(TRse, TI, TE, ETL, T1_low, T1_hi) - - # Length should be exactly the integer steps from T1_low to T1_hi inclusive - expected_length = int(T1_hi) - int(T1_low) + 1 - - assert len(t1_grid) == expected_length - assert len(fraction_curve) == expected_length - assert t1_grid[0] == T1_low - assert t1_grid[-1] == T1_hi - - # Check that fraction curve monotonically DECREASES for standard physics ranges - # As T1 gets longer, the IR signal becomes more negative relative to the SE signal - assert np.all(np.diff(fraction_curve) < 0) - - @patch("subprocess.run") def test_run_dcm2niix(mock_run): """Test that the dcm2niix command constructor triggers properly."""