Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 176 additions & 1 deletion ardupilot_methodic_configurator/backend_flightcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
SPDX-License-Identifier: GPL-3.0-or-later
"""

import os
from argparse import ArgumentParser
from logging import debug as logging_debug
from logging import error as logging_error
Expand Down Expand Up @@ -87,7 +88,7 @@ def close(self) -> None:
]


class FlightController:
class FlightController: # pylint: disable=too-many-public-methods
"""
A class to manage the connection and parameters of a flight controller.

Expand Down Expand Up @@ -1163,6 +1164,180 @@ def put_progress_callback(completion: float) -> None:
ret.display_message()
return ret.error_code == 0

def download_last_flight_log(
self, local_filename: str, progress_callback: Union[None, Callable[[int, int], None]] = None
) -> bool:
"""Download the last flight log from the flight controller."""
if self.master is None:
error_msg = _("No flight controller connected")
logging_error(error_msg)
return False
if not self.info.is_mavftp_supported:
error_msg = _("MAVFTP is not supported by the flight controller")
logging_error(error_msg)
return False

mavftp = MAVFTP(self.master, target_system=self.master.target_system, target_component=self.master.target_component)

def get_progress_callback(completion: float) -> None:
if progress_callback is not None and completion is not None:
progress_callback(int(completion * 100), 100)

try:
# Try to get the last log number using different methods
remote_filenumber = self._get_last_log_number(mavftp)
if remote_filenumber is None:
return False

# We want the previous log, not the current one (which might be incomplete)
# remote_filenumber -= 1
# if remote_filenumber < 1:
# logging_error(_("No previous flight log available"))
# return False

return self._download_log_file(mavftp, remote_filenumber, local_filename, get_progress_callback)

except Exception as e: # pylint: disable=broad-exception-caught
logging_error(_("Error during flight log download: %s"), str(e))
return False

def _get_last_log_number(self, mavftp: MAVFTP) -> Union[int, None]:
"""Get the last log number using multiple fallback methods."""
# Method 1: Try to get LASTLOG.TXT
log_number = self._get_log_number_from_lastlog_txt(mavftp)
if log_number is not None:
return log_number

# Method 2: Try to list the logs directory and find the highest numbered log
log_number = self._get_log_number_from_directory_listing(mavftp)
if log_number is not None:
return log_number

# Method 3: Try common log numbers (scan backwards from a reasonable max)
log_number = self._get_log_number_by_scanning(mavftp)
if log_number is not None:
return log_number

logging_error(_("Could not determine the last log number using any method"))
return None

def _get_log_number_from_lastlog_txt(self, mavftp: MAVFTP) -> Union[int, None]:
"""Try to get the log number from LASTLOG.TXT file."""
logging_info(_("Trying to get log number from LASTLOG.TXT"))
try:
temp_lastlog_file = "temp_lastlog.txt"
mavftp.cmd_get(["/APM/LOGS/LASTLOG.TXT", temp_lastlog_file])
ret = mavftp.process_ftp_reply("OpenFileRO", timeout=10)
if ret.error_code != 0:
logging_warning(_("LASTLOG.TXT not available, trying alternative methods"))
return None

return self._extract_log_number_from_file(temp_lastlog_file)
except Exception as e: # pylint: disable=broad-exception-caught
logging_warning(_("Failed to get log number from LASTLOG.TXT: %s"), str(e))
return None

def _get_log_number_from_directory_listing(self, _mavftp: MAVFTP) -> Union[int, None]:
"""Try to get the highest log number by listing the logs directory using MAVFTP."""
logging_info(_("Trying to get log number from directory listing"))
try:
result = _mavftp.cmd_list(["/APM/LOGS/"])
if not hasattr(result, "directory_listing") or not isinstance(result.directory_listing, dict):
logging_error(_("No directory listing found in MAVFTPReturn"))
return None
highest = -1
for name in result.directory_listing:
# Typical log file names: 00000036.BIN, 00000037.BIN, etc.
if name.endswith(".BIN") and name[:8].isdigit():
try:
log_num = int(name[:8])
highest = max(highest, log_num)
except ValueError:
continue
if highest != -1:
logging_info(_("Highest log number found: %d"), highest)
return highest
logging_error(_("No log files found in directory listing"))
return None
except Exception as e: # pylint: disable=broad-exception-caught
logging_warning(_("Failed to get log number from directory listing: %s"), str(e))
return None

def _get_log_number_by_scanning(self, mavftp: MAVFTP) -> Union[int, None]:
"""Try to find the last log using binary search for efficiency."""
logging_info(_("Trying to find log number using binary search"))
try:
# Binary search to find the highest log number
low = 1
high = 9999 # Reasonable upper bound for log numbers

Copilot AI Sep 7, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 9999 should be defined as a named constant (e.g., MAX_LOG_NUMBER = 9999) to improve maintainability and make the upper bound configurable.

Copilot uses AI. Check for mistakes.
last_found = None

while low <= high:
mid = (low + high) // 2
remote_filename = f"/APM/LOGS/{mid:08}.BIN"

# Test if this log file exists
temp_test_file = f"temp_test_{mid}.tmp"
mavftp.cmd_get([remote_filename, temp_test_file])
ret = mavftp.process_ftp_reply("OpenFileRO", timeout=5) # Must be > idle_detection_time (3.7s)

# Clean up the temp file if it was created
if os.path.exists(temp_test_file):
os.remove(temp_test_file)

if ret.error_code == 0:
# File exists, search in upper half
last_found = mid
low = mid + 1
logging_debug(_("Log %d exists, searching higher"), mid)
else:
# File doesn't exist, search in lower half
high = mid - 1
logging_debug(_("Log %d doesn't exist, searching lower"), mid)

if last_found is not None:
logging_info(_("Found highest log number using binary search: %d"), last_found)
return last_found

logging_warning(_("No log files found using binary search"))
return None

except Exception as e: # pylint: disable=broad-exception-caught
logging_warning(_("Failed to scan for log numbers using binary search: %s"), str(e))
return None

def _download_log_file(
self, mavftp: MAVFTP, remote_filenumber: int, local_filename: str, get_progress_callback: Callable
) -> bool:
"""Download the actual log file from the flight controller."""
remote_filename = f"/APM/LOGS/{remote_filenumber:08}.BIN"
logging_info(_("Downloading flight log %s to %s"), remote_filename, local_filename)

# Download the actual log file
mavftp.cmd_get([remote_filename, local_filename], progress_callback=get_progress_callback)
ret = mavftp.process_ftp_reply("OpenFileRO", timeout=0) # No timeout for large log files

Copilot AI Sep 7, 2025

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using timeout=0 (infinite timeout) for large log files could cause the application to hang indefinitely if there are network issues. Consider using a reasonable large timeout value instead.

Copilot uses AI. Check for mistakes.
if ret.error_code != 0:
logging_error(_("Failed to download flight log %s"), remote_filename)
ret.display_message()
return False

logging_info(_("Successfully downloaded flight log to %s"), local_filename)
return True

def _extract_log_number_from_file(self, temp_lastlog_file: str) -> Union[int, None]:
"""Extract log number from LASTLOG.TXT file and clean up the temporary file."""
try:
with open(temp_lastlog_file, encoding="UTF-8") as file:
file_contents = file.readline()
return int(file_contents.strip())
except (FileNotFoundError, ValueError) as e:
logging_error(_("Could not extract last log file number from LASTLOG.TXT: %s"), e)
return None
finally:
# Clean up the temporary file
if os.path.exists(temp_lastlog_file):
os.remove(temp_lastlog_file)

@staticmethod
def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser:
parser.add_argument(
Expand Down
31 changes: 26 additions & 5 deletions ardupilot_methodic_configurator/backend_mavftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,17 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen
invalid_error_code: int = 0,
invalid_opcode: int = 0,
invalid_payload_size: int = 0,
directory_listing: Union[dict[str, int], None] = None,
) -> None:
self.operation_name = operation_name
self.error_code = error_code
self.system_error = system_error
self.invalid_error_code = invalid_error_code
self.invalid_opcode = invalid_opcode
self.invalid_payload_size = invalid_payload_size
self.directory_listing = directory_listing

def display_message(self) -> None: # pylint: disable=too-many-branches
def display_message(self) -> None: # pylint: disable=too-many-branches, too-many-statements # noqa: C901, PLR0912, PLR0915
if self.error_code == ERR_None:
logging.info("%s succeeded", self.operation_name)
elif self.error_code == ERR_Fail:
Expand Down Expand Up @@ -284,6 +286,16 @@ def display_message(self) -> None: # pylint: disable=too-many-branches
else:
logging.error("%s failed, unknown error %u in display_message()", self.operation_name, self.error_code)

if self.directory_listing is not None:
total_size = 0
for name, size in self.directory_listing.items():
if size == -1: # directories are defined by a size of -1
logging.info(" %s/", name)
else:
logging.info(" %s\t%u", name, size)
total_size += max(0, size)
logging.info("Total size %.2f kByte", total_size / 1024.0)

@property
def return_code(self) -> int:
return self.error_code
Expand Down Expand Up @@ -357,6 +369,7 @@ def __init__(
self.write_pending = 0
self.write_last_send: Union[None, float] = None
self.open_retries = 0
self.directory_listing: dict[str, int] = {}

self.master = master
self.target_system = target_system
Expand Down Expand Up @@ -464,6 +477,7 @@ def cmd_list(self, args) -> MAVFTPReturn:
self.dir_offset = 0
op = FTP_OP(self.seq, self.session, OP_ListDirectory, len(enc_dname), 0, 0, self.dir_offset, enc_dname)
self.__send(op)
self.directory_listing = {}
return self.process_ftp_reply("ListDirectory")

def __handle_list_reply(self, op, _m) -> MAVFTPReturn:
Expand All @@ -477,13 +491,20 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn:
self.dir_offset += 1
try:
d = str(d, "ascii") # noqa: PLW2901
except Exception: # noqa: S112 pylint: disable=broad-exception-caught
except (TypeError, UnicodeDecodeError):
continue
if d[0] == "D":
logging.info(" D %s", d[1:])
name = d[1:]
self.directory_listing[name] = -1 # directories are defined by a size of -1
logging.info(" D %s", name)
elif d[0] == "F":
(name, size) = d[1:].split("\t")
size_int = int(size)
try:
size_int = int(size)
except (ValueError, TypeError, OverflowError):
logging.error("Invalid file size: %s", size)
size_int = 0
self.directory_listing[name] = size_int
self.total_size += size_int
logging.info(" %s\t%u", name, size_int)
else:
Expand All @@ -497,7 +518,7 @@ def __handle_list_reply(self, op, _m) -> MAVFTPReturn:
self.total_size = 0
else:
return self.__decode_ftp_ack_and_nack(op)
return MAVFTPReturn("ListDirectory", ERR_None)
return MAVFTPReturn("ListDirectory", ERR_None, directory_listing=self.directory_listing)

def cmd_get(self, args, callback=None, progress_callback=None) -> MAVFTPReturn:
"""Get file."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

import sys
import threading
import time
import tkinter as tk
from argparse import ArgumentParser, Namespace
Expand Down Expand Up @@ -373,6 +374,28 @@ def __create_parameter_area_widgets(self) -> None:
else _("No flight controller connected, upload not available"),
)

# Create download last flight log button
download_log_button = ttk.Button(
buttons_frame,
text=_("Download last flight log"),
command=self.on_download_last_flight_log_click,
)
download_log_button.configure(
state=(
"normal" if (self.flight_controller.master and self.flight_controller.info.is_mavftp_supported) else "disabled"
)
)
download_log_button.pack(side=tk.LEFT, padx=(8, 8)) # Add padding on both sides of the download log button
show_tooltip(
download_log_button,
_(
"Download the last flight log from the flight controller\n"
"This will save the previous flight log to a file on your computer for analysis"
)
if (self.flight_controller.master and self.flight_controller.info.is_mavftp_supported)
else _("No flight controller connected or MAVFTP not supported"),
)

# Create skip button
self.skip_button = ttk.Button(buttons_frame, text=_("Skip parameter file"), command=self.on_skip_click)
self.skip_button.configure(
Expand Down Expand Up @@ -935,6 +958,61 @@ def _export_fc_params_missing_or_different_in_amc_files(self, fc_parameters: Par
else:
logging_info(_("No FC parameters are missing or different from AMC parameter files"))

def on_download_last_flight_log_click(self) -> None:
"""Handle the download last flight log button click."""
if not self.flight_controller.master:
messagebox.showerror(_("Error"), _("No flight controller connected"))
return

if not self.flight_controller.info.is_mavftp_supported:
messagebox.showerror(_("Error"), _("MAVFTP is not supported by the flight controller"))
return

# Show file dialog to select where to save the log file
filename = filedialog.asksaveasfilename(
title=_("Save flight log as"),
defaultextension=".bin",
filetypes=[
(_("Binary log files"), "*.bin"),
(_("All files"), "*.*"),
],
)

if not filename: # User cancelled the dialog
return

# Create a progress window for the download
progress_window = ProgressWindow(
self.root,
_("Downloading Flight Log"),
_("Downloaded {}% from {}%"),
)

# Start the download in a separate thread to avoid blocking the GUI
def download_thread() -> None:
try:
success = self.flight_controller.download_last_flight_log(filename, progress_window.update_progress_bar)
if success:
self.root.after(
0,
lambda: messagebox.showinfo(_("Success"), _("Flight log downloaded successfully to:\n%s") % filename),
)
else:
self.root.after(
0,
lambda: messagebox.showerror(
_("Error"), _("Failed to download flight log. Check the console for details.")
),
)
except Exception as e: # pylint: disable=broad-exception-caught
error_msg = str(e)
self.root.after(0, lambda: messagebox.showerror(_("Error"), _("Download error: %s") % error_msg))
finally:
self.root.after(0, progress_window.destroy)

download_thread_obj = threading.Thread(target=download_thread, daemon=True)
download_thread_obj.start()

def _configuration_step_is_optional(self, file_name: str, threshold_pct: int = 20) -> bool:
# Check if the configuration step for the given file is optional
mandatory_text, _mandatory_url = self.local_filesystem.get_documentation_text_and_url(file_name, "mandatory")
Expand Down
Loading