|
14 | 14 | from os import path as os_path |
15 | 15 | from pathlib import Path |
16 | 16 | from time import sleep as time_sleep |
17 | | -from typing import Callable, Optional, Union |
| 17 | +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast |
18 | 18 |
|
19 | 19 | import serial.tools.list_ports_common |
20 | 20 | from pymavlink import mavutil |
|
39 | 39 | from ardupilot_methodic_configurator.data_model_flightcontroller_info import FlightControllerInfo |
40 | 40 | from ardupilot_methodic_configurator.data_model_par_dict import ParDict |
41 | 41 |
|
| 42 | +if TYPE_CHECKING: |
| 43 | + from pymavlink.dialects.v20.ardupilotmega import MAVLink_autopilot_version_message |
| 44 | + |
42 | 45 | DEFAULT_REBOOT_TIME: int = 7 |
43 | 46 |
|
44 | 47 | # Re-export constants for backwards compatibility |
45 | | -# DEFAULT_BAUDRATE and SUPPORTED_BAUDRATES are from connection module |
46 | | -# DEFAULT_REBOOT_TIME is defined in this module |
47 | 48 | __all__ = [ |
48 | 49 | "DEFAULT_BAUDRATE", |
49 | 50 | "DEFAULT_REBOOT_TIME", |
@@ -139,9 +140,13 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen |
139 | 140 | fc_parameters=None, # Let params_manager create its own fc_parameters dict |
140 | 141 | ) |
141 | 142 |
|
142 | | - self._commands_manager: FlightControllerCommandsProtocol = commands_manager or FlightControllerCommands( |
143 | | - params_manager=self._params_manager, |
144 | | - connection_manager=self._connection_manager, |
| 143 | + self._commands_manager: FlightControllerCommandsProtocol = cast( |
| 144 | + "FlightControllerCommandsProtocol", |
| 145 | + commands_manager |
| 146 | + or FlightControllerCommands( |
| 147 | + params_manager=self._params_manager, |
| 148 | + connection_manager=self._connection_manager, |
| 149 | + ), |
145 | 150 | ) |
146 | 151 |
|
147 | 152 | self._files_manager: FlightControllerFilesProtocol = files_manager or FlightControllerFiles( |
@@ -208,6 +213,26 @@ def baudrate(self) -> int: |
208 | 213 | """Get the baudrate setting.""" |
209 | 214 | return self._baudrate |
210 | 215 |
|
| 216 | + @property |
| 217 | + def PARAM_FETCH_POLL_DELAY(self) -> float: # noqa: N802 # pylint: disable=invalid-name |
| 218 | + """Get parameter fetch poll delay - delegates to params manager.""" |
| 219 | + return self._params_manager.PARAM_FETCH_POLL_DELAY |
| 220 | + |
| 221 | + @property |
| 222 | + def BATTERY_STATUS_CACHE_TIME(self) -> float: # noqa: N802 # pylint: disable=invalid-name |
| 223 | + """Get battery status cache time - delegates to commands manager.""" |
| 224 | + return self._commands_manager.BATTERY_STATUS_CACHE_TIME |
| 225 | + |
| 226 | + @property |
| 227 | + def BATTERY_STATUS_TIMEOUT(self) -> float: # noqa: N802 # pylint: disable=invalid-name |
| 228 | + """Get battery status timeout - delegates to commands manager.""" |
| 229 | + return self._commands_manager.BATTERY_STATUS_TIMEOUT |
| 230 | + |
| 231 | + @property |
| 232 | + def COMMAND_ACK_TIMEOUT(self) -> float: # noqa: N802 # pylint: disable=invalid-name |
| 233 | + """Get command acknowledgment timeout - delegates to commands manager.""" |
| 234 | + return self._commands_manager.COMMAND_ACK_TIMEOUT |
| 235 | + |
211 | 236 | @property |
212 | 237 | def fc_parameters(self) -> dict[str, float]: |
213 | 238 | """Get flight controller parameters - delegates to params manager.""" |
@@ -281,6 +306,33 @@ def add_connection(self, connection_string: str) -> bool: |
281 | 306 | """Add a new connection to the list of available connections - delegates to connection manager.""" |
282 | 307 | return self._connection_manager.add_connection(connection_string) |
283 | 308 |
|
| 309 | + # Testing-only methods (protected methods exposed for SITL integration tests) |
| 310 | + def _detect_vehicles_from_heartbeats(self, timeout: int) -> dict[tuple[int, int], Any]: |
| 311 | + """Detect vehicles from heartbeats - delegates to connection manager (testing only).""" |
| 312 | + return self._connection_manager._detect_vehicles_from_heartbeats(timeout) # noqa: SLF001 # pylint: disable=protected-access |
| 313 | + |
| 314 | + def _extract_firmware_type_from_banner(self, banner_msgs: list[str], os_custom_version_index: Optional[int]) -> str: |
| 315 | + """Extract firmware type from banner - delegates to connection manager (testing only).""" |
| 316 | + return self._connection_manager._extract_firmware_type_from_banner( # noqa: SLF001 # pylint: disable=protected-access |
| 317 | + banner_msgs, os_custom_version_index |
| 318 | + ) |
| 319 | + |
| 320 | + def _extract_chibios_version_from_banner(self, banner_msgs: list[str]) -> tuple[str, Optional[int]]: |
| 321 | + """Extract ChibiOS version from banner - delegates to connection manager (testing only).""" |
| 322 | + return self._connection_manager._extract_chibios_version_from_banner(banner_msgs) # noqa: SLF001 # pylint: disable=protected-access |
| 323 | + |
| 324 | + def _select_supported_autopilot(self, detected_vehicles: dict[tuple[int, int], Any]) -> str: |
| 325 | + """Select supported autopilot from detected vehicles - delegates to connection manager (testing only).""" |
| 326 | + return self._connection_manager._select_supported_autopilot(detected_vehicles) # noqa: SLF001 # pylint: disable=protected-access |
| 327 | + |
| 328 | + def _populate_flight_controller_info(self, m: "MAVLink_autopilot_version_message") -> None: |
| 329 | + """Populate flight controller info from autopilot version - delegates to connection manager (testing only).""" |
| 330 | + self._connection_manager._populate_flight_controller_info(m) # noqa: SLF001 # pylint: disable=protected-access |
| 331 | + |
| 332 | + def _retrieve_autopilot_version_and_banner(self, timeout: int) -> str: |
| 333 | + """Retrieve autopilot version and banner - delegates to connection manager (testing only).""" |
| 334 | + return self._connection_manager._retrieve_autopilot_version_and_banner(timeout) # noqa: SLF001 # pylint: disable=protected-access |
| 335 | + |
284 | 336 | def register_and_try_connect( |
285 | 337 | self, |
286 | 338 | comport: Union[mavutil.SerialPort, serial.tools.list_ports_common.ListPortInfo], |
|
0 commit comments