Skip to content

Commit bc6f88b

Browse files
committed
tests(backend_flightcontroller): Improved test coverage
1 parent a094e6d commit bc6f88b

14 files changed

Lines changed: 4174 additions & 24 deletions

.codespell-exclude-file

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
"Version": "LAF"
55
datas= [],
66
ardupilot_methodic_configuratorAny.datas,
7+
critical_param_prefixes = ["FRAME", "BATT", "COMPASS", "SERVO", "MOT"]

ardupilot_methodic_configurator/backend_flightcontroller_connection.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@
2727
from serial.serialutil import SerialException
2828

2929
from ardupilot_methodic_configurator import _
30+
from ardupilot_methodic_configurator.backend_flightcontroller_mavlink_factory import (
31+
MavlinkConnectionFactory,
32+
SystemMavlinkConnectionFactory,
33+
)
34+
from ardupilot_methodic_configurator.backend_flightcontroller_serial import (
35+
SerialPortDiscovery,
36+
SystemSerialPortDiscovery,
37+
)
3038
from ardupilot_methodic_configurator.data_model_flightcontroller_info import FlightControllerInfo
3139

3240
if TYPE_CHECKING:
@@ -83,7 +91,7 @@ def close(self) -> None:
8391
]
8492

8593

86-
class FlightControllerConnection:
94+
class FlightControllerConnection: # pylint: disable=too-many-instance-attributes
8795
"""
8896
Manages flight controller connection establishment and lifecycle.
8997
@@ -108,11 +116,13 @@ class FlightControllerConnection:
108116
"udp:127.0.0.1:14550",
109117
]
110118

111-
def __init__(
119+
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
112120
self,
113121
info: FlightControllerInfo,
114122
baudrate: int = DEFAULT_BAUDRATE,
115123
network_ports: Optional[list[str]] = None,
124+
serial_port_discovery: Optional[SerialPortDiscovery] = None,
125+
mavlink_connection_factory: Optional[MavlinkConnectionFactory] = None,
116126
) -> None:
117127
"""
118128
Initialize the connection manager.
@@ -121,6 +131,8 @@ def __init__(
121131
info: Flight controller information object to populate
122132
baudrate: Default baud rate for serial connections
123133
network_ports: Optional list of network ports to try (overrides defaults)
134+
serial_port_discovery: Optional serial port discovery service
135+
mavlink_connection_factory: Optional MAVLink connection factory service
124136
125137
"""
126138
self.info = info
@@ -129,6 +141,10 @@ def __init__(
129141
self._baudrate = baudrate
130142
self._network_ports = network_ports or self.DEFAULT_NETWORK_PORTS
131143
self._connection_tuples: list[tuple[str, str]] = []
144+
self._serial_port_discovery: SerialPortDiscovery = serial_port_discovery or SystemSerialPortDiscovery()
145+
self._mavlink_connection_factory: MavlinkConnectionFactory = (
146+
mavlink_connection_factory or SystemMavlinkConnectionFactory()
147+
)
132148

133149
def discover_connections(self) -> None:
134150
"""
@@ -137,7 +153,7 @@ def discover_connections(self) -> None:
137153
Populates the list of available serial ports and network ports
138154
that can be used to connect to a flight controller.
139155
"""
140-
comports = self.get_serial_ports()
156+
comports = self._serial_port_discovery.get_available_ports()
141157
netports = self.get_network_ports()
142158
# list of tuples with the first element being the port name and the second element being the port description
143159
self._connection_tuples = [(port.device, port.description) for port in comports] + [(port, port) for port in netports]
@@ -314,9 +330,9 @@ def _create_mavlink_connection( # pylint: disable=too-many-arguments, too-many-
314330
mavutil.mavlink_connection: The MAVLink connection object
315331
316332
"""
317-
return mavutil.mavlink_connection(
333+
return self._mavlink_connection_factory.create(
318334
device=device,
319-
baud=baudrate,
335+
baudrate=baudrate,
320336
timeout=timeout,
321337
retries=retries,
322338
progress_callback=progress_callback,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
"""
2+
MAVLink connection factory service for flight controller connections.
3+
4+
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
5+
6+
SPDX-FileCopyrightText: 2024-2025 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
7+
8+
SPDX-License-Identifier: GPL-3.0-or-later
9+
"""
10+
11+
from typing import Optional, Protocol
12+
13+
from pymavlink import mavutil
14+
15+
16+
class MavlinkConnectionFactory(Protocol): # pylint: disable=too-few-public-methods
17+
"""Protocol for creating MAVLink connections."""
18+
19+
def create( # pylint: disable=too-many-arguments, too-many-positional-arguments
20+
self,
21+
device: str,
22+
baudrate: int,
23+
timeout: float = 5.0,
24+
retries: int = 3,
25+
progress_callback: Optional[object] = None,
26+
) -> Optional[mavutil.mavlink_connection]: # pyright: ignore[reportGeneralTypeIssues]
27+
"""Create a MAVLink connection."""
28+
... # pylint: disable=unnecessary-ellipsis
29+
30+
31+
class SystemMavlinkConnectionFactory: # pylint: disable=too-few-public-methods
32+
"""Real implementation using PyMAVLink library."""
33+
34+
def create( # pylint: disable=too-many-arguments, too-many-positional-arguments
35+
self,
36+
device: str,
37+
baudrate: int,
38+
timeout: float = 5.0,
39+
retries: int = 3,
40+
progress_callback: Optional[object] = None,
41+
) -> Optional[mavutil.mavlink_connection]: # pyright: ignore[reportGeneralTypeIssues]
42+
"""Create connection using actual PyMAVLink library."""
43+
try:
44+
return mavutil.mavlink_connection(
45+
device=device,
46+
baud=baudrate,
47+
timeout=timeout,
48+
retries=retries,
49+
progress_callback=progress_callback,
50+
)
51+
except (OSError, TimeoutError, ValueError):
52+
# OSError: connection failures (file not found, permission denied, etc.)
53+
# TimeoutError: connection timeout
54+
# ValueError: invalid parameter values
55+
return None
56+
57+
58+
class FakeMavlinkConnectionFactory:
59+
"""Mock implementation for testing without actual hardware."""
60+
61+
def __init__(self) -> None:
62+
"""Initialize mock factory."""
63+
self._connections: dict[str, FakeMavlinkConnection] = {}
64+
65+
def create( # pylint: disable=too-many-arguments, too-many-positional-arguments
66+
self,
67+
device: str,
68+
baudrate: int,
69+
timeout: float = 5.0, # noqa: ARG002 # pylint: disable=unused-argument
70+
retries: int = 3,
71+
progress_callback: Optional[object] = None,
72+
) -> Optional["FakeMavlinkConnection"]:
73+
"""Create a fake MAVLink connection for testing."""
74+
conn = FakeMavlinkConnection(device, baudrate)
75+
conn.retries = retries
76+
conn.progress_callback = progress_callback
77+
return conn
78+
79+
def get_connection(self, device: str) -> Optional["FakeMavlinkConnection"]:
80+
"""Get a previously created fake connection."""
81+
return self._connections.get(device)
82+
83+
84+
class FakeMavlinkConnection:
85+
"""Fake MAVLink connection for testing."""
86+
87+
retries: int
88+
progress_callback: Optional[object]
89+
90+
def __init__(self, device: str, baudrate: int) -> None:
91+
"""Initialize fake connection."""
92+
self.device = device
93+
self.baudrate = baudrate
94+
self.connected = True
95+
self._message_queue: list[object] = []
96+
97+
def recv_match(
98+
self,
99+
blocking: bool = True, # noqa: ARG002 # pylint: disable=unused-argument
100+
timeout: Optional[float] = None, # noqa: ARG002 # pylint: disable=unused-argument
101+
) -> Optional[object]:
102+
"""Receive a matched message from queue."""
103+
# Note: blocking and timeout parameters are accepted for API compatibility
104+
# but not used in fake implementation
105+
if self._message_queue:
106+
return self._message_queue.pop(0)
107+
return None
108+
109+
def mav_send(self, msg: object) -> None:
110+
"""Send a MAVLink message (no-op for fake)."""
111+
# Note: msg parameter is accepted for API compatibility but not used in fake
112+
113+
def close(self) -> None:
114+
"""Close connection."""
115+
self.connected = False
116+
117+
def add_message(self, msg: object) -> None:
118+
"""Add a message to the queue for testing."""
119+
self._message_queue.append(msg)
120+
121+
def clear_messages(self) -> None:
122+
"""Clear all queued messages."""
123+
self._message_queue.clear()

ardupilot_methodic_configurator/backend_flightcontroller_params.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from ardupilot_methodic_configurator import _
2424
from ardupilot_methodic_configurator.backend_flightcontroller_mavftp_utils import create_mavftp
2525
from ardupilot_methodic_configurator.data_model_flightcontroller_info import FlightControllerInfo
26-
from ardupilot_methodic_configurator.data_model_par_dict import ParDict
26+
from ardupilot_methodic_configurator.data_model_par_dict import ParDict, validate_param_name
2727

2828
# Type hint for connection manager to avoid circular imports
2929
if TYPE_CHECKING:
@@ -242,12 +242,24 @@ def set_param(self, param_name: str, param_value: float) -> tuple[bool, str]:
242242
243243
Returns:
244244
tuple[bool, str]: (True, "") if command sent successfully,
245-
(False, error_message) if no connection available
245+
(False, error_message) if no connection available or invalid parameters
246246
247247
"""
248248
if self.master is None:
249249
return False, _("No flight controller connection available")
250250

251+
# Validate parameter name using ArduPilot standards
252+
is_valid_name, name_error = validate_param_name(param_name)
253+
if not is_valid_name:
254+
logging_error(name_error)
255+
return False, name_error
256+
257+
# Validate parameter value
258+
if not isinstance(param_value, (int, float)):
259+
error_msg = _("Invalid parameter value type: %s (expected numeric)") % type(param_value).__name__
260+
logging_error(error_msg)
261+
return False, error_msg
262+
251263
self.master.param_set_send(param_name, param_value)
252264
# Update local cache optimistically
253265
self.fc_parameters[param_name] = param_value
@@ -276,12 +288,18 @@ def fetch_param(self, param_name: str, timeout: int = 5) -> Optional[float]:
276288
timeout: Timeout in seconds to wait for the response. Default is 5
277289
278290
Returns:
279-
float: The value of the parameter, or None if not found or timeout occurred
291+
float: The value of the parameter
280292
281293
"""
282294
if self.master is None:
283295
return None
284296

297+
# Validate parameter name using ArduPilot standards
298+
is_valid_name, name_error = validate_param_name(param_name)
299+
if not is_valid_name:
300+
logging_error(name_error)
301+
raise ValueError(name_error)
302+
285303
# Send PARAM_REQUEST_READ message
286304
self.master.mav.param_request_read_send(
287305
self.master.target_system,
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
Serial port discovery service for flight controller connections.
3+
4+
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
5+
6+
SPDX-FileCopyrightText: 2024-2025 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
7+
8+
SPDX-License-Identifier: GPL-3.0-or-later
9+
"""
10+
11+
from typing import Protocol
12+
13+
import serial.tools.list_ports
14+
import serial.tools.list_ports_common
15+
16+
17+
class SerialPortDiscovery(Protocol):
18+
"""Protocol for discovering and managing serial ports."""
19+
20+
def get_available_ports(
21+
self,
22+
) -> list[serial.tools.list_ports_common.ListPortInfo]:
23+
"""Get list of available serial ports."""
24+
... # pylint: disable=unnecessary-ellipsis
25+
26+
def get_port_description(self, device: str) -> str:
27+
"""Get description for a serial port device."""
28+
... # pylint: disable=unnecessary-ellipsis
29+
30+
31+
class SystemSerialPortDiscovery:
32+
"""Real implementation using PySerial for hardware discovery."""
33+
34+
def get_available_ports(
35+
self,
36+
) -> list[serial.tools.list_ports_common.ListPortInfo]:
37+
"""Get actual serial ports from system."""
38+
return list(serial.tools.list_ports.comports())
39+
40+
def get_port_description(self, device: str) -> str:
41+
"""Get port description from system."""
42+
for port in serial.tools.list_ports.comports():
43+
if port.device == device:
44+
return str(port.description)
45+
return device
46+
47+
48+
class FakeSerialPortDiscovery:
49+
"""Mock implementation for testing without physical hardware."""
50+
51+
def __init__(self) -> None:
52+
"""Initialize with empty port list."""
53+
self._ports: list[serial.tools.list_ports_common.ListPortInfo] = []
54+
55+
def get_available_ports(
56+
self,
57+
) -> list[serial.tools.list_ports_common.ListPortInfo]:
58+
"""Return configured fake ports."""
59+
return self._ports.copy()
60+
61+
def get_port_description(self, device: str) -> str:
62+
"""Return fake port description."""
63+
for port in self._ports:
64+
if port.device == device:
65+
return str(port.description)
66+
return f"Test Port {device}"
67+
68+
def add_port(
69+
self,
70+
device: str,
71+
description: str = "Test Serial Port",
72+
manufacturer: str = "Test Manufacturer",
73+
) -> None:
74+
"""Add a fake port for testing."""
75+
port_info = _create_mock_port_info(device, description, manufacturer)
76+
self._ports.append(port_info)
77+
78+
def clear_ports(self) -> None:
79+
"""Clear all configured fake ports."""
80+
self._ports.clear()
81+
82+
83+
def _create_mock_port_info(device: str, description: str, manufacturer: str) -> serial.tools.list_ports_common.ListPortInfo:
84+
"""Create a mock ListPortInfo object for testing."""
85+
port_info = serial.tools.list_ports_common.ListPortInfo(device)
86+
port_info.description = description
87+
port_info.manufacturer = manufacturer
88+
return port_info

ardupilot_methodic_configurator/data_model_par_dict.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,36 @@
2424
PARAM_NAME_MAX_LEN = 16
2525

2626

27+
def validate_param_name(param_name: str) -> tuple[bool, str]:
28+
"""
29+
Validate parameter name according to ArduPilot standards.
30+
31+
Args:
32+
param_name: The parameter name to validate
33+
34+
Returns:
35+
tuple[bool, str]: (is_valid, error_message)
36+
is_valid: True if valid, False otherwise
37+
error_message: Description of validation error, empty if valid
38+
39+
"""
40+
# Check if parameter name is provided and is a string
41+
if not param_name or not isinstance(param_name, str):
42+
return False, _("Parameter name cannot be empty")
43+
44+
# Check if parameter name exceeds maximum length
45+
if len(param_name) > PARAM_NAME_MAX_LEN:
46+
msg = _("Parameter name too long (max %d characters): %s") % (PARAM_NAME_MAX_LEN, param_name)
47+
return False, msg
48+
49+
# Check if parameter name matches the required format
50+
if not re.match(PARAM_NAME_REGEX, param_name):
51+
msg = _("Invalid parameter name format (must start with capital letter, contain only A-Z, 0-9, _): %s") % param_name
52+
return False, msg
53+
54+
return True, ""
55+
56+
2757
def is_within_tolerance(x: float, y: float, atol: float = 1e-08, rtol: float = 1e-04) -> bool:
2858
"""
2959
Determines if the absolute difference between two numbers is within a specified tolerance.

0 commit comments

Comments
 (0)