diff --git a/ardupilot_methodic_configurator/backend_filesystem.py b/ardupilot_methodic_configurator/backend_filesystem.py index e6f18ad65..aeedd3546 100644 --- a/ardupilot_methodic_configurator/backend_filesystem.py +++ b/ardupilot_methodic_configurator/backend_filesystem.py @@ -15,12 +15,6 @@ from logging import exception as logging_exception from logging import info as logging_info from logging import warning as logging_warning -from os import getcwd as os_getcwd -from os import listdir as os_listdir -from os import path as os_path -from os import remove as os_remove -from os import rename as os_rename -from os import rmdir as os_rmdir from pathlib import Path from platform import system as platform_system from re import compile as re_compile @@ -129,7 +123,7 @@ def re_init(self, vehicle_dir: str, vehicle_type: str, blank_component_data: boo # Extend parameter documentation metadata if .pdef.xml exists for filename in self.file_parameters: pdef_xml_file = filename.replace(".param", ".pdef.xml") - if os_path.exists(os_path.join(xml_dir, pdef_xml_file)): + if (Path(xml_dir) / pdef_xml_file).exists(): doc_dict = parse_parameter_metadata("", xml_dir, pdef_xml_file, vehicle_type, TOOLTIP_MAX_LENGTH) self.doc_dict.update(doc_dict) @@ -168,9 +162,9 @@ def rename_parameter_files(self) -> None: new_filename, ) continue - new_filename_path = os_path.join(self.vehicle_dir, new_filename) - old_filename_path = os_path.join(self.vehicle_dir, old_filename) - os_rename(old_filename_path, new_filename_path) + new_filename_path = Path(self.vehicle_dir) / new_filename + old_filename_path = Path(self.vehicle_dir) / old_filename + old_filename_path.rename(new_filename_path) logging_info("Renamed %s to %s", old_filename, new_filename) def _format_columns_sorted_numerically( # pylint: disable=too-many-locals @@ -338,15 +332,17 @@ def read_params_from_files(self) -> dict[str, ParDict]: """ parameters: dict[str, ParDict] = {} - if os_path.isdir(self.vehicle_dir): + vehicle_path = Path(self.vehicle_dir) + if vehicle_path.is_dir(): # Regular expression pattern for filenames starting with two digits followed by an underscore and ending in .param pattern = re_compile(r"^\d{2}_.*\.param$") - for filename in sorted(os_listdir(self.vehicle_dir)): + for file_path in sorted(vehicle_path.iterdir()): + filename = file_path.name if pattern.match(filename): if filename in {"00_default.param", "01_ignore_readonly.param"}: continue - parameters[filename] = ParDict.from_file(os_path.join(self.vehicle_dir, filename)) + parameters[filename] = ParDict.from_file(str(file_path)) else: logging_error(_("Error: %s is not a directory."), self.vehicle_dir) return parameters @@ -387,11 +383,10 @@ def export_to_param(self, params: ParDict, filename_out: str, annotate_doc: bool annotate_doc (bool, optional): Whether to update the parameter documentation. Default is True. """ - params.export_to_param(os_path.join(self.vehicle_dir, filename_out)) + output_path = Path(self.vehicle_dir) / filename_out + params.export_to_param(str(output_path)) if annotate_doc: - update_parameter_documentation( - self.doc_dict, os_path.join(self.vehicle_dir, filename_out), "missionplanner", self.param_default_dict - ) + update_parameter_documentation(self.doc_dict, str(output_path), "missionplanner", self.param_default_dict) def vehicle_configuration_file_exists(self, filename: str) -> bool: """ @@ -404,8 +399,8 @@ def vehicle_configuration_file_exists(self, filename: str) -> bool: bool: True if the file exists and is a file (not a directory) and is not empty, False otherwise. """ - file_path = os_path.join(self.vehicle_dir, filename) - return os_path.exists(file_path) and os_path.isfile(file_path) and os_path.getsize(file_path) > 0 + file_path = Path(self.vehicle_dir) / filename + return file_path.exists() and file_path.is_file() and file_path.stat().st_size > 0 def __all_intermediate_parameter_file_comments(self) -> dict[str, str]: """ @@ -463,11 +458,8 @@ def categorize_parameters(self, param: ParDict) -> tuple[ParDict, ParDict, ParDi @staticmethod def get_directory_name_from_full_path(full_path: str) -> str: - # Normalize the path to ensure it's in a standard format - normalized_path = os_path.normpath(full_path) - - # Split the path into head and tail, then get the basename of the tail - return os_path.basename(os_path.split(normalized_path)[1]) + # Use pathlib for cleaner path operations + return Path(full_path).name # Extract the vehicle name from the directory path def get_vehicle_directory_name(self) -> str: @@ -475,15 +467,16 @@ def get_vehicle_directory_name(self) -> str: def zip_file_path(self) -> str: vehicle_name = self.get_vehicle_directory_name() - return os_path.join(self.vehicle_dir, f"{vehicle_name}.zip") + return str(Path(self.vehicle_dir) / f"{vehicle_name}.zip") def zip_file_exists(self) -> bool: - zip_file_path = self.zip_file_path() - return os_path.exists(zip_file_path) and os_path.isfile(zip_file_path) + zip_file_path = Path(self.zip_file_path()) + return zip_file_path.exists() and zip_file_path.is_file() def add_configuration_file_to_zip(self, zipf: ZipFile, filename: str) -> None: if self.vehicle_configuration_file_exists(filename): - zipf.write(os_path.join(self.vehicle_dir, filename), arcname=filename) + file_path = Path(self.vehicle_dir) / filename + zipf.write(str(file_path), arcname=filename) def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None: """ @@ -529,31 +522,36 @@ def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None: logging_info(_("Intermediate parameter files and summary files zipped to %s"), zip_file_path) def vehicle_image_filepath(self) -> str: - return os_path.join(self.vehicle_dir, "vehicle.jpg") + return str(Path(self.vehicle_dir) / "vehicle.jpg") def vehicle_image_exists(self) -> bool: - return os_path.exists(self.vehicle_image_filepath()) and os_path.isfile(self.vehicle_image_filepath()) + image_path = Path(self.vehicle_image_filepath()) + return image_path.exists() and image_path.is_file() @staticmethod def new_vehicle_dir(base_dir: str, new_dir: str) -> str: - return os_path.join(base_dir, new_dir) + return str(Path(base_dir) / new_dir) @staticmethod def directory_exists(directory: str) -> bool: - return os_path.exists(directory) and os_path.isdir(directory) + dir_path = Path(directory) + return dir_path.exists() and dir_path.is_dir() def copy_template_files_to_new_vehicle_dir( self, template_dir: str, new_vehicle_dir: str, blank_change_reason: bool, copy_vehicle_image: bool ) -> str: # Copy the template files to the new vehicle directory try: - if not os_path.exists(template_dir): + template_path = Path(template_dir) + new_vehicle_path = Path(new_vehicle_dir) + + if not template_path.exists(): error_msg = _("Template directory does not exist: {template_dir}") error_msg = error_msg.format(**locals()) logging_error(error_msg) return error_msg - if not os_path.exists(new_vehicle_dir): + if not new_vehicle_path.exists(): error_msg = _("New vehicle directory does not exist: {new_vehicle_dir}") error_msg = error_msg.format(**locals()) logging_error(error_msg) @@ -568,21 +566,21 @@ def copy_template_files_to_new_vehicle_dir( if not copy_vehicle_image: skip_files.add("vehicle.jpg") - for item in os_listdir(template_dir): - if item in skip_files: + for item_path in template_path.iterdir(): + item_name = item_path.name + if item_name in skip_files: continue - source = os_path.join(template_dir, item) - dest = os_path.join(new_vehicle_dir, item) - if blank_change_reason and item.endswith(".param"): + dest = new_vehicle_path / item_name + if blank_change_reason and item_name.endswith(".param"): # Blank the change reason in the template files, strip the comments that start with # - with open(source, encoding="utf-8") as file: + with open(item_path, encoding="utf-8") as file: lines = file.readlines() with open(dest, "w", encoding="utf-8") as file: file.writelines(line.split("#")[0].strip() + "\n" for line in lines) - elif os_path.isdir(source): - shutil_copytree(source, dest) + elif item_path.is_dir(): + shutil_copytree(str(item_path), str(dest)) else: - shutil_copy2(source, dest) + shutil_copy2(str(item_path), str(dest)) except (OSError, shutil_Error) as _e: error_msg = _("Error copying template files to new vehicle directory: {_e}") return error_msg.format(**locals()) @@ -591,7 +589,8 @@ def copy_template_files_to_new_vehicle_dir( def remove_created_files_and_vehicle_dir(self) -> str: # Remove the created files in the new vehicle directory try: - if not os_path.exists(self.vehicle_dir): + vehicle_path = Path(self.vehicle_dir) + if not vehicle_path.exists(): # Use the actual vehicle_dir value to avoid KeyError from missing format keys error_msg = _("New vehicle directory does not exist: {vehicle_dir}") error_msg = error_msg.format(vehicle_dir=self.vehicle_dir) @@ -600,23 +599,31 @@ def remove_created_files_and_vehicle_dir(self) -> str: # delete all files in the vehicle directory and delete the vehicle directory errors: list[str] = [] - for item in os_listdir(self.vehicle_dir): - item_path = os_path.join(self.vehicle_dir, item) + + def safe_remove_item(item_path: Path) -> Optional[str]: + """Safely remove a file or directory, returning error message if failed.""" try: # If the entry is a symlink, remove the link instead of recursing into the target - if os_path.islink(item_path): - os_remove(item_path) - elif os_path.isdir(item_path): - shutil_rmtree(item_path) + if item_path.is_symlink(): + item_path.unlink() + elif item_path.is_dir(): + shutil_rmtree(str(item_path)) else: - os_remove(item_path) + item_path.unlink() + return None except OSError as e: logging_exception(_("Error removing %s"), item_path, e) - errors.append(str(e)) + return str(e) + + # Remove all items and collect errors + for item_path in vehicle_path.iterdir(): + error = safe_remove_item(item_path) + if error: + errors.append(error) # Try to remove the now-empty vehicle directory try: - os_rmdir(self.vehicle_dir) + vehicle_path.rmdir() except OSError as e: logging_exception(_("Error removing directory %s"), self.vehicle_dir, e) errors.append(str(e)) @@ -633,11 +640,11 @@ def remove_created_files_and_vehicle_dir(self) -> str: @staticmethod def getcwd() -> str: - return os_getcwd() + return str(Path.cwd()) def tempcal_imu_result_param_tuple(self) -> tuple[str, str]: tempcal_imu_result_param_filename = "03_imu_temperature_calibration_results.param" - return tempcal_imu_result_param_filename, os_path.join(self.vehicle_dir, tempcal_imu_result_param_filename) + return tempcal_imu_result_param_filename, str(Path(self.vehicle_dir) / tempcal_imu_result_param_filename) def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -> int: ret = 0 @@ -652,14 +659,16 @@ def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) - def write_last_uploaded_filename(self, current_file: str) -> None: try: - with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), "w", encoding="utf-8") as file: + last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt" + with open(last_uploaded_path, "w", encoding="utf-8") as file: file.write(current_file) except Exception as e: # pylint: disable=broad-except logging_error(_("Error writing last uploaded filename: %s"), e) def __read_last_uploaded_filename(self) -> str: try: - with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), encoding="utf-8") as file: + last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt" + with open(last_uploaded_path, encoding="utf-8") as file: return file.read().strip() except FileNotFoundError as e: logging_debug(_("last_uploaded_filename.txt not found: %s"), e) @@ -726,7 +735,8 @@ def backup_fc_parameters_to_file( overwrite_existing_file or not self.vehicle_configuration_file_exists(filename) ): param_dict_as_par = ParDict({param: Par(float(value), "") for param, value in param_dict.items()}) - param_dict_as_par.export_to_param(os_path.join(self.vehicle_dir, filename)) + param_file_path = Path(self.vehicle_dir) / filename + param_dict_as_par.export_to_param(str(param_file_path)) def get_eval_variables(self) -> dict[str, dict[str, Any]]: variables = {} @@ -824,7 +834,8 @@ def write_param_default_values(self, param_default_values: ParDict) -> bool: def write_param_default_values_to_file(self, param_default_values: ParDict, filename: str = "00_default.param") -> None: if self.write_param_default_values(param_default_values): self.file_parameters[filename] = param_default_values - self.param_default_dict.export_to_param(os_path.join(self.vehicle_dir, filename)) + param_file_path = Path(self.vehicle_dir) / filename + self.param_default_dict.export_to_param(str(param_file_path)) def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, str]: if ( @@ -835,7 +846,7 @@ def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, src = self.configuration_steps[selected_file]["download_file"].get("source_url", "") dst = self.configuration_steps[selected_file]["download_file"].get("dest_local", "") if self.vehicle_dir and src and dst: - return src, os_path.join(self.vehicle_dir, dst) + return src, str(Path(self.vehicle_dir) / dst) return "", "" def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str, str]: @@ -847,7 +858,7 @@ def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str src = self.configuration_steps[selected_file]["upload_file"].get("source_local", "") dst = self.configuration_steps[selected_file]["upload_file"].get("dest_on_fc", "") if self.vehicle_dir and src and dst: - return os_path.join(self.vehicle_dir, src), dst + return str(Path(self.vehicle_dir) / src), dst return "", "" @staticmethod @@ -858,7 +869,7 @@ def get_git_commit_hash() -> str: ["git", "rev-parse", "HEAD"], # noqa: S607 capture_output=True, text=True, - cwd=os_path.dirname(__file__), + cwd=Path(__file__).parent, check=True, ) if result.returncode == 0: @@ -867,7 +878,7 @@ def get_git_commit_hash() -> str: # Log the specific error and fallback to reading the git_hash.txt file msg = f"Failed to get git hash via command: {e}" logging_debug(msg) - git_hash_file = os_path.join(os_path.dirname(__file__), "git_hash.txt") + git_hash_file = Path(__file__).parent / "git_hash.txt" try: with open(git_hash_file, encoding="utf-8") as file: return file.read().strip() @@ -904,7 +915,7 @@ def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser: parser.add_argument( # type: ignore[attr-defined] "--vehicle-dir", type=str, - default=os_getcwd(), + default=str(Path.cwd()), help=_( "Directory containing vehicle-specific intermediate parameter files. Default is the current working directory" ),