-
Notifications
You must be signed in to change notification settings - Fork 64
Add button to download the last .bin file from the FC #774
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| SPDX-License-Identifier: GPL-3.0-or-later | ||
| """ | ||
|
|
||
| import os | ||
| from argparse import ArgumentParser | ||
| from logging import debug as logging_debug | ||
| from logging import error as logging_error | ||
|
|
@@ -87,7 +88,7 @@ def close(self) -> None: | |
| ] | ||
|
|
||
|
|
||
| class FlightController: | ||
| class FlightController: # pylint: disable=too-many-public-methods | ||
| """ | ||
| A class to manage the connection and parameters of a flight controller. | ||
|
|
||
|
|
@@ -1163,6 +1164,180 @@ def put_progress_callback(completion: float) -> None: | |
| ret.display_message() | ||
| return ret.error_code == 0 | ||
|
|
||
| def download_last_flight_log( | ||
| self, local_filename: str, progress_callback: Union[None, Callable[[int, int], None]] = None | ||
| ) -> bool: | ||
| """Download the last flight log from the flight controller.""" | ||
| if self.master is None: | ||
| error_msg = _("No flight controller connected") | ||
| logging_error(error_msg) | ||
| return False | ||
| if not self.info.is_mavftp_supported: | ||
| error_msg = _("MAVFTP is not supported by the flight controller") | ||
| logging_error(error_msg) | ||
| return False | ||
|
|
||
| mavftp = MAVFTP(self.master, target_system=self.master.target_system, target_component=self.master.target_component) | ||
|
|
||
| def get_progress_callback(completion: float) -> None: | ||
| if progress_callback is not None and completion is not None: | ||
| progress_callback(int(completion * 100), 100) | ||
|
|
||
| try: | ||
| # Try to get the last log number using different methods | ||
| remote_filenumber = self._get_last_log_number(mavftp) | ||
| if remote_filenumber is None: | ||
| return False | ||
|
|
||
| # We want the previous log, not the current one (which might be incomplete) | ||
| # remote_filenumber -= 1 | ||
| # if remote_filenumber < 1: | ||
| # logging_error(_("No previous flight log available")) | ||
| # return False | ||
|
|
||
| return self._download_log_file(mavftp, remote_filenumber, local_filename, get_progress_callback) | ||
|
|
||
| except Exception as e: # pylint: disable=broad-exception-caught | ||
| logging_error(_("Error during flight log download: %s"), str(e)) | ||
| return False | ||
|
|
||
| def _get_last_log_number(self, mavftp: MAVFTP) -> Union[int, None]: | ||
| """Get the last log number using multiple fallback methods.""" | ||
| # Method 1: Try to get LASTLOG.TXT | ||
| log_number = self._get_log_number_from_lastlog_txt(mavftp) | ||
| if log_number is not None: | ||
| return log_number | ||
|
|
||
| # Method 2: Try to list the logs directory and find the highest numbered log | ||
| log_number = self._get_log_number_from_directory_listing(mavftp) | ||
| if log_number is not None: | ||
| return log_number | ||
|
|
||
| # Method 3: Try common log numbers (scan backwards from a reasonable max) | ||
| log_number = self._get_log_number_by_scanning(mavftp) | ||
| if log_number is not None: | ||
| return log_number | ||
|
|
||
| logging_error(_("Could not determine the last log number using any method")) | ||
| return None | ||
|
|
||
| def _get_log_number_from_lastlog_txt(self, mavftp: MAVFTP) -> Union[int, None]: | ||
| """Try to get the log number from LASTLOG.TXT file.""" | ||
| logging_info(_("Trying to get log number from LASTLOG.TXT")) | ||
| try: | ||
| temp_lastlog_file = "temp_lastlog.txt" | ||
| mavftp.cmd_get(["/APM/LOGS/LASTLOG.TXT", temp_lastlog_file]) | ||
| ret = mavftp.process_ftp_reply("OpenFileRO", timeout=10) | ||
| if ret.error_code != 0: | ||
| logging_warning(_("LASTLOG.TXT not available, trying alternative methods")) | ||
| return None | ||
|
|
||
| return self._extract_log_number_from_file(temp_lastlog_file) | ||
| except Exception as e: # pylint: disable=broad-exception-caught | ||
| logging_warning(_("Failed to get log number from LASTLOG.TXT: %s"), str(e)) | ||
| return None | ||
|
|
||
| def _get_log_number_from_directory_listing(self, _mavftp: MAVFTP) -> Union[int, None]: | ||
| """Try to get the highest log number by listing the logs directory using MAVFTP.""" | ||
| logging_info(_("Trying to get log number from directory listing")) | ||
| try: | ||
| result = _mavftp.cmd_list(["/APM/LOGS/"]) | ||
| if not hasattr(result, "directory_listing") or not isinstance(result.directory_listing, dict): | ||
| logging_error(_("No directory listing found in MAVFTPReturn")) | ||
| return None | ||
| highest = -1 | ||
| for name in result.directory_listing: | ||
| # Typical log file names: 00000036.BIN, 00000037.BIN, etc. | ||
| if name.endswith(".BIN") and name[:8].isdigit(): | ||
| try: | ||
| log_num = int(name[:8]) | ||
| highest = max(highest, log_num) | ||
| except ValueError: | ||
| continue | ||
| if highest != -1: | ||
| logging_info(_("Highest log number found: %d"), highest) | ||
| return highest | ||
| logging_error(_("No log files found in directory listing")) | ||
| return None | ||
| except Exception as e: # pylint: disable=broad-exception-caught | ||
| logging_warning(_("Failed to get log number from directory listing: %s"), str(e)) | ||
| return None | ||
|
|
||
| def _get_log_number_by_scanning(self, mavftp: MAVFTP) -> Union[int, None]: | ||
| """Try to find the last log using binary search for efficiency.""" | ||
| logging_info(_("Trying to find log number using binary search")) | ||
| try: | ||
| # Binary search to find the highest log number | ||
| low = 1 | ||
| high = 9999 # Reasonable upper bound for log numbers | ||
| last_found = None | ||
|
|
||
| while low <= high: | ||
| mid = (low + high) // 2 | ||
| remote_filename = f"/APM/LOGS/{mid:08}.BIN" | ||
|
|
||
| # Test if this log file exists | ||
| temp_test_file = f"temp_test_{mid}.tmp" | ||
| mavftp.cmd_get([remote_filename, temp_test_file]) | ||
| ret = mavftp.process_ftp_reply("OpenFileRO", timeout=5) # Must be > idle_detection_time (3.7s) | ||
|
|
||
| # Clean up the temp file if it was created | ||
| if os.path.exists(temp_test_file): | ||
| os.remove(temp_test_file) | ||
|
|
||
| if ret.error_code == 0: | ||
| # File exists, search in upper half | ||
| last_found = mid | ||
| low = mid + 1 | ||
| logging_debug(_("Log %d exists, searching higher"), mid) | ||
| else: | ||
| # File doesn't exist, search in lower half | ||
| high = mid - 1 | ||
| logging_debug(_("Log %d doesn't exist, searching lower"), mid) | ||
|
|
||
| if last_found is not None: | ||
| logging_info(_("Found highest log number using binary search: %d"), last_found) | ||
| return last_found | ||
|
|
||
| logging_warning(_("No log files found using binary search")) | ||
| return None | ||
|
|
||
| except Exception as e: # pylint: disable=broad-exception-caught | ||
| logging_warning(_("Failed to scan for log numbers using binary search: %s"), str(e)) | ||
| return None | ||
|
|
||
| def _download_log_file( | ||
| self, mavftp: MAVFTP, remote_filenumber: int, local_filename: str, get_progress_callback: Callable | ||
| ) -> bool: | ||
| """Download the actual log file from the flight controller.""" | ||
| remote_filename = f"/APM/LOGS/{remote_filenumber:08}.BIN" | ||
| logging_info(_("Downloading flight log %s to %s"), remote_filename, local_filename) | ||
|
|
||
| # Download the actual log file | ||
| mavftp.cmd_get([remote_filename, local_filename], progress_callback=get_progress_callback) | ||
| ret = mavftp.process_ftp_reply("OpenFileRO", timeout=0) # No timeout for large log files | ||
|
||
| if ret.error_code != 0: | ||
| logging_error(_("Failed to download flight log %s"), remote_filename) | ||
| ret.display_message() | ||
| return False | ||
|
|
||
| logging_info(_("Successfully downloaded flight log to %s"), local_filename) | ||
| return True | ||
|
|
||
| def _extract_log_number_from_file(self, temp_lastlog_file: str) -> Union[int, None]: | ||
| """Extract log number from LASTLOG.TXT file and clean up the temporary file.""" | ||
| try: | ||
| with open(temp_lastlog_file, encoding="UTF-8") as file: | ||
| file_contents = file.readline() | ||
| return int(file_contents.strip()) | ||
| except (FileNotFoundError, ValueError) as e: | ||
| logging_error(_("Could not extract last log file number from LASTLOG.TXT: %s"), e) | ||
| return None | ||
| finally: | ||
| # Clean up the temporary file | ||
| if os.path.exists(temp_lastlog_file): | ||
| os.remove(temp_lastlog_file) | ||
|
|
||
| @staticmethod | ||
| def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser: | ||
| parser.add_argument( | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The magic number 9999 should be defined as a named constant (e.g., MAX_LOG_NUMBER = 9999) to improve maintainability and make the upper bound configurable.