Skip to content

Commit 9191e1e

Browse files
committed
feat(pathlib): use pathlib in the filesystem backend
1 parent ea6439c commit 9191e1e

1 file changed

Lines changed: 65 additions & 63 deletions

File tree

ardupilot_methodic_configurator/backend_filesystem.py

Lines changed: 65 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@
1515
from logging import exception as logging_exception
1616
from logging import info as logging_info
1717
from logging import warning as logging_warning
18-
from os import getcwd as os_getcwd
19-
from os import listdir as os_listdir
20-
from os import path as os_path
21-
from os import remove as os_remove
22-
from os import rename as os_rename
23-
from os import rmdir as os_rmdir
2418
from pathlib import Path
2519
from platform import system as platform_system
2620
from re import compile as re_compile
@@ -129,7 +123,7 @@ def re_init(self, vehicle_dir: str, vehicle_type: str, blank_component_data: boo
129123
# Extend parameter documentation metadata if <parameter_file>.pdef.xml exists
130124
for filename in self.file_parameters:
131125
pdef_xml_file = filename.replace(".param", ".pdef.xml")
132-
if os_path.exists(os_path.join(xml_dir, pdef_xml_file)):
126+
if (Path(xml_dir) / pdef_xml_file).exists():
133127
doc_dict = parse_parameter_metadata("", xml_dir, pdef_xml_file, vehicle_type, TOOLTIP_MAX_LENGTH)
134128
self.doc_dict.update(doc_dict)
135129

@@ -168,9 +162,9 @@ def rename_parameter_files(self) -> None:
168162
new_filename,
169163
)
170164
continue
171-
new_filename_path = os_path.join(self.vehicle_dir, new_filename)
172-
old_filename_path = os_path.join(self.vehicle_dir, old_filename)
173-
os_rename(old_filename_path, new_filename_path)
165+
new_filename_path = Path(self.vehicle_dir) / new_filename
166+
old_filename_path = Path(self.vehicle_dir) / old_filename
167+
old_filename_path.rename(new_filename_path)
174168
logging_info("Renamed %s to %s", old_filename, new_filename)
175169

176170
def _format_columns_sorted_numerically( # pylint: disable=too-many-locals
@@ -338,15 +332,17 @@ def read_params_from_files(self) -> dict[str, ParDict]:
338332
339333
"""
340334
parameters: dict[str, ParDict] = {}
341-
if os_path.isdir(self.vehicle_dir):
335+
vehicle_path = Path(self.vehicle_dir)
336+
if vehicle_path.is_dir():
342337
# Regular expression pattern for filenames starting with two digits followed by an underscore and ending in .param
343338
pattern = re_compile(r"^\d{2}_.*\.param$")
344339

345-
for filename in sorted(os_listdir(self.vehicle_dir)):
340+
for file_path in sorted(vehicle_path.iterdir()):
341+
filename = file_path.name
346342
if pattern.match(filename):
347343
if filename in {"00_default.param", "01_ignore_readonly.param"}:
348344
continue
349-
parameters[filename] = ParDict.from_file(os_path.join(self.vehicle_dir, filename))
345+
parameters[filename] = ParDict.from_file(str(file_path))
350346
else:
351347
logging_error(_("Error: %s is not a directory."), self.vehicle_dir)
352348
return parameters
@@ -387,11 +383,10 @@ def export_to_param(self, params: ParDict, filename_out: str, annotate_doc: bool
387383
annotate_doc (bool, optional): Whether to update the parameter documentation. Default is True.
388384
389385
"""
390-
params.export_to_param(os_path.join(self.vehicle_dir, filename_out))
386+
output_path = Path(self.vehicle_dir) / filename_out
387+
params.export_to_param(str(output_path))
391388
if annotate_doc:
392-
update_parameter_documentation(
393-
self.doc_dict, os_path.join(self.vehicle_dir, filename_out), "missionplanner", self.param_default_dict
394-
)
389+
update_parameter_documentation(self.doc_dict, str(output_path), "missionplanner", self.param_default_dict)
395390

396391
def vehicle_configuration_file_exists(self, filename: str) -> bool:
397392
"""
@@ -404,8 +399,8 @@ def vehicle_configuration_file_exists(self, filename: str) -> bool:
404399
bool: True if the file exists and is a file (not a directory) and is not empty, False otherwise.
405400
406401
"""
407-
file_path = os_path.join(self.vehicle_dir, filename)
408-
return os_path.exists(file_path) and os_path.isfile(file_path) and os_path.getsize(file_path) > 0
402+
file_path = Path(self.vehicle_dir) / filename
403+
return file_path.exists() and file_path.is_file() and file_path.stat().st_size > 0
409404

410405
def __all_intermediate_parameter_file_comments(self) -> dict[str, str]:
411406
"""
@@ -463,27 +458,25 @@ def categorize_parameters(self, param: ParDict) -> tuple[ParDict, ParDict, ParDi
463458

464459
@staticmethod
465460
def get_directory_name_from_full_path(full_path: str) -> str:
466-
# Normalize the path to ensure it's in a standard format
467-
normalized_path = os_path.normpath(full_path)
468-
469-
# Split the path into head and tail, then get the basename of the tail
470-
return os_path.basename(os_path.split(normalized_path)[1])
461+
# Use pathlib for cleaner path operations
462+
return Path(full_path).name
471463

472464
# Extract the vehicle name from the directory path
473465
def get_vehicle_directory_name(self) -> str:
474466
return self.get_directory_name_from_full_path(self.vehicle_dir)
475467

476468
def zip_file_path(self) -> str:
477469
vehicle_name = self.get_vehicle_directory_name()
478-
return os_path.join(self.vehicle_dir, f"{vehicle_name}.zip")
470+
return str(Path(self.vehicle_dir) / f"{vehicle_name}.zip")
479471

480472
def zip_file_exists(self) -> bool:
481-
zip_file_path = self.zip_file_path()
482-
return os_path.exists(zip_file_path) and os_path.isfile(zip_file_path)
473+
zip_file_path = Path(self.zip_file_path())
474+
return zip_file_path.exists() and zip_file_path.is_file()
483475

484476
def add_configuration_file_to_zip(self, zipf: ZipFile, filename: str) -> None:
485477
if self.vehicle_configuration_file_exists(filename):
486-
zipf.write(os_path.join(self.vehicle_dir, filename), arcname=filename)
478+
file_path = Path(self.vehicle_dir) / filename
479+
zipf.write(str(file_path), arcname=filename)
487480

488481
def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None:
489482
"""
@@ -529,31 +522,36 @@ def zip_files(self, files_to_zip: list[tuple[bool, str]]) -> None:
529522
logging_info(_("Intermediate parameter files and summary files zipped to %s"), zip_file_path)
530523

531524
def vehicle_image_filepath(self) -> str:
532-
return os_path.join(self.vehicle_dir, "vehicle.jpg")
525+
return str(Path(self.vehicle_dir) / "vehicle.jpg")
533526

534527
def vehicle_image_exists(self) -> bool:
535-
return os_path.exists(self.vehicle_image_filepath()) and os_path.isfile(self.vehicle_image_filepath())
528+
image_path = Path(self.vehicle_image_filepath())
529+
return image_path.exists() and image_path.is_file()
536530

537531
@staticmethod
538532
def new_vehicle_dir(base_dir: str, new_dir: str) -> str:
539-
return os_path.join(base_dir, new_dir)
533+
return str(Path(base_dir) / new_dir)
540534

541535
@staticmethod
542536
def directory_exists(directory: str) -> bool:
543-
return os_path.exists(directory) and os_path.isdir(directory)
537+
dir_path = Path(directory)
538+
return dir_path.exists() and dir_path.is_dir()
544539

545540
def copy_template_files_to_new_vehicle_dir(
546541
self, template_dir: str, new_vehicle_dir: str, blank_change_reason: bool, copy_vehicle_image: bool
547542
) -> str:
548543
# Copy the template files to the new vehicle directory
549544
try:
550-
if not os_path.exists(template_dir):
545+
template_path = Path(template_dir)
546+
new_vehicle_path = Path(new_vehicle_dir)
547+
548+
if not template_path.exists():
551549
error_msg = _("Template directory does not exist: {template_dir}")
552550
error_msg = error_msg.format(**locals())
553551
logging_error(error_msg)
554552
return error_msg
555553

556-
if not os_path.exists(new_vehicle_dir):
554+
if not new_vehicle_path.exists():
557555
error_msg = _("New vehicle directory does not exist: {new_vehicle_dir}")
558556
error_msg = error_msg.format(**locals())
559557
logging_error(error_msg)
@@ -568,21 +566,21 @@ def copy_template_files_to_new_vehicle_dir(
568566
if not copy_vehicle_image:
569567
skip_files.add("vehicle.jpg")
570568

571-
for item in os_listdir(template_dir):
572-
if item in skip_files:
569+
for item_path in template_path.iterdir():
570+
item_name = item_path.name
571+
if item_name in skip_files:
573572
continue
574-
source = os_path.join(template_dir, item)
575-
dest = os_path.join(new_vehicle_dir, item)
576-
if blank_change_reason and item.endswith(".param"):
573+
dest = new_vehicle_path / item_name
574+
if blank_change_reason and item_name.endswith(".param"):
577575
# Blank the change reason in the template files, strip the comments that start with #
578-
with open(source, encoding="utf-8") as file:
576+
with open(item_path, encoding="utf-8") as file:
579577
lines = file.readlines()
580578
with open(dest, "w", encoding="utf-8") as file:
581579
file.writelines(line.split("#")[0].strip() + "\n" for line in lines)
582-
elif os_path.isdir(source):
583-
shutil_copytree(source, dest)
580+
elif item_path.is_dir():
581+
shutil_copytree(str(item_path), str(dest))
584582
else:
585-
shutil_copy2(source, dest)
583+
shutil_copy2(str(item_path), str(dest))
586584
except (OSError, shutil_Error) as _e:
587585
error_msg = _("Error copying template files to new vehicle directory: {_e}")
588586
return error_msg.format(**locals())
@@ -591,7 +589,8 @@ def copy_template_files_to_new_vehicle_dir(
591589
def remove_created_files_and_vehicle_dir(self) -> str:
592590
# Remove the created files in the new vehicle directory
593591
try:
594-
if not os_path.exists(self.vehicle_dir):
592+
vehicle_path = Path(self.vehicle_dir)
593+
if not vehicle_path.exists():
595594
# Use the actual vehicle_dir value to avoid KeyError from missing format keys
596595
error_msg = _("New vehicle directory does not exist: {vehicle_dir}")
597596
error_msg = error_msg.format(vehicle_dir=self.vehicle_dir)
@@ -600,23 +599,22 @@ def remove_created_files_and_vehicle_dir(self) -> str:
600599

601600
# delete all files in the vehicle directory and delete the vehicle directory
602601
errors: list[str] = []
603-
for item in os_listdir(self.vehicle_dir):
604-
item_path = os_path.join(self.vehicle_dir, item)
602+
for item_path in vehicle_path.iterdir():
605603
try:
606604
# If the entry is a symlink, remove the link instead of recursing into the target
607-
if os_path.islink(item_path):
608-
os_remove(item_path)
609-
elif os_path.isdir(item_path):
610-
shutil_rmtree(item_path)
605+
if item_path.is_symlink():
606+
item_path.unlink()
607+
elif item_path.is_dir():
608+
shutil_rmtree(str(item_path))
611609
else:
612-
os_remove(item_path)
610+
item_path.unlink()
613611
except OSError as e:
614612
logging_exception(_("Error removing %s"), item_path, e)
615613
errors.append(str(e))
616614

617615
# Try to remove the now-empty vehicle directory
618616
try:
619-
os_rmdir(self.vehicle_dir)
617+
vehicle_path.rmdir()
620618
except OSError as e:
621619
logging_exception(_("Error removing directory %s"), self.vehicle_dir, e)
622620
errors.append(str(e))
@@ -633,11 +631,11 @@ def remove_created_files_and_vehicle_dir(self) -> str:
633631

634632
@staticmethod
635633
def getcwd() -> str:
636-
return os_getcwd()
634+
return str(Path.cwd())
637635

638636
def tempcal_imu_result_param_tuple(self) -> tuple[str, str]:
639637
tempcal_imu_result_param_filename = "03_imu_temperature_calibration_results.param"
640-
return tempcal_imu_result_param_filename, os_path.join(self.vehicle_dir, tempcal_imu_result_param_filename)
638+
return tempcal_imu_result_param_filename, str(Path(self.vehicle_dir) / tempcal_imu_result_param_filename)
641639

642640
def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -> int:
643641
ret = 0
@@ -652,14 +650,16 @@ def copy_fc_values_to_file(self, selected_file: str, params: dict[str, float]) -
652650

653651
def write_last_uploaded_filename(self, current_file: str) -> None:
654652
try:
655-
with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), "w", encoding="utf-8") as file:
653+
last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt"
654+
with open(last_uploaded_path, "w", encoding="utf-8") as file:
656655
file.write(current_file)
657656
except Exception as e: # pylint: disable=broad-except
658657
logging_error(_("Error writing last uploaded filename: %s"), e)
659658

660659
def __read_last_uploaded_filename(self) -> str:
661660
try:
662-
with open(os_path.join(self.vehicle_dir, "last_uploaded_filename.txt"), encoding="utf-8") as file:
661+
last_uploaded_path = Path(self.vehicle_dir) / "last_uploaded_filename.txt"
662+
with open(last_uploaded_path, encoding="utf-8") as file:
663663
return file.read().strip()
664664
except FileNotFoundError as e:
665665
logging_debug(_("last_uploaded_filename.txt not found: %s"), e)
@@ -726,7 +726,8 @@ def backup_fc_parameters_to_file(
726726
overwrite_existing_file or not self.vehicle_configuration_file_exists(filename)
727727
):
728728
param_dict_as_par = ParDict({param: Par(float(value), "") for param, value in param_dict.items()})
729-
param_dict_as_par.export_to_param(os_path.join(self.vehicle_dir, filename))
729+
param_file_path = Path(self.vehicle_dir) / filename
730+
param_dict_as_par.export_to_param(str(param_file_path))
730731

731732
def get_eval_variables(self) -> dict[str, dict[str, Any]]:
732733
variables = {}
@@ -824,7 +825,8 @@ def write_param_default_values(self, param_default_values: ParDict) -> bool:
824825
def write_param_default_values_to_file(self, param_default_values: ParDict, filename: str = "00_default.param") -> None:
825826
if self.write_param_default_values(param_default_values):
826827
self.file_parameters[filename] = param_default_values
827-
self.param_default_dict.export_to_param(os_path.join(self.vehicle_dir, filename))
828+
param_file_path = Path(self.vehicle_dir) / filename
829+
self.param_default_dict.export_to_param(str(param_file_path))
828830

829831
def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str, str]:
830832
if (
@@ -835,7 +837,7 @@ def get_download_url_and_local_filename(self, selected_file: str) -> tuple[str,
835837
src = self.configuration_steps[selected_file]["download_file"].get("source_url", "")
836838
dst = self.configuration_steps[selected_file]["download_file"].get("dest_local", "")
837839
if self.vehicle_dir and src and dst:
838-
return src, os_path.join(self.vehicle_dir, dst)
840+
return src, str(Path(self.vehicle_dir) / dst)
839841
return "", ""
840842

841843
def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str, str]:
@@ -847,7 +849,7 @@ def get_upload_local_and_remote_filenames(self, selected_file: str) -> tuple[str
847849
src = self.configuration_steps[selected_file]["upload_file"].get("source_local", "")
848850
dst = self.configuration_steps[selected_file]["upload_file"].get("dest_on_fc", "")
849851
if self.vehicle_dir and src and dst:
850-
return os_path.join(self.vehicle_dir, src), dst
852+
return str(Path(self.vehicle_dir) / src), dst
851853
return "", ""
852854

853855
@staticmethod
@@ -858,7 +860,7 @@ def get_git_commit_hash() -> str:
858860
["git", "rev-parse", "HEAD"], # noqa: S607
859861
capture_output=True,
860862
text=True,
861-
cwd=os_path.dirname(__file__),
863+
cwd=Path(__file__).parent,
862864
check=True,
863865
)
864866
if result.returncode == 0:
@@ -867,7 +869,7 @@ def get_git_commit_hash() -> str:
867869
# Log the specific error and fallback to reading the git_hash.txt file
868870
msg = f"Failed to get git hash via command: {e}"
869871
logging_debug(msg)
870-
git_hash_file = os_path.join(os_path.dirname(__file__), "git_hash.txt")
872+
git_hash_file = Path(__file__).parent / "git_hash.txt"
871873
try:
872874
with open(git_hash_file, encoding="utf-8") as file:
873875
return file.read().strip()
@@ -904,7 +906,7 @@ def add_argparse_arguments(parser: ArgumentParser) -> ArgumentParser:
904906
parser.add_argument( # type: ignore[attr-defined]
905907
"--vehicle-dir",
906908
type=str,
907-
default=os_getcwd(),
909+
default=str(Path.cwd()),
908910
help=_(
909911
"Directory containing vehicle-specific intermediate parameter files. Default is the current working directory"
910912
),

0 commit comments

Comments
 (0)