|
8 | 8 | SPDX-License-Identifier: GPL-3.0-or-later |
9 | 9 | """ |
10 | 10 |
|
| 11 | +import os |
11 | 12 | from argparse import ArgumentParser |
12 | 13 | from logging import debug as logging_debug |
13 | 14 | from logging import error as logging_error |
@@ -87,7 +88,7 @@ def close(self) -> None: |
87 | 88 | ] |
88 | 89 |
|
89 | 90 |
|
90 | | -class FlightController: |
| 91 | +class FlightController: # pylint: disable=too-many-public-methods |
91 | 92 | """ |
92 | 93 | A class to manage the connection and parameters of a flight controller. |
93 | 94 |
|
@@ -1163,6 +1164,180 @@ def put_progress_callback(completion: float) -> None: |
1163 | 1164 | ret.display_message() |
1164 | 1165 | return ret.error_code == 0 |
1165 | 1166 |
|
| 1167 | + def download_last_flight_log( |
| 1168 | + self, local_filename: str, progress_callback: Union[None, Callable[[int, int], None]] = None |
| 1169 | + ) -> bool: |
| 1170 | + """Download the last flight log from the flight controller.""" |
| 1171 | + if self.master is None: |
| 1172 | + error_msg = _("No flight controller connected") |
| 1173 | + logging_error(error_msg) |
| 1174 | + return False |
| 1175 | + if not self.info.is_mavftp_supported: |
| 1176 | + error_msg = _("MAVFTP is not supported by the flight controller") |
| 1177 | + logging_error(error_msg) |
| 1178 | + return False |
| 1179 | + |
| 1180 | + mavftp = MAVFTP(self.master, target_system=self.master.target_system, target_component=self.master.target_component) |
| 1181 | + |
| 1182 | + def get_progress_callback(completion: float) -> None: |
| 1183 | + if progress_callback is not None and completion is not None: |
| 1184 | + progress_callback(int(completion * 100), 100) |
| 1185 | + |
| 1186 | + try: |
| 1187 | + # Try to get the last log number using different methods |
| 1188 | + remote_filenumber = self._get_last_log_number(mavftp) |
| 1189 | + if remote_filenumber is None: |
| 1190 | + return False |
| 1191 | + |
| 1192 | + # We want the previous log, not the current one (which might be incomplete) |
| 1193 | + # remote_filenumber -= 1 |
| 1194 | + # if remote_filenumber < 1: |
| 1195 | + # logging_error(_("No previous flight log available")) |
| 1196 | + # return False |
| 1197 | + |
| 1198 | + return self._download_log_file(mavftp, remote_filenumber, local_filename, get_progress_callback) |
| 1199 | + |
| 1200 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 1201 | + logging_error(_("Error during flight log download: %s"), str(e)) |
| 1202 | + return False |
| 1203 | + |
| 1204 | + def _get_last_log_number(self, mavftp: MAVFTP) -> Union[int, None]: |
| 1205 | + """Get the last log number using multiple fallback methods.""" |
| 1206 | + # Method 1: Try to get LASTLOG.TXT |
| 1207 | + log_number = self._get_log_number_from_lastlog_txt(mavftp) |
| 1208 | + if log_number is not None: |
| 1209 | + return log_number |
| 1210 | + |
| 1211 | + # Method 2: Try to list the logs directory and find the highest numbered log |
| 1212 | + log_number = self._get_log_number_from_directory_listing(mavftp) |
| 1213 | + if log_number is not None: |
| 1214 | + return log_number |
| 1215 | + |
| 1216 | + # Method 3: Try common log numbers (scan backwards from a reasonable max) |
| 1217 | + log_number = self._get_log_number_by_scanning(mavftp) |
| 1218 | + if log_number is not None: |
| 1219 | + return log_number |
| 1220 | + |
| 1221 | + logging_error(_("Could not determine the last log number using any method")) |
| 1222 | + return None |
| 1223 | + |
| 1224 | + def _get_log_number_from_lastlog_txt(self, mavftp: MAVFTP) -> Union[int, None]: |
| 1225 | + """Try to get the log number from LASTLOG.TXT file.""" |
| 1226 | + logging_info(_("Trying to get log number from LASTLOG.TXT")) |
| 1227 | + try: |
| 1228 | + temp_lastlog_file = "temp_lastlog.txt" |
| 1229 | + mavftp.cmd_get(["/APM/LOGS/LASTLOG.TXT", temp_lastlog_file]) |
| 1230 | + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=10) |
| 1231 | + if ret.error_code != 0: |
| 1232 | + logging_warning(_("LASTLOG.TXT not available, trying alternative methods")) |
| 1233 | + return None |
| 1234 | + |
| 1235 | + return self._extract_log_number_from_file(temp_lastlog_file) |
| 1236 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 1237 | + logging_warning(_("Failed to get log number from LASTLOG.TXT: %s"), str(e)) |
| 1238 | + return None |
| 1239 | + |
| 1240 | + def _get_log_number_from_directory_listing(self, _mavftp: MAVFTP) -> Union[int, None]: |
| 1241 | + """Try to get the highest log number by listing the logs directory using MAVFTP.""" |
| 1242 | + logging_info(_("Trying to get log number from directory listing")) |
| 1243 | + try: |
| 1244 | + result = _mavftp.cmd_list(["/APM/LOGS/"]) |
| 1245 | + if not hasattr(result, "directory_listing") or not isinstance(result.directory_listing, dict): |
| 1246 | + logging_error(_("No directory listing found in MAVFTPReturn")) |
| 1247 | + return None |
| 1248 | + highest = -1 |
| 1249 | + for name in result.directory_listing: |
| 1250 | + # Typical log file names: 00000036.BIN, 00000037.BIN, etc. |
| 1251 | + if name.endswith(".BIN") and name[:8].isdigit(): |
| 1252 | + try: |
| 1253 | + log_num = int(name[:8]) |
| 1254 | + highest = max(highest, log_num) |
| 1255 | + except ValueError: |
| 1256 | + continue |
| 1257 | + if highest != -1: |
| 1258 | + logging_info(_("Highest log number found: %d"), highest) |
| 1259 | + return highest |
| 1260 | + logging_error(_("No log files found in directory listing")) |
| 1261 | + return None |
| 1262 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 1263 | + logging_warning(_("Failed to get log number from directory listing: %s"), str(e)) |
| 1264 | + return None |
| 1265 | + |
| 1266 | + def _get_log_number_by_scanning(self, mavftp: MAVFTP) -> Union[int, None]: |
| 1267 | + """Try to find the last log using binary search for efficiency.""" |
| 1268 | + logging_info(_("Trying to find log number using binary search")) |
| 1269 | + try: |
| 1270 | + # Binary search to find the highest log number |
| 1271 | + low = 1 |
| 1272 | + high = 9999 # Reasonable upper bound for log numbers |
| 1273 | + last_found = None |
| 1274 | + |
| 1275 | + while low <= high: |
| 1276 | + mid = (low + high) // 2 |
| 1277 | + remote_filename = f"/APM/LOGS/{mid:08}.BIN" |
| 1278 | + |
| 1279 | + # Test if this log file exists |
| 1280 | + temp_test_file = f"temp_test_{mid}.tmp" |
| 1281 | + mavftp.cmd_get([remote_filename, temp_test_file]) |
| 1282 | + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=5) # Must be > idle_detection_time (3.7s) |
| 1283 | + |
| 1284 | + # Clean up the temp file if it was created |
| 1285 | + if os.path.exists(temp_test_file): |
| 1286 | + os.remove(temp_test_file) |
| 1287 | + |
| 1288 | + if ret.error_code == 0: |
| 1289 | + # File exists, search in upper half |
| 1290 | + last_found = mid |
| 1291 | + low = mid + 1 |
| 1292 | + logging_debug(_("Log %d exists, searching higher"), mid) |
| 1293 | + else: |
| 1294 | + # File doesn't exist, search in lower half |
| 1295 | + high = mid - 1 |
| 1296 | + logging_debug(_("Log %d doesn't exist, searching lower"), mid) |
| 1297 | + |
| 1298 | + if last_found is not None: |
| 1299 | + logging_info(_("Found highest log number using binary search: %d"), last_found) |
| 1300 | + return last_found |
| 1301 | + |
| 1302 | + logging_warning(_("No log files found using binary search")) |
| 1303 | + return None |
| 1304 | + |
| 1305 | + except Exception as e: # pylint: disable=broad-exception-caught |
| 1306 | + logging_warning(_("Failed to scan for log numbers using binary search: %s"), str(e)) |
| 1307 | + return None |
| 1308 | + |
| 1309 | + def _download_log_file( |
| 1310 | + self, mavftp: MAVFTP, remote_filenumber: int, local_filename: str, get_progress_callback: Callable |
| 1311 | + ) -> bool: |
| 1312 | + """Download the actual log file from the flight controller.""" |
| 1313 | + remote_filename = f"/APM/LOGS/{remote_filenumber:08}.BIN" |
| 1314 | + logging_info(_("Downloading flight log %s to %s"), remote_filename, local_filename) |
| 1315 | + |
| 1316 | + # Download the actual log file |
| 1317 | + mavftp.cmd_get([remote_filename, local_filename], progress_callback=get_progress_callback) |
| 1318 | + ret = mavftp.process_ftp_reply("OpenFileRO", timeout=0) # No timeout for large log files |
| 1319 | + if ret.error_code != 0: |
| 1320 | + logging_error(_("Failed to download flight log %s"), remote_filename) |
| 1321 | + ret.display_message() |
| 1322 | + return False |
| 1323 | + |
| 1324 | + logging_info(_("Successfully downloaded flight log to %s"), local_filename) |
| 1325 | + return True |
| 1326 | + |
| 1327 | + def _extract_log_number_from_file(self, temp_lastlog_file: str) -> Union[int, None]: |
| 1328 | + """Extract log number from LASTLOG.TXT file and clean up the temporary file.""" |
| 1329 | + try: |
| 1330 | + with open(temp_lastlog_file, encoding="UTF-8") as file: |
| 1331 | + file_contents = file.readline() |
| 1332 | + return int(file_contents.strip()) |
| 1333 | + except (FileNotFoundError, ValueError) as e: |
| 1334 | + logging_error(_("Could not extract last log file number from LASTLOG.TXT: %s"), e) |
| 1335 | + return None |
| 1336 | + finally: |
| 1337 | + # Clean up the temporary file |
| 1338 | + if os.path.exists(temp_lastlog_file): |
| 1339 | + os.remove(temp_lastlog_file) |
| 1340 | + |
1166 | 1341 | @staticmethod |
1167 | 1342 | def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser: |
1168 | 1343 | parser.add_argument( |
|
0 commit comments