diff --git a/ardupilot_methodic_configurator/backend_flightcontroller.py b/ardupilot_methodic_configurator/backend_flightcontroller.py index 5cbeb5267..6fe9968b0 100644 --- a/ardupilot_methodic_configurator/backend_flightcontroller.py +++ b/ardupilot_methodic_configurator/backend_flightcontroller.py @@ -8,6 +8,7 @@ SPDX-License-Identifier: GPL-3.0-or-later """ +import os from argparse import ArgumentParser from logging import debug as logging_debug from logging import error as logging_error @@ -87,7 +88,7 @@ def close(self) -> None: ] -class FlightController: +class FlightController: # pylint: disable=too-many-public-methods """ A class to manage the connection and parameters of a flight controller. @@ -1163,6 +1164,180 @@ def put_progress_callback(completion: float) -> None: ret.display_message() return ret.error_code == 0 + def download_last_flight_log( + self, local_filename: str, progress_callback: Union[None, Callable[[int, int], None]] = None + ) -> bool: + """Download the last flight log from the flight controller.""" + if self.master is None: + error_msg = _("No flight controller connected") + logging_error(error_msg) + return False + if not self.info.is_mavftp_supported: + error_msg = _("MAVFTP is not supported by the flight controller") + logging_error(error_msg) + return False + + mavftp = MAVFTP(self.master, target_system=self.master.target_system, target_component=self.master.target_component) + + def get_progress_callback(completion: float) -> None: + if progress_callback is not None and completion is not None: + progress_callback(int(completion * 100), 100) + + try: + # Try to get the last log number using different methods + remote_filenumber = self._get_last_log_number(mavftp) + if remote_filenumber is None: + return False + + # We want the previous log, not the current one (which might be incomplete) + # remote_filenumber -= 1 + # if remote_filenumber < 1: + # logging_error(_("No previous flight log available")) + # return False + + return self._download_log_file(mavftp, remote_filenumber, local_filename, get_progress_callback) + + except Exception as e: # pylint: disable=broad-exception-caught + logging_error(_("Error during flight log download: %s"), str(e)) + return False + + def _get_last_log_number(self, mavftp: MAVFTP) -> Union[int, None]: + """Get the last log number using multiple fallback methods.""" + # Method 1: Try to get LASTLOG.TXT + log_number = self._get_log_number_from_lastlog_txt(mavftp) + if log_number is not None: + return log_number + + # Method 2: Try to list the logs directory and find the highest numbered log + log_number = self._get_log_number_from_directory_listing(mavftp) + if log_number is not None: + return log_number + + # Method 3: Try common log numbers (scan backwards from a reasonable max) + log_number = self._get_log_number_by_scanning(mavftp) + if log_number is not None: + return log_number + + logging_error(_("Could not determine the last log number using any method")) + return None + + def _get_log_number_from_lastlog_txt(self, mavftp: MAVFTP) -> Union[int, None]: + """Try to get the log number from LASTLOG.TXT file.""" + logging_info(_("Trying to get log number from LASTLOG.TXT")) + try: + temp_lastlog_file = "temp_lastlog.txt" + mavftp.cmd_get(["/APM/LOGS/LASTLOG.TXT", temp_lastlog_file]) + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=10) + if ret.error_code != 0: + logging_warning(_("LASTLOG.TXT not available, trying alternative methods")) + return None + + return self._extract_log_number_from_file(temp_lastlog_file) + except Exception as e: # pylint: disable=broad-exception-caught + logging_warning(_("Failed to get log number from LASTLOG.TXT: %s"), str(e)) + return None + + def _get_log_number_from_directory_listing(self, _mavftp: MAVFTP) -> Union[int, None]: + """Try to get the highest log number by listing the logs directory using MAVFTP.""" + logging_info(_("Trying to get log number from directory listing")) + try: + result = _mavftp.cmd_list(["/APM/LOGS/"]) + if not hasattr(result, "directory_listing") or not isinstance(result.directory_listing, dict): + logging_error(_("No directory listing found in MAVFTPReturn")) + return None + highest = -1 + for name in result.directory_listing: + # Typical log file names: 00000036.BIN, 00000037.BIN, etc. + if name.endswith(".BIN") and name[:8].isdigit(): + try: + log_num = int(name[:8]) + highest = max(highest, log_num) + except ValueError: + continue + if highest != -1: + logging_info(_("Highest log number found: %d"), highest) + return highest + logging_error(_("No log files found in directory listing")) + return None + except Exception as e: # pylint: disable=broad-exception-caught + logging_warning(_("Failed to get log number from directory listing: %s"), str(e)) + return None + + def _get_log_number_by_scanning(self, mavftp: MAVFTP) -> Union[int, None]: + """Try to find the last log using binary search for efficiency.""" + logging_info(_("Trying to find log number using binary search")) + try: + # Binary search to find the highest log number + low = 1 + high = 9999 # Reasonable upper bound for log numbers + last_found = None + + while low <= high: + mid = (low + high) // 2 + remote_filename = f"/APM/LOGS/{mid:08}.BIN" + + # Test if this log file exists + temp_test_file = f"temp_test_{mid}.tmp" + mavftp.cmd_get([remote_filename, temp_test_file]) + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=5) # Must be > idle_detection_time (3.7s) + + # Clean up the temp file if it was created + if os.path.exists(temp_test_file): + os.remove(temp_test_file) + + if ret.error_code == 0: + # File exists, search in upper half + last_found = mid + low = mid + 1 + logging_debug(_("Log %d exists, searching higher"), mid) + else: + # File doesn't exist, search in lower half + high = mid - 1 + logging_debug(_("Log %d doesn't exist, searching lower"), mid) + + if last_found is not None: + logging_info(_("Found highest log number using binary search: %d"), last_found) + return last_found + + logging_warning(_("No log files found using binary search")) + return None + + except Exception as e: # pylint: disable=broad-exception-caught + logging_warning(_("Failed to scan for log numbers using binary search: %s"), str(e)) + return None + + def _download_log_file( + self, mavftp: MAVFTP, remote_filenumber: int, local_filename: str, get_progress_callback: Callable + ) -> bool: + """Download the actual log file from the flight controller.""" + remote_filename = f"/APM/LOGS/{remote_filenumber:08}.BIN" + logging_info(_("Downloading flight log %s to %s"), remote_filename, local_filename) + + # Download the actual log file + mavftp.cmd_get([remote_filename, local_filename], progress_callback=get_progress_callback) + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=0) # No timeout for large log files + if ret.error_code != 0: + logging_error(_("Failed to download flight log %s"), remote_filename) + ret.display_message() + return False + + logging_info(_("Successfully downloaded flight log to %s"), local_filename) + return True + + def _extract_log_number_from_file(self, temp_lastlog_file: str) -> Union[int, None]: + """Extract log number from LASTLOG.TXT file and clean up the temporary file.""" + try: + with open(temp_lastlog_file, encoding="UTF-8") as file: + file_contents = file.readline() + return int(file_contents.strip()) + except (FileNotFoundError, ValueError) as e: + logging_error(_("Could not extract last log file number from LASTLOG.TXT: %s"), e) + return None + finally: + # Clean up the temporary file + if os.path.exists(temp_lastlog_file): + os.remove(temp_lastlog_file) + @staticmethod def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser: parser.add_argument( diff --git a/ardupilot_methodic_configurator/backend_mavftp.py b/ardupilot_methodic_configurator/backend_mavftp.py index 2883c0a92..ffa15f1b9 100755 --- a/ardupilot_methodic_configurator/backend_mavftp.py +++ b/ardupilot_methodic_configurator/backend_mavftp.py @@ -229,6 +229,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen invalid_error_code: int = 0, invalid_opcode: int = 0, invalid_payload_size: int = 0, + directory_listing: Union[dict[str, int], None] = None, ) -> None: self.operation_name = operation_name self.error_code = error_code @@ -236,8 +237,9 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen self.invalid_error_code = invalid_error_code self.invalid_opcode = invalid_opcode self.invalid_payload_size = invalid_payload_size + self.directory_listing = directory_listing - def display_message(self) -> None: # pylint: disable=too-many-branches + def display_message(self) -> None: # pylint: disable=too-many-branches, too-many-statements # noqa: C901, PLR0912, PLR0915 if self.error_code == ERR_None: logging.info("%s succeeded", self.operation_name) elif self.error_code == ERR_Fail: @@ -284,6 +286,16 @@ def display_message(self) -> None: # pylint: disable=too-many-branches else: logging.error("%s failed, unknown error %u in display_message()", self.operation_name, self.error_code) + if self.directory_listing is not None: + total_size = 0 + for name, size in self.directory_listing.items(): + if size == -1: # directories are defined by a size of -1 + logging.info(" %s/", name) + else: + logging.info(" %s\t%u", name, size) + total_size += max(0, size) + logging.info("Total size %.2f kByte", total_size / 1024.0) + @property def return_code(self) -> int: return self.error_code @@ -357,6 +369,7 @@ def __init__( self.write_pending = 0 self.write_last_send: Union[None, float] = None self.open_retries = 0 + self.directory_listing: dict[str, int] = {} self.master = master self.target_system = target_system @@ -464,6 +477,7 @@ def cmd_list(self, args) -> MAVFTPReturn: self.dir_offset = 0 op = FTP_OP(self.seq, self.session, OP_ListDirectory, len(enc_dname), 0, 0, self.dir_offset, enc_dname) self.__send(op) + self.directory_listing = {} return self.process_ftp_reply("ListDirectory") def __handle_list_reply(self, op, _m) -> MAVFTPReturn: @@ -477,13 +491,20 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn: self.dir_offset += 1 try: d = str(d, "ascii") # noqa: PLW2901 - except Exception: # noqa: S112 pylint: disable=broad-exception-caught + except (TypeError, UnicodeDecodeError): continue if d[0] == "D": - logging.info(" D %s", d[1:]) + name = d[1:] + self.directory_listing[name] = -1 # directories are defined by a size of -1 + logging.info(" D %s", name) elif d[0] == "F": (name, size) = d[1:].split("\t") - size_int = int(size) + try: + size_int = int(size) + except (ValueError, TypeError, OverflowError): + logging.error("Invalid file size: %s", size) + size_int = 0 + self.directory_listing[name] = size_int self.total_size += size_int logging.info(" %s\t%u", name, size_int) else: @@ -497,7 +518,7 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn: self.total_size = 0 else: return self.__decode_ftp_ack_and_nack(op) - return MAVFTPReturn("ListDirectory", ERR_None) + return MAVFTPReturn("ListDirectory", ERR_None, directory_listing=self.directory_listing) def cmd_get(self, args, callback=None, progress_callback=None) -> MAVFTPReturn: """Get file.""" diff --git a/ardupilot_methodic_configurator/frontend_tkinter_parameter_editor.py b/ardupilot_methodic_configurator/frontend_tkinter_parameter_editor.py index 315d2abc0..e7a71b626 100755 --- a/ardupilot_methodic_configurator/frontend_tkinter_parameter_editor.py +++ b/ardupilot_methodic_configurator/frontend_tkinter_parameter_editor.py @@ -11,6 +11,7 @@ """ import sys +import threading import time import tkinter as tk from argparse import ArgumentParser, Namespace @@ -373,6 +374,28 @@ def __create_parameter_area_widgets(self) -> None: else _("No flight controller connected, upload not available"), ) + # Create download last flight log button + download_log_button = ttk.Button( + buttons_frame, + text=_("Download last flight log"), + command=self.on_download_last_flight_log_click, + ) + download_log_button.configure( + state=( + "normal" if (self.flight_controller.master and self.flight_controller.info.is_mavftp_supported) else "disabled" + ) + ) + download_log_button.pack(side=tk.LEFT, padx=(8, 8)) # Add padding on both sides of the download log button + show_tooltip( + download_log_button, + _( + "Download the last flight log from the flight controller\n" + "This will save the previous flight log to a file on your computer for analysis" + ) + if (self.flight_controller.master and self.flight_controller.info.is_mavftp_supported) + else _("No flight controller connected or MAVFTP not supported"), + ) + # Create skip button self.skip_button = ttk.Button(buttons_frame, text=_("Skip parameter file"), command=self.on_skip_click) self.skip_button.configure( @@ -935,6 +958,61 @@ def _export_fc_params_missing_or_different_in_amc_files(self, fc_parameters: Par else: logging_info(_("No FC parameters are missing or different from AMC parameter files")) + def on_download_last_flight_log_click(self) -> None: + """Handle the download last flight log button click.""" + if not self.flight_controller.master: + messagebox.showerror(_("Error"), _("No flight controller connected")) + return + + if not self.flight_controller.info.is_mavftp_supported: + messagebox.showerror(_("Error"), _("MAVFTP is not supported by the flight controller")) + return + + # Show file dialog to select where to save the log file + filename = filedialog.asksaveasfilename( + title=_("Save flight log as"), + defaultextension=".bin", + filetypes=[ + (_("Binary log files"), "*.bin"), + (_("All files"), "*.*"), + ], + ) + + if not filename: # User cancelled the dialog + return + + # Create a progress window for the download + progress_window = ProgressWindow( + self.root, + _("Downloading Flight Log"), + _("Downloaded {}% from {}%"), + ) + + # Start the download in a separate thread to avoid blocking the GUI + def download_thread() -> None: + try: + success = self.flight_controller.download_last_flight_log(filename, progress_window.update_progress_bar) + if success: + self.root.after( + 0, + lambda: messagebox.showinfo(_("Success"), _("Flight log downloaded successfully to:\n%s") % filename), + ) + else: + self.root.after( + 0, + lambda: messagebox.showerror( + _("Error"), _("Failed to download flight log. Check the console for details.") + ), + ) + except Exception as e: # pylint: disable=broad-exception-caught + error_msg = str(e) + self.root.after(0, lambda: messagebox.showerror(_("Error"), _("Download error: %s") % error_msg)) + finally: + self.root.after(0, progress_window.destroy) + + download_thread_obj = threading.Thread(target=download_thread, daemon=True) + download_thread_obj.start() + def _configuration_step_is_optional(self, file_name: str, threshold_pct: int = 20) -> bool: # Check if the configuration step for the given file is optional mandatory_text, _mandatory_url = self.local_filesystem.get_documentation_text_and_url(file_name, "mandatory")