Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 57 additions & 11 deletions src/odemis/driver/simcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
from odemis import dataio, model, util
from odemis.model import oneway
from odemis.util.synthetic import ParabolicMirrorRayTracer
from odemis.util.synthetic import simulate_peak

ERROR_STATE_FILE = "simcam-hw.error"

GOFFSET_TO_PIXEL = 0.25 # Conversion factor for grating offset to image pixels.
PEAK_WIDTH = 2.5 # Width of the simulated spectrograph peak in pixels (before binning).

class Camera(model.DigitalCamera):
'''
Expand Down Expand Up @@ -179,6 +181,22 @@ def clip_max_res(img_res):
self._img_simulator = None
self._mirror = None

try:
spectrograph = dependencies["spectrograph"]
if not (
isinstance(spectrograph, model.ComponentBase)
and hasattr(spectrograph, "axes")
and isinstance(spectrograph.axes, dict)
and "goffset" in spectrograph.axes):
raise ValueError((f"spectrograph {spectrograph} must have a 'goffset' attribute"))

self._spectrograph = spectrograph
logging.debug("Will simulate spectral peaks using spectrograph %s", spectrograph.name)

except (TypeError, KeyError):
logging.info("Will not simulate spectrograph peaks")
self._spectrograph = None

# Simple implementation of the flow: we keep generating images and if
# there are subscribers, they'll receive it.
self.data = SimpleDataFlow(self)
Expand All @@ -195,6 +213,8 @@ def clip_max_res(img_res):
self._error_creation_thread.daemon = True
self._error_creation_thread.start()

self._min_val = self._img.min()

def _setBinning(self, value):
"""
value (2-tuple int)
Expand Down Expand Up @@ -374,6 +394,11 @@ def _simulate(self):
center = self._img_res[0] / 2, self._img_res[1] / 2
pixel_size = self._metadata.get(model.MD_PIXEL_SIZE, self.pixelSize.value)

# Extra translation to simulate stage movement
pos = self._metadata.get(model.MD_POS, (0, 0))
pxs = [p / b for p, b in zip(pixel_size, self.binning.value)]
stage_shift = pos[0] / pxs[0], -pos[1] / pxs[1] # Y goes opposite

if self._mirror:
eff_pixel_size = [p * b for p, b in zip(pixel_size, binning)]
self._img_simulator.resolution.value = res
Expand All @@ -382,11 +407,6 @@ def _simulate(self):
self._img = model.DataArray(sim_img, self._img.metadata)
sim_img = self._img.copy()
else:
# Extra translation to simulate stage movement
pos = self._metadata.get(model.MD_POS, (0, 0))
pxs = [p / b for p, b in zip(pixel_size, self.binning.value)]
stage_shift = pos[0] / pxs[0], -pos[1] / pxs[1] # Y goes opposite

# First and last index (eg, 0 -> 255)
ltrb = [center[0] + trans[0] + stage_shift[0] - (res[0] / 2) * binning[0],
center[1] + trans[1] + stage_shift[1] - (res[1] / 2) * binning[1],
Expand All @@ -413,12 +433,38 @@ def _simulate(self):
[int(round(ltrb[1] + i * binning[1])) for i in range(res[1])])
sim_img = self._img[numpy.ix_(coord[1], coord[0])] # copy

# Add some noise
# spectrograph peak simulation
if self._spectrograph:
current_offset = self._spectrograph.position.value["goffset"]

ccd_center_x = self._img_res[0] / 2 # find the x-coordinate of the center of the ccd
x0_px = ccd_center_x + current_offset * GOFFSET_TO_PIXEL
roi_left = center[0] + trans[0] + stage_shift[0] - (res[0] / 2) * binning[0]

bin_x = binning[0] # binning factor along x-axis
peak_center_binned = (x0_px - roi_left) / bin_x # express the peak position in the ROI's coordinate system

logging.debug("Peak center: x0_px=%s, ROI left: ltrb0=%s, Peak center (binned): %s",
x0_px, roi_left, peak_center_binned)

width_binned = PEAK_WIDTH / bin_x

peak = simulate_peak(amplitude=20000, x0=peak_center_binned, width=width_binned,
Comment thread
yxkdejong marked this conversation as resolved.
shape=sim_img.shape, dtype=sim_img.dtype)

# simulation routine
sim_img = (peak + self._min_val).astype(self._img.dtype, copy=False)

# define max noise amplitude
mx = self._img.max()
sim_img += numpy.random.randint(0, max(mx // 100, 10), sim_img.shape, dtype=sim_img.dtype)
# Clip, but faster than clip() on big array.
# There can still be some overflow, but let's just consider this "strong noise"
sim_img[sim_img > mx] = mx
noise_max = max(mx // 100, 10)

# generate noise and add directly
noise = numpy.random.randint(0, noise_max, sim_img.shape, dtype=self._img.dtype)

# final clamp to prevent overflow
dt_info = numpy.iinfo(sim_img.dtype) if numpy.issubdtype(sim_img.dtype, numpy.integer) else numpy.finfo(sim_img.dtype)
sim_img += numpy.minimum(dt_info.max - sim_img, noise)

return sim_img

Expand Down
75 changes: 75 additions & 0 deletions src/odemis/driver/test/simcam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,5 +457,80 @@ def test_acquire_ar_pol(self):
testing.assert_array_not_equal(im_horizontal, im_vertical)


class TestSimCamSpectrograph(unittest.TestCase):

@classmethod
def setUpClass(cls) -> None:
"""
Validate spectrograph goffset peak simulation in SimCam.
"""

class MockSpectrograph(model.Component):
def __init__(self, name="mock_spectrograph"):
super().__init__(name)
self.axes = {"goffset": None}
self.position = type('obj', (object,), {'value': {"goffset": 0.0}})()

cls.focus = simulated.Stage(**KWARGS_FOCUS)
cls.mock_spectrograph = MockSpectrograph()

cls.camera = CLASS(dependencies={"focus": cls.focus, "spectrograph": cls.mock_spectrograph},
**KWARGS_MOVE)

@classmethod
def tearDownClass(cls) -> None:
cls.camera.terminate()

def setUp(self) -> None:
self.camera.binning.value = (1, 1)
self.camera.translation.value = (0, 0)
self.camera.resolution.value = self.camera.resolution.range[1]
self.camera.exposureTime.value = 0.05
self.mock_spectrograph.position.value["goffset"] = 0.0
time.sleep(0.1)

def _find_peak(self, image: numpy.ndarray) -> tuple[float, float, float]:
"""
Helper function to find the peak
Returns weighted peak position, max profile value and mean profile value.
"""
data = image.astype(float)
profile = data.mean(axis=0)

p_min, p_max = profile.min(), profile.max()
threshold = p_min + (p_max - p_min) * 0.8

mask = profile > threshold
indices = numpy.arange(len(profile))

if numpy.sum(mask) == 0:
return float(profile.argmax()), float(p_max), float(profile.mean())

Comment thread
yxkdejong marked this conversation as resolved.
# find the peak
weighted_average = numpy.sum(indices[mask] * profile[mask]) / numpy.sum(profile[mask])

return weighted_average, p_max, profile.mean()

def test_spectrograph_with_goffset(self):
Comment thread
yxkdejong marked this conversation as resolved.
move = 300
expected_ratio = 0.25

self.mock_spectrograph.position.value["goffset"] = 0.0
image_0 = self.camera.data.get()
date_0 = image_0.metadata[model.MD_ACQ_DATE]

# change offset
self.mock_spectrograph.position.value["goffset"] = move

# obtain new image
image_new = self.camera.data.get(asap=False)

x0, _, _ = self._find_peak(image_0)
x1, _, _ = self._find_peak(image_new)
shift = x1 - x0

expected_shift = move * expected_ratio
self.assertAlmostEqual(shift, expected_shift, delta=0.5)

if __name__ == '__main__':
unittest.main()
37 changes: 37 additions & 0 deletions src/odemis/util/synthetic.py
Comment thread
pieleric marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,43 @@ def psf_gaussian(
numpy.rint(UINT16_MAX * out, out=out)
return out.astype(numpy.uint16)

def simulate_peak(amplitude: float,
x0: float,
width: float,
shape: Union[int, Tuple[int, int]],
dtype: numpy.dtype = numpy.uint16) -> numpy.ndarray:
"""
Simulate a 1D Gaussian peak across the x-dimension of an image, with optional 2D image input for direct use when y > 1.
:param amplitude: the maximum intensity of the peak
:param x0: the center position of the peak along the x-axis
:param width: the standard deviation (width) of the Gaussian peak
:param shape: the shape of the output image (can be an int for 1D or a tuple for 2D)
:param dtype: the data type of the output array (default is numpy.uint16)
:return: a numpy array containing the simulated peak, either as a 1D Gaussian or directly from the provided image
"""

# Standardize shape to a tuple, so we can index it
if isinstance(shape, (int, numpy.integer)):
shape = (shape,)

# Generate the 1D Gaussian peak across the x-dimension
if width <= 0:
raise ValueError(f"width ({width}) should be positive")

x = numpy.arange(shape[-1], dtype=numpy.float64)
intensity = amplitude * (numpy.exp(-0.5 * ((x - x0) / width) ** 2))

# Clip based on the actual dtype provided
info = numpy.iinfo(dtype) if numpy.issubdtype(dtype, numpy.integer) else numpy.finfo(dtype)
intensity_clipped = numpy.clip(intensity, info.min, info.max)
peak_1d = intensity_clipped.astype(dtype)

if len(shape) == 1:
return peak_1d
else:
# Duplicated along the y-dimension (shape[0])
return numpy.tile(peak_1d, (shape[0], 1))


class ParabolicMirrorRayTracer:
"""
Expand Down
80 changes: 79 additions & 1 deletion src/odemis/util/test/synthetic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import numpy

from odemis.util.synthetic import ParabolicMirrorRayTracer
from odemis.util.synthetic import ParabolicMirrorRayTracer, simulate_peak


class TestParabolicMirrorRayTracer(unittest.TestCase):
Expand Down Expand Up @@ -125,6 +125,84 @@ def test_constructor_raises_value_error_on_missing_keys(self):
with self.assertRaises(ValueError):
ParabolicMirrorRayTracer(good_pos=bad_pos_missing_all)

class TestPeakSimulation(unittest.TestCase):

def test_simulate_peak_1d(self):
"""
Verify simulate_peak generates a correct 1D Gaussian peak.
"""

amplitude = 1000
x0 = 50
width = 5
shape = 100

peak = simulate_peak(amplitude, x0, width, shape)

self.assertEqual(peak.shape, (shape,))
self.assertEqual(peak.dtype, numpy.uint16)

# peak maximum should occur near x0
peak_idx = numpy.argmax(peak)
self.assertAlmostEqual(peak_idx, x0, delta=1)

# peak value should be close to amplitude (clipped if necessary)
self.assertAlmostEqual(peak[peak_idx], amplitude, delta=5)

def test_simulate_peak_2d(self):
"""
Verify simulate_peak correctly expands the 1D peak to 2D.
"""

amplitude = 500
x0 = 40
width = 4
shape = (20, 100)

peak = simulate_peak(amplitude, x0, width, shape)

self.assertEqual(peak.shape, shape)

# every row should be identical
for row in peak:
self.assertTrue(numpy.array_equal(row, peak[0]))

# verify peak location
peak_idx = numpy.argmax(peak[0])
self.assertAlmostEqual(peak_idx, x0, delta=1)

def test_simulate_peak_dtype_clipping(self):
"""
Verify values are clipped to dtype limits.
"""

amplitude = 100000 # larger than uint16 max
x0 = 30
width = 3
shape = 80

peak = simulate_peak(amplitude, x0, width, shape)

dtype_max = numpy.iinfo(numpy.uint16).max

self.assertEqual(peak.max(), dtype_max)
Comment thread
yxkdejong marked this conversation as resolved.

def test_simulate_peak_dtype(self):
"""
Verify simulate_peak respects the dtype argument.
"""

amplitude = 200
x0 = 25
width = 3
shape = 60

peak = simulate_peak(amplitude, x0, width, shape, dtype=numpy.uint8)

self.assertEqual(peak.dtype, numpy.uint8)

dtype_max = numpy.iinfo(numpy.uint8).max
self.assertLessEqual(peak.max(), dtype_max)

if __name__ == "__main__":
unittest.main()
Loading