diff --git a/core/pioreactor/calibrations/pooling.py b/core/pioreactor/calibrations/pooling.py new file mode 100644 index 000000000..5fc5c0df4 --- /dev/null +++ b/core/pioreactor/calibrations/pooling.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +import typing as t +from collections.abc import Callable + +from pioreactor import structs +from pioreactor.utils.timing import current_utc_datestamp +from pioreactor.utils.timing import current_utc_datetime + + +def pool_od_calibrations( + calibrations: list[structs.ODCalibration | structs.OD600Calibration], + fit: t.Literal["spline", "poly", "akima"] = "spline", +) -> structs.OD600Calibration | structs.ODCalibration: + """ + Merge raw recorded_data from multiple OD calibrations and refit a new curve. + """ + if not calibrations: + raise ValueError("No calibrations provided for pooling.") + + if len(calibrations) == 1: + # Just copy and rename + cal = calibrations[0] + base_class = type(cal) + new_name = f"pooled-od{cal.angle}-from-1-unit-{current_utc_datestamp()}" + + kwargs = {f: getattr(cal, f) for f in cal.__struct_fields__} + kwargs["calibration_name"] = new_name + kwargs["calibrated_on_pioreactor_unit"] = "$cluster" + kwargs["created_at"] = current_utc_datetime() + return base_class(**kwargs) + + # Validation: must share angle and pd_channel + first_cal = calibrations[0] + angle = first_cal.angle + pd_channel = first_cal.pd_channel + ir_led_intensity = first_cal.ir_led_intensity + + # Check compatibility + for cal in calibrations[1:]: + if cal.angle != angle: + raise ValueError(f"Incompatible angles: {angle} != {cal.angle}") + if cal.pd_channel != pd_channel: + raise ValueError(f"Incompatible pd_channels: {pd_channel} != {cal.pd_channel}") + + # ir_led_intensity must be within 5% + if ir_led_intensity == 0: + if cal.ir_led_intensity != 0: + raise ValueError("Incompatible ir_led_intensity: 0 vs non-zero") + elif abs(cal.ir_led_intensity - ir_led_intensity) / ir_led_intensity > 0.05: + raise ValueError( + f"Incompatible ir_led_intensity: {ir_led_intensity} and {cal.ir_led_intensity} differ by > 5%" + ) + + # Merging + merged_x: list[float] = [] + merged_y: list[float] = [] + weights: list[float] = [] + + for cal in calibrations: + x_data = cal.recorded_data["x"] + y_data = cal.recorded_data["y"] + count = len(x_data) + if count == 0: + continue + + merged_x.extend(x_data) + merged_y.extend(y_data) + + # Equal weight for each point + weights.extend([1.0] * count) + + if not merged_x: + raise ValueError("No recorded data found in any provided calibrations.") + + # Refit + if fit == "poly": + from pioreactor.calibrations.utils import calculate_poly_curve_of_best_fit + + curve_data = calculate_poly_curve_of_best_fit(merged_x, merged_y, degree=2, weights=weights) + elif fit == "spline": + from pioreactor.utils.splines import spline_fit + + knots_count = min(4, len(set(merged_x))) + curve_data = spline_fit(merged_x, merged_y, knots=max(2, knots_count), weights=weights) # type: ignore + elif fit == "akima": + from pioreactor.utils.akimas import akima_fit + + curve_data = akima_fit(merged_x, merged_y) # type: ignore + else: + raise ValueError(f"Unsupported fit type: {fit}") + + new_name = f"pooled-od{angle}-{current_utc_datestamp()}" + + kwargs = {f: getattr(first_cal, f) for f in first_cal.__struct_fields__} + kwargs["calibration_name"] = new_name + kwargs["calibrated_on_pioreactor_unit"] = "$cluster" + kwargs["created_at"] = current_utc_datetime() + kwargs["curve_data_"] = curve_data + kwargs["recorded_data"] = {"x": merged_x, "y": merged_y} + + base_class = type(first_cal) + return base_class(**kwargs) + + +_POOLING_HANDLERS: dict[str, Callable] = { + "od": pool_od_calibrations, + "od600": pool_od_calibrations, +} diff --git a/core/pioreactor/web/api.py b/core/pioreactor/web/api.py index dfdaf0947..ed75de55f 100644 --- a/core/pioreactor/web/api.py +++ b/core/pioreactor/web/api.py @@ -1753,11 +1753,136 @@ def get_all_active_estimators(pioreactor_unit: str) -> DelayedResponseReturnValu def get_all_estimators(pioreactor_unit: str) -> DelayedResponseReturnValue: if pioreactor_unit == UNIVERSAL_IDENTIFIER: task = cache.cached_multicast_get(cache.ESTIMATORS, get_all_workers()) + return create_task_response(task) + + +@api_bp.route("/cluster/calibrations//pool", methods=["POST"]) +def pool_calibrations(device: str) -> ResponseReturnValue: + payload = request.get_json(silent=True) or {} + donor_units = payload.get("donor_units") + + if donor_units: + task = tasks.multicast_get(f"/unit_api/calibrations/{device}/active", donor_units, return_raw=True) else: - task = cache.cached_multicast_get(cache.ESTIMATORS, [pioreactor_unit]) + task = fanout.broadcast_get_across_workers(f"/unit_api/calibrations/{device}/active", return_raw=True) + + try: + results = task.get(blocking=True, timeout=15) + except (HueyException, TaskException): + abort_with(500, "Timed out fetching active calibrations from workers") + + donors = [] + skipped = [] + calibrations = [] + + from pioreactor.structs import AllCalibrations + from pioreactor.utils import yaml_decode, yaml_encode + + for worker, result in results.items(): + if result is None: + skipped.append(worker) + continue + try: + cal = yaml_decode(result, type=AllCalibrations) + calibrations.append(cal) + donors.append(worker) + except Exception: + skipped.append(worker) + + from pioreactor.calibrations.pooling import _POOLING_HANDLERS + + if device.startswith("od"): + handler = _POOLING_HANDLERS.get("od") + else: + handler = _POOLING_HANDLERS.get(device) + + if not handler: + abort_with(400, f"Pooling not supported for device {device}") + + try: + pooled = handler(calibrations) + except Exception as e: + abort_with(400, f"Failed to pool calibrations: {e}") + + return jsonify({ + "calibration_data": yaml_encode(pooled).decode("utf-8"), + "calibration_name": pooled.calibration_name, + "donors": donors, + "skipped": skipped + }) + + +@api_bp.route("/workers//calibrations//apply", methods=["POST"]) +def apply_calibration(pioreactor_unit: str, device: str) -> DelayedResponseReturnValue: + payload = request.get_json(silent=True) or {} + if "calibration_data" not in payload: + abort_with(400, "Missing calibration_data in payload") + + payload["set_as_active"] = True + + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + task = tasks.multicast_post(f"/unit_api/calibrations/{device}", get_all_workers(), json=payload) + else: + task = tasks.multicast_post(f"/unit_api/calibrations/{device}", [pioreactor_unit], json=payload) + + # Invalidate cache + if pioreactor_unit == UNIVERSAL_IDENTIFIER: + cache.cache.delete_memoized(cache.get_all_calibrations) + else: + cache.cache.delete_memoized(cache.get_all_calibrations, pioreactor_unit) + return create_task_response(task) +@api_bp.route("/workers//calibrations//copy_to/", methods=["POST"]) +def copy_calibration(source_unit: str, device: str, target_unit: str) -> DelayedResponseReturnValue: + payload = request.get_json(silent=True) or {} + calibration_name = payload.get("calibration_name") + + if calibration_name: + task = tasks.multicast_get(f"/unit_api/calibrations/{device}/{calibration_name}", [source_unit], return_raw=True) + else: + task = tasks.multicast_get(f"/unit_api/calibrations/{device}/active", [source_unit], return_raw=True) + + try: + result = task.get(blocking=True, timeout=10) + except (HueyException, TaskException): + abort_with(500, "Timed out fetching calibration from source unit") + + source_result = result.get(source_unit) + if source_result is None: + abort_with(404, "Source unit did not return a calibration") + + from pioreactor.structs import AllCalibrations + from pioreactor.utils import yaml_decode, yaml_encode + from pioreactor.utils.timing import current_utc_datestamp, current_utc_datetime + + try: + cal = yaml_decode(source_result, type=AllCalibrations) + except Exception as e: + abort_with(500, f"Failed to decode calibration from source unit: {e}") + + kwargs = {f: getattr(cal, f) for f in cal.__struct_fields__} + kwargs["calibration_name"] = f"copy-from-{source_unit}-{current_utc_datestamp()}" + kwargs["calibrated_on_pioreactor_unit"] = "$cluster" + kwargs["created_at"] = current_utc_datetime() + new_cal = type(cal)(**kwargs) # type: ignore + + apply_payload = { + "calibration_data": yaml_encode(new_cal).decode("utf-8"), + "set_as_active": True + } + + if target_unit == UNIVERSAL_IDENTIFIER: + post_task = tasks.multicast_post(f"/unit_api/calibrations/{device}", get_all_workers(), json=apply_payload) + cache.cache.delete_memoized(cache.get_all_calibrations) + else: + post_task = tasks.multicast_post(f"/unit_api/calibrations/{device}", [target_unit], json=apply_payload) + cache.cache.delete_memoized(cache.get_all_calibrations, target_unit) + + return create_task_response(post_task) + + @api_bp.route("/workers//zipped_calibrations", methods=["GET"]) def get_zipped_calibrations(pioreactor_unit: str) -> ResponseReturnValue: if pioreactor_unit == UNIVERSAL_IDENTIFIER: diff --git a/core/pioreactor/web/unit_api.py b/core/pioreactor/web/unit_api.py index 2be024298..f082efcad 100644 --- a/core/pioreactor/web/unit_api.py +++ b/core/pioreactor/web/unit_api.py @@ -1314,12 +1314,15 @@ def create_calibration(device: str) -> ResponseReturnValue: remediation="Check file permissions and server logs.", ) + activated = False if set_as_active: with local_persistent_storage("active_calibrations") as c: - c[device] = calibration_name + if device not in c: + c[device] = calibration_name + activated = True # Respond with success and the created calibration details - response = jsonify({"msg": "Calibration created successfully.", "path": str(path)}) + response = jsonify({"msg": "Calibration created successfully.", "path": str(path), "activated": activated}) response.status_code = 201 return response @@ -1372,6 +1375,39 @@ def delete_calibration(device: str, calibration_name: str) -> ResponseReturnValu ) +@unit_api_bp.route("/calibrations//active", methods=["GET"]) +def get_active_calibration(device: str) -> ResponseReturnValue: + with local_persistent_storage("active_calibrations") as c: + if device not in c: + abort_with(404, description=f"No active calibration for {device}.") + calibration_name = str(c[device]) + + calibration_path = CALIBRATION_PATH / device / f"{calibration_name}.yaml" + if not calibration_path.exists(): + abort_with(404, description=f"Active calibration file for {device} missing.") + + try: + raw_yaml = calibration_path.read_text() + return attach_cache_control(Response(response=raw_yaml, status=200, mimetype="application/yaml")) + except Exception as e: + publish_to_error_log(f"Error reading active calibration: {e}", "get_active_calibration") + abort_with(500, description="Failed to read active calibration.") + + +@unit_api_bp.route("/calibrations//", methods=["GET"]) +def get_calibration(device: str, calibration_name: str) -> ResponseReturnValue: + calibration_path = CALIBRATION_PATH / device / f"{calibration_name}.yaml" + if not calibration_path.exists(): + abort_with(404, description=f"Calibration file for {device} missing.") + + try: + raw_yaml = calibration_path.read_text() + return attach_cache_control(Response(response=raw_yaml, status=200, mimetype="application/yaml")) + except Exception as e: + publish_to_error_log(f"Error reading calibration: {e}", "get_calibration") + abort_with(500, description="Failed to read calibration.") + + @unit_api_bp.route("/calibrations", methods=["GET"]) def get_all_calibrations() -> ResponseReturnValue: calibration_dir = CALIBRATION_PATH diff --git a/core/tests/test_calibration_pooling.py b/core/tests/test_calibration_pooling.py new file mode 100644 index 000000000..623939bb5 --- /dev/null +++ b/core/tests/test_calibration_pooling.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +import pytest +from pioreactor.calibrations.pooling import pool_od_calibrations +from pioreactor.calibrations.pooling import _POOLING_HANDLERS +from pioreactor.structs import OD600Calibration +from pioreactor.utils.timing import current_utc_datetime + + +def make_od_calibration( + name: str, + x: list[float], + y: list[float], + angle: str = "90", + pd_channel: str = "1", + ir_led_intensity: float = 50.0 +) -> OD600Calibration: + return OD600Calibration( + created_at=current_utc_datetime(), + calibrated_on_pioreactor_unit="worker1", + calibration_name=name, + angle=angle, + pd_channel=pd_channel, + ir_led_intensity=ir_led_intensity, + curve_data_={"type": "spline", "knots": [], "coefficients": []}, # type: ignore + recorded_data={"x": x, "y": y} + ) + + +def test_pool_two_od_calibrations_merges_recorded_data(): + cal1 = make_od_calibration("cal1", [1.0, 2.0], [10.0, 20.0]) + cal2 = make_od_calibration("cal2", [3.0, 4.0], [30.0, 40.0]) + + pooled = pool_od_calibrations([cal1, cal2], fit="poly") + assert pooled.calibrated_on_pioreactor_unit == "$cluster" + assert pooled.calibration_name.startswith("pooled-od90") + + # Check that x and y points from both donors are present + assert len(pooled.recorded_data["x"]) == 4 + assert set(pooled.recorded_data["x"]) == {1.0, 2.0, 3.0, 4.0} + assert set(pooled.recorded_data["y"]) == {10.0, 20.0, 30.0, 40.0} + + # Check the curve_data_ has been refit (using poly for simpler test) + from pioreactor.structs import PolyFitCoefficients + assert isinstance(pooled.curve_data_, PolyFitCoefficients) + + +def test_pool_calibrations_rejects_mismatched_angles(): + cal1 = make_od_calibration("cal1", [1.0], [10.0], angle="90") + cal2 = make_od_calibration("cal2", [2.0], [20.0], angle="135") + + with pytest.raises(ValueError, match="Incompatible angles"): + pool_od_calibrations([cal1, cal2]) + + +def test_pool_calibrations_rejects_mismatched_pd_channels(): + cal1 = make_od_calibration("cal1", [1.0], [10.0], pd_channel="1") + cal2 = make_od_calibration("cal2", [2.0], [20.0], pd_channel="2") + + with pytest.raises(ValueError, match="Incompatible pd_channels"): + pool_od_calibrations([cal1, cal2]) + + +def test_pool_calibrations_rejects_incompatible_ir_intensity(): + cal1 = make_od_calibration("cal1", [1.0], [10.0], ir_led_intensity=50.0) + cal2 = make_od_calibration("cal2", [2.0], [20.0], ir_led_intensity=55.0) # > 5% difference + + with pytest.raises(ValueError, match="Incompatible ir_led_intensity"): + pool_od_calibrations([cal1, cal2]) + + +def test_pool_calibrations_accepts_compatible_ir_intensity(): + cal1 = make_od_calibration("cal1", [1.0], [10.0], ir_led_intensity=50.0) + cal2 = make_od_calibration("cal2", [2.0], [20.0], ir_led_intensity=51.0) # <= 5% difference + + pooled = pool_od_calibrations([cal1, cal2], fit="poly") + assert len(pooled.recorded_data["x"]) == 2 + + +def test_pool_single_calibration(): + cal1 = make_od_calibration("cal1", [1.0], [10.0]) + + pooled = pool_od_calibrations([cal1]) + assert "from-1-unit" in pooled.calibration_name + assert pooled.calibrated_on_pioreactor_unit == "$cluster" + assert len(pooled.recorded_data["x"]) == 1 + + +def test_extension_point_registry_is_accessible(): + assert "od" in _POOLING_HANDLERS + assert "od600" in _POOLING_HANDLERS + assert _POOLING_HANDLERS["od"] == pool_od_calibrations diff --git a/scripts/dev_services_status.sh b/scripts/dev_services_status.sh index bec478c9f..b48f940fa 100755 --- a/scripts/dev_services_status.sh +++ b/scripts/dev_services_status.sh @@ -2,6 +2,7 @@ set -euo pipefail exec 2>/dev/null +echo "Checking dev services status..." # Helper to check whether common dev services are already running. running_services=() @@ -14,6 +15,7 @@ add_running() { } add_missing() { + local name=$1 local hint=$2 missing_services+=("${name} (${hint})") @@ -24,7 +26,7 @@ check_port() { local port=$2 local details # Skip header (NR>1) and capture the first listener to keep output compact. - details=$(lsof -nPiTCP:${port} -sTCP:LISTEN 2>/dev/null | awk 'NR>1 {printf "%s (pid %s)", $1, $2; exit}') + details=$(lsof -nPiTCP:${port} -sTCP:LISTEN 2>/dev/null | awk 'NR>1 {printf "%s (pid %s)", $1, $2; exit}' || true) if [[ -n "${details}" ]]; then add_running "${name}" "port ${port} – ${details}" else @@ -48,6 +50,7 @@ check_huey() { fi } + check_port "Flask web API" 4999 check_port "Frontend dev server" 3000 check_huey