diff --git a/src/odemis/acq/align/goffset.py b/src/odemis/acq/align/goffset.py new file mode 100644 index 0000000000..97b1ab6cf2 --- /dev/null +++ b/src/odemis/acq/align/goffset.py @@ -0,0 +1,793 @@ +import logging +import numpy +import threading +import time + +from collections.abc import Iterable +from concurrent.futures import CancelledError +from concurrent.futures._base import CANCELLED, FINISHED, RUNNING +from odemis.acq.align import light +from odemis import model +from odemis.acq.align.autofocus import _mapDetectorToSelector +from odemis.model import InstantaneousFuture +from odemis.util import executeAsyncTask +from odemis.util.peak import Smooth, Detect +from scipy.optimize import curve_fit +from typing import Any, Dict, List, Optional, Tuple, Union + +MOVE_TIME_GRATING = 20 # s +MOVE_TIME_DETECTOR = 5 # s +EST_ALIGN_TIME = 30 # s + +def gaussian(x: numpy.ndarray, amplitude: float, x0: float, width: float) -> numpy.ndarray: + """ + Gaussian function (for curve fitting). + + :param x: input coordinates + :param amplitude: peak intensity + :param x0 = peak's center + :param width = standard deviation + :return: Gaussian function evaluated at x (numpy array) + """ + + intensity = amplitude * numpy.exp(-0.5 * ((x - x0) / width) ** 2) + return intensity + + +def find_peak_position(data: numpy.ndarray, window_radius: int = 15, snr_threshold: float = 10.0) -> float: + """ + Finds the peak position in the given spectrum data. + It can handle both 1D and 2D data (in which case it uses Maximum Intensity Projection). + + The function first uses geometric shape-checking (Smooth and Detect) to accurately + find the peak index while ignoring high-amplitude 1-pixel noise spikes. + It then calculates a weighted average of the positions within a window, using the + intensity values as weights. For improved accuracy, it attempts to fit a Gaussian curve. + + :param data: 1D or 2D array containing the spectrum + :param window_radius: number of pixels on either side of the peak to include in the window for fitting (default: 15) + :return: estimated peak position in pixels (float) + :raises RuntimeError: if no significant peak is detected (SNR too low) + """ + + raw_data = numpy.asarray(data, dtype=float) + spectrum = numpy.squeeze(raw_data) + + # maximum intensity projection + if spectrum.ndim > 1: + spectrum = spectrum.max(axis=0) + + # geometric smoothing and noise calculation + smoothed_data = Smooth(spectrum, window_len=11) + clean_data = smoothed_data - numpy.median(smoothed_data) + noise_std = numpy.std(clean_data) + + # detect peak + maxtab, _ = Detect(smoothed_data, lookahead=10, delta=(noise_std + 1e-6) * snr_threshold) + + if not maxtab: + raise RuntimeError("No peak detected (SNR too low or peak geometry invalid)") + + # sort peaks by intensity and grab the highest valid one + maxtab.sort(key=lambda x: x[1], reverse=True) + peak_idx = int(maxtab[0][0]) + + # create a window around the peak + start = max(0, peak_idx - window_radius) + end = min(len(spectrum), peak_idx + window_radius + 1) + window_data = spectrum[start:end] + window_idx = numpy.arange(start, end) + + # calculate the weighted averages + weights = window_data.clip(min=0) + + # Corner case: window has no positive signal. Use the maximum index itself + # as the best estimate of the peak, since weighted average would be undefined + if weights.sum() == 0: + weighted_avg = float(peak_idx) + logging.info("Weighted average fallback: all window data <= 0, using peak_idx=%d as estimate", peak_idx) + else: + weighted_avg = float(numpy.sum(window_idx * window_data) / numpy.sum(weights)) + + # try Gaussian fit for better accuracy, but fallback to weighted average if it fails or is out of bounds + try: + p0 = [window_data.max(), weighted_avg, 2.5] # initial guess: [amplitude, center, width] + popt, pcov = curve_fit(gaussian, window_idx, window_data, p0=p0) + peak = popt[1] + + if start <= peak <= end: + return float(peak) + + except RuntimeError: + logging.debug("Gaussian peak fit did not converge, falling back to weighted average") + + except ValueError: + logging.exception("Gaussian peak fit failed due to invalid input") + + return weighted_avg + +def peak_is_present(spectrum: numpy.ndarray, + snr_threshold: float = 10.0, + width_range: Tuple[float, float] = (0.5, 12.0)) -> bool: + """ + Test to decide whether spectral peak is present. + + The test uses: + - The proven Odemis Detect() algorithm for geometric shape verification. + - A dynamic SNR threshold to reject background noise. + - A local width estimate computed from a small window around the peak. + + :param spectrum: 1D or 2D array containing the spectrum + :param snr_threshold: minimum required signal-to-noise ratio for a peak (default: 10.0) + :param width_range: acceptable range of estimated peak widths in pixels (default: (0.5, 12.0)) + :return: True if a peak meeting the criteria is present, False otherwise + """ + + raw_data = numpy.asarray(spectrum, dtype=float) # convert to float + spectrum_1d = numpy.squeeze(raw_data) # remove any dummy dimensions + + # convert 2D data to 1D via MIP if it wasn't done by the caller + if spectrum_1d.ndim > 1: + spectrum_1d = spectrum_1d.max(axis=0) + + smoothed_data = Smooth(spectrum_1d, window_len=11) # remove any hot pixels + clean_data = smoothed_data - numpy.median(smoothed_data) + noise_std = numpy.std(clean_data) + + # use Detect to reject high-amplitude noise spikes that lack Gaussian geometry + maxtab, _ = Detect(smoothed_data, lookahead=10, delta=(noise_std + 1e-6) * snr_threshold) + + if not maxtab: + return False + + # get the best peak found + maxtab.sort(key=lambda x: x[1], reverse=True) + peak_idx = int(maxtab[0][0]) + peak_value = maxtab[0][1] + + if peak_idx < 1 or peak_idx > len(spectrum_1d) - 2: + logging.debug("Peak too close to edge idx=%d len=%d", peak_idx, len(spectrum_1d)) + return False + + # estimate width around the verified peak + window = spectrum_1d[peak_idx - 2: peak_idx + 3] # create 5-value window (2 on the left, 2 on the right of the peak) + x = numpy.arange(len(window)) + w = window - window.min() + if w.sum() == 0: + return False + + mean = numpy.sum(x * w) / numpy.sum(w) + var = numpy.sum(w * (x - mean) ** 2) / numpy.sum(w) + width = numpy.sqrt(var) + + snr = (peak_value - numpy.median(smoothed_data)) / (noise_std + 1e-6) + present = width_range[0] <= width <= width_range[1] + + logging.debug("snr=%.2f width=%.2f present=%s", snr, width, present) + + return present + +def coarse_scan_goffset_for_peak(spgr, detector, future: model.ProgressiveFuture, + step: int = 2000) -> Tuple[float, float]: + """" + Coarse scan across the goffset axis until a real peak becomes visible. + + The function performs absolute moves across the configured goffset axis + from min to max in steps of `step`. After each move it reads the detector, + computes a 1D spectrum (mean across the first axis), and applies + `peak_is_present` to decide whether a real peak is on the detector. + When a peak is found, `find_peak_position` is used to estimate its pixel + position and the function returns the actual goffset (as reported by the + actuator) and the peak pixel. + + :param spgr: spectrograph + :param detector: detector + :param step: step size in goffset units for the coarse scan (default: 2000) + :return: tuple (actual_goffset, peak_pixel) + :raises RuntimeError: if no peak is found across the full goffset range + """ + + current = float(spgr.position.value["goffset"]) + step = abs(step) if step != 0 else 2000.0 + max_span = 200000.0 # limit how far we wander from the current valid position + + logging.debug( + "Coarse local scan around goffset %.1f with step %.1f and max span %.1f", + current, step, max_span) + + # build a sequence of positions: current, +step, -step, +2*step, -2*step, ... + positions = [current] + k = 1 + while k * step <= max_span: + positions.append(current + k * step) + positions.append(current - k * step) + k += 1 + + tried = set() + + for g in positions: + _checkCancelled(future) + + # avoid duplicate moves due to symmetry or float rounding + key = round(g, 3) + if key in tried: + continue + tried.add(key) + + logging.debug("Coarse scan move attempt to goffset %.3f", g) + try: + spgr.moveAbsSync({"goffset": g}) + time.sleep(2) # give hardware time to stabilize + except ValueError: + logging.warning("Skipping invalid goffset %.3f (%s)", g, ValueError) + continue + + data = detector.data.get(asap=False) + spectrum = data.max(axis=0) if data.ndim == 2 else data + + if peak_is_present(spectrum): + peak = find_peak_position(data) + return g, peak + + raise RuntimeError("Peak not found in local goffset scan around current position") + +def estimate_goffset_scale(spgr: model.Actuator, + detector: model.Detector, + delta: float = 5.0, + retries: int = 1) -> Tuple[float, float, float]: + """ + Estimate the scale factor between a change in the grating offset ('goffset') + and the resulting shift of the spectral peak on the detector. + + The function moves the actuator by a small step (delta) and measures the peak position before and after the move. + It calculates the ratio of pixel shift per unit of goffset. The actuator is returned to its original position + after measurement. + + If the measured scale is unreasonably small or large, the function retries recursively + and falls back to a default value of 0.5 if necessary. + + :param spgr: spectrograph + :param detector: detector + :param delta: The relative goffset step size to apply when measuring the scale (default: 5.0). + The actual step may be negated to avoid exceeding hardware limits. + :param retries: number of retries allowed if the estimated scale is unreliable (default: 1). + + :return: Tuple (scale, p0, p1) + scale: estimated pixels per unit of goffset + p0: peak position at the initial goffset + p1: peak position after applying the test delta + """ + + # get initial state + data0 = detector.data.get(asap=False) + p0 = find_peak_position(data0) + + # check limits before moving + current_pos = spgr.position.value["goffset"] + goffset_max = spgr.axes["goffset"].range[1] + + # ensure that max limit isn't violated + move_direction = 1 if (current_pos + delta < goffset_max) else -1 + actual_delta = delta * move_direction + + # move and measure + spgr.moveRelSync({"goffset": actual_delta}) + data1 = detector.data.get(asap=False) + p1 = find_peak_position(data1) + + # return back to start + spgr.moveRelSync({"goffset": -actual_delta}) + + # calculate goffset scale + scale = (p1 - p0) / actual_delta + + logging.info( + "SCALE TRACKING | p0: %.1f | p1: %.1f | Delta: %.1f | Shift: %.1f | Result Scale: %.4f", + p0, p1, actual_delta, (p1 - p0), scale) + + # If the estimated scale is extremely small, the measurement is likely unreliable + # (e.g., due to noise or a flat spectrum). + # In that case we retry the estimation once by calling the function recursively. + # If the retry still fails (raising a RuntimeError), we fall back to a default + # scale value to ensure the algorithm can continue and avoid infinite recursion. + + if abs(scale) < 1e-3 or abs(scale) > 2.0: + logging.warning( + "Unreliable scale estimate (%.4f). Retries left: %d", + scale, retries) + + if retries > 0: + return estimate_goffset_scale(spgr, detector, delta, retries - 1) + + logging.warning("Scale estimation failed after retries, using default 0.5") + scale = 0.5 + + return scale, p0, p1 + +def log_detector_state(caller: str, stage: str, detector: model.Detector, data: numpy.ndarray): + """ + Log a compact summary of the detector data: min, max, mean, background, noise, peak and SNR. + + :param caller: origin of the log message + :param stage: stage of the log message + :param detector: detector + :param data: data + :return: None + """ + + shape = data.shape + + # convert 2D data to 1D if necessary + spectrum = data.max(axis=0) if data.ndim == 2 else data + + max_val = float(spectrum.max()) + min_val = float(spectrum.min()) + mean_val = float(spectrum.mean()) + + background = float(numpy.median(spectrum)) + signal = spectrum - background + peak_height = float(signal.max()) + + # robust noise estimate + noise_region = signal[signal < 0] + if len(noise_region) > 10: + noise_std = float(numpy.std(noise_region)) + else: + noise_std = float(numpy.std(signal)) + + snr = peak_height / (noise_std + 1e-6) + + logging.warning( + "%s: goffset: [%s] Detector=%s | shape=%s | min=%.2f max=%.2f mean=%.2f | bg=%.2f | noise=%.2f | snr=%.2f", + caller, stage, detector.name, shape, min_val, max_val, mean_val, background, noise_std, snr) + +def sparc_auto_grating_offset(spgr: model.Actuator, + detector: model.Detector, + tolerance_px: float = 0.4, + max_it: int = 60, + gain: float = 0.4) -> model.ProgressiveFuture: + """ + Start an asynchronous task that centers the spectral peak by adjusting the + grating offset (goffset). + + :param spgr: spectrograph + :param detector: detector + :param tolerance_px: the acceptable displacement of the peak from the center in pixels (default: 0.4) + :param max_it: maximum number of iterations to attempt (default: 20) + :param gain: proportional gain factor for adjusting the goffset (default: 0.4) + :return: A ``ProgressiveFuture`` representing the asynchronous alignment + task. The future can be used to monitor progress, retrieve the + result, or cancel the alignment. + """ + + est_start = time.time() + 0.05 + est_time = max_it * 0.5 # rough estimated time + + f = model.ProgressiveFuture(start=est_start, end=est_start + est_time) + + f._task_lock = threading.Lock() + f._task_state = RUNNING + f.task_canceller = _cancel_sparc_auto_grating_offset + + executeAsyncTask(f, _do_sparc_auto_grating_offset, args=(f, spgr, detector, tolerance_px, max_it, gain)) + + return f + +def _do_sparc_auto_grating_offset(future: model.ProgressiveFuture, + spgr: model.Actuator, + detector: model.Detector, + tolerance_px: float, + max_it: int, + gain: float) -> bool: + """ + Core alignment routine that iteratively adjusts the grating offset to center the peak. + + Behavior: + - Attempts to acquire a peak on the detector (coarse scan) if none is present. + - Measures the local goffset-to-pixel scale only when a peak is present and not centered. + - Runs a centering loop until the peak is within `tolerance_px` or the max #iterations is reached. + - Respects cancellation via the provided ProgressiveFuture. + + :param future: model.ProgressiveFuture + :param spgr: spectrograph + :param detector: detector + :param tolerance_px: pixel tolerance for successful alignment + :param max_it: maximum number of centering iterations + :param gain: proportional gain for converting pixel error to goffset correction + :return: True if alignment succeeded (peak within tolerance), False otherwise (bool) + :raises CancelledError: if the future was cancelled during execution + """ + + logging.info("Running alignment | detector=%s |", detector.name) + + try: + center_target = (detector.resolution.value[0] - 1) / 2 + data0 = detector.data.get(asap=False) + spectrum0 = data0.max(axis=0) if data0.ndim == 2 else data0 + + if peak_is_present(spectrum0): + peak0 = find_peak_position(data0) + logging.debug("Initial read: peak0=%.2f px", peak0) + peak_present = True + else: + logging.debug("Initial read: no significant peak detected, scanning the axes") + peak_present = False + peak0 = None + + # if peak present and already centered, do nothing + if peak_present: + initial_error_px = peak0 - center_target + logging.debug("Initial error_px=%.3f px (tolerance=%.3f)", initial_error_px, tolerance_px) + if abs(initial_error_px) <= tolerance_px: + logging.info("Peak already centered | peak=%.2f | center=%.2f | error=%.3f", + peak0, center_target, initial_error_px) + return True + + # if no peak present, run acquisition (this will move the grating) + else: + try: + g_acq, p_acq = coarse_scan_goffset_for_peak(spgr, detector, future, step=2000) + logging.info("Peak acquired at goffset=%d pixel=%.2f", g_acq, p_acq) + peak0 = p_acq + except RuntimeError: + logging.error("Peak acquisition failed — aborting alignment") + return False + + # after acquisition, check if acquisition already placed peak within tolerance + post_acq_error_px = peak0 - center_target + logging.debug("Post-acquisition error_px=%.3f px", post_acq_error_px) + if abs(post_acq_error_px) <= tolerance_px: + logging.info("Peak centered by acquisition | peak=%.2f | center=%.2f | error=%.3f", + peak0, center_target, post_acq_error_px) + return True + + # peak is present and not centered -> estimate scale and center + try: + scale, p0, p1 = estimate_goffset_scale(spgr, detector) + logging.info("Scale estimated: %.4f px/goffset | p0=%.2f p1=%.2f", scale, p0, p1) + except RuntimeError: + logging.warning("Peak lost during scale estimation. Forcing re-acquisition.") + try: + g_acq, p_acq = coarse_scan_goffset_for_peak(spgr, detector, future, step=2000) + scale, p0, p1 = estimate_goffset_scale(spgr, detector) # try one more time + except RuntimeError: + logging.error("Peak re-acquisition failed — aborting alignment") + return False + + # prefer p1 (after probe move) if available, else p0 + # p1 is preferred because it reflects the peak position after a controlled probe move, + # making it more up‑to‑date and reliable than the initial p0 measurement. + start_peak = p1 if p1 is not None else p0 + + total_goffset_displacement = 0.0 + axis = spgr.axes["goffset"] + minv, maxv = axis.range + + for i in range(max_it): + _checkCancelled(future) + + # if the peak is lost during the iteration, retry twice before aborting. + if i == 0: + peak_px = start_peak + else: + max_retries = 2 + peak_found = False + + for attempt in range(max_retries + 1): + data = detector.data.get(asap=False) + try: + peak_px = find_peak_position(data) + peak_found = True + break + except RuntimeError: + if attempt < max_retries: + logging.warning("Peak lost in iteration %d. Settle and retry (%d/%d)...", + i, attempt + 1, max_retries) + time.sleep(2) + else: + pass + + if not peak_found: + logging.error(f"Peak re-acquisition permanently failed during iteration {i} — aborting") + return False + + error_px = peak_px - center_target + + if abs(error_px) <= tolerance_px: + return True + + # calculate required adjustment based on pixel error and scaling factor + delta_goffset = -gain * (error_px / scale) + current = spgr.position.value["goffset"] + + # clamp move to stay within the allowed goffset range + # ensures that we don't command a step that would exceed axis limits. + delta_goffset = max(minv - current, min(maxv - current, delta_goffset)) + + # accumulate the actual displacement for total movement tracking + total_goffset_displacement += delta_goffset + + logging.debug( + "Iter: %d | Peak: %.2f | Error: %.2f | Move: %.6f | Total Change: %.6f", + i, peak_px, error_px, delta_goffset, total_goffset_displacement) + + try: + spgr.moveRelSync({"goffset": delta_goffset}) + time.sleep(0.2) # settle time for hardware to stabilize + except ValueError: + logging.warning("Hardware offset limit reached: %s. Keeping max allowed move.", ValueError) + break + + future.set_progress(end=time.time() + (max_it - i - 1) * 0.5) + + logging.warning("SparcAutoGratingOffset did not converge within max iterations") + return False + + except CancelledError: + logging.debug("SparcAutoGratingOffset cancelled") + raise + except Exception as e: + logging.error("Alignment error: %s", e) + raise + +def _cancel_sparc_auto_grating_offset(future: model.ProgressiveFuture) -> bool: + """ + Canceller of _do_sparc_auto_grating_offset task. + """ + with future._task_lock: + future._task_state = CANCELLED + + +def _checkCancelled(future: "model.ProgressiveFuture") -> None: + """ + Check if the future has been cancelled, and if so raise CancelledError. + """ + + with future._task_lock: + if future._task_state == CANCELLED: + raise CancelledError() + + +def _total_alignment_time(n_gratings: int, + n_detectors: int) -> float: + """ + Estimate total time for aligning all grating-detector combinations. + + :param n_gratings: number of gratings to align + :param n_detectors: number of detectors to align + :return: estimated total time in seconds + """ + + runs = n_detectors + max(0, n_gratings - 1) + move_time = ((n_gratings - 1) * MOVE_TIME_GRATING + (n_detectors - 1) * MOVE_TIME_DETECTOR) + + # total time = time spent running alignment algorithms + time spent moving hardware + return runs * EST_ALIGN_TIME + move_time + +def auto_align_grating_detector_offsets(spectrograph: model.Actuator, + detectors: Union[model.Detector, List[model.Detector]], + opm: 'OpticalPathManager', + align_mode: str, + bl: model.Emitter, + selector: Optional[model.Actuator] = None, + streams: Optional[List['Stream']] = None) -> model.ProgressiveFuture: + """ + Automatically align grating-detector offsets for all combinations of gratings and detectors. + - If a selector is provided, it will be used to switch between detectors for the first grating, then the first detector + will be used for all subsequent gratings. + - For multiple detectors, the grating alignment will only be adjusted for the first detector; subsequent detectors will + be aligned by adjusting the detector offset with the grating alignment fixed. + + :param spectrograph: spectrograph + :param detectors: list of detectors + :param opm: OpticalPathManager + :param align_mode: optical alignment mode used to close slit/path + :param bl: brightlight emitter + :param selector: optional selector to switch between detectors + :param streams: optional list of streams to update with progress + :return: ProgressiveFuture that will resolve to a dict mapping (grating, detector) + :raises ValueError: if no detectors provided, or if multiple detectors provided without a selector + :raises CancelledError: if the operation is cancelled + """ + + if not isinstance(detectors, Iterable): + detectors = [detectors] + if not detectors: + raise ValueError("At least one detector must be provided") + if len(detectors) > 1 and selector is None: + raise ValueError("No selector provided, but multiple detectors") + + if streams is None: + streams = [] + + est_start = time.time() + 0.1 + n_gratings = len(spectrograph.axes["grating"].choices) + n_detectors = len(detectors) + a_time = (_total_alignment_time(n_gratings, n_detectors) + 10)# estimated time to turn on light and close slit + f = model.ProgressiveFuture(start=est_start, end=est_start + a_time) + f._progress = 0.0 + f.task_canceller = _cancel_auto_align_grating_detector_offsets + + f._task_lock = threading.Lock() + f._task_state = RUNNING + f._subfuture = InstantaneousFuture() + executeAsyncTask(f, _do_auto_align_grating_detector_offsets, args=(f, spectrograph, detectors, opm, align_mode, bl, selector, streams)) + return f + +def _do_auto_align_grating_detector_offsets(future: model.ProgressiveFuture, + spectrograph: model.Actuator, + detectors: List[model.Detector], + opm: 'OpticalPathManager', + align_mode: str, + bl: model.Emitter, + selector: Optional[model.Actuator], + streams: List['Stream'], + stabilization_time: float = 10.0) -> Optional[Dict[Any, Any]]: + """ + Iterate through each grating and detector combination, adjusting the selector if provided, and run the auto-alignment algorithm. + - If a selector is provided, it will be used to switch between detectors for the first grating, then the first detector + will be used for all subsequent gratings. + - For multiple detectors, the grating alignment will only be adjusted for the first detector; subsequent detectors will + be aligned by adjusting the detector offset with the grating alignment fixed. + - Before alignment, turn on the brightlight and closes the slit. + + :param future: ProgressiveFuture to update with progress and results + :param spectrograph: spectrograph + :param detectors: list of detectors + :param opm: OpticalPathManager + :param align_mode: optical alignment mode used to close slit/path + :param bl: brightlight emitter + :param selector: optional selector to switch between detectors + :param streams: optional list of streams to update with progress + :param stabilization_time: time to wait after moving hardware before starting alignment (default: 15s) + + :return: dict mapping (grating, detector) to alignment success boolean + :raises CancelledError: if the operation is cancelled + """ + + results: dict[tuple, bool] = {} + original_pos = {k: v for k, v in spectrograph.position.value.items() + if k in ("wavelength", "grating")} + + gratings = sorted(list(spectrograph.axes["grating"].choices.keys())) + logging.info(f"Available gratings: {list(spectrograph.axes['grating'].choices.keys())}") + + first_detector = detectors[0] + + if selector: + original_selector = selector.position.value + selector_axes, detector_to_selector = _mapDetectorToSelector(selector, detectors) + logging.debug("selector_axes=%s detector_to_selector keys=%s", + selector_axes, list(detector_to_selector.keys())) + + def is_current_detector(d): + if selector is None: + return True + return detector_to_selector[d] == selector.position.value[selector_axes] + + # calculate total steps for progress bar + total_steps = len(detectors) + (len(gratings) - 1) + current_step = 0 + future._progress = 0.0 + + # start alignment for the first grating + try: + _checkCancelled(future) + logging.info("Preparing optical path and turning on light") + + # make sure it's in 0th order (ie, show the image as-is), this also ensures the spectrograph + # is done with any previous actions. + spectrograph.moveAbsSync({"wavelength": 0}) + + # logging.info("Forcing brightlight ON") # bypassing sensor check for simulation + # bl.power.value = bl.power.range[1] + # time.sleep(1) + + logging.info("Turning on brightlight") + + future._subfuture = light.turnOnLight(bl, first_detector) + + try: + future._subfuture.result(timeout=60) + + except TimeoutError: + future._subfuture.cancel() + logging.warning("Brightlight did not confirm ON within 60s; continuing anyway") + + _checkCancelled(future) + + logging.info("Setting optical path to alignment mode: %s",align_mode) + future._subfuture = opm.setPath(align_mode,detector=first_detector) + future._subfuture.result() + + _checkCancelled(future) + future._subfuture = InstantaneousFuture() + + g0 = gratings[0] + logging.info("Starting alignment for initial grating: %s", g0) + + spectrograph.moveAbsSync({"grating": g0, "wavelength": 0}) + time.sleep(stabilization_time) + + detectors_sorted = sorted(detectors, key=is_current_detector, reverse=True) + + # align each detector for the first grating + for d in detectors_sorted: + _checkCancelled(future) + logging.info("Starting alignment | Detector: %s | Grating: %s", d.name, g0) + + if selector: + selector.moveAbsSync({selector_axes: detector_to_selector[d]}) + time.sleep(stabilization_time) + + gui_data = d.data.get(asap=False) + log_detector_state("GUI", "INITIAL", d, gui_data) + + future._subfuture = sparc_auto_grating_offset(spectrograph, d) + success = future._subfuture.result() + results[(g0, d.name)] = success + + # update progress + current_step += 1 + future._progress = current_step / total_steps + + logging.info("Finished alignment | Detector: %s | Grating: %s", d.name, g0) + + if selector: + selector.moveAbsSync({selector_axes: detector_to_selector[first_detector]}) + time.sleep(stabilization_time) + + # align remaining gratings using the first detector + for g in gratings[1:]: + _checkCancelled(future) + logging.info("Switching to grating: %s", g) + + spectrograph.moveAbsSync({"grating": g, "wavelength": 0}) + time.sleep(stabilization_time) + logging.info("Starting alignment | Detector: %s | Grating: %s", first_detector.name, g) + + future._subfuture = sparc_auto_grating_offset(spectrograph, first_detector) + success = future._subfuture.result() + results[(g, first_detector.name)] = success + + # update progress + current_step += 1 + future._progress = current_step / total_steps + + logging.info("Finished alignment | Detector: %s | Grating: %s", first_detector.name, g) + + future._progress = 1.0 + return results + + except CancelledError: + logging.info("Auto-alignment cancelled") + raise + + finally: + logging.info("Turning off brightlight") + try: + bl.power.value = (bl.power.range[0]) + except Exception: + logging.exception("Failed to turn off the light during alignment cleanup") + + spectrograph.moveAbsSync(original_pos) + if selector: + selector.moveAbsSync(original_selector) + + with future._task_lock: + future._task_state = FINISHED + +def _cancel_auto_align_grating_detector_offsets(future: model.ProgressiveFuture) -> bool: + """ + Canceller for _do_auto_align_grating_detector_offsets task. + """ + logging.debug("Cancelling autoalignment...") + + with future._task_lock: + if future._task_state == FINISHED: + return False + future._task_state = CANCELLED + future._subfuture.cancel() + logging.debug("Auto-alignment cancellation requested") + + return True diff --git a/src/odemis/acq/align/goffset_alignment.py b/src/odemis/acq/align/goffset_alignment.py new file mode 100644 index 0000000000..ee841cd0cc --- /dev/null +++ b/src/odemis/acq/align/goffset_alignment.py @@ -0,0 +1,200 @@ +import logging +import threading +import time +from collections.abc import Iterable +from concurrent.futures import TimeoutError, CancelledError +from concurrent.futures._base import CANCELLED, FINISHED, RUNNING +from typing import Any, Dict, List, Optional, Tuple, Union + +from odemis import model +from odemis.model import InstantaneousFuture +from odemis.util import executeAsyncTask + +from odemis.acq.align.autofocus import _mapDetectorToSelector +from odemis.acq.align.goffset import sparc_auto_grating_offset + + +def _checkCancelled(future: "model.ProgressiveFuture"): + + """ + Check if the future has been cancelled, and if so raise CancelledError. + """ + + with future._function_lock: + if future._function_state == CANCELLED: + raise CancelledError() + +def _total_alignment_time(n_gratings: int, + n_detectors: int) -> float: + + """ + Estimate total time for aligning all grating-detector combinations. + + :param n_gratings: number of gratings to align + :param n_detectors: number of detectors to align + :return: estimated total time in seconds + """ + + runs = n_detectors + max(0, n_gratings - 1) + move_time = ((n_gratings-1) * MOVE_TIME_GRATING + (n_detectors - 1) * MOVE_TIME_DETECTOR) + + # total time = time spent running alignment algorithms + time spent moving hardware + return runs * EST_ALIGN_TIME + move_time + + +def auto_align_grating_detector_offsets(spectrograph: model.Actuator, + detectors: Union[model.Detector, List[model.Detector]], + selector: Optional[model.Actuator] = None, + streams: Optional[List['Stream']] = None) -> model.ProgressiveFuture: + + """ + Automatically align grating-detector offsets for all combinations of gratings and detectors. + - If a selector is provided, it will be used to switch between detectors for the first grating, then the first detector + will be used for all subsequent gratings. + - For multiple detectors, the grating alignment will only be adjusted for the first detector; subsequent detectors will + be aligned by adjusting the detector offset with the grating alignment fixed. + + :param spectrograph: spectrograph + :param detectors: list of detectors + :param selector: optional selector to switch between detectors + :param streams: optional list of streams to update with progress + :return: ProgressiveFuture that will resolve to a dict mapping (grating, detector) + :raises ValueError: if no detectors provided, or if multiple detectors provided without a selector + :raises CancelledError: if the operation is cancelled + """ + + if not isinstance(detectors, Iterable): + detectors = [detectors] + if not detectors: + raise ValueError("At least one detector must be provided") + if len(detectors) > 1 and selector is None: + raise ValueError("No selector provided, but multiple detectors") + + if streams is None: + streams = [] + + est_start = time.time() + 0.1 + n_gratings = len(spectrograph.axes["grating"].choices) + n_detectors = len(detectors) + a_time = _total_alignment_time(n_gratings, n_detectors) + f = model.ProgressiveFuture(start=est_start, end=est_start + a_time) + f.task_canceller = _cancel_auto_align_grating_detector_offsets + + f._task_lock = threading.Lock() + f._task_state = RUNNING + f._subfuture = InstantaneousFuture() + executeAsyncTask(f, _do_auto_align_grating_detector_offsets, args=(f, spectrograph, detectors, selector, streams)) + return f + +MOVE_TIME_GRATING = 20 #s +MOVE_TIME_DETECTOR = 5 #s +EST_ALIGN_TIME = 30 #s + +def _do_auto_align_grating_detector_offsets(future: model.ProgressiveFuture, + spectrograph: model.Actuator, + detectors: List[model.Detector], + selector: Optional[model.Actuator], + streams: List['Stream'], + stabilization_time: float = 15.0) -> Optional[Dict[Any, Any]]: + + """ + Iterate through each grating and detector combination, adjusting the selector if provided, and run the auto-alignment algorithm. + - If a selector is provided, it will be used to switch between detectors for the first grating, then the first detector + will be used for all subsequent gratings. + - For multiple detectors, the grating alignment will only be adjusted for the first detector; subsequent detectors will + be aligned by adjusting the detector offset with the grating alignment fixed. + + :param future: ProgressiveFuture to update with progress and results + :param spectrograph: spectrograph + :param detectors: list of detectors + :param selector: optional selector to switch between detectors + :param streams: optional list of streams to update with progress + :param stabilization_time: time to wait after moving hardware before starting alignment (default: 15s) + :return: dict mapping (grating, detector) to alignment success boolean + :raises CancelledError: if the operation is cancelled + """ + + results: dict[tuple, bool] = {} + original_pos = {k: v for k, v in spectrograph.position.value.items() + if k in ("wavelength", "grating")} + + gratings = sorted(list(spectrograph.axes["grating"].choices.keys())) + logging.info(f"Available gratings: {list(spectrograph.axes['grating'].choices.keys())}") + + first_detector = detectors[0] + + if selector: + original_selector = selector.position.value + selector_axes, detector_to_selector = _mapDetectorToSelector(selector, detectors) + + def is_current_detector(d): + if selector is None: + return True + return detector_to_selector[d] == selector.position.value[selector_axes] + + try: + g0 = gratings[0] + logging.info(f"Starting alignment for initial grating: {g0}") + + spectrograph.moveAbsSync({"grating": g0, "wavelength": 0}) + time.sleep(stabilization_time) + + detectors_sorted = sorted(detectors, key=is_current_detector, reverse=True) + + # align each detector for the first grating + for d in detectors_sorted: + _checkCancelled(future) + logging.info(f"Starting alignment | Detector: {d.name} | Grating: {g0}") + + if selector: + selector.moveAbsSync({selector_axes: detector_to_selector[d]}) + future._subfuture = sparc_auto_grating_offset(spectrograph, d) + success = future._subfuture.result() + results[(g0, d.name)] = success + + logging.info(f"Finished alignment | Detector: {d.name} | Grating: {g0} | Success: {success}") + + if selector: + selector.moveAbsSync({selector_axes: detector_to_selector[first_detector]}) + + # align remaining gratings using the first detector + for g in gratings[1:]: + _checkCancelled(future) + logging.info(f"Switching to grating: {g}") + + spectrograph.moveAbsSync({"grating": g, "wavelength": 0}) + time.sleep(stabilization_time) + logging.info(f"Starting alignment | Detector: {first_detector.name} | Grating: {g}") + + future._subfuture = sparc_auto_grating_offset(spectrograph, first_detector) + success = future._subfuture.result() + results[(g, first_detector.name)] = success + + logging.info(f"Finished alignment | Detector: {first_detector.name} | Grating: {g} | Success: {success}") + + return results + + except CancelledError: + logging.info("Auto-alignment cancelled") + raise + + finally: + spectrograph.moveAbsSync(original_pos) + if selector: + selector.moveAbsSync(original_selector) + + with future._task_lock: + future._task_state = FINISHED + + +def _cancel_auto_align_grating_detector_offsets(future: model.ProgressiveFuture) -> bool: + logging.debug("Cancelling autoalignment...") + + with future._task_lock: + if future._task_state == FINISHED: + return False + future._task_state = CANCELLED + future._subfuture.cancel() + logging.debug("Auto-alignment cancellation requested") + + return True diff --git a/src/odemis/acq/align/test/goffset_alignment_test.py b/src/odemis/acq/align/test/goffset_alignment_test.py new file mode 100644 index 0000000000..cf32fc3233 --- /dev/null +++ b/src/odemis/acq/align/test/goffset_alignment_test.py @@ -0,0 +1,131 @@ +import logging +import os +import time +import unittest +import numpy +from collections.abc import Iterable +from concurrent.futures import CancelledError +from scipy import ndimage + +from odemis import model, acq + +#from odemis.acq.align.goffset import auto_align_grating_detector_offsets + +from odemis.util import testing, timeout, img +import odemis.util.focus + +from odemis.acq.align.goffset import( + find_peak_position, + acquire_peak, + estimate_goffset_scale, + sparc_auto_grating_offset, + auto_align_grating_detector_offsets +) + + + +CONFIG_PATH = os.path.dirname(odemis.__file__) + "/../../install/linux/usr/share/odemis/" +SPARC_CONFIG = CONFIG_PATH + "sim/sparc2-focus-test.odm.yaml" + +logging.getLogger().setLevel(logging.DEBUG) + +class TestAutoAlignGratingDetectorOffsets(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.spgr = model.getComponent(role="spectrograph") + cls.ccd = model.getComponent(role="ccd") + #cls.spccd = model.getComponent(role="sp-ccd") + cls.selector = model.getComponent(role="spec-det-selector") + + def setUp(self): + # Speed up acquisition + self.ccd.exposureTime.value = self.ccd.exposureTime.range[0] + + # @timeout(100) + # def test_cancel(self): + # f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=[self.ccd],) + # time.sleep(1) + # + # cancelled = f.cancel() + # self.assertTrue(cancelled) + # self.assertTrue(f.cancelled()) + # + # with self.assertRaises(CancelledError): + # f.result(timeout=900) + + + @timeout(1000) + def test_single_detector_iteration(self): + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=[self.ccd], selector=self.selector) + res = f.result(timeout=900) + + n_gratings = len(self.spgr.axes["grating"].choices) + n_detectors = 1 + expected = n_detectors + (n_gratings - 1) + + self.assertEqual(len(res), expected) + + first_grating = list(self.spgr.axes["grating"].choices.keys())[0] + dets_first = [d for (g, d) in res.keys() if g == first_grating] + + self.assertEqual(len(dets_first), n_detectors) + + @timeout(100) + def test_cancel(self): + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=[self.ccd],) + + time.sleep(1) + + cancelled = f.cancel() + self.assertTrue(cancelled) + self.assertTrue(f.cancelled()) + + with self.assertRaises(CancelledError): + f.result(timeout=900) + + def test_multi_detector_iteration(self): + spccd = model.getComponent(role="sp-ccd") + spccd.exposureTime.value = spccd.exposureTime.range[0] + + detectors = [self.ccd, spccd] + + # run alignment + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=detectors, selector=self.selector) + res = f.result(timeout=900) + + # calculate expected results + n_gratings = len(self.spgr.axes["grating"].choices) + n_detectors = len(detectors) + expected_count = n_detectors + (n_gratings - 1) + + self.assertEqual(len(res), expected_count, f"Expected {expected_count} results, got {len(res)}") + + # verify that every detector was used for the first grating + gratings_list = list(self.spgr.axes["grating"].choices.keys()) + first_grating = gratings_list[0] + + dets_for_first_grating = [d for (g, d) in res.keys() if g == first_grating] + self.assertEqual(len(dets_for_first_grating), n_detectors) + self.assertIn(self.ccd.name, dets_for_first_grating) + self.assertIn(spccd.name, dets_for_first_grating) + + # verify that only first detector is used for remaining gratings + for g in gratings_list[1:]: + # Ensure only the first detector (index 0) is present for these gratings + dets_for_this_grating = [d for (grating, d) in res.keys() if grating == g] + self.assertEqual(len(dets_for_this_grating), 1) + self.assertEqual(dets_for_this_grating[0], detectors[0].name) + + # move to spectral camera + self.selector.moveAbsSync({"rx": 1.5707963267948966}) + data = spccd.data.get(asap=False) + + # check data is not flat + if data.max() == data.min(): + print("WARNING: sp-ccd is returning a flat image!") + else: + print(f"sp-ccd signal range: {data.min()} to {data.max()}") + +if __name__ == '__main__': + unittest.main() diff --git a/src/odemis/acq/align/test/goffset_test.py b/src/odemis/acq/align/test/goffset_test.py new file mode 100644 index 0000000000..6f3b545feb --- /dev/null +++ b/src/odemis/acq/align/test/goffset_test.py @@ -0,0 +1,561 @@ +# -*- coding: utf-8 -*- +""" +Test Sparc auto grating offset alignment +""" + +import logging +import numpy as np +import os +import threading +import unittest + +from concurrent.futures import Future +from concurrent.futures._base import RUNNING +from odemis import model +from odemis.util import timeout +from odemis.acq.align.goffset import(find_peak_position, peak_is_present, estimate_goffset_scale, + sparc_auto_grating_offset, auto_align_grating_detector_offsets, + _do_auto_align_grating_detector_offsets, log_detector_state) +from odemis.dataio import hdf5 +from odemis.model import ProgressiveFuture +from unittest.mock import patch, MagicMock + +logging.getLogger().setLevel(logging.DEBUG) + +HOME_PATH = os.path.expanduser("~") + "/" +H5_FILE_2d_NO_PEAK = HOME_PATH + "development/odemis/grating 2 1024x256/" + +class TestSparcAutoGratingOffset(unittest.TestCase): + """ + Test automatic grating offset alignment. + """ + + @classmethod + def setUpClass(cls): + + cls.detector = model.getComponent(role="ccd") + cls.spgr = model.getComponent(role="spectrograph") + # cls.spccd = model.getComponent(role="sp-ccd") + cls.selector = model.getComponent(role="spec-det-selector") + + cls._original_goffset = cls.spgr.goffset + cls._original_position = cls.spgr.position.value.copy() + + # @classmethod + # def tearDownClass(cls): + # # restore original position + # try: + # cls.spgr.moveAbsSync(cls._original_position) + # except Exception: + # logging.exception("Failed restoring spectrograph position") + + def setUp(self): + # speed up detector + self.detector.exposureTime.value = self.detector.exposureTime.range[0] + + def test_find_peak_position_synthetic(self): + """ + Test peak detection on synthetic Gaussian data. + """ + + x = np.arange(200) + true_center = 83.4 + spectrum = np.exp(-0.5*((x-true_center)/3.0)**2) + + peak = find_peak_position(spectrum) + self.assertAlmostEqual(peak, true_center, places=1) + + def test_find_peak_position_2d(self): + """ + Test peak detection on 2D data (mean over axis). + """ + + x = np.arange(200) + true_center = 120.0 + line = np.exp(-0.5*((x-true_center)/4.0)**2) + image = np.tile(line, (50, 1)) + + peak = find_peak_position(image) + self.assertAlmostEqual(peak, true_center, places=1) + + @timeout(100) + def test_estimate_goffset_scale(self): + """ + Test that goffset scale is non-zero and finite. + """ + scale = estimate_goffset_scale(self.spgr, self.detector) + + self.assertIsInstance(scale, float) + self.assertNotEqual(scale, 0.0) + self.assertTrue(np.isfinite(scale)) + + @timeout(800) + def test_scale_not_misaligned(self): + """ + Verify scale estimation only happens when the peak is misaligned. + This is inferred from the probe move performed by estimate_goffset_scale(). + """ + # reset spectrograph to known aligned position + self.spgr.moveAbsSync(self._original_position) + + start_goffset = self.spgr.position.value["goffset"] + + f = sparc_auto_grating_offset(self.spgr, self.detector, max_it=20) + result = f.result(timeout=300) + + end_goffset = self.spgr.position.value["goffset"] + + self.assertTrue(result) + + # If peak is already centered, the algorithm exits immediately + # so goffset should not change. + self.assertAlmostEqual(start_goffset, end_goffset, places=6, + msg="goffset changed even though peak was already centered (scale estimation likely ran)") + + def test_scale_estimation_misaligned(self): + """ + Verify that scale estimation runs when the peak is misaligned, which is inferred + from the probe move performed by estimate_goffset_scale(). + """ + + delta = 500 + current = self.spgr.position.value["goffset"] + maxv = self.spgr.axes["goffset"].range[1] + direction = 1 if (current + delta < maxv) else -1 + + self.spgr.moveRelSync({"goffset": delta * direction}) + + start_goffset = self.spgr.position.value["goffset"] + + f = sparc_auto_grating_offset(self.spgr, self.detector, max_it=50) + result = f.result(timeout=600) + + end_goffset = self.spgr.position.value["goffset"] + + self.assertTrue(result) + + # If misaligned, centering should move the grating + self.assertNotAlmostEqual(start_goffset, end_goffset, places=3, + msg="goffset did not change during alignment when peak was misaligned") + + @timeout(800) + def test_auto_grating_offset(self): + """ + Test automatic centering of spectral peak (one grating/detector only). + """ + + delta = 0 # intentionally misalign + current = self.spgr.position.value["goffset"] + goffset_max = self.spgr.axes["goffset"].range[1] + direction = 1 if (current + delta < goffset_max) else -1 + + test_data = self.detector.data.get(asap=False) + log_detector_state("TEST", "INITIAL", self.detector, test_data) + + self.spgr.moveRelSync({"goffset": delta * direction}) + logging.info("Test: after misalign move, spgr.position.gooffset = %s", self.spgr.position.value["goffset"]) + f = sparc_auto_grating_offset(self.spgr, self.detector, max_it=100) + + result = f.result(timeout=800) + self.assertTrue(result) + + @timeout(600) + def test_single_detector_updates_grating(self): + """ + Single-detector mode: aligning a detector on the secondary port must update + the grating offset and not the detector offset. + """ + + spccd = model.getComponent(role="sp-ccd") + spccd.exposureTime.value = spccd.exposureTime.range[0] + + # try to move selector to secondary port; ignore if not available + try: + self.selector.moveAbsSync({"rx": 1.5707963267948966}) + except Exception: + logging.debug("Selector move to secondary failed or not present; continuing") + + start_goffset = self.spgr.position.value["goffset"] + + f = sparc_auto_grating_offset(self.spgr, spccd, max_it=50) + result = f.result(timeout=300) + self.assertTrue(result, "Single-detector alignment failed") + + after_goffset = self.spgr.position.value["goffset"] + self.assertNotAlmostEqual(start_goffset, after_goffset, places=3, + msg="Grating goffset did not change for single-detector alignment on secondary port") + + @timeout(900) + def test_multi_detector_does_not_change_grating(self): + """ + Multi-detector mode: first detector sets the grating; aligning the second + detector should not change the grating offset (it should adjust detector offset). + """ + + spccd = model.getComponent(role="sp-ccd") + spccd.exposureTime.value = spccd.exposureTime.range[0] + + # ensure selector points to primary detector for initial grating-setting alignment + try: + self.selector.moveAbsSync({"rx": 0.0}) + except Exception: + logging.debug("Selector move to primary failed or not present; continuing") + + # align first detector to set grating offset + f_first = sparc_auto_grating_offset(self.spgr, self.detector, max_it=50) + self.assertTrue(f_first.result(timeout=300), "First-detector alignment failed") + grating_after_first = self.spgr.position.value["goffset"] + + # switch to secondary detector + try: + self.selector.moveAbsSync({"rx": 1.5707963267948966}) + except Exception: + logging.debug("Selector move to secondary failed or not present; continuing") + + # align second detector in multi-detector mode (should not change grating) + f_second = sparc_auto_grating_offset(self.spgr, spccd, max_it=50) + self.assertTrue(f_second.result(timeout=300), "Second-detector alignment failed") + + grating_after_second = self.spgr.position.value["goffset"] + self.assertAlmostEqual(grating_after_first, grating_after_second, places=3, + msg="Grating goffset changed when aligning second detector in multi-detector mode") + + @timeout(900) + def test_multi_detector_detector_offset_changes(self): + """ + Multi-detector mode: after the first detector sets the grating, aligning the + second detector should NOT change the grating goffset but should change the + detector-specific response (verified by the peak moving closer to center). + """ + spccd = model.getComponent(role="sp-ccd") + spccd.exposureTime.value = spccd.exposureTime.range[0] + + # ensure primary detector sets the grating first + try: + self.selector.moveAbsSync({"rx": 0.0}) + except Exception: + logging.debug("Selector move to primary failed or not present; continuing") + + f_first = sparc_auto_grating_offset(self.spgr, self.detector, max_it=50) + self.assertTrue(f_first.result(timeout=300), "First-detector alignment failed") + grating_after_first = self.spgr.position.value["goffset"] + + # switch to secondary detector + try: + self.selector.moveAbsSync({"rx": 1.5707963267948966}) + except Exception: + logging.debug("Selector move to secondary failed or not present; continuing") + + # measure peak before alignment on secondary detector + data_before = spccd.data.get(asap=False) + before_peak = float(find_peak_position(data_before)) + + # Run alignment for second detector in multi-detector mode + f_second = sparc_auto_grating_offset(self.spgr, spccd, max_it=50) + self.assertTrue(f_second.result(timeout=600), "Second-detector alignment failed") + + # measure peak after alignment + data_after = spccd.data.get(asap=False) + after_peak = float(find_peak_position(data_after)) + + # assert grating did not change + grating_after_second = self.spgr.position.value["goffset"] + self.assertAlmostEqual(grating_after_first, grating_after_second, places=3, + msg="Grating goffset changed when aligning second detector in multi-detector mode") + + # assert peak moved closer to center on the secondary detector + center = spccd.resolution.value[0] / 2 + before_dist = abs(before_peak - center) + after_dist = abs(after_peak - center) + self.assertLess(after_dist, before_dist, + msg="Peak did not move closer to center on second detector after alignment") + + def test_single_detector_iteration(self): + """ + Verifies that the auto-alignment algorithm generates the minimum required set + of offsets when operating with a single detector and multiple gratings. + """ + + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=self.detector, selector=self.selector) + res = f.result(timeout=900) + + n_gratings = len(self.spgr.axes["grating"].choices) + n_detectors = 1 + expected = n_detectors + (n_gratings - 1) + + self.assertEqual(len(res), expected) + + first_grating = list(self.spgr.axes["grating"].choices.keys())[0] + dets_first = [d for (g, d) in res.keys() if g == first_grating] + + self.assertEqual(len(dets_first), n_detectors) + + def test_single_detector_alignment_algorithm(self): + + """ + Test automatic alignment of the gratings using a single detector. + """ + + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=self.detector, selector=self.selector) + result = f.result(timeout=800) + + self.assertTrue(result) + + def test_multi_detector_iteration(self): + """ + Tests the auto-alignment sequence for a multi-detector setup. + + Verifies that: + - Grating 1 aligns ALL detectors (calibrating the detector offsets). + - Subsequent gratings align ONLY the primary detector (calibrating the grating offsets). + - The total number of alignment runs is exactly as expected. + - The secondary detector receives valid optical data post-alignment. + """ + + spccd = model.getComponent(role="sp-ccd") + spccd.exposureTime.value = spccd.exposureTime.range[0] + + detectors = [self.detector, spccd] + + # run alignment + f = auto_align_grating_detector_offsets(spectrograph=self.spgr, detectors=detectors, selector=self.selector) + res = f.result(timeout=900) + + # calculate expected results + n_gratings = len(self.spgr.axes["grating"].choices) + n_detectors = len(detectors) + expected_count = n_detectors + (n_gratings - 1) + + self.assertEqual(len(res), expected_count, f"Expected {expected_count} results, got {len(res)}") + + # verify that every detector was used for the first grating + gratings_list = list(self.spgr.axes["grating"].choices.keys()) + first_grating = gratings_list[0] + + dets_for_first_grating = [d for (g, d) in res.keys() if g == first_grating] + self.assertEqual(len(dets_for_first_grating), n_detectors) + self.assertIn(self.ccd.name, dets_for_first_grating) + self.assertIn(spccd.name, dets_for_first_grating) + + # verify that only first detector is used for remaining gratings + for g in gratings_list[1:]: + # Ensure only the first detector (index 0) is present for these gratings + dets_for_this_grating = [d for (grating, d) in res.keys() if grating == g] + self.assertEqual(len(dets_for_this_grating), 1) + self.assertEqual(dets_for_this_grating[0], detectors[0].name) + + # move to spectral camera + self.selector.moveAbsSync({"rx": 1.5707963267948966}) + data = spccd.data.get(asap=False) + + # check data is not flat + self.assertNotEqual(data.max(), data.min()) + + def test_driver_raises_valueerror_on_hardware_limit(self): + """ + Verifies the driver successfully raises a ValueError when computing + an offset outside of the hardware limits. + """ + # out of bounds value + out_of_bounds_target = 9999999.0 + + # test method directly to ensure the new bounds-checking logic works + with self.assertRaises(ValueError, msg="Driver should raise ValueError for out-of-bounds offset"): + self.spgr._doSetGoffsetAbs(out_of_bounds_target) + + @timeout(150) + def test_hardware_limit_valueerror_handled(self): + """ + Verifies that when a goffset move exceeds hardware limits and throws a ValueError, + the alignment algorithm catches it gracefully instead of crashing. + """ + # slightly misalign the peak so the algorithm wants to move + self.spgr.moveRelSync({"goffset": 50}) + + # temporarily bypass the software axis range clamp to ensure the raw command + # reaches the hardware driver to trigger the ValueError + original_range = self.spgr.axes["goffset"].range + + try: + self.spgr.axes["goffset"].range = (-9999999.0, 9999999.0) + + # execute auto-alignment procedure + f = sparc_auto_grating_offset(self.spgr, self.detector, max_it=10, gain=99999.0) + result = f.result(timeout=100) + + # driver should raise ValueError for out-of-bounds move, which should be caught by the alignment code + self.assertFalse(result, "Alignment should gracefully fail (return False) when blocked by hardware limits.") + + finally: + # restore original software limits and position + self.spgr.axes["goffset"].range = original_range + self.spgr.moveAbsSync({"goffset": self._original_position["goffset"]}) + + def test_find_peak_position_realistic_2d(self): + """ + Test peak detection on realistic 2D detector data (like GUI), + including noise and baseline offset. + """ + np.random.seed(0) + + x = np.arange(1024) + true_center = 512.3 + + # base gaussian peak + peak = 50 * np.exp(-0.5 * ((x - true_center) / 6.0) ** 2) + + # simulate camera baseline + baseline = 300 + + # build 2D image with row variations + rows = 256 + image = [] + + for i in range(rows): + row_noise = np.random.normal(0, 3, size=x.shape) # noise + row_gain = 1 + np.random.normal(0, 0.02) # slight variation + row = baseline + row_gain * peak + row_noise + image.append(row) + + image = np.array(image) + + # run function + peak_pos = find_peak_position(image) + + self.assertAlmostEqual(peak_pos, true_center, places=1) + + def test_no_peak_present_image(self): + """ + Test peak detection on not-peak detector data. + """ + + im_no_peak = hdf5.read_data(H5_FILE_2d_NO_PEAK + "no-peak-2d-1.h5") + data = (im_no_peak[0].data) + + spectrum = data.mean(axis=0) if data.ndim == 2 else data + + present = peak_is_present(spectrum, snr_threshold=10.0, width_range=(0.5, 12.0)) + + self.assertFalse(present, f"Peak was not detected. Check SNR and width limits.") + + def test_peak_present_image(self): + im_peak = hdf5.read_data(H5_FILE_2d_NO_PEAK + "with-peak-2d-1.h5") + raw_data = np.asarray(im_peak[0].data) + + spectrum = np.squeeze(raw_data) + if spectrum.ndim > 1: + # convert 2D array into 1D by averaging + spectrum = spectrum.mean(axis=0) + + clean_spec = spectrum - np.median(spectrum) + noise_std = np.std(clean_spec) + peak_val = clean_spec.max() + snr = peak_val / (noise_std + 1e-6) + peak_idx = np.argmax(clean_spec) + + # simple width estimation + window = clean_spec[max(0, peak_idx - 2): min(len(clean_spec), peak_idx + 3)] + x = np.arange(len(window)) + w = window - window.min() + width = 0.0 + if w.sum() > 0: + mean = np.sum(x * w) / np.sum(w) + var = np.sum(w * (x - mean) ** 2) / np.sum(w) + width = np.sqrt(var) + + # debug logs + logging.info("=" * 40) + logging.info(f"IMAGE DEBUG SCORECARD") + logging.info(f"Final Spectrum Shape: {spectrum.shape}") + logging.info(f"Peak Location: Index {peak_idx}") + logging.info(f"Calculated SNR: {snr:.2f} (Threshold: 10.0)") + logging.info(f"Calculated Width: {width:.2f} (Range: 0.5 - 12.0)") + logging.info("=" * 40) + + logging.info(f"Cleaned Spectrum Shape for Function: {spectrum.shape}") + + # pass the cleaned 1D array to the existing function + present = peak_is_present(spectrum, snr_threshold=10.0, width_range=(0.5, 12.0)) + + self.assertTrue(present, "Peak should have been detected in the cleaned spectrum.") + + @patch('odemis.acq.align.goffset.sparc_auto_grating_offset') + @patch('odemis.acq.align.goffset.light.turnOnLight') + @patch('odemis.acq.align.goffset.time.sleep') + + def test_do_auto_align_mocked(self, mock_sleep, mock_turnOnLight, mock_sparc_offset): + """ + Test the full auto-calibration sequence using mocked hardware to bypass + timeouts, sleep delays, and physical hardware requirements. + """ + + # set up mocked return values using standard python futures + fake_light_future = Future() + fake_light_future.set_result(True) + mock_turnOnLight.return_value = fake_light_future + + fake_align_future = Future() + fake_align_future.set_result(True) + mock_sparc_offset.return_value = fake_align_future + + # set up fake hardware + fake_spectrograph = MagicMock() + + # Mock the initial position and the grating choices + fake_spectrograph.position.value = {"wavelength": 0, "grating": "grating1"} + fake_spectrograph.axes = {"grating": MagicMock(choices={"grating1": 0, "grating2": 1})} + + fake_detector = MagicMock() + fake_detector.name = "fake_ccd" + fake_detector.data.get.return_value = np.zeros((1024,)) + + fake_opm = MagicMock() + fake_path_future = Future() + fake_path_future.set_result(True) + fake_opm.setPath.return_value = fake_path_future + + fake_bl = MagicMock() + fake_bl.power.range = [0, 100] + + main_future = ProgressiveFuture(start=0, end=100) + main_future._task_lock = threading.Lock() + main_future._task_state = RUNNING + main_future._subfuture = Future() + + # run function + results = _do_auto_align_grating_detector_offsets(future=main_future, spectrograph=fake_spectrograph, + detectors=[fake_detector], opm=fake_opm, + align_mode="spec-focus", bl=fake_bl, selector=None, + streams=[]) + + # Assertions + # did it try to turn on the light? + mock_turnOnLight.assert_called_once_with(fake_bl, fake_detector) + + # did it set the optical path? + fake_opm.setPath.assert_called_once_with("spec-focus", detector=fake_detector) + + # did it try to align both gratings we mocked? + self.assertEqual(mock_sparc_offset.call_count, 2) + + # did it clean up and turn the light off in the finally block? + self.assertEqual(fake_bl.power.value, 0) + + # check the final returned results dict + self.assertEqual(results, {("grating1", "fake_ccd"): True, ("grating2", "fake_ccd"): True}) + + @timeout(100) + def test_cancel(self): + """ + Test cancelling alignment. + """ + f = sparc_auto_grating_offset(self.spgr, self.detector) + # wait for the result or a timeout + try: + f.result(timeout=5) + except: + pass + self.assertTrue(f.done()) + +if __name__ == "__main__": + unittest.main() diff --git a/src/odemis/gui/cont/tabs/sparc2_align_tab.py b/src/odemis/gui/cont/tabs/sparc2_align_tab.py index 17af22d75c..95e8e87a6b 100644 --- a/src/odemis/gui/cont/tabs/sparc2_align_tab.py +++ b/src/odemis/gui/cont/tabs/sparc2_align_tab.py @@ -45,6 +45,7 @@ Sparc2AutoFocus, Sparc2ManualFocus, ) +from odemis.acq.align.goffset import auto_align_grating_detector_offsets from odemis.acq.stream_settings import StreamSettingsConfig from odemis.gui.comp import popup from odemis.gui.conf.data import get_hw_config, get_local_vas @@ -864,6 +865,16 @@ def add_axis(axisname, comp, label=None): # TODO: Auto remove the background when the image shape changes? # TODO: Use a toggle button to show the background is in use or not? + # Auto-calibration button + self.panel.btn_auto_calibrate.Bind(wx.EVT_BUTTON, self._on_btn_auto_calibrate) + + # Auto-calibration state + self._auto_calibrate_future = model.InstantaneousFuture() + + # Create progress timer ONCE + self._progress_timer = wx.Timer(self.panel) + self.panel.Bind(wx.EVT_TIMER, self._on_progress_timer, self._progress_timer) + def _on_btn_auto_align(self, evt): """ Handle the "Auto alignment" button click. @@ -1537,6 +1548,130 @@ def _on_align_mode_done(self, f): else: logging.debug("Optical path was updated.") + + # Auto-Calibration + def _on_btn_auto_calibrate(self, evt): + + # Check if there's a process running, if so, cancel and reset + if hasattr(self, "_auto_calibrate_future") and not self._auto_calibrate_future.done(): + self._auto_calibrate_future.cancel() + + if self._progress_timer.IsRunning(): + self._progress_timer.Stop() + + self.panel.btn_auto_calibrate.SetLabel("Auto calibrate") + self.panel.gauge_auto_calibrate.SetValue(0) + return + + # Reset progress bar + self.panel.gauge_auto_calibrate.SetValue(0) + self.panel.btn_auto_calibrate.SetLabel("Cancel") + + wx.CallAfter(self._start_auto_calibration) + + if not self._progress_timer.IsRunning(): + self._progress_timer.Start(200) + + def _start_auto_calibration(self): + main = self.tab_data_model.main + align_mode = self.tab_data_model.align_mode.value + opm = main.opm + + # Set the optical path according to the align mode + if align_mode == "streak-align": + if (main.streak_ccd + and main.spectrograph_ded + and main.streak_ccd.name in main.spectrograph_ded.affects.value): + opath = "streak-focus-ext" + else: + opath = "streak-focus" + elif align_mode == "tunnel-lens-align": + opath = "spec-focus-ext" + elif align_mode in ("lens-align", "lens2-align", "light-in-align"): + opath = "spec-focus" + else: + logging.warning("Auto calibration requested not compatible with requested alignment mode %s. Do nothing.", + align_mode) + return + + # Pick the right hardware based on whether opath is external or internal + if opath in ("spec-focus-ext", "streak-focus-ext"): + bl = main.brightlight_ext + spectrograph = main.spectrograph_ded + selector = getattr(main, "spec_ded_det_selector", None) + else: + bl = main.brightlight + spectrograph = main.spectrograph + selector = getattr(main, "spec_det_selector", None) + + if not bl or not spectrograph: + logging.error("Missing brightlight or spectrograph. Cannot start auto-calibration.") + return + + # Combine all detectors + detectors = getattr(main, "ccds", []) + getattr(main, "sp_ccds", []) + if not detectors: + detectors = [main.ccd] if main.ccd else [] + + # If multiple detectors but no selector, only use the first detector + if len(detectors) > 1 and selector is None: + logging.warning("Multiple detectors detected but no selector; aligning only the first detector") + detectors = [detectors[0]] + + logging.info( + "Starting auto-calibration: detectors=%s selector=%s path=%s", + [d.name for d in detectors], selector.position.value if selector else None, opath) + + # Start alignment procedure + self._auto_calibrate_future = auto_align_grating_detector_offsets( + spectrograph, detectors, opm, opath, bl, selector=selector) + + # Bind progress & done callbacks + self._auto_calibrate_future.add_done_callback(self._on_auto_calibrate_done) + self.panel.btn_auto_calibrate.SetLabel("Cancel") + + def _on_progress_timer(self, evt): + if not hasattr(self, "_auto_calibrate_future"): + return + + f = self._auto_calibrate_future + + if f.done(): + self._progress_timer.Stop() + self.panel.gauge_auto_calibrate.SetValue(100) + return + + # Fetch where the backend logic says we are + target_val = int(getattr(f, "_progress", 0.0) * 100) + current_val = self.panel.gauge_auto_calibrate.GetValue() + + # Smooth UI trick: + # If we are behind the target, catch up quickly. + if current_val < target_val: + self.panel.gauge_auto_calibrate.SetValue(current_val + 1) + + # If we are caught up, STILL allow the bar to creep up to 5% ahead of the target. + # This ensures the bar instantly starts moving to ~4% while waiting + # for the hardware's 10-second stabilization time. + elif current_val < (target_val + 4) and current_val < 95: + self.panel.gauge_auto_calibrate.SetValue(current_val + 1) + + def _on_auto_calibrate_done(self, f): + try: + result = f.result() + logging.info("Auto calibration finished: %s", result) + except CancelledError: + logging.info("Auto calibration cancelled") + except Exception: + logging.exception("Auto calibration failed") + finally: + self._auto_calibrate_future = model.InstantaneousFuture() + + wx.CallAfter(self._progress_timer.Stop) + + wx.CallAfter(self.panel.btn_auto_calibrate.SetLabel, "Auto calibrate") + wx.CallAfter(self.panel.gauge_auto_calibrate.SetValue, 0) + @call_in_wx_main def _on_lens_align_done(self, f): # Has no effect now, as OPM future are not cancellable (but it makes the