Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/scratch-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ version = "0.0.1"
description = "Scratch core libraries"
authors = [
{ name = "Netherlands Forensic Institute - CFS R&D Data", email = "cfs_rd_data@nfi.nl" },
{ name = "Sharlon N. Regales", email = "s.regales@nfi.nl" },
{ name = "Raymond Houwing", email = "r.houwing@nfi.nl" },
]
readme = "README.md"
Expand All @@ -20,6 +19,7 @@ dependencies = [
"scikit-image>=0.25.2",
"scipy>=1.16.3",
"surfalize~=0.16.6",
"torch>=2.11.0",
"x3p @ git+https://github.com/giacomomarchioro/pyx3p.git#81c0f764cf321e56dc41e9e3c71d14e97d5bc3ae",
]

Expand Down
2 changes: 1 addition & 1 deletion packages/scratch-core/src/container_models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ def _clear_cached_properties(instance: BaseModel):
for name in dir(type(instance)):
attr = getattr(type(instance), name, None)
if isinstance(attr, cached_property):
instance.__dict__.pop(name, None)
vars(instance).pop(name, None)
4 changes: 2 additions & 2 deletions packages/scratch-core/src/conversion/export/mark.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def load_mark_from_mat_file(path: Path) -> Mark:
mark = Mark(
scan_image=ScanImage(
data=np.asarray(container["depth_data"], dtype=np.float64),
scale_x=float(container["xdim"][0]),
scale_y=float(container["ydim"][0]),
scale_x=float(container["xdim"].flat[0]),
scale_y=float(container["ydim"].flat[0]),
),
mark_type=_parse_mark_type(str(container["mark_type"][0]).lower()),
# TODO: Parse `center` and `meta_data` from data struct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _detect_striation_angle(

# Calculate gradient and extract tilt angles
fy, fx = np.gradient(smoothed)
angles = _compute_tilt_angles_from_gradient(fx, fy)
angles = _compute_tilt_angles_from_gradient(np.asarray(fx), np.asarray(fy))

# Filter outliers: keep only angles within expected range for fine alignment
angles = angles[np.abs(angles) < np.radians(_MAX_GRADIENT_ANGLE_DEG)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
from container_models.base import BinaryMask, FloatArray2D, FloatArray1D
import numpy as np

from container_models.scan_image import ScanImage
from conversion.surface_comparison.cell_registration.utils import (
convert_grid_cell_to_cell,
pad_image_array,
_batched_match,
_rotate_image,
)
from conversion.surface_comparison.models import (
ComparisonParams,
Cell,
GridCell,
)
from conversion.surface_comparison.utils import rotate_points
from conversion.surface_comparison.cell_registration.utils import (
convert_grid_cell_to_cell,
pad_image_array,
)

from concurrent.futures import ThreadPoolExecutor
from functools import partial
from os import cpu_count
from threading import Lock

import cv2
import numpy as np
from skimage.transform import rotate

N_THREADS = cpu_count() or 1


def match_cells(
Expand Down Expand Up @@ -49,118 +41,59 @@ def match_cells(
return []

fill_value_comparison = float(np.nanmean(comparison_image.data))
pixel_size = comparison_image.scale_x # Assumes isotropic image
pixel_size = comparison_image.scale_x
cell_width, cell_height = grid_cells[0].width, grid_cells[0].height
pad_width, pad_height = cell_width, cell_height # Set pad size to cell size
pad_width, pad_height = cell_width, cell_height

comparison_data = pad_image_array(
comparison_image.data, pad_width=pad_width, pad_height=pad_height
)
padded_center_x, padded_center_y = (
padded_center = (
(comparison_data.shape[1] - 1) / 2,
(comparison_data.shape[0] - 1) / 2,
)

# Prepare the arguments for parallel processing
angles = np.arange(
params.search_angle_min,
params.search_angle_max + params.search_angle_step,
params.search_angle_step,
)
chunks = np.array_split(angles, N_THREADS)
locks = [Lock() for _ in grid_cells]
_process_chunk = partial(
_find_best_match,
grid_cells=grid_cells,
locks=locks,
comparison_data=comparison_data,
cell_size=(cell_width, cell_height),
minimum_fill_fraction=params.minimum_fill_fraction,
fill_value=fill_value_comparison,
padded_center=(padded_center_x, padded_center_y),
pad_size=(pad_width, pad_height),

templates = [gc.cell_data_filled for gc in grid_cells]
results = _batched_match(
comparison_data,
templates,
angles,
params.minimum_fill_fraction,
fill_value_comparison,
)
# Execute parallel search
with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
list(executor.map(_process_chunk, chunks))
for grid_cell, (score, x, y, angle_idx) in zip(grid_cells, results):
angle = float(angles[angle_idx])
rotated = _rotate_image(comparison_data, angle, fill_value=np.nan)
rot_h, rot_w = rotated.shape

cell_center = (x + cell_width / 2, y + cell_height / 2)
rotated_center = ((rot_w - 1) / 2, (rot_h - 1) / 2)

orig_x, orig_y = _unrotate_point(
rotated_point=cell_center,
original_image_center=padded_center,
rotated_image_center=rotated_center,
angle_deg=angle,
)
grid_cell.grid_search_params.update(
score=score,
angle=angle,
center_x=orig_x - pad_width,
center_y=orig_y - pad_height,
)

return [
convert_grid_cell_to_cell(grid_cell=grid_cell, pixel_size=pixel_size)
for grid_cell in grid_cells
]


def _find_best_match(
angles: FloatArray1D,
grid_cells: list[GridCell],
locks: list[Lock],
comparison_data: FloatArray2D,
cell_size: tuple[int, int],
minimum_fill_fraction: float,
fill_value: float,
padded_center: tuple[float, float],
pad_size: tuple[int, int],
) -> list[GridCell]:
"""Find the best-matching position and angle for each grid cell in the comparison image."""
cell_width, cell_height = cell_size
pad_width, pad_height = pad_size
for angle in angles:
angle = float(angle)
# Rotate the comparison image by `-angle` degrees.
# This is equivalent to rotating the reference patch by `angle` degrees.
rotated = rotate(
image=comparison_data,
angle=-angle,
cval=np.nan, # type: ignore
order=0,
resize=True,
)
# Get the mask of valid pixels for the rotated image
valid_mask = ~np.isnan(rotated)
# Compute the fill fraction mask based on the valid pixels mask
fill_fraction_map = _get_fill_fraction_map(
valid_pixel_mask=valid_mask,
cell_width=cell_width,
cell_height=cell_height,
)
fill_fraction_mask = fill_fraction_map >= minimum_fill_fraction
# Now that we computed the fill fraction mask, we can safely replace NaN values in the rotated image
rotated[~valid_mask] = fill_value

for grid_cell, lock in zip(grid_cells, locks):
score_map = _get_score_map(
comparison_array=rotated,
template=grid_cell.cell_data_filled,
)
score, x, y = _compute_best_score_from_maps(
score_map=score_map, fill_fraction_mask=fill_fraction_mask
)
if score > grid_cell.grid_search_params.score:
# Compute the center coordinates of the cell on the (original) unrotated image
cell_center = (x + cell_width / 2, y + cell_height / 2)
rotated_center = (
(rotated.shape[1] - 1) / 2, # type: ignore
(rotated.shape[0] - 1) / 2,
)
original_center_x, original_center_y = _unrotate_point(
rotated_point=cell_center,
original_image_center=padded_center,
rotated_image_center=rotated_center,
angle_deg=angle,
)
with lock:
# Guard against race conditions
if score > grid_cell.grid_search_params.score:
# Update the parameters
grid_cell.grid_search_params.update(
score=score,
angle=angle,
center_x=original_center_x - pad_width, # Undo the padding
center_y=original_center_y - pad_height, # Undo the padding
)
return grid_cells


def _unrotate_point(
rotated_point: tuple[float, float],
original_image_center: tuple[float, float],
Expand All @@ -186,74 +119,3 @@ def _unrotate_point(
)[0]
# Shift the coordinates relative to the top-left of the original image.
return x_center + x, y_center + y


def _get_fill_fraction_map(
valid_pixel_mask: BinaryMask,
cell_height: int,
cell_width: int,
) -> FloatArray2D:
"""
Compute a 2D map where each entry [y, x] is the fill fraction of a cell-sized window with its
**top-left corner** at pixel (x, y), matching the indexing convention of ``cv2.matchTemplate``.

:param valid_pixel_mask: Boolean array (H, W); True where image data is valid.
:param cell_height: Height of the cell window in pixels.
:param cell_width: Width of the cell window in pixels.
:returns: Float64 array (H, W) with fill fractions in [0, 1], top-left indexed.
Entries near the bottom-right boundary are underestimates and will be rejected by the fill-fraction gate.
Since the image is padded with NaNs before calling this function, this does not matter.
"""
kernel = np.ones((cell_height, cell_width), dtype=np.float32) / (
cell_height * cell_width
)
filtered = cv2.filter2D(
valid_pixel_mask.astype(np.float32),
ddepth=-1,
kernel=kernel,
anchor=(0, 0),
borderType=cv2.BORDER_CONSTANT,
)
return np.asarray(filtered, dtype=np.float64)


def _compute_best_score_from_maps(
score_map: FloatArray2D, fill_fraction_mask: BinaryMask
) -> tuple[float, int, int]:
"""
Compute the highest correlation score and the corresponding x, y coordinates
from the score and fill fraction maps.
"""
# Make sure the shape of `score_map` and the `fill_fraction_mask` match, and
# discard irrelevant fill fraction mask positions at the bottom right.
valid_positions = fill_fraction_mask[: score_map.shape[0], : score_map.shape[1]]
# Replace non-valid values (where fill fraction is below threshold) with -inf
masked_scores = np.where(valid_positions, score_map, -np.inf)
# Compute the best score and x, y position from the score map
best_flat_index = np.argmax(masked_scores)
score = masked_scores.flat[best_flat_index]
y, x = np.unravel_index(best_flat_index, masked_scores.shape)
return float(score), int(x), int(y)


def _get_score_map(
comparison_array: FloatArray2D, template: FloatArray2D
) -> FloatArray2D:
"""
Compute a normalized cross-correlation score map for one reference cell.

Slides the cell template over the comparison array using ``cv2.TM_CCOEFF_NORMED``, which computes
the Pearson correlation coefficient between the template and every same-sized patch. NaN values must
have been replaced in both arrays before calling this function.

:param comparison_array: NaN-free float32-compatible comparison image, padded by a full cell on each side.
:param template: Reference grid cell whose ``cell_data`` is used as the template; must contain no NaN values.
:returns: Float64 score map of shape ``(H - cell_height + 1, W - cell_width + 1)`` with values in ``[-1, 1]``.
"""

score_map = cv2.matchTemplate(
image=comparison_array.astype(np.float32),
templ=template.astype(np.float32),
method=cv2.TM_CCOEFF_NORMED,
)
return np.asarray(score_map, dtype=np.float64)
Loading
Loading