Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ branch = True

[report]
include = ardupilot_methodic_configurator/*
omit = ardupilot_methodic_configurator/backend_mavftp.py, ardupilot_methodic_configurator/mavftp_example.py, ardupilot_methodic_configurator/tempcal_imu.py, ardupilot_methodic_configurator/frontend_tkinter_motor_test.py, ardupilot_methodic_configurator/data_model_fc_ids.py, ardupilot_methodic_configurator/configuration_steps_strings.py, ardupilot_methodic_configurator/vehicle_components.py
omit = ardupilot_methodic_configurator/backend_mavftp.py, ardupilot_methodic_configurator/mavftp_example.py, ardupilot_methodic_configurator/tempcal_imu.py, ardupilot_methodic_configurator/data_model_fc_ids.py, ardupilot_methodic_configurator/configuration_steps_strings.py, ardupilot_methodic_configurator/vehicle_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __init__(self, local_filesystem: LocalFilesystem) -> None:
# A dictionary that maps variable names to their values
# These variables are used by the forced_parameters and derived_parameters in configuration_steps_*.json files
self.variables = self.local_filesystem.get_eval_variables()
# Ensure transient helper variables are not persisted across steps
self.variables.pop("fc_parameters", None)
self.variables.pop("new_connection_prefix", None)

def process_configuration_step( # pylint: disable=too-many-locals
self,
Expand Down Expand Up @@ -84,7 +87,7 @@ def process_configuration_step( # pylint: disable=too-many-locals

# Process configuration step operations if configuration steps exist
if self.local_filesystem.configuration_steps and selected_file in self.local_filesystem.configuration_steps:
variables = self.variables
variables = self.variables.copy()
variables["fc_parameters"] = fc_parameters

# Compute derived parameters (does NOT mutate filesystem.file_parameters)
Expand All @@ -102,11 +105,13 @@ def process_configuration_step( # pylint: disable=too-many-locals
if not fc_param_names or param_name in fc_param_names:
derived_params_to_apply[param_name] = param

# Populate new_connection_prefix from rename_connection configuration step
# Populate new_connection_prefix from rename_connection configuration step (per-step scope)
if "rename_connection" in self.local_filesystem.configuration_steps.get(selected_file, {}):
variables["new_connection_prefix"] = self.local_filesystem.configuration_steps[selected_file][
"rename_connection"
]
else:
variables.pop("new_connection_prefix", None)

# Calculate connection rename operations (does NOT mutate filesystem.file_parameters)
rename_ui_infos, duplicates_to_remove, renames_to_apply = self._handle_connection_renaming(
Expand Down
121 changes: 113 additions & 8 deletions ardupilot_methodic_configurator/data_model_motor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
SPDX-License-Identifier: GPL-3.0-or-later
"""

from enum import Enum
from logging import debug as logging_debug
from logging import error as logging_error
from logging import info as logging_info
Expand Down Expand Up @@ -59,6 +60,16 @@ class ValidationError(MotorTestError):
"""Raised when validation of input parameters fails."""


class MotorStatusEvent(str, Enum):
"""Well-known status events published by model motor operations."""

COMMAND_SENT = "command_sent"
STOP_SENT = "stop_sent"


MotorStatusCallback = Callable[[int, MotorStatusEvent], None]


class MotorTestDataModel: # pylint: disable=too-many-public-methods, too-many-instance-attributes
"""
Data model for motor test functionality.
Expand Down Expand Up @@ -102,6 +113,7 @@ def __init__(

self._test_throttle_pct = 0.0
self._test_duration_s = 0.0
self._first_test_acknowledged = False

self._got_battery_status = False

Expand Down Expand Up @@ -496,6 +508,33 @@ def get_parameter(self, param_name: str) -> Optional[float]:

return self.flight_controller.fc_parameters.get(param_name)

def set_motor_spin_arm_value(
self,
value: float,
reset_progress_callback: Union[None, Callable[[int, int], None]] = None,
connection_progress_callback: Union[None, Callable[[int, int], None]] = None,
) -> None:
"""Set MOT_SPIN_ARM ensuring a 0.02 margin relative to MOT_SPIN_MIN."""
spin_min = self.get_parameter("MOT_SPIN_MIN")
if spin_min is not None and value > spin_min - 0.02:
raise ValidationError(
_("MOT_SPIN_ARM must stay at least 0.02 below MOT_SPIN_MIN (current %(min).2f).") % {"min": spin_min}
)

self.set_parameter("MOT_SPIN_ARM", value, reset_progress_callback, connection_progress_callback)

def set_motor_spin_min_value(self, value: float) -> None:
"""Set MOT_SPIN_MIN ensuring it keeps 0.02 margin above MOT_SPIN_ARM."""
spin_arm = self.get_parameter("MOT_SPIN_ARM")
if spin_arm is None:
raise ParameterError(_("MOT_SPIN_ARM must be available before updating MOT_SPIN_MIN."))
if value < spin_arm + 0.02:
raise ValidationError(
_("MOT_SPIN_MIN must be at least 0.02 higher than MOT_SPIN_ARM (current %(arm).2f).") % {"arm": spin_arm}
)

self.set_parameter("MOT_SPIN_MIN", value)

def test_motor(self, test_sequence_nr: int, motor_output_nr: int, throttle_percent: int, timeout_seconds: int) -> None:
"""
Test a specific motor.
Expand Down Expand Up @@ -576,6 +615,28 @@ def test_motor(self, test_sequence_nr: int, motor_output_nr: int, throttle_perce
if not success:
raise MotorTestExecutionError(message)

def _emit_status_event(
self,
callback: Optional[MotorStatusCallback],
motor_number: int,
event: MotorStatusEvent,
) -> None:
"""Notify listeners about a status change."""
if callback is not None:
callback(motor_number, event)

def run_single_motor_test(
self,
test_sequence_nr: int,
motor_output_nr: int,
status_callback: Optional[MotorStatusCallback] = None,
) -> None:
"""Execute a single motor test using stored throttle/duration settings."""
throttle_pct = self.get_test_throttle_pct()
duration = int(self.get_test_duration_s())
self.test_motor(test_sequence_nr, motor_output_nr, throttle_pct, duration)
self._emit_status_event(status_callback, motor_output_nr, MotorStatusEvent.COMMAND_SENT)

def test_all_motors(self, throttle_percent: int, timeout_seconds: int) -> None:
"""
Test all motors simultaneously.
Expand Down Expand Up @@ -622,6 +683,22 @@ def test_motors_in_sequence(self, throttle_percent: int, timeout_seconds: int) -
if not success:
raise MotorTestExecutionError(message)

def run_all_motors_test(self, status_callback: Optional[MotorStatusCallback] = None) -> None:
"""Execute an all-motors test using stored settings and report events."""
throttle_pct = self.get_test_throttle_pct()
duration = int(self.get_test_duration_s())
self.test_all_motors(throttle_pct, duration)
for motor_number in range(1, self.motor_count + 1):
self._emit_status_event(status_callback, motor_number, MotorStatusEvent.COMMAND_SENT)

def run_sequential_motor_test(self, status_callback: Optional[MotorStatusCallback] = None) -> None:
"""Execute a sequential test using stored settings and report events."""
throttle_pct = self.get_test_throttle_pct()
duration = int(self.get_test_duration_s())
self.test_motors_in_sequence(throttle_pct, duration)
for motor_number in range(1, self.motor_count + 1):
self._emit_status_event(status_callback, motor_number, MotorStatusEvent.COMMAND_SENT)

def stop_all_motors(self) -> None:
"""
Emergency stop for all motors.
Expand All @@ -634,6 +711,12 @@ def stop_all_motors(self) -> None:
if not success:
raise MotorTestExecutionError(message)

def emergency_stop_motors(self, status_callback: Optional[MotorStatusCallback] = None) -> None:
"""Stop motors and emit status events for listeners."""
self.stop_all_motors()
for motor_number in range(1, self.motor_count + 1):
self._emit_status_event(status_callback, motor_number, MotorStatusEvent.STOP_SENT)

def get_motor_diagram_path(self) -> tuple[str, str]:
"""
Get the filepath for the motor diagram SVG file for the current frame.
Expand Down Expand Up @@ -1096,6 +1179,31 @@ def update_frame_type_from_selection(
logging_error(error_msg)
raise FrameConfigurationError(error_msg) from e

def update_frame_type_by_key(
self,
selected_key: str,
reset_progress_callback: Union[None, Callable[[int, int], None]] = None,
connection_progress_callback: Union[None, Callable[[int, int], None]] = None,
extra_sleep_time: Optional[int] = None,
) -> bool:
"""Update frame configuration using the combobox key directly."""
try:
frame_type_code = int(selected_key)
except (TypeError, ValueError) as exc:
raise ValidationError(_("Invalid frame type selection")) from exc

current_types = self.get_current_frame_class_types()
frame_type_name = current_types.get(frame_type_code)
if frame_type_name is None:
raise ValidationError(_("Frame type %(key)s is not available for current frame class") % {"key": selected_key})

return self.update_frame_type_from_selection(
frame_type_name,
reset_progress_callback,
connection_progress_callback,
extra_sleep_time,
)

def get_battery_status_color(self) -> str:
"""
Get the appropriate color for battery voltage display.
Expand Down Expand Up @@ -1133,15 +1241,12 @@ def get_battery_display_text(self) -> tuple[str, str]:
return _("Voltage: N/A"), _("Current: N/A")

def should_show_first_test_warning(self) -> bool:
"""
Check if first-time safety warning should be shown.
"""Return True when first-time warning still needs acknowledgement."""
return not self._first_test_acknowledged

Returns:
bool: True if warning should be shown

"""
# Could be expanded to check user preferences/settings
return True
def acknowledge_first_test_warning(self) -> None:
"""Record that the user has accepted the first-time warning."""
self._first_test_acknowledged = True

def get_safety_warning_message(self) -> str:
"""
Expand Down
Loading
Loading