1818from os import readlink as os_readlink
1919from time import sleep as time_sleep
2020from time import time as time_time
21- from typing import Any , Callable , ClassVar , NoReturn , Optional , Union
21+ from typing import TYPE_CHECKING , Any , Callable , ClassVar , NoReturn , Optional , Union
2222
2323import serial .tools .list_ports
2424import serial .tools .list_ports_common
2929from ardupilot_methodic_configurator import _
3030from ardupilot_methodic_configurator .data_model_flightcontroller_info import FlightControllerInfo
3131
32+ if TYPE_CHECKING :
33+ from ardupilot_methodic_configurator .backend_flightcontroller_protocols import MavlinkConnection
34+
3235
3336class FakeSerialForTests :
3437 """
@@ -121,7 +124,7 @@ def __init__(
121124
122125 """
123126 self .info = info
124- self .master : Optional [mavutil . mavlink_connection ] = None
127+ self .master : Optional [MavlinkConnection ] = None
125128 self .comport : Union [mavutil .SerialPort , serial .tools .list_ports_common .ListPortInfo , None ] = None
126129 self ._baudrate = baudrate
127130 self ._network_ports = network_ports or self .DEFAULT_NETWORK_PORTS
@@ -148,7 +151,7 @@ def disconnect(self) -> None:
148151 """Close the connection to the flight controller."""
149152 if self .master is not None :
150153 with contextlib .suppress (Exception ):
151- self .master .close ()
154+ self .master .close () # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
152155 self .master = None
153156
154157 def add_connection (self , connection_string : str ) -> bool :
@@ -294,7 +297,7 @@ def _create_mavlink_connection( # pylint: disable=too-many-arguments, too-many-
294297 timeout : int = 5 ,
295298 retries : int = 3 ,
296299 progress_callback : Union [None , Callable [[int , int ], None ]] = None ,
297- ) -> mavutil .mavlink_connection :
300+ ) -> mavutil .mavlink_connection : # pyright: ignore[reportGeneralTypeIssues]
298301 """
299302 Factory method for creating MAVLink connections.
300303
@@ -334,7 +337,13 @@ def _detect_vehicles_from_heartbeats(self, timeout: int) -> dict[tuple[int, int]
334337 detected_vehicles : dict [tuple [int , int ], Any ] = {}
335338
336339 while time_time () - start_time < timeout :
337- m = self .master .recv_match (type = "HEARTBEAT" , blocking = False ) if self .master else None
340+ m = (
341+ self .master .recv_match ( # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
342+ type = "HEARTBEAT" , blocking = False
343+ )
344+ if self .master
345+ else None
346+ )
338347 if m is None :
339348 time_sleep (self .HEARTBEAT_POLL_DELAY )
340349 continue
@@ -394,7 +403,13 @@ def _retrieve_autopilot_version_and_banner(self, timeout: int) -> str:
394403
395404 # Request AUTOPILOT_VERSION message
396405 self ._request_message (mavutil .mavlink .MAVLINK_MSG_ID_AUTOPILOT_VERSION )
397- m = self .master .recv_match (type = "AUTOPILOT_VERSION" , blocking = True , timeout = timeout ) if self .master else None
406+ m = (
407+ self .master .recv_match ( # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
408+ type = "AUTOPILOT_VERSION" , blocking = True , timeout = timeout
409+ )
410+ if self .master
411+ else None
412+ )
398413
399414 return self ._process_autopilot_version (m , banner_msgs )
400415
@@ -404,9 +419,9 @@ def _request_banner(self) -> None:
404419 if self .master is not None :
405420 # Note: Don't wait for ACK here as banner requests are fire-and-forget
406421 # and we handle the response via STATUS_TEXT messages
407- self .master .mav .command_long_send (
408- self .master .target_system ,
409- self .master .target_component ,
422+ self .master .mav .command_long_send ( # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
423+ self .master .target_system , # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
424+ self .master .target_component , # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
410425 mavutil .mavlink .MAV_CMD_DO_SEND_BANNER ,
411426 0 ,
412427 0 ,
@@ -429,7 +444,9 @@ def _receive_banner_text(self) -> list[str]:
429444 start_time = time_time ()
430445 banner_msgs : list [str ] = []
431446 while self .master :
432- msg = self .master .recv_match (type = "STATUSTEXT" , blocking = False )
447+ msg = self .master .recv_match ( # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
448+ type = "STATUSTEXT" , blocking = False
449+ )
433450 if msg :
434451 if banner_msgs :
435452 banner_msgs .append (msg .text )
@@ -455,7 +472,7 @@ def _request_message(self, message_id: int) -> None:
455472 system_id = int (self .info .system_id ) if self .info .system_id else 0
456473 component_id = int (self .info .component_id ) if self .info .component_id else 0
457474
458- self .master .mav .command_long_send (
475+ self .master .mav .command_long_send ( # type: ignore[union-attr] # pyright: ignore[reportAttributeAccessIssue]
459476 system_id ,
460477 component_id ,
461478 mavutil .mavlink .MAV_CMD_REQUEST_MESSAGE ,
@@ -627,27 +644,29 @@ def _extract_firmware_type_from_banner(self, banner_msgs: list[str], os_custom_v
627644
628645 return firmware_type
629646
630- def _populate_flight_controller_info (self , m : MAVLink_autopilot_version_message ) -> None :
647+ def _populate_flight_controller_info (self , m : Optional [ MAVLink_autopilot_version_message ] ) -> None :
631648 """
632649 Populate flight controller info from AUTOPILOT_VERSION message.
633650
634651 Args:
635- m: The AUTOPILOT_VERSION MAVLink message
652+ m: The AUTOPILOT_VERSION MAVLink message, or None
636653
637654 """
655+ if m is None :
656+ return
638657 self .info .set_capabilities (m .capabilities )
639658 self .info .set_flight_sw_version (m .flight_sw_version )
640659 self .info .set_usb_vendor_and_product_ids (m .vendor_id , m .product_id ) # must be done before set_board_version()
641660 self .info .set_board_version (m .board_version )
642661 self .info .set_flight_custom_version (m .flight_custom_version )
643662 self .info .set_os_custom_version (m .os_custom_version )
644663
645- def _process_autopilot_version (self , m : MAVLink_autopilot_version_message , banner_msgs : list [str ]) -> str :
664+ def _process_autopilot_version (self , m : Optional [ MAVLink_autopilot_version_message ] , banner_msgs : list [str ]) -> str :
646665 """
647666 Process AUTOPILOT_VERSION message and banner messages to extract flight controller info.
648667
649668 Args:
650- m: The AUTOPILOT_VERSION MAVLink message
669+ m: The AUTOPILOT_VERSION MAVLink message, or None if not received
651670 banner_msgs: List of banner messages received from flight controller
652671
653672 Returns:
@@ -921,7 +940,10 @@ def comport_device(self) -> str:
921940 return ""
922941 return str (self .comport .device )
923942
924- def set_master_for_testing (self , master : Optional [mavutil .mavlink_connection ]) -> None :
943+ def set_master_for_testing (
944+ self ,
945+ master : Optional [mavutil .mavlink_connection ], # pyright: ignore[reportGeneralTypeIssues]
946+ ) -> None :
925947 """
926948 Set the MAVLink connection for testing purposes.
927949
0 commit comments