diff --git a/.coveragerc b/.coveragerc index 2bdb30cbd..5ad85fd4c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -4,4 +4,4 @@ branch = True [report] include = ardupilot_methodic_configurator/* -omit = ardupilot_methodic_configurator/backend_mavftp.py, ardupilot_methodic_configurator/mavftp_example.py, ardupilot_methodic_configurator/tempcal_imu.py, ardupilot_methodic_configurator/frontend_tkinter_motor_test.py, ardupilot_methodic_configurator/data_model_fc_ids.py, ardupilot_methodic_configurator/configuration_steps_strings.py, ardupilot_methodic_configurator/vehicle_components.py +omit = ardupilot_methodic_configurator/backend_mavftp.py, ardupilot_methodic_configurator/mavftp_example.py, ardupilot_methodic_configurator/tempcal_imu.py, ardupilot_methodic_configurator/data_model_fc_ids.py, ardupilot_methodic_configurator/configuration_steps_strings.py, ardupilot_methodic_configurator/vehicle_components.py diff --git a/ardupilot_methodic_configurator/data_model_configuration_step.py b/ardupilot_methodic_configurator/data_model_configuration_step.py index 8ecc88b93..6376d22cb 100644 --- a/ardupilot_methodic_configurator/data_model_configuration_step.py +++ b/ardupilot_methodic_configurator/data_model_configuration_step.py @@ -46,6 +46,9 @@ def __init__(self, local_filesystem: LocalFilesystem) -> None: # A dictionary that maps variable names to their values # These variables are used by the forced_parameters and derived_parameters in configuration_steps_*.json files self.variables = self.local_filesystem.get_eval_variables() + # Ensure transient helper variables are not persisted across steps + self.variables.pop("fc_parameters", None) + self.variables.pop("new_connection_prefix", None) def process_configuration_step( # pylint: disable=too-many-locals self, @@ -84,7 +87,7 @@ def process_configuration_step( # pylint: disable=too-many-locals # Process configuration step operations if configuration steps exist if self.local_filesystem.configuration_steps and selected_file in self.local_filesystem.configuration_steps: - variables = self.variables + variables = self.variables.copy() variables["fc_parameters"] = fc_parameters # Compute derived parameters (does NOT mutate filesystem.file_parameters) @@ -102,11 +105,13 @@ def process_configuration_step( # pylint: disable=too-many-locals if not fc_param_names or param_name in fc_param_names: derived_params_to_apply[param_name] = param - # Populate new_connection_prefix from rename_connection configuration step + # Populate new_connection_prefix from rename_connection configuration step (per-step scope) if "rename_connection" in self.local_filesystem.configuration_steps.get(selected_file, {}): variables["new_connection_prefix"] = self.local_filesystem.configuration_steps[selected_file][ "rename_connection" ] + else: + variables.pop("new_connection_prefix", None) # Calculate connection rename operations (does NOT mutate filesystem.file_parameters) rename_ui_infos, duplicates_to_remove, renames_to_apply = self._handle_connection_renaming( diff --git a/ardupilot_methodic_configurator/data_model_motor_test.py b/ardupilot_methodic_configurator/data_model_motor_test.py index f7df1facb..d82326043 100644 --- a/ardupilot_methodic_configurator/data_model_motor_test.py +++ b/ardupilot_methodic_configurator/data_model_motor_test.py @@ -8,6 +8,7 @@ SPDX-License-Identifier: GPL-3.0-or-later """ +from enum import Enum from logging import debug as logging_debug from logging import error as logging_error from logging import info as logging_info @@ -59,6 +60,16 @@ class ValidationError(MotorTestError): """Raised when validation of input parameters fails.""" +class MotorStatusEvent(str, Enum): + """Well-known status events published by model motor operations.""" + + COMMAND_SENT = "command_sent" + STOP_SENT = "stop_sent" + + +MotorStatusCallback = Callable[[int, MotorStatusEvent], None] + + class MotorTestDataModel: # pylint: disable=too-many-public-methods, too-many-instance-attributes """ Data model for motor test functionality. @@ -102,6 +113,7 @@ def __init__( self._test_throttle_pct = 0.0 self._test_duration_s = 0.0 + self._first_test_acknowledged = False self._got_battery_status = False @@ -496,6 +508,33 @@ def get_parameter(self, param_name: str) -> Optional[float]: return self.flight_controller.fc_parameters.get(param_name) + def set_motor_spin_arm_value( + self, + value: float, + reset_progress_callback: Union[None, Callable[[int, int], None]] = None, + connection_progress_callback: Union[None, Callable[[int, int], None]] = None, + ) -> None: + """Set MOT_SPIN_ARM ensuring a 0.02 margin relative to MOT_SPIN_MIN.""" + spin_min = self.get_parameter("MOT_SPIN_MIN") + if spin_min is not None and value > spin_min - 0.02: + raise ValidationError( + _("MOT_SPIN_ARM must stay at least 0.02 below MOT_SPIN_MIN (current %(min).2f).") % {"min": spin_min} + ) + + self.set_parameter("MOT_SPIN_ARM", value, reset_progress_callback, connection_progress_callback) + + def set_motor_spin_min_value(self, value: float) -> None: + """Set MOT_SPIN_MIN ensuring it keeps 0.02 margin above MOT_SPIN_ARM.""" + spin_arm = self.get_parameter("MOT_SPIN_ARM") + if spin_arm is None: + raise ParameterError(_("MOT_SPIN_ARM must be available before updating MOT_SPIN_MIN.")) + if value < spin_arm + 0.02: + raise ValidationError( + _("MOT_SPIN_MIN must be at least 0.02 higher than MOT_SPIN_ARM (current %(arm).2f).") % {"arm": spin_arm} + ) + + self.set_parameter("MOT_SPIN_MIN", value) + def test_motor(self, test_sequence_nr: int, motor_output_nr: int, throttle_percent: int, timeout_seconds: int) -> None: """ Test a specific motor. @@ -576,6 +615,28 @@ def test_motor(self, test_sequence_nr: int, motor_output_nr: int, throttle_perce if not success: raise MotorTestExecutionError(message) + def _emit_status_event( + self, + callback: Optional[MotorStatusCallback], + motor_number: int, + event: MotorStatusEvent, + ) -> None: + """Notify listeners about a status change.""" + if callback is not None: + callback(motor_number, event) + + def run_single_motor_test( + self, + test_sequence_nr: int, + motor_output_nr: int, + status_callback: Optional[MotorStatusCallback] = None, + ) -> None: + """Execute a single motor test using stored throttle/duration settings.""" + throttle_pct = self.get_test_throttle_pct() + duration = int(self.get_test_duration_s()) + self.test_motor(test_sequence_nr, motor_output_nr, throttle_pct, duration) + self._emit_status_event(status_callback, motor_output_nr, MotorStatusEvent.COMMAND_SENT) + def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> None: """ Test all motors simultaneously. @@ -622,6 +683,22 @@ def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) - if not success: raise MotorTestExecutionError(message) + def run_all_motors_test(self, status_callback: Optional[MotorStatusCallback] = None) -> None: + """Execute an all-motors test using stored settings and report events.""" + throttle_pct = self.get_test_throttle_pct() + duration = int(self.get_test_duration_s()) + self.test_all_motors(throttle_pct, duration) + for motor_number in range(1, self.motor_count + 1): + self._emit_status_event(status_callback, motor_number, MotorStatusEvent.COMMAND_SENT) + + def run_sequential_motor_test(self, status_callback: Optional[MotorStatusCallback] = None) -> None: + """Execute a sequential test using stored settings and report events.""" + throttle_pct = self.get_test_throttle_pct() + duration = int(self.get_test_duration_s()) + self.test_motors_in_sequence(throttle_pct, duration) + for motor_number in range(1, self.motor_count + 1): + self._emit_status_event(status_callback, motor_number, MotorStatusEvent.COMMAND_SENT) + def stop_all_motors(self) -> None: """ Emergency stop for all motors. @@ -634,6 +711,12 @@ def stop_all_motors(self) -> None: if not success: raise MotorTestExecutionError(message) + def emergency_stop_motors(self, status_callback: Optional[MotorStatusCallback] = None) -> None: + """Stop motors and emit status events for listeners.""" + self.stop_all_motors() + for motor_number in range(1, self.motor_count + 1): + self._emit_status_event(status_callback, motor_number, MotorStatusEvent.STOP_SENT) + def get_motor_diagram_path(self) -> tuple[str, str]: """ Get the filepath for the motor diagram SVG file for the current frame. @@ -1096,6 +1179,31 @@ def update_frame_type_from_selection( logging_error(error_msg) raise FrameConfigurationError(error_msg) from e + def update_frame_type_by_key( + self, + selected_key: str, + reset_progress_callback: Union[None, Callable[[int, int], None]] = None, + connection_progress_callback: Union[None, Callable[[int, int], None]] = None, + extra_sleep_time: Optional[int] = None, + ) -> bool: + """Update frame configuration using the combobox key directly.""" + try: + frame_type_code = int(selected_key) + except (TypeError, ValueError) as exc: + raise ValidationError(_("Invalid frame type selection")) from exc + + current_types = self.get_current_frame_class_types() + frame_type_name = current_types.get(frame_type_code) + if frame_type_name is None: + raise ValidationError(_("Frame type %(key)s is not available for current frame class") % {"key": selected_key}) + + return self.update_frame_type_from_selection( + frame_type_name, + reset_progress_callback, + connection_progress_callback, + extra_sleep_time, + ) + def get_battery_status_color(self) -> str: """ Get the appropriate color for battery voltage display. @@ -1133,15 +1241,12 @@ def get_battery_display_text(self) -> tuple[str, str]: return _("Voltage: N/A"), _("Current: N/A") def should_show_first_test_warning(self) -> bool: - """ - Check if first-time safety warning should be shown. + """Return True when first-time warning still needs acknowledgement.""" + return not self._first_test_acknowledged - Returns: - bool: True if warning should be shown - - """ - # Could be expanded to check user preferences/settings - return True + def acknowledge_first_test_warning(self) -> None: + """Record that the user has accepted the first-time warning.""" + self._first_test_acknowledged = True def get_safety_warning_message(self) -> str: """ diff --git a/ardupilot_methodic_configurator/frontend_tkinter_motor_test.py b/ardupilot_methodic_configurator/frontend_tkinter_motor_test.py index 058f6cd18..d2baf188d 100644 --- a/ardupilot_methodic_configurator/frontend_tkinter_motor_test.py +++ b/ardupilot_methodic_configurator/frontend_tkinter_motor_test.py @@ -46,6 +46,7 @@ THROTTLE_PCT_MAX, THROTTLE_PCT_MIN, FrameConfigurationError, + MotorStatusEvent, MotorTestDataModel, MotorTestExecutionError, MotorTestSafetyError, @@ -116,11 +117,11 @@ def __init__( self.batt_current_label: ttk.Label # Store image reference (PNG or other format) self._current_diagram_image: Optional[tk.PhotoImage] = None - self._first_motor_test = True # Track if this is the first motor test self._frame_options_loaded = False # Track if frame options have been loaded self._diagrams_path = "" # Cache diagram path for performance self._diagram_needs_update = True # Track if diagram needs to be updated self._content_frame: Optional[ttk.Frame] = None # Store reference to content frame for widget searches + self._motor_grid_frame: Optional[ttk.Frame] = None # Direct handle for motor grid frame self._create_widgets() @@ -211,6 +212,7 @@ def _create_widgets(self) -> None: # pylint: disable=too-many-statements # noqa motor_grid = ttk.Frame(testing_frame) motor_grid.pack(pady=10) + self._motor_grid_frame = motor_grid self._create_motor_buttons(motor_grid) # --- Test Controls --- @@ -315,55 +317,25 @@ def _update_spinbox_values(self) -> None: def _update_motor_buttons_layout(self) -> None: """Re-create motor buttons if motor count changes.""" + motor_grid = self._motor_grid_frame + if motor_grid is None: + logging_error(_("Could not find motor grid frame")) + return + current_count = len(self.motor_buttons) required_count = self.model.motor_count - if current_count != required_count: - # Clear existing widgets - for button in self.motor_buttons: - button.master.destroy() - for combo in self.detected_comboboxes: - combo.master.destroy() - self.motor_buttons.clear() - self.motor_status_labels.clear() - self.detected_comboboxes.clear() - - # Find the motor grid frame by searching for it - # This is more robust than hardcoded casting - def find_motor_grid(parent: Union[Frame, ttk.Frame]) -> Optional[Union[Frame, ttk.Frame]]: - """Find a suitable motor grid frame.""" - for child in parent.winfo_children(): - if isinstance(child, (Frame, ttk.Frame)): - # Check if this looks like our motor grid - grid_children = child.winfo_children() - if len(grid_children) == 0: # Empty frame ready for motor buttons - return child - # Recursively search - result = find_motor_grid(child) - if result: - return result - return None - - # Find the testing frame and create new motor grid - testing_frame = None - if self._content_frame: - for child in self._content_frame.winfo_children(): - try: - if hasattr(child, "cget") and "Motor Order/Direction Configuration" in str(child.cget("text")): - testing_frame = child - break - except tk.TclError: - # Widget doesn't have a "text" option, skip it - continue - - if testing_frame and isinstance(testing_frame, (Frame, ttk.Frame)): - motor_grid = find_motor_grid(testing_frame) - if motor_grid: - self._create_motor_buttons(motor_grid) - else: - logging_error(_("Could not find motor grid frame")) - else: - logging_error(_("Could not find testing frame")) + if current_count == required_count: + return + + for child in motor_grid.winfo_children(): + child.destroy() + + self.motor_buttons.clear() + self.motor_status_labels.clear() + self.detected_comboboxes.clear() + + self._create_motor_buttons(motor_grid) def _load_png_diagram(self, diagram_path: str) -> None: """Load and display a PNG motor diagram using BaseWindow.put_image_in_label().""" @@ -452,14 +424,6 @@ def _on_frame_type_change(self, _event: object) -> None: logging_warning(_("No frame type selected")) return - # Convert the selected key to the format expected by the data model - # The data model expects the display text (type name), not the code - current_frame_types = self.model.get_current_frame_class_types() - selected_type_code = int(selected_key) - selected_text = current_frame_types.get(selected_type_code, f"Type {selected_type_code}") - - logging_info(_("Frame type changed: %(code)s (%(name)s)"), {"code": selected_key, "name": selected_text}) - try: # Create delayed progress windows that only show if operation takes more than 1 second reset_progress_window = ProgressWindow( @@ -479,8 +443,8 @@ def _on_frame_type_change(self, _event: object) -> None: reset_callback = DelayedProgressCallback(reset_progress_window.update_progress_bar, 1.0) connection_callback = DelayedProgressCallback(connection_progress_window.update_progress_bar, 1.0) - self.model.update_frame_type_from_selection( - selected_text, + self.model.update_frame_type_by_key( + selected_key, reset_callback, connection_callback, extra_sleep_time=2, @@ -560,7 +524,7 @@ def _set_motor_spin_arm(self) -> None: reset_progress_window = ProgressWindow( self.root_window, _("Resetting Flight Controller"), _("Waiting for {} of {} seconds") ) - self.model.set_parameter("MOT_SPIN_ARM", new_val, reset_progress_window.update_progress_bar) + self.model.set_motor_spin_arm_value(new_val, reset_progress_window.update_progress_bar) reset_progress_window.destroy() # for the case that we are doing a test and there is no real FC connected except (ParameterError, ValidationError) as e: showerror(_("Error"), str(e)) @@ -577,7 +541,7 @@ def _set_motor_spin_min(self) -> None: ) if new_val is not None: try: - self.model.set_parameter("MOT_SPIN_MIN", new_val) + self.model.set_motor_spin_min_value(new_val) except (ParameterError, ValidationError) as e: showerror(_("Error"), str(e)) @@ -588,27 +552,11 @@ def _test_motor(self, test_sequence_nr: int, motor_output_nr: int) -> None: {"seq": self.model.motor_labels[test_sequence_nr], "num": motor_output_nr}, ) - # First-time safety confirmation - if self._first_motor_test and self.model.should_show_first_test_warning(): - if not askyesno(_("Safety Confirmation"), self.model.get_safety_warning_message()): - return - self._first_motor_test = False + if not self._ensure_first_test_confirmation(): + return try: - # Check if motor test is safe (includes voltage checks) - self.model.is_motor_test_safe() - - # Validate test parameters - throttle_pct = self.model.get_test_throttle_pct() - duration = int(self.model.get_test_duration_s()) - - # Execute motor test - self.model.test_motor(test_sequence_nr, motor_output_nr, throttle_pct, duration) - - self._update_motor_status(motor_output_nr, _("Command sent"), "green") - - # Reset status after a short delay - self.root_window.after(2000, partial(self._update_motor_status, motor_output_nr, _("Ready"), "blue")) + self.model.run_single_motor_test(test_sequence_nr, motor_output_nr, self._handle_status_event) except MotorTestSafetyError as e: # Check if it's a voltage issue and provide specific guidance @@ -631,31 +579,11 @@ def _test_all_motors(self) -> None: """Execute a test for all motors simultaneously.""" logging_debug(_("Testing all motors")) - # First-time safety confirmation - if self._first_motor_test and self.model.should_show_first_test_warning(): - if not askyesno(_("Safety Confirmation"), self.model.get_safety_warning_message()): - return - self._first_motor_test = False + if not self._ensure_first_test_confirmation(): + return try: - # Check if motor test is safe - self.model.is_motor_test_safe() - - # Validate test parameters - throttle_pct = self.model.get_test_throttle_pct() - duration = int(self.model.get_test_duration_s()) - - # Execute all motors test - self.model.test_all_motors(throttle_pct, duration) - - for motor_number in range(1, self.model.motor_count + 1): - self._update_motor_status(motor_number, _("Command sent"), "green") - - # Reset status after a short delay - self.root_window.after( - 2000, - partial(self._update_motor_status, motor_number, _("Ready"), "blue"), - ) + self.model.run_all_motors_test(self._handle_status_event) except MotorTestSafetyError as e: showwarning(_("Safety Check Failed"), str(e)) @@ -670,31 +598,11 @@ def _test_motors_in_sequence(self) -> None: """Execute a test for all motors in sequence.""" logging_debug(_("Testing motors in sequence")) - # First-time safety confirmation - if self._first_motor_test and self.model.should_show_first_test_warning(): - if not askyesno(_("Safety Confirmation"), self.model.get_safety_warning_message()): - return - self._first_motor_test = False + if not self._ensure_first_test_confirmation(): + return try: - # Check if motor test is safe - self.model.is_motor_test_safe() - - # Validate test parameters - throttle_pct = self.model.get_test_throttle_pct() - duration = int(self.model.get_test_duration_s()) - - # Execute sequential test - self.model.test_motors_in_sequence(throttle_pct, duration) - - for motor_number in range(1, self.model.motor_count + 1): - self._update_motor_status(motor_number, _("Command sent"), "green") - - # Reset status after a short delay - self.root_window.after( - 2000, - partial(self._update_motor_status, motor_number, _("Ready"), "blue"), - ) + self.model.run_sequential_motor_test(self._handle_status_event) except MotorTestSafetyError as e: showwarning(_("Safety Check Failed"), str(e)) @@ -710,10 +618,7 @@ def _stop_all_motors(self) -> None: logging_info(_("Stopping all motors")) try: - self.model.stop_all_motors() - for motor_number in range(1, self.model.motor_count + 1): - self._update_motor_status(motor_number, _("Stop sent"), "red") - self.root_window.after(2000, self._reset_all_motor_status) + self.model.emergency_stop_motors(self._handle_status_event) except MotorTestExecutionError as e: showerror(_("Error"), str(e)) except Exception as e: # pylint: disable=broad-exception-caught @@ -723,6 +628,29 @@ def _emergency_stop(self) -> None: """Emergency stop - alias for _stop_all_motors for test compatibility.""" self._stop_all_motors() + def _ensure_first_test_confirmation(self) -> bool: + """Guard that the user acknowledged the first-time warning.""" + if self.model.should_show_first_test_warning(): + if not askyesno(_("Safety Confirmation"), self.model.get_safety_warning_message()): + return False + self.model.acknowledge_first_test_warning() + return True + + def _handle_status_event(self, motor_number: int, event: MotorStatusEvent) -> None: + """Translate model status events into user-facing label updates.""" + if event is MotorStatusEvent.COMMAND_SENT: + self._update_motor_status(motor_number, _("Command sent"), "green") + elif event is MotorStatusEvent.STOP_SENT: + self._update_motor_status(motor_number, _("Stop sent"), "red") + self._schedule_ready_reset(motor_number) + + def _schedule_ready_reset(self, motor_number: int, delay_ms: int = 2000) -> None: + """Return a motor label to the Ready state after a delay.""" + self.root_window.after( + delay_ms, + partial(self._update_motor_status, motor_number, _("Ready"), "blue"), + ) + def _update_motor_status(self, motor_number: int, status: str, color: str = "black") -> None: """ Update visual status for a specific motor. diff --git a/tests/test_data_model_configuration_step.py b/tests/test_data_model_configuration_step.py index 80a337213..00d60e125 100755 --- a/tests/test_data_model_configuration_step.py +++ b/tests/test_data_model_configuration_step.py @@ -267,6 +267,48 @@ def test_user_can_process_configuration_step_without_connection_renaming(self, p assert ui_errors == [] assert ui_infos == [] # No connection renaming means no info messages + def test_connection_renaming_state_does_not_leak_between_steps(self, processor, fc_parameters) -> None: # pylint: disable=too-many-locals + """ + Connection renaming helper variables must not leak into later steps. + + GIVEN: A configuration step that performs connection renaming + WHEN: A later step without renaming is processed + THEN: No renaming operations should be triggered for the later step + """ + rename_step = "test_file.param" + later_step = "13_general_configuration.param" + processor.local_filesystem.file_parameters[later_step] = { + "SERIAL2_PROTOCOL": Par(value=10.0, comment="Telemetry protocol"), + "SERIAL2_BAUD": Par(value=57.0, comment="Telemetry baud rate"), + } + processor.local_filesystem.configuration_steps = { + rename_step: {"rename_connection": "selected_serial"}, + later_step: {}, + } + + # First step performs renaming to ensure helper variable is set + ( + _params_first, + _errors_first, + _infos_first, + _duplicates_first, + renames_first, + _derived_first, + ) = processor.process_configuration_step(rename_step, fc_parameters) + assert renames_first # Sanity check that renaming took place + + # Later step must not inherit previous rename configuration + ( + _params_second, + _errors_second, + ui_infos_second, + _duplicates_second, + renames_second, + _derived_second, + ) = processor.process_configuration_step(later_step, fc_parameters) + assert renames_second == [] + assert ui_infos_second == [] + class TestConfigurationStepProcessorDomainModel: """Test domain model creation and parameter filtering.""" diff --git a/tests/test_data_model_motor_test.py b/tests/test_data_model_motor_test.py index 8fa625a07..c37361065 100755 --- a/tests/test_data_model_motor_test.py +++ b/tests/test_data_model_motor_test.py @@ -10,18 +10,21 @@ SPDX-License-Identifier: GPL-3.0-or-later """ -from typing import Optional +from typing import Any, Optional from unittest.mock import MagicMock, patch import pytest +from ardupilot_methodic_configurator import _ from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem from ardupilot_methodic_configurator.backend_filesystem_program_settings import ProgramSettings from ardupilot_methodic_configurator.backend_flightcontroller import FlightController from ardupilot_methodic_configurator.data_model_motor_test import ( FlightControllerConnectionError, FrameConfigurationError, + MotorStatusEvent, MotorTestDataModel, + MotorTestExecutionError, MotorTestSafetyError, ParameterError, ValidationError, @@ -69,7 +72,10 @@ def set_param_side_effect(param_name: str, value: float) -> tuple[bool, str]: # Configure fetch_param to return values from fc_parameters def fetch_param_side_effect(param_name: str) -> Optional[float]: - return fc.fc_parameters.get(param_name) + value = fc.fc_parameters.get(param_name) + if isinstance(value, (int, float)): + return float(value) + return None fc.fetch_param.side_effect = fetch_param_side_effect @@ -109,7 +115,10 @@ def mock_filesystem() -> MagicMock: "4": "OCTAQUAD", "5": "Y6", "7": "TRI", - } + }, + "min": 1, + "max": 12, + "RebootRequired": True, }, "FRAME_TYPE": { "values": { @@ -119,13 +128,65 @@ def mock_filesystem() -> MagicMock: "3": "HEXA: X", "10": "OCTA: PLUS", "11": "OCTA: X", - } + }, + "min": 0, + "max": 14, + "RebootRequired": False, + }, + "MOT_SPIN_ARM": { + "min": 0.05, + "max": 0.40, + "RebootRequired": True, + }, + "MOT_SPIN_MIN": { + "min": 0.07, + "max": 0.50, + "RebootRequired": False, }, } return filesystem +@pytest.fixture(autouse=True) +def program_settings_store(monkeypatch) -> dict[str, Any]: + """Provide in-memory ProgramSettings storage for deterministic tests.""" + store: dict[str, Any] = {"motor_test": {"duration": 3.0, "throttle_pct": 12}} + + def _get_setting(setting: str) -> Optional[float]: + parts = setting.split("/") + current: Any = store + for part in parts: + if isinstance(current, dict) and part in current: + current = current[part] + else: + return None + if isinstance(current, (int, float)): + return float(current) + return None + + def _set_setting(setting: str, value: float) -> None: + parts = setting.split("/") + current: Any = store + for part in parts[:-1]: + next_part = current.setdefault(part, {}) + if not isinstance(next_part, dict): + next_part = {} + current[part] = next_part + current = next_part + current[parts[-1]] = float(value) + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + _get_setting, + ) + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.set_setting", + _set_setting, + ) + return store + + @pytest.fixture def mock_settings() -> MagicMock: """Fixture providing mock program settings.""" @@ -421,6 +482,26 @@ def test_user_can_update_frame_configuration_and_motor_count(self, motor_test_mo assert motor_test_model.frame_type == 1 assert motor_test_model.motor_count == 6 # HEXA X has 6 motors + def test_frame_configuration_update_handles_missing_layouts(self, motor_test_model) -> None: + """Missing layout data leaves motor counts at zero without crashing.""" + motor_test_model._motor_data_loader.data = {"layouts": []} # pylint: disable=protected-access + motor_test_model._frame_class = 3 # pylint: disable=protected-access + motor_test_model._frame_type = 3 # pylint: disable=protected-access + + with patch.object(motor_test_model, "set_parameter", return_value=(True, "")): + motor_test_model.update_frame_configuration(1, 1) + + assert motor_test_model.motor_count == 0 + + def test_frame_configuration_update_skips_recalc_when_data_missing(self, motor_test_model) -> None: + """When the loader lacks data entirely the recalc block is skipped.""" + motor_test_model._motor_data_loader.data = None # pylint: disable=protected-access + + with patch.object(motor_test_model, "set_parameter", return_value=(True, "")): + motor_test_model.update_frame_configuration(1, 1) + + assert motor_test_model.motor_count == 0 + def test_frame_configuration_update_fails_with_disconnected_controller( self, disconnected_flight_controller, mock_filesystem ) -> None: @@ -741,6 +822,50 @@ def test_parameter_verification_detects_setting_failure(self, motor_test_model) with pytest.raises(ParameterError, match="verification failed"): motor_test_model.set_parameter("MOT_SPIN_ARM", 0.12) + def test_parameter_setting_triggers_reset_when_required(self, motor_test_model) -> None: + """Parameters flagged as reboot-required trigger a reconnect once applied.""" + reset_callback = MagicMock(name="reset_cb") + reconnect_callback = MagicMock(name="reconnect_cb") + motor_test_model.flight_controller.reset_and_reconnect = MagicMock() + + motor_test_model.set_parameter( + "MOT_SPIN_ARM", + 0.2, + reset_progress_callback=reset_callback, + connection_progress_callback=reconnect_callback, + extra_sleep_time=7, + ) + + motor_test_model.flight_controller.reset_and_reconnect.assert_called_once_with( + reset_callback, + reconnect_callback, + 7, + ) + + def test_parameter_setting_raises_error_when_verification_fails(self, motor_test_model) -> None: + """A mismatched readback raises ParameterError with helpful context.""" + motor_test_model.flight_controller.fetch_param = MagicMock(return_value=0.01) + + with pytest.raises(ParameterError, match="verification failed"): + motor_test_model.set_parameter("MOT_SPIN_MIN", 0.3) + + def test_parameter_setting_skips_doc_metadata_when_missing(self, motor_test_model) -> None: + """Parameters without documentation skip metadata validation gracefully.""" + motor_test_model.filesystem.doc_dict.pop("MOT_SPIN_ARM", None) + motor_test_model.flight_controller.fc_parameters["CUSTOM_PARAM"] = 0.05 + + motor_test_model.set_parameter("CUSTOM_PARAM", 0.07) + + motor_test_model.flight_controller.set_param.assert_any_call("CUSTOM_PARAM", 0.07) + + def test_parameter_setting_raises_error_when_controller_rejects_change(self, motor_test_model) -> None: + """Controller rejection surfaces a ParameterError with the FC message.""" + motor_test_model.flight_controller.set_param.side_effect = None + motor_test_model.flight_controller.set_param.return_value = (False, "DENIED") + + with pytest.raises(ParameterError, match="Failed to set parameter"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.2) + def test_user_can_get_parameter_value(self, motor_test_model) -> None: """ User can retrieve current parameter values from flight controller. @@ -1824,6 +1949,101 @@ def test_frame_options_handles_invalid_frame_type_values(self, motor_test_model) assert 1 in frame_options["QUAD"] # Valid entry "X" # Invalid entries should be excluded without breaking the parsing + def test_frame_options_skip_layouts_missing_required_fields(self, motor_test_model, caplog) -> None: + """ + Layouts without names are skipped with a warning. + + GIVEN: A layout entry lacking a ClassName/TypeName pair + WHEN: The model reloads frame options + THEN: The entry is ignored and a warning is logged + """ + broken_layout = {"Class": 9, "Type": 9, "motors": []} + motor_test_model._motor_data_loader.data["layouts"].append(broken_layout) # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + + with caplog.at_level("WARNING"): + frame_options = motor_test_model.get_frame_options() + + assert "Skipping motor layout" in caplog.text + assert "QUAD" in frame_options # Existing healthy entries remain + assert 9 not in frame_options.get("", {}) + + def test_frame_options_warn_when_no_sources_available(self, motor_test_model, caplog) -> None: + """ + The user sees a warning if neither motor data nor metadata provide options. + + GIVEN: Missing JSON data and empty metadata + WHEN: The UI requests frame options + THEN: The model returns an empty dict and logs a warning + """ + motor_test_model._motor_data_loader.data = None # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + motor_test_model.filesystem.doc_dict = {} + + with caplog.at_level("WARNING"): + options = motor_test_model.get_frame_options() + + assert options == {} + assert "No frame options" in caplog.text + + def test_frame_options_use_metadata_when_json_entries_invalid(self, motor_test_model) -> None: + """ + Invalid JSON layouts fall back to the parameter metadata catalog. + + GIVEN: Layout entries missing required fields + WHEN: The model rebuilds the frame options + THEN: It loads values from doc_dict instead + """ + motor_test_model._motor_data_loader.data = {"layouts": [{"Class": 7}]} # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + + options = motor_test_model.get_frame_options() + + assert "QUAD" in options + + def test_frame_options_fallback_uses_doc_dict_values(self, motor_test_model) -> None: + """When JSON layouts are empty the model rebuilds options from doc_dict.""" + motor_test_model._motor_data_loader.data = {"layouts": []} # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + motor_test_model.filesystem.doc_dict = { + "FRAME_TYPE": { + "values": { + "42": "TEST: CUSTOM", + } + } + } + + options = motor_test_model.get_frame_options() + + assert options == {"TEST": {42: "CUSTOM"}} + + def test_frame_options_fallback_skips_invalid_doc_entries(self, motor_test_model) -> None: + """Fallback processing skips entries with missing codes, delimiters, or bad numbers.""" + motor_test_model._motor_data_loader.data = {"layouts": []} # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + motor_test_model.filesystem.doc_dict = { + "FRAME_TYPE": { + "values": { + None: "IGNORED", + "abc": "BROKEN", + "7": "MISSINGDELIM", + "8": "VALID: FRAME", + } + } + } + + options = motor_test_model.get_frame_options() + + assert options == {"VALID": {8: "FRAME"}} + + def test_frame_options_fallback_handles_missing_frame_type_section(self, motor_test_model) -> None: + """If FRAME_TYPE metadata is absent the fallback returns an empty dict without crashing.""" + motor_test_model._motor_data_loader.data = {"layouts": []} # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + motor_test_model.filesystem.doc_dict = {"FRAME_CLASS": {"values": {"1": "Quad"}}} + + assert motor_test_model.get_frame_options() == {} + class TestMotorTestDataModelConnectionRefresh: """Test connection status refresh with error handling.""" @@ -1882,6 +2102,227 @@ def test_set_test_duration_succeeds_with_valid_value(self, motor_test_model) -> # Assert: Setting saved successfully (no exception raised) mock_set.assert_called_once_with("motor_test/duration", duration) + +class TestMotorTestDataModelSettingsGuards: + """Test guard rails applied to stored settings.""" + + def test_user_cannot_save_duration_below_minimum(self, motor_test_model) -> None: + """ + Duration values below 1 second raise ValueError and log an error. + + GIVEN: A user enters 0 seconds + WHEN: They save the setting + THEN: ValidationError is raised and the error is logged + """ + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ValueError, match="at least"), + ): + motor_test_model.set_test_duration_s(0) + + assert mock_log.call_count == 1 + + def test_user_cannot_save_duration_above_maximum(self, motor_test_model) -> None: + """Duration values above 60 seconds are rejected with a helpful message.""" + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ValueError, match="must not exceed"), + ): + motor_test_model.set_test_duration_s(120) + + assert mock_log.call_count == 1 + + def test_user_cannot_save_throttle_below_minimum(self, motor_test_model) -> None: + """Throttle percentages below 1% are invalid.""" + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ValueError, match="at least"), + ): + motor_test_model.set_test_throttle_pct(0) + + assert mock_log.call_count == 1 + + def test_user_cannot_save_throttle_above_maximum(self, motor_test_model) -> None: + """Throttle percentages above 100% trigger an error message.""" + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ValueError, match="must not exceed"), + ): + motor_test_model.set_test_throttle_pct(150) + + assert mock_log.call_count == 1 + + +class TestMotorTestDataModelSettingsEdgeCases: + """Test initialization edge cases for persisted settings.""" + + @staticmethod + def _build_model( + mock_flight_controller: FlightController, + mock_filesystem: LocalFilesystem, + mock_motor_data_json: dict[str, object], + ) -> MotorTestDataModel: + with patch("ardupilot_methodic_configurator.data_model_motor_test.FilesystemJSONWithSchema") as loader_cls: + loader = MagicMock() + loader.load_json_data.return_value = mock_motor_data_json + loader.data = mock_motor_data_json + loader_cls.return_value = loader + return MotorTestDataModel(mock_flight_controller, mock_filesystem) + + def test_user_is_warned_when_duration_setting_missing( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Missing duration settings raise a ReferenceError during initialization.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/duration": + return None + if setting == "motor_test/throttle_pct": + return 12 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ReferenceError, match="duration setting not found"), + ): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + + assert mock_log.call_count == 1 + + def test_user_is_warned_when_duration_setting_out_of_range( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Out-of-range duration values raise ValueError on load.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/duration": + return 0 + if setting == "motor_test/throttle_pct": + return 12 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with pytest.raises(ValueError, match="at least"): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + + def test_user_is_warned_when_throttle_setting_missing( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Missing throttle percentages raise ReferenceError.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/throttle_pct": + return None + if setting == "motor_test/duration": + return 5 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with ( + patch("ardupilot_methodic_configurator.data_model_motor_test.logging_error") as mock_log, + pytest.raises(ReferenceError, match="throttle percentage setting not found"), + ): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + + assert mock_log.call_count == 1 + + def test_user_is_warned_when_throttle_setting_out_of_range( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Throttle percentages outside 1-100 are rejected on load.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/throttle_pct": + return 500 + if setting == "motor_test/duration": + return 5 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with pytest.raises(ValueError, match="must not exceed"): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + + def test_user_is_warned_when_duration_setting_above_maximum( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Duration settings above 60 seconds raise ValueError on load.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/duration": + return 999 + if setting == "motor_test/throttle_pct": + return 12 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with pytest.raises(ValueError, match="must not exceed"): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + + def test_user_is_warned_when_throttle_setting_below_minimum( + self, + mock_flight_controller, + mock_filesystem, + mock_motor_data_json, + monkeypatch, + ) -> None: + """Throttle settings below 1% are rejected.""" + + def fake_get(setting: str) -> Optional[float]: + if setting == "motor_test/throttle_pct": + return 0 + if setting == "motor_test/duration": + return 5 + return 0 + + monkeypatch.setattr( + "ardupilot_methodic_configurator.data_model_motor_test.ProgramSettings.get_setting", + fake_get, + ) + + with pytest.raises(ValueError, match="at least"): + self._build_model(mock_flight_controller, mock_filesystem, mock_motor_data_json) + def test_set_test_throttle_pct_succeeds_with_valid_value(self, motor_test_model) -> None: """ Test throttle percentage setting saves successfully with valid values. @@ -1967,3 +2408,694 @@ def test_refresh_from_flight_controller_succeeds_when_update_works(self, motor_t # Assert: Should return True assert result is True + + +class TestMotorTestDataModelFrameSelectionWorkflows: # pylint: disable=too-many-public-methods + """Test high-level frame selection and combo-box workflows.""" + + def test_user_refreshes_frame_configuration_when_layout_unchanged(self, motor_test_model) -> None: + """ + Re-reading the same frame configuration keeps the UI responsive. + + GIVEN: A connected controller that already shared its frame layout + WHEN: The user refreshes the configuration twice without making changes + THEN: The model should report success each time without recomputing the layout + """ + motor_test_model.flight_controller.get_frame_info.reset_mock() + + assert motor_test_model.refresh_from_flight_controller() is True + assert motor_test_model.refresh_from_flight_controller() is True + assert motor_test_model.flight_controller.get_frame_info.call_count >= 2 + + def test_user_is_informed_when_controller_reports_unknown_frame(self, motor_test_model, caplog) -> None: + """ + Unsupported frame combinations surface a clear warning to the user. + + GIVEN: The flight controller reports a frame class/type with no known layout + WHEN: The user refreshes the configuration summary + THEN: The model should return False so the UI can highlight the issue + """ + motor_test_model.flight_controller.get_frame_info.return_value = (9, 9) + + with caplog.at_level("ERROR"): + assert motor_test_model.refresh_from_flight_controller() is False + + assert "No motor configuration found" in caplog.text + assert motor_test_model.motor_count == 0 + + def test_user_updates_frame_type_from_dropdown_text(self, motor_test_model) -> None: + """ + Selecting a new entry from the frame type list uploads the matching parameters. + + GIVEN: A user browsing available QUAD layouts + WHEN: They pick the "PLUS" entry from the dropdown list + THEN: The data model uploads FRAME_TYPE and updates its cached layout + """ + original_method = motor_test_model.set_parameter + with patch.object(motor_test_model, "set_parameter", wraps=original_method) as spy: + assert motor_test_model.update_frame_type_from_selection("PLUS") is True + + spy.assert_called() + assert motor_test_model.frame_type == 0 + + def test_user_updates_frame_type_using_combobox_key(self, motor_test_model) -> None: + """ + Selecting by encoded key keeps the UI combobox and the model in sync. + + GIVEN: A user interacts with a PairTupleCombobox showing motor layouts + WHEN: They choose the option with key "0" + THEN: The model should map the key back to the proper layout and update itself + """ + motor_test_model._frame_type = 1 # pylint: disable=protected-access + motor_test_model.flight_controller.fc_parameters["FRAME_TYPE"] = 1 + + assert motor_test_model.update_frame_type_by_key("0") is True + assert motor_test_model.frame_type == 0 + + def test_frame_type_selection_skips_redundant_uploads(self, motor_test_model) -> None: + """Selecting the already-active layout avoids unnecessary parameter writes.""" + motor_test_model._frame_class = 1 # pylint: disable=protected-access + motor_test_model._frame_type = 0 # pylint: disable=protected-access + motor_test_model.flight_controller.fc_parameters["FRAME_CLASS"] = 1 + motor_test_model.flight_controller.fc_parameters["FRAME_TYPE"] = 0 + + with patch.object(motor_test_model, "set_parameter") as mock_set_param: + assert motor_test_model.update_frame_type_from_selection("PLUS") is True + + mock_set_param.assert_not_called() + + def test_frame_type_selection_handles_missing_layout_data(self, motor_test_model) -> None: + """When layout metadata is absent the selection still succeeds with zero motors.""" + motor_test_model._motor_data_loader.data = {} # pylint: disable=protected-access + motor_test_model._frame_class = 3 # pylint: disable=protected-access + motor_test_model._frame_type = 3 # pylint: disable=protected-access + + with patch.object(motor_test_model, "set_parameter", return_value=None): + assert motor_test_model.update_frame_type_from_selection("PLUS") is True + + assert motor_test_model.motor_count == 0 + + def test_frame_type_selection_wraps_unexpected_errors(self, motor_test_model) -> None: + """Unexpected errors while uploading parameters raise FrameConfigurationError.""" + with ( + patch.object(motor_test_model, "set_parameter", side_effect=RuntimeError("upload boom")), + pytest.raises(FrameConfigurationError, match="Failed to update frame configuration"), + ): + motor_test_model.update_frame_type_from_selection("PLUS") + + def test_frame_type_selection_without_matching_layout(self, motor_test_model) -> None: + """Non-matching layouts leave the motor count at zero after selection.""" + motor_test_model._motor_data_loader.data = {"layouts": [{"Class": 9, "Type": 9, "motors": []}]} # pylint: disable=protected-access + + with patch.object(motor_test_model, "set_parameter", return_value=None): + assert motor_test_model.update_frame_type_from_selection("PLUS") is True + + assert motor_test_model.motor_count == 0 + + def test_frame_type_selection_propagates_parameter_errors(self, motor_test_model) -> None: + """ParameterError raised during upload is re-raised unchanged.""" + with ( + patch.object(motor_test_model, "set_parameter", side_effect=ParameterError("upload failed")), + pytest.raises(ParameterError, match="upload failed"), + ): + motor_test_model.update_frame_type_from_selection("PLUS") + + def test_invalid_frame_type_key_is_rejected(self, motor_test_model) -> None: + """ + Invalid combobox keys raise user-friendly validation errors. + + GIVEN: A user types an invalid frame type identifier + WHEN: The combobox tries to convert the key into a frame update + THEN: The model raises ValidationError so the UI can show feedback + """ + with pytest.raises(ValidationError, match="Invalid frame type selection"): + motor_test_model.update_frame_type_by_key("not-a-number") + + def test_user_can_parse_frame_type_text_selection(self, motor_test_model) -> None: + """ + Parsing the dropdown text yields numeric class and type codes. + + GIVEN: The UI shows human-readable type names + WHEN: The user selects the QUAD PLUS entry + THEN: The model resolves it back to the ArduPilot class/type codes + """ + frame_class_code, frame_type_code = motor_test_model.parse_frame_type_selection("PLUS") + + assert frame_class_code == 1 + assert frame_type_code == 0 + + def test_user_gets_validation_error_for_unknown_frame_text(self, motor_test_model) -> None: + """ + Unknown frame descriptions raise validation errors instead of misconfiguring the FC. + + GIVEN: A mistyped frame description + WHEN: The model attempts to parse it + THEN: ValidationError explains that the selection does not exist + """ + with pytest.raises(ValidationError, match="Could not find frame type"): + motor_test_model.parse_frame_type_selection("Imaginary Frame") + + def test_user_inspects_current_frame_type_metadata(self, motor_test_model) -> None: + """ + The combo-box helper APIs expose current selections and available options. + + GIVEN: A quad X frame loaded from the controller + WHEN: The UI asks for selection text, key, and pair tuples + THEN: The model returns matching values for the widgets + """ + selection_text = motor_test_model.get_current_frame_selection_text() + selection_key = motor_test_model.get_current_frame_selection_key() + selection_pairs = motor_test_model.get_frame_type_pairs() + + assert selection_text == "X" + assert selection_key == "1" + assert ("1", "1: X") in selection_pairs + + def test_user_reads_available_frame_types_for_current_class(self, motor_test_model) -> None: + """ + Users can request only the frame types that belong to the currently connected frame class. + + GIVEN: A connected controller reporting frame class 1 (QUAD) + WHEN: The model fetches types for that class + THEN: It should return both PLUS and X entries + """ + types = motor_test_model.get_current_frame_class_types() + + assert types[0] == "PLUS" + assert types[1] == "X" + + motor_test_model.flight_controller.fc_parameters["FRAME_CLASS"] = 99 + assert motor_test_model.get_current_frame_class_types() == {} + + def test_user_reuses_cached_frame_options_after_first_load(self, motor_test_model) -> None: + """ + Frame option loading hits disk only once and relies on caching afterwards. + + GIVEN: The model already parsed the JSON layouts + WHEN: The user asks for the options again later + THEN: The cached result is returned even if the loader data changes + """ + first_result = motor_test_model.get_frame_options() + motor_test_model._motor_data_loader.data = {} # pylint: disable=protected-access + cached_result = motor_test_model.get_frame_options() + + assert cached_result == first_result + assert "QUAD" in cached_result + + def test_user_falls_back_to_parameter_metadata_when_layouts_missing(self, motor_test_model) -> None: + """ + Missing motor layout files still produce options via parameter metadata. + + GIVEN: The AP_Motors JSON data is unavailable + WHEN: The user opens the frame type picker + THEN: The model derives options from the filesystem doc_dict fallback + """ + motor_test_model._motor_data_loader.data = {} # pylint: disable=protected-access + motor_test_model._cached_frame_options = None # pylint: disable=protected-access + + options = motor_test_model.get_frame_options() + + assert "QUAD" in options + assert options["QUAD"][0] == "PLUS" + + def test_user_receives_error_when_requesting_invalid_motor_order(self, motor_test_model) -> None: + """ + Asking for test order of a non-existent motor returns a clear ValueError. + + GIVEN: A quad layout with four motors + WHEN: The UI queries the order for motor 99 + THEN: The model raises ValueError describing the invalid request + """ + with pytest.raises(ValueError, match="Invalid motor number"): + motor_test_model.test_order(99) + + def test_user_observes_double_letter_labels_for_large_layout(self, motor_test_model) -> None: + """ + Frame labels extend beyond Z for very large motor counts. + + GIVEN: A layout describing 27 motors + WHEN: The controller reports that layout + THEN: The model generates AA for the 27th motor label + """ + mega_layout = { + "Class": 2, + "ClassName": "MEGA", + "Type": 3, + "TypeName": "GRID", + "motors": [{"Number": idx, "TestOrder": idx, "Rotation": "CCW"} for idx in range(1, 28)], + } + motor_test_model._motor_data_loader.data["layouts"].append(mega_layout) # pylint: disable=protected-access + motor_test_model.flight_controller.get_frame_info.return_value = (2, 3) + motor_test_model.flight_controller.fc_parameters["FRAME_CLASS"] = 2 + motor_test_model.flight_controller.fc_parameters["FRAME_TYPE"] = 3 + + assert motor_test_model.refresh_from_flight_controller() is True + assert motor_test_model.motor_labels[0] == "A" + assert motor_test_model.motor_labels[26] == "AA" + assert motor_test_model.motor_numbers[26] == 27 + + def test_user_reads_motor_test_order_for_existing_motor(self, motor_test_model) -> None: + """ + Requesting the test order for a valid motor returns a zero-based index. + + GIVEN: The default QUAD X layout + WHEN: The UI asks for the order of motor 1 + THEN: The model reports zero because that motor is first in sequence + """ + assert motor_test_model.test_order(1) == 0 + + def test_user_is_warned_when_frame_types_requested_without_connection(self, motor_test_model, caplog) -> None: + """ + Connection loss prevents listing frame types. + + GIVEN: A disconnected controller + WHEN: The UI requests available types + THEN: An empty dict is returned and a warning is logged + """ + motor_test_model.flight_controller.master = None + + with caplog.at_level("WARNING"): + assert motor_test_model.get_current_frame_class_types() == {} + + assert "connection required" in caplog.text + + def test_user_is_warned_when_frame_class_parameter_missing(self, motor_test_model, caplog) -> None: + """ + Missing FRAME_CLASS metadata surfaces a warning. + + GIVEN: Connected hardware without FRAME_CLASS parameter + WHEN: The model tries to read available types + THEN: It returns an empty dict and logs the problem + """ + motor_test_model.flight_controller.fc_parameters.pop("FRAME_CLASS", None) + + with caplog.at_level("WARNING"): + assert motor_test_model.get_current_frame_class_types() == {} + + assert "FRAME_CLASS parameter not found" in caplog.text + + def test_user_cannot_parse_frame_type_when_controller_disconnected(self, motor_test_model) -> None: + """ + Parsing selections requires an active connection. + + GIVEN: The controller disconnects mid-session + WHEN: The UI tries to resolve a dropdown entry + THEN: ValidationError explains that the connection is required + """ + motor_test_model.flight_controller.master = None + + with pytest.raises(ValidationError, match="connection required"): + motor_test_model.parse_frame_type_selection("PLUS") + + def test_user_cannot_parse_frame_type_when_frame_class_missing(self, motor_test_model) -> None: + """ + FRAME_CLASS metadata must exist to parse selections. + + GIVEN: The FC parameters suddenly omit FRAME_CLASS + WHEN: The UI requests parsing + THEN: ValidationError references the missing parameter + """ + motor_test_model.flight_controller.fc_parameters.pop("FRAME_CLASS", None) + + with pytest.raises(ValidationError, match="FRAME_CLASS parameter not found"): + motor_test_model.parse_frame_type_selection("PLUS") + + def test_user_cannot_parse_frame_type_when_frame_class_non_numeric(self, motor_test_model) -> None: + """ + Garbage data in FRAME_CLASS triggers a parsing error. + + GIVEN: A FRAME_CLASS string that is not numeric + WHEN: The user parses a selection + THEN: ValidationError reports the conversion failure + """ + motor_test_model.flight_controller.fc_parameters["FRAME_CLASS"] = "abc" + + with pytest.raises(ValidationError, match="Error parsing frame type"): + motor_test_model.parse_frame_type_selection("PLUS") + + def test_user_synchronizes_frame_class_when_model_state_outdated(self, motor_test_model) -> None: + """ + The model uploads FRAME_CLASS when its cached value drifts from the controller. + + GIVEN: The cached state was stale after offline edits + WHEN: The user selects the current frame type again + THEN: FRAME_CLASS and FRAME_TYPE parameters are both updated if necessary + """ + motor_test_model._frame_class = 99 # pylint: disable=protected-access + motor_test_model.flight_controller.set_param.reset_mock() + + motor_test_model.update_frame_type_from_selection("PLUS") + + param_names = [call.args[0] for call in motor_test_model.flight_controller.set_param.call_args_list] + assert "FRAME_CLASS" in param_names + assert "FRAME_TYPE" in param_names + + def test_user_is_warned_when_combobox_key_missing_for_current_class(self, motor_test_model) -> None: + """ + Integer keys outside the available set raise ValidationError. + + GIVEN: Only PLUS and X layout keys exist + WHEN: The UI sends key 99 + THEN: The model raises ValidationError noting the missing option + """ + with pytest.raises(ValidationError, match="not available"): + motor_test_model.update_frame_type_by_key("99") + + def test_user_updates_frame_configuration_and_motor_count_recomputes(self, motor_test_model) -> None: + """ + Frame updates recompute the cached motor count and layout metadata. + + GIVEN: Stale frame class/type information + WHEN: The user requests a refresh using update_frame_configuration + THEN: The motor count matches the selected layout + """ + motor_test_model._frame_class = 2 # pylint: disable=protected-access + motor_test_model._frame_type = 99 # pylint: disable=protected-access + + motor_test_model.update_frame_configuration(1, 1) + + assert motor_test_model.motor_count == 4 + + def test_layout_entries_without_test_order_are_skipped(self, motor_test_model) -> None: + """ + Motors lacking TestOrder metadata leave their slots empty. + + GIVEN: A layout where the first motor has no TestOrder field + WHEN: The controller reports that layout + THEN: The corresponding entry in motor_numbers remains zero while valid entries populate + """ + sparse_layout = { + "Class": 4, + "ClassName": "DUAL", + "Type": 1, + "TypeName": "SKIP", + "motors": [ + {"Number": 1, "Rotation": "CCW"}, + {"Number": 2, "TestOrder": 1, "Rotation": "CW"}, + ], + } + motor_test_model._motor_data_loader.data["layouts"].append(sparse_layout) # pylint: disable=protected-access + motor_test_model.flight_controller.get_frame_info.return_value = (4, 1) + motor_test_model.flight_controller.fc_parameters["FRAME_CLASS"] = 4 + motor_test_model.flight_controller.fc_parameters["FRAME_TYPE"] = 1 + + assert motor_test_model.refresh_from_flight_controller() is True + assert motor_test_model.motor_numbers[1] == 0 + assert motor_test_model.motor_numbers[0] == 2 + + +class TestMotorTestDataModelBatteryFeedback: + """Test user-facing battery indicators and warning strings.""" + + def test_user_sees_cached_battery_status_after_initial_request(self, motor_test_model) -> None: + """ + Battery polling stops after the first successful response to avoid spamming the FC. + + GIVEN: Battery monitoring returns a "warming up" message before providing measurements + WHEN: The model polls multiple times + THEN: It should request streaming twice and reuse cached data afterwards + """ + motor_test_model.flight_controller.request_periodic_battery_status.reset_mock() + motor_test_model.flight_controller.get_battery_status.side_effect = [ + ((12.3, 2.0), "priming"), + ((12.5, 2.1), ""), + ((12.5, 2.1), ""), + ] + + motor_test_model.get_battery_status() + motor_test_model.get_battery_status() + motor_test_model.get_battery_status() + + assert motor_test_model.flight_controller.request_periodic_battery_status.call_count == 2 + + def test_user_gets_color_coded_voltage_feedback(self, motor_test_model) -> None: + """ + Voltage status is translated into colors for the battery indicator widget. + + GIVEN: Live voltage data from the controller + WHEN: The user checks the indicator + THEN: The model reports green, red, gray, or orange depending on the status + """ + assert motor_test_model.get_battery_status_color() == "green" + + motor_test_model.flight_controller.get_battery_status.return_value = ((10.0, 2.0), "") + assert motor_test_model.get_battery_status_color() == "red" + + motor_test_model.flight_controller.is_battery_monitoring_enabled.return_value = False + assert motor_test_model.get_battery_status_color() == "gray" + + motor_test_model.flight_controller.is_battery_monitoring_enabled.return_value = True + with patch.object(motor_test_model, "get_voltage_status", return_value="low"): + assert motor_test_model.get_battery_status_color() == "orange" + + def test_user_reads_battery_display_text_feedback(self, motor_test_model) -> None: + """ + The UI strings clearly communicate whether telemetry is disabled, unavailable, or healthy. + + GIVEN: Various monitoring states + WHEN: The UI asks for the formatted voltage/current strings + THEN: The model returns context-aware messages + """ + motor_test_model.flight_controller.is_battery_monitoring_enabled.return_value = False + assert motor_test_model.get_battery_display_text() == (_("Voltage: Disabled"), _("Current: Disabled")) + + motor_test_model.flight_controller.is_battery_monitoring_enabled.return_value = True + motor_test_model.flight_controller.get_battery_status.return_value = (None, "") + assert motor_test_model.get_battery_display_text() == (_("Voltage: N/A"), _("Current: N/A")) + + motor_test_model.flight_controller.get_battery_status.return_value = ((12.34, 1.98), "") + voltage_text, current_text = motor_test_model.get_battery_display_text() + + assert "12.34" in voltage_text + assert "1.98" in current_text + + def test_battery_safety_messaging_highlights_reason(self, motor_test_model) -> None: + """ + Safety dialogs provide actionable text and remember acknowledgement state. + + GIVEN: A user reviewing the first-test warning and a battery error reason + WHEN: They acknowledge the warning and inspect the helper text + THEN: The warning state sticks and the reason string is interpolated into the copy + """ + assert motor_test_model.should_show_first_test_warning() is True + warning_message = motor_test_model.get_safety_warning_message() + assert "IMPORTANT SAFETY WARNING" in warning_message + + battery_message = motor_test_model.get_battery_safety_message("Voltage too low") + assert "Voltage too low" in battery_message + + assert motor_test_model.is_battery_related_safety_issue("Voltage too low") is True + assert motor_test_model.is_battery_related_safety_issue("Motor stalled") is False + + motor_test_model.acknowledge_first_test_warning() + assert motor_test_model.should_show_first_test_warning() is False + + +class TestMotorTestDataModelParameterGuards: + """Test parameter guard rails and reboot requirements.""" + + def test_user_triggers_reboot_for_parameters_marked_as_reboot_required(self, motor_test_model) -> None: + """ + Parameters that require a reboot schedule one automatically after a successful upload. + + GIVEN: MOT_SPIN_ARM metadata stating that a reboot is required + WHEN: The user sets a new value within the allowed range + THEN: The model calls reset_and_reconnect so the FC applies the change + """ + motor_test_model.flight_controller.reset_and_reconnect = MagicMock() + + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.08) + + motor_test_model.flight_controller.reset_and_reconnect.assert_called_once() + + def test_user_cannot_set_parameter_outside_documented_bounds(self, motor_test_model) -> None: + """ + Documentation-derived bounds guard against impossible parameter values. + + GIVEN: Metadata describing the valid MOT_SPIN_ARM range + WHEN: The user enters values outside that range + THEN: ValidationError explains why the update is rejected + """ + with pytest.raises(ValidationError, match="smaller than"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.01) + + with pytest.raises(ValidationError, match="greater than"): + motor_test_model.set_parameter("MOT_SPIN_ARM", 0.9) + + def test_user_cannot_raise_spin_arm_above_spin_min_margin(self, motor_test_model) -> None: + """ + MOT_SPIN_ARM must stay at least 0.02 below MOT_SPIN_MIN. + + GIVEN: A configured spin minimum of 0.12 + WHEN: The user tries to set spin arm to 0.11 + THEN: ValidationError reminds them about the 0.02 guard band + """ + motor_test_model.flight_controller.fc_parameters["MOT_SPIN_MIN"] = 0.12 + + with pytest.raises(ValidationError, match=r"0\.02 below"): + motor_test_model.set_motor_spin_arm_value(0.11) + + def test_user_cannot_lower_spin_min_without_spin_arm_value(self, motor_test_model) -> None: + """ + MOT_SPIN_MIN updates require a known MOT_SPIN_ARM baseline. + + GIVEN: Missing MOT_SPIN_ARM telemetry + WHEN: The user attempts to set MOT_SPIN_MIN + THEN: ParameterError explains why the update is unsafe + """ + motor_test_model.flight_controller.fc_parameters.pop("MOT_SPIN_ARM", None) + + with pytest.raises(ParameterError, match="must be available"): + motor_test_model.set_motor_spin_min_value(0.20) + + def test_user_cannot_set_spin_min_too_close_to_spin_arm(self, motor_test_model) -> None: + """ + The 0.02 guard applies to MOT_SPIN_MIN as well. + + GIVEN: MOT_SPIN_ARM at 0.15 + WHEN: The user sets MOT_SPIN_MIN to 0.16 + THEN: ValidationError warns about the insufficient gap + """ + motor_test_model.flight_controller.fc_parameters["MOT_SPIN_ARM"] = 0.15 + + with pytest.raises(ValidationError, match=r"0\.02 higher"): + motor_test_model.set_motor_spin_min_value(0.16) + + def test_user_sets_spin_arm_value_within_safe_margin(self, motor_test_model) -> None: + """Valid spin arm updates delegate to the generic parameter setter.""" + motor_test_model.flight_controller.set_param.reset_mock() + + motor_test_model.set_motor_spin_arm_value(0.08) + + motor_test_model.flight_controller.set_param.assert_called_with("MOT_SPIN_ARM", 0.08) + + def test_user_sets_spin_min_value_within_safe_margin(self, motor_test_model) -> None: + """Valid spin minimum updates also leverage the shared setter path.""" + motor_test_model.flight_controller.set_param.reset_mock() + + motor_test_model.set_motor_spin_min_value(0.15) + + motor_test_model.flight_controller.set_param.assert_called_with("MOT_SPIN_MIN", 0.15) + + +class TestMotorTestDataModelMotorExecutionWorkflows: + """Exercise single, all, sequential, and emergency motor test workflows.""" + + def test_user_cannot_request_mismatched_motor_output(self, motor_test_model) -> None: + """ + The UI validates that the selected motor output matches the expected test order. + + GIVEN: A quad where motor 1 is first in the sequence + WHEN: The user selects sequence 0 but output 2 + THEN: ValidationError explains the mismatch + """ + with ( + patch.object(motor_test_model, "is_motor_test_safe", return_value=None), + pytest.raises(ValidationError, match="expected: 1"), + ): + motor_test_model.test_motor(0, 2, 10, 2) + + def test_user_cannot_request_duration_shorter_or_longer_than_limits(self, motor_test_model) -> None: + """ + Test duration bounds prevent dangerous commands. + + GIVEN: The duration range of 1-60 seconds + WHEN: The user enters 0 seconds or 120 seconds + THEN: ValidationError is raised in both cases + """ + with patch.object(motor_test_model, "is_motor_test_safe", return_value=None): + with pytest.raises(ValidationError, match=r"valid range:\s*1-60"): + motor_test_model.test_motor(0, 1, 10, 0) + with pytest.raises(ValidationError, match=r"valid range:\s*1-60"): + motor_test_model.test_motor(0, 1, 10, 120) + + def test_motor_command_failure_surfaces_execution_error(self, motor_test_model) -> None: + """ + Backend failures bubble up as MotorTestExecutionError for the UI to display. + + GIVEN: The FC rejects a motor command + WHEN: The user starts a motor test + THEN: MotorTestExecutionError carries the backend error string + """ + motor_test_model.flight_controller.test_motor.return_value = (False, "PWM timeout") + + with ( + patch.object(motor_test_model, "is_motor_test_safe", return_value=None), + pytest.raises(MotorTestExecutionError, match="PWM timeout"), + ): + motor_test_model.test_motor(0, 1, 10, 2) + + def test_status_callbacks_receive_events_for_each_workflow(self, motor_test_model) -> None: + """ + Status callbacks fire for single, all, sequential, and emergency stop workflows. + + GIVEN: A listener interested in motor status events + WHEN: The user runs the full workflow suite + THEN: The callback receives COMMAND_SENT/STOP_SENT events for every motor + """ + events: list[tuple[int, MotorStatusEvent]] = [] + + def _recorder(motor_number: int, event: MotorStatusEvent) -> None: + events.append((motor_number, event)) + + motor_test_model.run_single_motor_test(0, 1, _recorder) + motor_test_model.run_all_motors_test(_recorder) + motor_test_model.run_sequential_motor_test(_recorder) + motor_test_model.emergency_stop_motors(_recorder) + + assert any(event == MotorStatusEvent.COMMAND_SENT for _motor, event in events) + assert any(event == MotorStatusEvent.STOP_SENT for _motor, event in events) + assert len(events) >= motor_test_model.motor_count * 3 + + def test_stop_all_motors_failure_surfaces_error(self, motor_test_model) -> None: + """ + Emergency stop failures propagate as MotorTestExecutionError. + + GIVEN: A backend failure when stopping motors + WHEN: The user presses the stop button + THEN: MotorTestExecutionError exposes the backend reason + """ + motor_test_model.flight_controller.stop_all_motors.return_value = (False, "Stop failed") + + with pytest.raises(MotorTestExecutionError, match="Stop failed"): + motor_test_model.stop_all_motors() + + def test_all_motor_test_failure_surfaces_error(self, motor_test_model) -> None: + """ + Failed all-motor tests report the FC error code. + + GIVEN: test_all_motors returning False + WHEN: The user runs the all-motors workflow + THEN: MotorTestExecutionError is raised + """ + motor_test_model.flight_controller.test_all_motors.return_value = (False, "All motors failed") + + with pytest.raises(MotorTestExecutionError, match="All motors failed"): + motor_test_model.test_all_motors(20, 5) + + def test_sequential_motor_test_failure_surfaces_error(self, motor_test_model) -> None: + """ + Failed sequential tests also raise MotorTestExecutionError. + + GIVEN: test_motors_in_sequence returning False + WHEN: The user runs the sequential workflow + THEN: MotorTestExecutionError is raised + """ + motor_test_model.flight_controller.test_motors_in_sequence.return_value = (False, "Sequence failed") + + with pytest.raises(MotorTestExecutionError, match="Sequence failed"): + motor_test_model.test_motors_in_sequence(20, 5) + + def test_status_callbacks_are_optional_for_single_motor_runs(self, motor_test_model) -> None: + """ + Workflows run normally when no listener is registered. + + GIVEN: No status callback provided + WHEN: The user runs a single motor test + THEN: The motor command executes without emitting events + """ + motor_test_model.flight_controller.test_motor.reset_mock() + + motor_test_model.run_single_motor_test(0, 1) + + motor_test_model.flight_controller.test_motor.assert_called_once() diff --git a/tests/test_frontend_tkinter_motor_test.py b/tests/test_frontend_tkinter_motor_test.py new file mode 100755 index 000000000..3e1a01c24 --- /dev/null +++ b/tests/test_frontend_tkinter_motor_test.py @@ -0,0 +1,907 @@ +#!/usr/bin/env python3 + +""" +Behavior-driven tests for the motor test Tkinter frontend. + +This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator + +SPDX-FileCopyrightText: 2024-2025 Amilcar do Carmo Lucas + +SPDX-License-Identifier: GPL-3.0-or-later +""" + +from __future__ import annotations + +from importlib import import_module +from tkinter import ttk +from types import SimpleNamespace +from typing import TYPE_CHECKING, Any, Callable, cast # pylint: disable=unused-import +from unittest.mock import MagicMock, patch + +import pytest + +import ardupilot_methodic_configurator.frontend_tkinter_motor_test as motor_test_module +from ardupilot_methodic_configurator.data_model_motor_test import ( # pylint: disable=unused-import + MotorTestDataModel, + MotorTestExecutionError, + MotorTestSafetyError, + ParameterError, + ValidationError, +) +from ardupilot_methodic_configurator.frontend_tkinter_motor_test import ( + DelayedProgressCallback, + MotorStatusEvent, + MotorTestView, + MotorTestWindow, + _create_motor_test_view, + argument_parser, + main, + register_motor_test_plugin, +) + +if TYPE_CHECKING: + from collections.abc import Generator + + +# pylint: disable=redefined-outer-name, too-few-public-methods, protected-access, raising-bad-type +class FakeMotorTestModel: # pylint: disable=too-many-instance-attributes, too-many-public-methods + """Minimal stand-in for MotorTestDataModel with controllable behavior.""" + + def __init__(self) -> None: + self.motor_count = 2 + self.motor_labels = ["A", "B"] + self.motor_numbers = [1, 2] + self.motor_directions = ["CW", "CCW"] + self._test_throttle_pct = 25 + self._test_duration_s = 3.0 + self.frame_pairs = [("1", "1: Quad"), ("2", "2: Hex")] + self.frame_types = {1: "Quad", 2: "Hex"} + self.diagram_available = False + self.diagram_path = "diagram.png" + self.diagram_error = "" + self.battery_text = ("Voltage: 12.5V", "Current: 3.2A") + self.battery_color = "green" + self.parameters = {"MOT_SPIN_ARM": 0.1, "MOT_SPIN_MIN": 0.2} + self._should_warn = True + self.refresh_returns = True + self.raise_frame_error: Exception | None = None + self.raise_throttle_error: Exception | None = None + self.raise_duration_error: Exception | None = None + self.raise_spin_arm_error: Exception | None = None + self.raise_spin_min_error: Exception | None = None + self.next_single_motor_error: Exception | None = None + self.next_all_motor_error: Exception | None = None + self.next_sequence_error: Exception | None = None + self.next_emergency_error: Exception | None = None + self.raise_stop_error: Exception | None = None + self.last_frame_type_key: str | None = None + self.single_motor_calls: list[tuple[int, int]] = [] + self.all_motor_runs = 0 + self.sequence_runs = 0 + self.emergency_runs = 0 + self.stop_calls = 0 + self.battery_issue = True + + def refresh_from_flight_controller(self) -> bool: + return self.refresh_returns + + def get_frame_type_pairs(self) -> list[tuple[str, str]]: + return self.frame_pairs + + def get_current_frame_selection_key(self) -> str: + return "1" + + def get_current_frame_class_types(self) -> dict[int, str]: + return self.frame_types + + def motor_diagram_exists(self) -> bool: + return self.diagram_available + + def get_motor_diagram_path(self) -> tuple[str, str]: + return self.diagram_path, self.diagram_error + + def get_battery_display_text(self) -> tuple[str, str]: + return self.battery_text + + def get_battery_status_color(self) -> str: + return self.battery_color + + def update_frame_type_by_key(self, key: str, *_callbacks, **_kwargs) -> None: + if self.raise_frame_error: + error = self.raise_frame_error + self.raise_frame_error = None + raise error + self.last_frame_type_key = key + + def get_test_throttle_pct(self) -> int: + return int(self._test_throttle_pct) + + def set_test_throttle_pct(self, throttle_pct: int) -> None: + if self.raise_throttle_error: + error = self.raise_throttle_error + self.raise_throttle_error = None + raise error + self._test_throttle_pct = throttle_pct + + def get_test_duration_s(self) -> float: + return float(self._test_duration_s) + + def set_test_duration_s(self, duration: float) -> None: + if self.raise_duration_error: + error = self.raise_duration_error + self.raise_duration_error = None + raise error + self._test_duration_s = duration + + def get_parameter(self, name: str) -> float | None: + return self.parameters.get(name) + + def set_motor_spin_arm_value(self, value: float, *_args) -> None: + if self.raise_spin_arm_error: + error = self.raise_spin_arm_error + self.raise_spin_arm_error = None + raise error + self.parameters["MOT_SPIN_ARM"] = value + + def set_motor_spin_min_value(self, value: float) -> None: + if self.raise_spin_min_error: + error = self.raise_spin_min_error + self.raise_spin_min_error = None + raise error + self.parameters["MOT_SPIN_MIN"] = value + + def should_show_first_test_warning(self) -> bool: + return self._should_warn + + def acknowledge_first_test_warning(self) -> None: + self._should_warn = False + + def get_safety_warning_message(self) -> str: + return "Proceed with caution" + + def is_battery_related_safety_issue(self, reason: str) -> bool: + return self.battery_issue and "voltage" in reason.lower() + + def run_single_motor_test( + self, + test_sequence_nr: int, + motor_output_nr: int, + status_callback: Callable[[int, MotorStatusEvent], None] | None = None, + ) -> None: + if self.next_single_motor_error: + error = self.next_single_motor_error + self.next_single_motor_error = None + raise error + self.single_motor_calls.append((test_sequence_nr, motor_output_nr)) + if status_callback: + status_callback(motor_output_nr, MotorStatusEvent.COMMAND_SENT) + + def run_all_motors_test( + self, + status_callback: Callable[[int, MotorStatusEvent], None] | None = None, + ) -> None: + if self.next_all_motor_error: + error = self.next_all_motor_error + self.next_all_motor_error = None + raise error + self.all_motor_runs += 1 + if status_callback: + for motor_number in range(1, self.motor_count + 1): + status_callback(motor_number, MotorStatusEvent.COMMAND_SENT) + + def run_sequential_motor_test( + self, + status_callback: Callable[[int, MotorStatusEvent], None] | None = None, + ) -> None: + if self.next_sequence_error: + error = self.next_sequence_error + self.next_sequence_error = None + raise error + self.sequence_runs += 1 + if status_callback: + for motor_number in range(1, self.motor_count + 1): + status_callback(motor_number, MotorStatusEvent.COMMAND_SENT) + + def emergency_stop_motors( + self, + status_callback: Callable[[int, MotorStatusEvent], None] | None = None, + ) -> None: + if self.next_emergency_error: + error = self.next_emergency_error + self.next_emergency_error = None + raise error + self.emergency_runs += 1 + if status_callback: + for motor_number in range(1, self.motor_count + 1): + status_callback(motor_number, MotorStatusEvent.STOP_SENT) + + def stop_all_motors(self) -> None: + if self.raise_stop_error: + error = self.raise_stop_error + self.raise_stop_error = None + raise error + self.stop_calls += 1 + + def get_battery_safety_message(self, reason: str) -> str: + return f"Unsafe battery: {reason}" + + def get_current_frame_selection_text(self) -> str: + return "Quad" + + def test_order(self, motor_number: int) -> int: + return motor_number - 1 + + +class DummyBaseWindow: + """Lightweight base window replacement for tests.""" + + def __init__(self, root) -> None: + self.root = root + + def put_image_in_label(self, parent, filepath: str, image_height: int, fallback_text: str) -> ttk.Label: + label = ttk.Label(parent, text=fallback_text) + cast("Any", label).image = f"image::{filepath}::{image_height}" + return label + + @staticmethod + def calculate_scaled_image_size(value: int) -> int: + return value + + +@pytest.fixture +def fake_model() -> FakeMotorTestModel: + return FakeMotorTestModel() + + +@pytest.fixture +def dummy_base_window(tk_root) -> DummyBaseWindow: + return DummyBaseWindow(tk_root) + + +@pytest.fixture(autouse=True) +def dialog_spies(mocker) -> SimpleNamespace: + """Prevent blocking GUI dialogs during tests and expose spies.""" + return SimpleNamespace( + showerror=mocker.patch.object(motor_test_module, "showerror"), + showwarning=mocker.patch.object(motor_test_module, "showwarning"), + askyesno=mocker.patch.object(motor_test_module, "askyesno", return_value=True), + askfloat=mocker.patch.object(motor_test_module, "askfloat", return_value=None), + ) + + +@pytest.fixture +def motor_view( + tk_root, + fake_model, + dummy_base_window, + mocker, +) -> Generator[MotorTestView, None, None]: + parent = ttk.Frame(tk_root) + view = MotorTestView(parent, fake_model, dummy_base_window) + mocker.patch.object(parent, "after") + mocker.patch.object(tk_root, "after") + yield view + parent.destroy() + + +class TestDelayedProgressCallback: + """Exercises the delayed progress helper used by long-running operations.""" + + def test_callback_triggers_after_delay(self) -> None: + """ + Delayed callbacks notify observers when time threshold is met. + + GIVEN: A wrapped callback with zero-second delay + WHEN: The callback is invoked + THEN: The original callback receives the progress values immediately + """ + recorded: list[tuple[int, int]] = [] + + def recorder(current: int, total: int) -> None: + recorded.append((current, total)) + + delayed = DelayedProgressCallback(recorder, delay_seconds=0.0) + delayed(1, 5) + + assert recorded == [(1, 5)] + + +class TestMotorTestView: + """Covers the Tkinter view widget behavior for the motor test UI.""" + + def test_initialization_populates_widgets(self, motor_view: MotorTestView, fake_model: FakeMotorTestModel) -> None: + """ + The view builds the UI using the model-provided configuration. + + GIVEN: A fake data model with two motors + WHEN: MotorTestView is instantiated + THEN: Motor buttons and controls reflect the model state + """ + assert len(motor_view.motor_buttons) == fake_model.motor_count + assert motor_view.throttle_spinbox.get() == str(fake_model.get_test_throttle_pct()) + assert motor_view.duration_spinbox.get() == str(fake_model.get_test_duration_s()) + + def test_user_adjusts_throttle_and_duration_values( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + mocker, + ) -> None: + """ + User edits throttle and duration inputs while recovering from invalid entries. + + GIVEN: The operator tunes the timing and throttle for a test + WHEN: They enter valid numbers, typos, and values rejected by the model + THEN: The view stores valid data, reports errors, and syncs with backend changes + """ + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "45") + motor_view._on_throttle_change() + assert fake_model.get_test_throttle_pct() == 45 + + dialog_spies.showerror.reset_mock() + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "oops") + motor_view._on_throttle_change() + dialog_spies.showerror.assert_called_once() + + dialog_spies.showerror.reset_mock() + fake_model.raise_throttle_error = ValidationError("bad throttle") + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "55") + motor_view._on_throttle_change() + dialog_spies.showerror.assert_called_once() + + dialog_spies.showerror.reset_mock() + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "6") + motor_view._on_duration_change() + assert fake_model.get_test_duration_s() == 6 + + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "oops") + motor_view._on_duration_change() + dialog_spies.showerror.assert_called_once() + + dialog_spies.showerror.reset_mock() + fake_model.raise_duration_error = ValidationError("bad duration") + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "7") + motor_view._on_duration_change() + dialog_spies.showerror.assert_called_once() + + mocker.patch.object(motor_view.throttle_spinbox, "focus_get", return_value=None) + mocker.patch.object(motor_view.duration_spinbox, "focus_get", return_value=None) + fake_model._test_throttle_pct = 60 + fake_model._test_duration_s = 8 + motor_view._update_spinbox_values() + assert motor_view.throttle_spinbox.get() == "60" + assert float(motor_view.duration_spinbox.get()) == 8 + + with patch.object(fake_model, "get_test_duration_s", side_effect=KeyError("duration")): + motor_view._update_spinbox_values() + + def test_user_sets_spin_parameters_with_validation( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + mocker, + ) -> None: + """ + User edits spin-arm and spin-min values, seeing confirmation and validation feedback. + + GIVEN: Prompted dialogs for spin parameters + WHEN: The operator cancels, accepts, and triggers validation failures + THEN: Values persist for valid input and dialogs summarize errors + """ + dialog_spies.askfloat.return_value = None + motor_view._set_motor_spin_arm() + + progress_stub = SimpleNamespace(update_progress_bar=lambda *_a: None, destroy=lambda: None) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.ProgressWindow", + return_value=progress_stub, + ) + + dialog_spies.askfloat.return_value = 0.2 + motor_view._set_motor_spin_arm() + assert fake_model.parameters["MOT_SPIN_ARM"] == 0.2 + + dialog_spies.showerror.reset_mock() + dialog_spies.askfloat.return_value = 0.25 + fake_model.raise_spin_arm_error = ParameterError("spin arm invalid") + motor_view._set_motor_spin_arm() + dialog_spies.showerror.assert_called_once() + + dialog_spies.askfloat.return_value = 0.35 + motor_view._set_motor_spin_min() + assert fake_model.parameters["MOT_SPIN_MIN"] == 0.35 + + dialog_spies.showerror.reset_mock() + dialog_spies.askfloat.return_value = 0.4 + fake_model.raise_spin_min_error = ParameterError("spin min invalid") + motor_view._set_motor_spin_min() + dialog_spies.showerror.assert_called_once() + + def test_on_frame_type_change_success_and_error( + self, motor_view: MotorTestView, fake_model: FakeMotorTestModel, mocker + ) -> None: + """ + Frame type changes upload parameters or surface validation errors. + + GIVEN: A selected frame type key + WHEN: The change succeeds and later raises a validation error + THEN: The model receives the key and UI displays an error when needed + """ + motor_view.frame_type_combobox.current(0) + progress_stub = SimpleNamespace(update_progress_bar=lambda *_a: None, destroy=lambda: None) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.ProgressWindow", + side_effect=[progress_stub, progress_stub, progress_stub, progress_stub], + ) + motor_view._on_frame_type_change(object()) + assert fake_model.last_frame_type_key == "1" + + fake_model.raise_frame_error = ValidationError("invalid selection") + motor_view._on_frame_type_change(object()) + + def test_throttle_change_paths( + self, motor_view: MotorTestView, fake_model: FakeMotorTestModel, dialog_spies: SimpleNamespace + ) -> None: + """ + Throttle changes accept valid input and handle invalid entries. + + GIVEN: Valid, non-numeric, and model-invalid throttle submissions + WHEN: The user edits the throttle spinbox + THEN: The model updates for valid data and dialogs show for errors + """ + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "40") + motor_view._on_throttle_change() + assert fake_model.get_test_throttle_pct() == 40 + + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "abc") + motor_view._on_throttle_change() + assert dialog_spies.showerror.called + + fake_model.raise_throttle_error = ValidationError("bad") + motor_view.throttle_spinbox.delete(0, "end") + motor_view.throttle_spinbox.insert(0, "50") + motor_view._on_throttle_change() + + def test_duration_change_paths( + self, motor_view: MotorTestView, fake_model: FakeMotorTestModel, dialog_spies: SimpleNamespace + ) -> None: + """ + Duration changes follow the same validation flow as throttle changes. + + GIVEN: Valid, non-numeric, and model-invalid durations + WHEN: The user edits the duration spinbox + THEN: The model updates or displays appropriate error dialogs + """ + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "5") + motor_view._on_duration_change() + assert fake_model.get_test_duration_s() == 5 + + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "oops") + motor_view._on_duration_change() + assert dialog_spies.showerror.called + + fake_model.raise_duration_error = ValidationError("bad") + motor_view.duration_spinbox.delete(0, "end") + motor_view.duration_spinbox.insert(0, "7") + motor_view._on_duration_change() + + def test_user_refreshes_layout_and_diagram_feedback( + self, motor_view: MotorTestView, fake_model: FakeMotorTestModel, mocker + ) -> None: + """ + User refreshes the view and sees layout plus diagram feedback update together. + + GIVEN: Changing motor counts and diagram availability + WHEN: The operator refreshes the panel + THEN: The motor grid rebuilds and diagram label reflects the current state + """ + # Prevent the periodic scheduler from re-queuing follow-up updates. + mocker.patch.object(motor_view.parent, "after") + mocker.patch.object(motor_view.throttle_spinbox, "focus_get", return_value=None) + mocker.patch.object(motor_view.duration_spinbox, "focus_get", return_value=None) + + # Layout rebuild when the motor count changes. + fake_model.motor_count = 1 + fake_model.diagram_available = True + fake_model.diagram_path = "diagram.png" + diagram_loader = mocker.patch.object( + motor_view, + "_load_png_diagram", + side_effect=lambda _path: motor_view.diagram_label.configure(text=""), + ) + + motor_view._diagram_needs_update = True + motor_view._update_view() + assert len(motor_view.motor_buttons) == 1 + diagram_loader.assert_called_once_with("diagram.png") + + # Non-PNG paths fall back to descriptive text. + fake_model.diagram_path = "diagram.svg" + fake_model.diagram_error = "Unsupported diagram format" + motor_view._diagrams_path = "" + motor_view._diagram_needs_update = True + motor_view._update_view() + assert "diagram" in motor_view.diagram_label.cget("text").lower() + assert diagram_loader.call_count == 1 + + # Unavailable diagrams show the standard notice. + fake_model.diagram_available = False + motor_view._diagram_needs_update = True + motor_view._update_view() + assert "not available" in motor_view.diagram_label.cget("text").lower() + + @pytest.mark.parametrize( + ("trigger_exception", "expected_call"), + [ + (MotorTestSafetyError("voltage low"), "showwarning"), + (ValidationError("bad param"), "showerror"), + (MotorTestExecutionError("fail"), "showerror"), + (RuntimeError("unexpected"), "showerror"), + ], + ) + def test_test_motor_exceptions( # pylint: disable=too-many-arguments, too-many-positional-arguments + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + trigger_exception: Exception, + expected_call: str, + ) -> None: + """ + Single-motor tests surface detailed error dialogs for each failure mode. + + GIVEN: A prepared view with acknowledged safety prompt + WHEN: The data model raises different exceptions + THEN: The view highlights the failure and shows the matching dialog + """ + fake_model._should_warn = False + fake_model.next_single_motor_error = trigger_exception + motor_view._test_motor(0, 1) + assert getattr(dialog_spies, expected_call).called + + def test_test_motor_success_flow(self, motor_view: MotorTestView, fake_model: FakeMotorTestModel) -> None: + """ + Successful motor tests update the status labels and reset readiness. + + GIVEN: The first-test confirmation already acknowledged + WHEN: A motor test runs without exceptions + THEN: The status label reflects the sent command + """ + fake_model._should_warn = False + motor_view._test_motor(0, 1) + assert fake_model.single_motor_calls == [(0, 1)] + + @pytest.mark.parametrize( + "attr", + ["next_all_motor_error", "next_sequence_error"], + ) + def test_batch_motor_tests_handle_exceptions( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + attr: str, + ) -> None: + """ + Batch test buttons surface validation, safety, execution, and generic errors. + + GIVEN: A batch test invocation + WHEN: The underlying model raises exceptions + THEN: The user sees the relevant warning dialog + """ + fake_model._should_warn = False + setattr(fake_model, attr, MotorTestSafetyError("issue")) + if attr == "next_all_motor_error": + motor_view._test_all_motors() + else: + motor_view._test_motors_in_sequence() + assert dialog_spies.showwarning.called + + def test_stop_all_motors_and_emergency_alias( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + ) -> None: + """ + Emergency stop commands propagate success and error states. + + GIVEN: A stop command invocation + WHEN: The model succeeds, raises MotorTestExecutionError, and raises RuntimeError + THEN: The dialogs reflect the outcome and the alias covers the same path + """ + motor_view._stop_all_motors() + fake_model.next_emergency_error = MotorTestExecutionError("bad") + motor_view._stop_all_motors() + assert dialog_spies.showerror.called + + fake_model.next_emergency_error = RuntimeError("boom") + motor_view._stop_all_motors() + motor_view._emergency_stop() + + def test_handle_status_events_and_ready_reset( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, # pylint: disable=unused-argument + mocker, + ) -> None: + """ + Status events immediately update labels and schedule ready reset. + + GIVEN: A motor label displaying its initial status + WHEN: Command and stop events arrive + THEN: The UI colors and texts change accordingly + """ + after_spy = mocker.patch.object(motor_view.root_window, "after") + motor_view._handle_status_event(1, MotorStatusEvent.COMMAND_SENT) + motor_view._handle_status_event(1, MotorStatusEvent.STOP_SENT) + after_spy.assert_called() + + def test_update_and_reset_motor_status(self, motor_view: MotorTestView) -> None: + """ + Status updates respect motor bounds and reset helper restores readiness. + + GIVEN: Multiple motor labels + WHEN: Updates reference valid and invalid motor numbers + THEN: Only valid labels change and reset returns them to Ready + """ + motor_view._update_motor_status(1, "Running", "red") + motor_view._update_motor_status(5, "Ignore", "red") + motor_view._reset_all_motor_status() + texts = [label.cget("text") for label in motor_view.motor_status_labels] + assert all(text == "Ready" for text in texts) + + def test_keyboard_shortcuts_bind_actions(self, motor_view: MotorTestView, mocker) -> None: + """ + Keyboard shortcuts map to stop and test helpers. + + GIVEN: The root window used by the view + WHEN: Shortcuts are configured + THEN: The expected bindings and focus calls occur + """ + bind_spy = mocker.patch.object(motor_view.root_window, "bind") + focus_spy = mocker.patch.object(motor_view.root_window, "focus_set") + motor_view._setup_keyboard_shortcuts() + assert bind_spy.call_count == 4 + focus_spy.assert_called_once() + + def test_on_activate_and_on_deactivate_paths( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + mocker, + ) -> None: + """ + Activation refreshes state while deactivation handles stop failures. + + GIVEN: Refresh successes and failures during activation/deactivation + WHEN: The corresponding lifecycle hooks run + THEN: The view updates, logs warnings, and re-raises unexpected exceptions + """ + update_view_spy = mocker.patch.object(motor_view, "_update_view") + fake_model.refresh_returns = False + motor_view.on_activate() + assert update_view_spy.called + + fake_model.raise_stop_error = MotorTestExecutionError("stop") + motor_view.on_deactivate() + + fake_model.raise_stop_error = ParameterError("stop") + motor_view.on_deactivate() + + fake_model.raise_stop_error = RuntimeError("boom") + with pytest.raises(RuntimeError): + motor_view.on_deactivate() + + def test_first_test_confirmation_flow( + self, + motor_view: MotorTestView, + fake_model: FakeMotorTestModel, + dialog_spies: SimpleNamespace, + ) -> None: + """ + Confirmation prompts gate the first motor command. + + GIVEN: A pending confirmation dialog + WHEN: The user cancels and later accepts + THEN: The helper returns False then True and acknowledgement sticks + """ + dialog_spies.askyesno.return_value = False + assert motor_view._ensure_first_test_confirmation() is False + + dialog_spies.askyesno.return_value = True + assert motor_view._ensure_first_test_confirmation() is True + assert fake_model._should_warn is False + + +class TestMotorTestWindow: + """Validates the standalone window wrapper around the motor test view.""" + + def test_window_initialization_and_close_flow(self, fake_model: FakeMotorTestModel, mocker) -> None: + """ + The top-level window wires the view and propagates close events. + + GIVEN: Patched BaseWindow and view constructors + WHEN: MotorTestWindow initializes and closes with varying stop behaviors + THEN: Stop commands are attempted and the root is destroyed + """ + root_mock = MagicMock() + root_mock.after = MagicMock() + root_mock.bind = MagicMock() + + def fake_base_init(self) -> None: + self.root = root_mock + self.main_frame = MagicMock() + self.put_image_in_label = MagicMock() + self.calculate_scaled_image_size = lambda value: value + + mocker.patch("ardupilot_methodic_configurator.frontend_tkinter_motor_test.BaseWindow.__init__", fake_base_init) + view_mock = MagicMock() + view_mock.model = fake_model + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.MotorTestView", + return_value=view_mock, + ) + + window = MotorTestWindow(cast("MotorTestDataModel", fake_model)) + fake_model.raise_stop_error = MotorTestExecutionError("expected") + window.on_close() + + fake_model.raise_stop_error = RuntimeError("unexpected") + window.on_close() + + def test_motor_test_window_on_close_logical_paths( + self, + fake_model: FakeMotorTestModel, + mocker, + ) -> None: + """ + Unexpected close exceptions are logged but never re-raised. + + GIVEN: A window whose view stop call raises an arbitrary exception + WHEN: on_close executes + THEN: The root destroy call still executes + """ + root_mock = MagicMock() + + def fake_base_init(self) -> None: + self.root = root_mock + self.main_frame = MagicMock() + self.put_image_in_label = MagicMock() + self.calculate_scaled_image_size = lambda value: value + + mocker.patch("ardupilot_methodic_configurator.frontend_tkinter_motor_test.BaseWindow.__init__", fake_base_init) + view_mock = MagicMock() + view_mock.model = fake_model + mocker.patch.object(fake_model, "stop_all_motors", side_effect=RuntimeError("boom")) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.MotorTestView", + return_value=view_mock, + ) + + window = MotorTestWindow(cast("MotorTestDataModel", fake_model)) + window.on_close() + + +class TestCommandLineWorkflow: + """Verifies CLI helpers, entry points, and plugin registration hooks.""" + + def test_argument_parser_wires_common_arguments(self, mocker) -> None: + """ + Argument parser delegates to backend helpers and parses args. + + GIVEN: Patched backend adders returning the provided parser + WHEN: argument_parser is invoked + THEN: Each backend helper receives the parser instance + """ + parser_mock = MagicMock() + fc_adder = mocker.patch( + "ardupilot_methodic_configurator.backend_flightcontroller.FlightController.add_argparse_arguments", + side_effect=lambda parser: parser, + ) + fs_adder = mocker.patch( + "ardupilot_methodic_configurator.backend_filesystem.LocalFilesystem.add_argparse_arguments", + side_effect=lambda parser: parser, + ) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.add_common_arguments", + return_value=parser_mock, + ) + parser_mock.parse_args.return_value = SimpleNamespace() + + argument_parser() + assert fc_adder.called + assert fs_adder.called + fc_parser = fc_adder.call_args.args[0] + fs_parser = fs_adder.call_args.args[0] + assert fc_parser is fs_parser + parser_mock.parse_args.assert_called_once_with() + + def test_main_success_and_failure_paths(self, mocker) -> None: + """ + The standalone entry point starts the window and reports failures. + + GIVEN: Patched dependencies for ApplicationState, window, and dialogs + WHEN: main succeeds once and fails the next time + THEN: The window mainloop executes and failures trigger showerror + """ + args = SimpleNamespace() + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.argument_parser", + return_value=args, + ) + state = SimpleNamespace(flight_controller=MagicMock(), local_filesystem=MagicMock()) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.ApplicationState", + return_value=state, + ) + mocker.patch("ardupilot_methodic_configurator.frontend_tkinter_motor_test.setup_logging") + mocker.patch("ardupilot_methodic_configurator.frontend_tkinter_motor_test.initialize_flight_controller_and_filesystem") + window_mock = MagicMock() + window_mock.root = MagicMock() + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.MotorTestWindow", + return_value=window_mock, + ) + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.MotorTestDataModel", return_value=MagicMock() + ) + + main() + window_mock.root.mainloop.assert_called_once() + + mocker.patch( + "ardupilot_methodic_configurator.frontend_tkinter_motor_test.MotorTestWindow", + side_effect=RuntimeError("boom"), + ) + main() + state.flight_controller.disconnect.assert_called() + + def test_plugin_registration_and_factory_creation( + self, + tk_root, + fake_model: FakeMotorTestModel, + dummy_base_window, + ) -> None: + """ + Plugin registration routes through plugin_factory with the view factory. + + GIVEN: The plugin factory is replaced with a spy + WHEN: register_motor_test_plugin executes + THEN: The factory receives the plugin ID and creator + """ + parent = ttk.Frame(tk_root) + view = _create_motor_test_view(parent, fake_model, dummy_base_window) + assert isinstance(view, MotorTestView) + + class DummyFactory: + """Minimal stand-in for plugin_factory to capture registration calls.""" + + def __init__(self) -> None: + self.calls: list[tuple[str, Callable]] = [] + + def register(self, name: str, creator: Callable) -> None: + self.calls.append((name, creator)) + + dummy_factory = DummyFactory() + module = import_module("ardupilot_methodic_configurator.frontend_tkinter_motor_test") + module_with_attr = cast("Any", module) + original_factory = module_with_attr.plugin_factory + module_with_attr.plugin_factory = dummy_factory + try: + register_motor_test_plugin() + assert dummy_factory.calls + finally: + module_with_attr.plugin_factory = original_factory