-
Notifications
You must be signed in to change notification settings - Fork 61
Expand file tree
/
Copy pathdata_model_parameter_editor.py
More file actions
2223 lines (1849 loc) · 98.7 KB
/
data_model_parameter_editor.py
File metadata and controls
2223 lines (1849 loc) · 98.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
The business logic for the configuration (parameter editing and uploading).
Contains state information but no GUI code.
Aggregates flight controller and filesystem access in a single interface.
Uses exceptions for error handling, the GUI layer will catch and display them.
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
SPDX-FileCopyrightText: 2024-2026 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
SPDX-License-Identifier: GPL-3.0-or-later
"""
import platform
import subprocess
from csv import writer as csv_writer
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from logging import error as logging_error
from logging import exception as logging_exception
from logging import info as logging_info
from logging import warning as logging_warning
from pathlib import Path
from time import time
from typing import Callable, Literal, Optional
from ardupilot_methodic_configurator import _
from ardupilot_methodic_configurator.backend_filesystem import LocalFilesystem
from ardupilot_methodic_configurator.backend_filesystem_configuration_steps import PhaseData
from ardupilot_methodic_configurator.backend_flightcontroller import FlightController
from ardupilot_methodic_configurator.backend_internet import download_file_from_url, webbrowser_open_url
from ardupilot_methodic_configurator.data_model_ardupilot_parameter import (
ArduPilotParameter,
ParameterOutOfRangeError,
ParameterUnchangedError,
)
from ardupilot_methodic_configurator.data_model_battery_monitor import BatteryMonitorDataModel
from ardupilot_methodic_configurator.data_model_configuration_step import ConfigurationStepProcessor
from ardupilot_methodic_configurator.data_model_motor_test import MotorTestDataModel
from ardupilot_methodic_configurator.data_model_par_dict import Par, ParamFileError, ParDict, is_within_tolerance
from ardupilot_methodic_configurator.plugin_constants import PLUGIN_BATTERY_MONITOR, PLUGIN_MOTOR_TEST
from ardupilot_methodic_configurator.tempcal_imu import IMUfit
# Type aliases for callback functions used in workflow methods
AskConfirmationCallback = Callable[[str, str], bool] # (title, message) -> bool
SelectFileCallback = Callable[[str, list[str]], Optional[str]] # (title, filetypes) -> Optional[filename]
ShowWarningCallback = Callable[[str, str], None] # (title, message) -> None
ShowErrorCallback = Callable[[str, str], None] # (title, message) -> None
ShowInfoCallback = Callable[[str, str], None] # (title, message) -> None
AskRetryCancelCallback = Callable[[str, str], bool] # (title, message) -> bool
ExperimentChoice = Literal["close", True, False]
ExperimentChoiceCallback = Callable[[str, str, list[str]], ExperimentChoice]
class OperationNotPossibleError(Exception):
"""Raised when an operation cannot be performed due to missing prerequisites or state."""
class InvalidParameterNameError(Exception):
"""Raised when a parameter name is invalid or already exists."""
class ParameterValueUpdateStatus(Enum):
"""Possible outcomes when updating a parameter value."""
UPDATED = "updated"
UNCHANGED = "unchanged"
ERROR = "error"
CONFIRM_OUT_OF_RANGE = "confirm_out_of_range"
@dataclass
class ParameterValueUpdateResult:
"""Presenter-friendly response describing the outcome of a parameter update attempt."""
status: ParameterValueUpdateStatus
title: Optional[str] = None
message: Optional[str] = None
# pylint: disable=too-many-lines
class ParameterEditor: # pylint: disable=too-many-public-methods, too-many-instance-attributes
"""
Manages configuration state, including flight controller and filesystem access.
This class aggregates the flight controller and filesystem access to provide a unified interface
for managing configuration state. It holds protected references to the flight controller and filesystem,
and provides methods to interact with them.
"""
# Maximum number of parameters for bulk add warning threshold.
# When more than these parameters are selected for bulk addition, the user will be warned.
# This threshold prevents users from being overwhelmed by accidentally adding
# too many parameters at once, leading to difficulty tracking changes and slowing the GUI.
# 15 is chosen as a balance between convenience
# and safety, allowing batch operations while maintaining user control.
MAX_BULK_ADD_SUGGESTIONS = 15
def __init__(
self,
current_file: str,
flight_controller: FlightController,
filesystem: LocalFilesystem,
export_fc_params_missing_or_different: bool = False,
) -> None:
self.current_file = current_file
self._flight_controller = flight_controller
self._local_filesystem = filesystem
self._config_step_processor = ConfigurationStepProcessor(self._local_filesystem)
self._should_export_fc_params_diff = export_fc_params_missing_or_different
# self.current_step_parameters is rebuilt on every repopulate(...) call and only contains the ArduPilotParameter
# objects needed for the current table view.
self.current_step_parameters: dict[str, ArduPilotParameter] = {}
# Track parameters added by user (not in original file) or renamed by the system in the current configuration step
self._added_parameters: set[str] = set()
# Track parameters deleted by user (were in original file) or renamed by the system in the current configuration step
self._deleted_parameters: set[str] = set()
self._at_least_one_changed = False
self._last_time_asked_to_save: float = 0
# frontend_tkinter_parameter_editor.py API start
@property
def connected_vehicle_type(self) -> str:
return (
getattr(self._flight_controller.info, "vehicle_type", "")
if hasattr(self._flight_controller, "info") and self._flight_controller.info is not None
else ""
)
@property
def is_fc_connected(self) -> bool:
return self._flight_controller.master is not None and bool(self._flight_controller.fc_parameters)
@property
def fc_parameters(self) -> dict[str, float]:
return (
self._flight_controller.fc_parameters
if hasattr(self._flight_controller, "fc_parameters") and self._flight_controller.fc_parameters is not None
else {}
)
@property
def is_mavftp_supported(self) -> bool:
return (
getattr(self._flight_controller.info, "is_mavftp_supported", False)
if hasattr(self._flight_controller, "info") and self._flight_controller.info is not None
else False
)
def handle_imu_temperature_calibration_workflow( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
selected_file: str,
ask_user_confirmation: AskConfirmationCallback,
select_file: SelectFileCallback,
show_warning: ShowWarningCallback,
show_error: ShowErrorCallback,
get_progress_callback: Optional[Callable[[], Optional[Callable]]] = None,
) -> bool:
"""
Complete IMU temperature calibration workflow with user interaction via callbacks.
This method orchestrates the entire IMU calibration workflow including user confirmation,
file selection, warnings, error handling, and the actual calibration through injected
callback functions. This allows the business logic to be separated from GUI implementation details.
Args:
selected_file: The current parameter file being processed.
ask_user_confirmation: Callback function for asking yes/no questions.
select_file: Callback function for file selection dialog.
show_warning: Callback function for showing warning messages.
show_error: Callback function for showing error messages.
get_progress_callback: Optional factory function that creates and returns a progress callback.
Returns:
bool: True if calibration was performed successfully, False otherwise.
"""
# Check if IMU temperature calibration should be offered for this file
tempcal_imu_result_param_filename, tempcal_imu_result_param_fullpath = (
self._local_filesystem.tempcal_imu_result_param_tuple()
)
if selected_file != tempcal_imu_result_param_filename:
return False
# Ask user for confirmation using injected callback
confirmation_msg = _(
"If you proceed the {tempcal_imu_result_param_filename}\n"
"will be overwritten with the new calibration results.\n"
"Do you want to provide a .bin log file and\n"
"run the IMU temperature calibration using it?"
).format(tempcal_imu_result_param_filename=tempcal_imu_result_param_filename)
if not ask_user_confirmation(_("IMU temperature calibration"), confirmation_msg):
return False
# Select log file using injected callback
log_file = select_file(_("Select ArduPilot binary log file"), ["*.bin", "*.BIN"])
if not log_file:
return False # User cancelled file selection
# Show warning using injected callback
show_warning(
_("IMU temperature calibration"),
_("Please wait, this can take a really long time and\nthe GUI will be unresponsive until it finishes."),
)
# Get progress callback from factory if provided
progress_callback = get_progress_callback() if get_progress_callback else None
# Perform the actual IMU temperature calibration
IMUfit(
logfile=log_file,
outfile=tempcal_imu_result_param_fullpath,
no_graph=False,
log_parm=False,
online=False,
tclr=False,
figpath=self._local_filesystem.vehicle_dir,
progress_callback=progress_callback,
)
try:
# Reload parameter files after calibration
self._local_filesystem.file_parameters = self._local_filesystem.read_params_from_files()
return True
except ParamFileError as exp:
show_error(_("Fatal error reading parameter files"), f"{exp}")
raise
def _should_copy_fc_values_to_file(self, selected_file: str) -> tuple[bool, Optional[dict], Optional[str]]:
"""
Check if flight controller values should be copied to the specified file.
Args:
selected_file: The file to check for copying requirements.
Returns:
tuple: (should_copy, relevant_fc_params, auto_changed_by) - should_copy indicates if copy is needed,
relevant_fc_params contains the parameters to copy if needed,
auto_changed_by contains the tool name that requires external changes.
"""
auto_changed_by = self._local_filesystem.auto_changed_by(selected_file)
if auto_changed_by and self._flight_controller.fc_parameters:
# Filter relevant FC parameters for this file
relevant_fc_params = {
key: value
for key, value in self._flight_controller.fc_parameters.items()
if key in self.current_step_parameters
}
return True, relevant_fc_params, auto_changed_by
return False, None, auto_changed_by
def _update_parameters_from_fc_values(self, relevant_fc_params: dict[str, float]) -> bool:
"""
Update in-memory parameter values from flight controller values.
This method updates the ArduPilotParameter objects in current_step_parameters
with values from the flight controller. The updated values are held in memory
and will be saved to the parameter file later when the user either:
- Uploads parameters to the FC (via upload button)
- Skips to the next parameter file (via skip button)
At that point, the user will be prompted to save changes to file if any
parameters have been modified.
Args:
relevant_fc_params: Dictionary of parameter names and FC values to copy.
Returns:
bool: True if at least one parameter was successfully updated in memory.
Note:
This method bypasses range checking since values came from the FC and were
already accepted there. The GUI table view immediately reflects the copied
FC values, and subsequent dirty-state tracking operates on the updated state.
"""
params_copied = 0
for param_name, value in relevant_fc_params.items():
param = self.current_step_parameters.get(param_name)
if param is None:
logging_error(_("Parameter %s not in current step parameters"), param_name)
continue
try:
param.set_new_value(str(value), ignore_out_of_range=True)
params_copied += 1
except ParameterUnchangedError:
continue # Expected, not an error
except ParameterOutOfRangeError:
# Log warning but accept FC value anyway since it came from FC
logging_warning(_("Parameter %s value %s is out of range but accepted from FC"), param_name, value)
params_copied += 1
except (ValueError, TypeError):
logging_exception(_("Failed to update in-memory value for %s after FC copy"), param_name)
continue
return bool(params_copied)
def handle_copy_fc_values_workflow(
self,
selected_file: str,
ask_user_choice: ExperimentChoiceCallback,
show_info: ShowInfoCallback,
) -> ExperimentChoice:
"""
Handle the complete workflow for copying FC values to file with user interaction.
Args:
selected_file: The configuration file to potentially update.
ask_user_choice: Callback to ask user for choice (Yes/No/Close).
show_info: Callback to show information messages.
Returns:
ExperimentChoice: "close" if user chose to close, True if copied, False if no copy.
"""
should_copy, relevant_fc_params, auto_changed_by = self._should_copy_fc_values_to_file(selected_file)
if should_copy and relevant_fc_params and auto_changed_by:
msg = _(
"This configuration step requires external changes by: {auto_changed_by}\n\n"
"The external tool experiment procedure is described in the tuning guide.\n\n"
"Choose an option:\n"
"* CLOSE - Close the application and go perform the experiment\n"
"* YES - Copy current FC values to {selected_file} (if you've already completed the experiment)\n"
"* NO - Continue without copying values (if you haven't performed the experiment yet,"
" but know what you are doing)"
).format(auto_changed_by=auto_changed_by, selected_file=selected_file)
user_choice = ask_user_choice(_("Update file with values from FC?"), msg, [_("Close"), _("Yes"), _("No")])
if user_choice is True: # Yes option
params_copied = self._update_parameters_from_fc_values(relevant_fc_params)
if params_copied:
show_info(
_("Parameters copied"),
_("FC values have been copied to {selected_file}").format(selected_file=selected_file),
)
return user_choice
return False
def _handle_file_jump_workflow(
self,
selected_file: str,
gui_complexity: str,
ask_user_confirmation: AskConfirmationCallback,
) -> str:
"""
Handle the complete workflow for file jumping with user interaction.
Args:
selected_file: The current configuration file.
gui_complexity: The GUI complexity setting ("simple" or other).
ask_user_confirmation: Callback to ask user for confirmation.
Returns:
str: The destination file to jump to, or the original file if no jump.
"""
jump_options = self._get_file_jump_options(selected_file)
for dest_file, msg in jump_options.items():
if gui_complexity == "simple" or ask_user_confirmation(
_("Skip some steps?"), _(msg) if msg else _("Skip to {dest_file}?").format(dest_file=dest_file)
):
return dest_file
return selected_file
def _get_file_jump_options(self, selected_file: str) -> dict[str, str]:
"""
Get available file jump options for the selected file.
Args:
selected_file: The current configuration file.
Returns:
dict: Dictionary mapping destination files to their messages.
"""
return self._local_filesystem.jump_possible(selected_file)
def handle_write_changes_workflow(
self,
annotate_params_into_files: bool,
ask_user_confirmation: AskConfirmationCallback,
) -> bool:
"""
Handle the workflow for writing changes to intermediate parameter file.
Args:
at_least_one_param_edited: Whether any parameters have been edited.
annotate_params_into_files: Whether to annotate documentation into files.
ask_user_confirmation: Callback to ask user for confirmation.
Returns:
bool: True if changes were written, False otherwise.
"""
elapsed_since_last_ask = time() - self._last_time_asked_to_save
# Avoid asking the user multiple times in quick succession (e.g., during file transitions)
# Always check elapsed time to prevent duplicate prompts within 1 second
# If annotate parameters into files is true, we always need to write to file, because
# the parameter metadata might have changed, or not be present in the file.
if (self._has_unsaved_changes() or annotate_params_into_files) and elapsed_since_last_ask > 1.0:
msg = _("Do you want to write the changes to the {current_filename} file?").format(
current_filename=self.current_file
)
should_save = ask_user_confirmation(_("One or more parameters have been edited"), msg)
if should_save:
self._export_current_file(annotate_doc=annotate_params_into_files)
# Update timestamp regardless of user's answer to prevent duplicate prompts
self._last_time_asked_to_save = time()
return should_save
return False
def handle_param_file_change_workflow( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals # noqa: PLR0913
self,
selected_file: str,
forced: bool,
gui_complexity: str,
auto_open_documentation: bool,
handle_imu_temp_cal: Callable[[str], None],
handle_copy_fc_values: Callable[[str], ExperimentChoice],
handle_upload_file: Callable[[str], None],
ask_confirmation: AskConfirmationCallback,
show_error: ShowErrorCallback,
show_info: ShowInfoCallback,
) -> tuple[str, bool]:
"""
Handle the complete workflow when parameter file selection changes.
This method orchestrates all the steps that need to happen when switching
to a different parameter file, including:
- IMU temperature calibration check
- File jumping
- Documentation opening
- FC values copy check
- File download/upload
Args:
selected_file: The newly selected parameter file.
forced: Whether to force the workflow even if file hasn't changed.
gui_complexity: The GUI complexity setting ("simple" or other).
auto_open_documentation: Whether to automatically open documentation.
handle_imu_temp_cal: Callback to handle IMU temperature calibration.
handle_copy_fc_values: Callback to handle copying FC values to file.
handle_upload_file: Callback to handle file upload.
ask_confirmation: Callback to ask user for confirmation.
show_error: Callback to show error messages.
show_info: Callback to show information messages.
Returns:
tuple: (final_selected_file, should_continue) - The final file after any jumps,
and whether to continue with the workflow (False means user wants to close).
"""
# If file hasn't changed and not forced, skip the workflow
if self.current_file == selected_file and not forced:
return selected_file, True
# Handle IMU temperature calibration workflow
handle_imu_temp_cal(selected_file)
# Handle file jumping
self.current_file = self._handle_file_jump_workflow(selected_file, gui_complexity, ask_confirmation)
# Open documentation if configured
if auto_open_documentation or gui_complexity == "simple":
self.open_documentation_in_browser(self.current_file)
# Process configuration step and create domain model parameters
(ui_errors, ui_infos) = self._repopulate_configuration_step_parameters()
for title, msg in ui_errors:
show_error(title, msg)
for title, msg in ui_infos:
show_info(title, msg)
# Handle copying FC values to file, can only be done after repopulate
result = handle_copy_fc_values(self.current_file)
if result == "close":
# User wants to close application
if self.is_fc_connected:
self._flight_controller.disconnect()
return self.current_file, False
# Handle file download from URL
if self._should_download_file_from_url_workflow(self.current_file, ask_confirmation, show_error):
# Handle file upload to FC
handle_upload_file(self.current_file)
return self.current_file, True
def _should_download_file_from_url_workflow(
self,
selected_file: str,
ask_confirmation: AskConfirmationCallback,
show_error: ShowErrorCallback,
) -> bool:
"""
Handle file download workflow with injected GUI callbacks.
This method implements the business logic for downloading files while
allowing the GUI to handle user interactions through callbacks.
Args:
selected_file: The configuration file being processed.
ask_confirmation: Callback to ask user if they want to download the file.
show_error: Callback to show error messages to the user.
Returns:
bool: True if download was successful or not needed, False if download failed.
"""
url, local_filename = self._local_filesystem.get_download_url_and_local_filename(selected_file)
if not url or not local_filename:
return True # No download required
if self._local_filesystem.vehicle_configuration_file_exists(local_filename):
return True # File already exists in the vehicle directory, no need to download it
# Ask user for confirmation
msg = _("Should the {local_filename} file be downloaded from the URL\n{url}?")
if not ask_confirmation(_("Download file from URL"), msg.format(local_filename=local_filename, url=url)):
return False # User declined download
# Attempt download
if not download_file_from_url(url, local_filename):
error_msg = _("Failed to download {local_filename} from {url}, please download it manually")
show_error(_("Download failed"), error_msg.format(local_filename=local_filename, url=url))
return False
return True
def should_upload_file_to_fc_workflow( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
selected_file: str,
ask_confirmation: Callable[[str, str], bool],
show_error: Callable[[str, str], None],
show_warning: Callable[[str, str], None],
get_progress_callback: Callable[[], Optional[Callable]],
) -> bool:
"""
Handle file upload workflow with injected GUI callbacks.
This method implements the business logic for uploading files to flight controller
while allowing the GUI to handle user interactions through callbacks.
Args:
selected_file: The configuration file being processed.
ask_confirmation: Callback to ask user if they want to upload the file.
show_error: Callback to show error messages to the user.
show_warning: Callback to show warning messages to the user.
get_progress_callback: Factory callback that creates and returns a progress callback
only when actually needed (after all checks pass).
Returns:
bool: True if upload was successful or not needed, False if upload failed.
"""
local_filename, remote_filename = self._local_filesystem.get_upload_local_and_remote_filenames(selected_file)
if not local_filename or not remote_filename:
return True # No upload required
if not self._local_filesystem.vehicle_configuration_file_exists(local_filename):
error_msg = _("Local file {local_filename} does not exist")
show_error(_("Will not upload any file"), error_msg.format(local_filename=local_filename))
return False
if self._flight_controller.master is None:
show_warning(_("Will not upload any file"), _("No flight controller connection"))
return False
# Ask user for confirmation
msg = _("Should the {local_filename} file be uploaded to the flight controller as {remote_filename}?")
if not ask_confirmation(
_("Upload file to FC"), msg.format(local_filename=local_filename, remote_filename=remote_filename)
):
return True # User declined upload
# Get progress callback only after all checks passed
progress_callback = get_progress_callback()
# Attempt upload
if not self._flight_controller.upload_file(local_filename, remote_filename, progress_callback):
error_msg = _("Failed to upload {local_filename} to {remote_filename}, please upload it manually")
show_error(_("Upload failed"), error_msg.format(local_filename=local_filename, remote_filename=remote_filename))
return False
return True
def ensure_upload_preconditions(
self,
selected_params: dict[str, object],
show_warning: ShowWarningCallback,
) -> bool:
"""Validate prerequisites before attempting to upload selected parameters."""
if not selected_params:
logging_warning(_("No parameter was selected for upload, will not upload any parameter"))
show_warning(_("Will not upload any parameter"), _("No parameter was selected for upload"))
return False
if not self.fc_parameters:
logging_warning(_("No parameters were yet downloaded from the flight controller, will not upload any parameter"))
show_warning(_("Will not upload any parameter"), _("No flight controller connection"))
return False
return True
def download_flight_controller_parameters(
self, get_progress_callback: Optional[Callable[[], Optional[Callable]]] = None
) -> tuple[dict, dict]:
"""
Download parameters from the flight controller.
Args:
get_progress_callback: Optional factory function that creates and returns a progress callback.
Returns:
tuple: (fc_parameters, param_default_values) downloaded from the flight controller.
"""
# Get progress callback from factory if provided
progress_callback = get_progress_callback() if get_progress_callback else None
# Download all parameters from the flight controller
fc_parameters, param_default_values = self._flight_controller.download_params(
progress_callback,
Path(self._local_filesystem.vehicle_dir) / "complete.param",
Path(self._local_filesystem.vehicle_dir) / "00_default.param",
)
# Note: fc_parameters are already updated internally in the flight controller
# via params_manager.download_params()
if fc_parameters:
# Update FC values in all current step ArduPilotParameter objects
# Thread-safety: This assumes single-threaded execution during parameter upload.
# The parameter editor UI is not designed for concurrent uploads, and the upload
# workflow blocks the UI thread. If multi-threading is added in the future,
# this loop would need synchronization (e.g., threading.Lock) to prevent
# race conditions when modifying current_step_parameters during iteration.
for param_name, param_obj in self.current_step_parameters.items():
if param_name in fc_parameters:
param_obj.set_fc_value(fc_parameters[param_name])
# Write default values to file if available
if param_default_values:
self._local_filesystem.write_param_default_values_to_file(param_default_values)
return fc_parameters, param_default_values
def upload_parameters_that_require_reset_workflow( # pylint: disable=too-many-locals, too-many-arguments, too-many-positional-arguments
self,
selected_params: dict,
ask_confirmation: AskConfirmationCallback,
show_error: ShowErrorCallback,
reset_progress_callback: Optional[Callable] = None,
connection_progress_callback: Optional[Callable] = None,
) -> tuple[bool, set[str]]:
"""
Upload parameters that require reset to the flight controller.
Args:
selected_params: Dictionary of parameters to upload.
ask_confirmation: Callback to ask user for confirmation.
show_error: Callback to show error messages.
reset_progress_callback: Optional callback for reset progress updates.
connection_progress_callback: Optional callback for connection progress updates.
Returns:
tuple[bool, set[str]]: (reset_happened, uploaded_param_names) - reset_happened indicates if reset occurred,
uploaded_param_names contains names of parameters that were uploaded.
"""
reset_required = False
reset_unsure_params = []
uploaded_params = set()
error_messages = []
# Write each selected parameter to the flight controller
for param_name, param in selected_params.items():
try:
if param_name not in self._flight_controller.fc_parameters or not is_within_tolerance(
self._flight_controller.fc_parameters[param_name], param.value
):
param_metadata = self._local_filesystem.doc_dict.get(param_name, None)
if param_metadata and param_metadata.get("RebootRequired", False):
success, error_msg = self._flight_controller.set_param(param_name, float(param.value))
if not success:
logging_error(_("Failed to set parameter %s: %s"), param_name, error_msg)
continue
uploaded_params.add(param_name)
if param_name in self._flight_controller.fc_parameters:
logging_info(
_("Parameter %s changed from %f to %f, reset required"),
param_name,
self._flight_controller.fc_parameters[param_name],
param.value,
)
else:
logging_info(_("Parameter %s changed to %f, reset required"), param_name, param.value)
reset_required = True
# Check if any of the selected parameters have a _TYPE, _EN, or _ENABLE suffix
# SID_AXIS only requires a possible reset when changing from 0 to a non-zero value
elif param_name.endswith(("_TYPE", "_EN", "_ENABLE")) or (
param_name == "SID_AXIS"
and float(self._flight_controller.fc_parameters.get("SID_AXIS", 0.0)) == 0.0
and float(param.value) != 0.0
):
success, error_msg = self._flight_controller.set_param(param_name, float(param.value))
if not success:
logging_error(_("Failed to set parameter %s: %s"), param_name, error_msg)
continue
uploaded_params.add(param_name)
if param_name in self._flight_controller.fc_parameters:
logging_info(
_("Parameter %s changed from %f to %f, possible reset required"),
param_name,
self._flight_controller.fc_parameters[param_name],
param.value,
)
else:
logging_info(_("Parameter %s changed to %f, possible reset required"), param_name, param.value)
reset_unsure_params.append(param_name)
except ValueError as e:
error_msg = _("Failed to set parameter {param_name}: {e}").format(param_name=param_name, e=e)
logging_error(error_msg)
error_messages.append(error_msg)
# Handle any errors with GUI dialogs
for error_msg in error_messages:
show_error(_("ArduPilot methodic configurator"), error_msg)
self.reset_and_reconnect_workflow(
reset_required,
reset_unsure_params,
ask_confirmation,
show_error,
reset_progress_callback,
connection_progress_callback,
)
reset_happened = reset_required or bool(reset_unsure_params)
return reset_happened, uploaded_params
def _calculate_reset_time(self) -> int:
"""
Calculate the extra sleep time needed for reset based on boot delay parameters.
Returns:
int: Extra sleep time in seconds.
"""
param_boot_delay = (
self.current_step_parameters["BRD_BOOT_DELAY"].get_new_value()
if "BRD_BOOT_DELAY" in self.current_step_parameters
else 0.0
)
flightcontroller_boot_delay = self._flight_controller.fc_parameters.get("BRD_BOOT_DELAY", 0)
return int(max(param_boot_delay, flightcontroller_boot_delay) // 1000 + 1) # round up
def _reset_and_reconnect_flight_controller(
self,
reset_progress_callback: Optional[Callable] = None,
connection_progress_callback: Optional[Callable] = None,
sleep_time: Optional[int] = None,
) -> Optional[str]:
"""
Reset and reconnect to the flight controller.
Args:
reset_progress_callback: Optional callback function for progress updates.
connection_progress_callback: Optional callback function for connection progress updates.
sleep_time: Optional sleep time override. If None, calculates based on boot delay parameters.
Returns:
Optional[str]: Error message if reset failed, None if successful.
"""
if sleep_time is None:
sleep_time = self._calculate_reset_time()
# Call reset_and_reconnect with a callback to update the reset progress bar and the progress message
return self._flight_controller.reset_and_reconnect(
reset_progress_callback, connection_progress_callback, int(sleep_time)
)
def reset_and_reconnect_workflow( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
fc_reset_required: bool,
fc_reset_unsure: list[str],
ask_confirmation: AskConfirmationCallback,
show_error: ShowErrorCallback,
reset_progress_callback: Optional[Callable] = None,
connection_progress_callback: Optional[Callable] = None,
) -> bool:
"""
Complete workflow for resetting and reconnecting to flight controller with user interaction.
This method orchestrates the complete reset process including:
- Asking user confirmation for uncertain reset scenarios
- Performing the actual reset and reconnection
- Handling errors appropriately
Args:
fc_reset_required: Whether reset is definitively required
fc_reset_unsure: List of parameters that potentially require reset
ask_confirmation: Callback to ask user for confirmation
show_error: Callback to show error messages
reset_progress_callback: Optional callback for reset progress updates
connection_progress_callback: Optional callback for connection progress updates
Returns:
bool: True if reset was performed (or not needed), False if reset failed
"""
# Determine if reset is needed based on required flag and user confirmation for uncertain cases
should_reset = fc_reset_required
if not fc_reset_required and fc_reset_unsure:
# Ask the user if they want to reset the ArduPilot
param_list_str = ", ".join(fc_reset_unsure)
msg = _("{param_list_str} parameter(s) potentially require a reset\nDo you want to reset the ArduPilot?")
should_reset = ask_confirmation(_("Possible reset required"), msg.format(param_list_str=param_list_str))
if should_reset:
error_message = self._reset_and_reconnect_flight_controller(reset_progress_callback, connection_progress_callback)
if error_message:
show_error(_("ArduPilot methodic configurator"), error_message)
return False
return True
return True # No reset needed
def _upload_parameters_to_fc( # pylint: disable=too-many-locals
self,
selected_params: dict,
show_error: Callable[[str, str], None],
progress_callback: Optional[Callable] = None,
) -> int:
"""
Upload selected parameters to flight controller.
Args:
selected_params: Dictionary of parameters to upload.
show_error: Callback to show error messages to the user.
progress_callback: Optional callback for progress updates.
Returns:
int: Number of changed parameters.
"""
error_messages = []
nr_changed = 0
nr_unchanged = 0
total_params = len(selected_params)
for idx, (param_name, param) in enumerate(selected_params.items(), start=1):
if progress_callback:
progress_callback(idx, total_params)
try:
success, error_msg = self._flight_controller.set_param(param_name, param.value)
if not success:
error_messages.append(
_("Failed to set parameter %(name)s: %(error)s") % {"name": param_name, "error": error_msg}
)
continue
if param_name not in self._flight_controller.fc_parameters or not is_within_tolerance(
self._flight_controller.fc_parameters[param_name], param.value
):
if param_name in self._flight_controller.fc_parameters:
logging_info(
_("Parameter %s changed from %f to %f"),
param_name,
self._flight_controller.fc_parameters[param_name],
param.value,
)
else:
logging_info(
_("Parameter %s changed to %f"),
param_name,
param.value,
)
nr_changed += 1
else:
logging_info(_("Parameter %s unchanged from %f"), param_name, param.value)
nr_unchanged += 1
except ValueError as _e:
error_msg = _("Failed to set parameter {param_name}: {_e}").format(**locals())
logging_error(error_msg)
error_messages.append(error_msg)
# Handle any errors with GUI dialogs
for error_msg in error_messages:
show_error(_("ArduPilot methodic configurator"), error_msg)
changed_msg = _("%d FC parameter(s) changed value") % nr_changed if nr_changed else ""
unchanged_msg = (
_("%d FC parameter(s) already had the value defined in this configuration step") % nr_unchanged
if nr_unchanged
else ""
)
msg = changed_msg + (", " if nr_changed and nr_unchanged else "") + unchanged_msg
logging_info(msg)
self._update_tuning_report()
return nr_changed
def _update_tuning_report(self) -> None:
report_params = [
"ATC_ACCEL_P_MAX",
"ATC_ACCEL_R_MAX",
"ATC_ACCEL_Y_MAX",
"ATC_ANG_PIT_P",
"ATC_ANG_RLL_P",
"ATC_ANG_YAW_P",
"ATC_RAT_PIT_FLTD",
"ATC_RAT_PIT_FLTE",
"ATC_RAT_PIT_FLTT",
"ATC_RAT_RLL_FLTD",
"ATC_RAT_RLL_FLTE",
"ATC_RAT_RLL_FLTT",
"ATC_RAT_YAW_FLTD",
"ATC_RAT_YAW_FLTE",
"ATC_RAT_YAW_FLTT",
"ATC_RAT_PIT_D",
"ATC_RAT_PIT_I",
"ATC_RAT_PIT_P",
"ATC_RAT_RLL_D",
"ATC_RAT_RLL_I",
"ATC_RAT_RLL_P",
"ATC_RAT_YAW_D",
"ATC_RAT_YAW_I",
"ATC_RAT_YAW_P",
"INS_ACCEL_FILTER",
"INS_GYRO_FILTER",
]
report_files = [
"00_default.param",
"11_initial_atc.param",
"16_pid_adjustment.param",
"23_quick_tune_results.param",
"31_autotune_roll_results.param",
"33_autotune_pitch_results.param",
"35_autotune_yaw_results.param",
"37_autotune_yawd_results.param",
"39_autotune_roll_pitch_retune_results.param",
]
report_file_path = Path(getattr(self._local_filesystem, "vehicle_dir", ".")) / "tuning_report.csv"
# Write a CSV with a header ("param", <list of files>) and one row per parameter.
with open(report_file_path, "w", newline="", encoding="utf-8") as file:
writer = csv_writer(file)
writer.writerow(["param", *report_files])
for param_name in report_params:
row = [param_name]
for param_file in report_files:
try:
if param_file == "00_default.param":
value = str(self._local_filesystem.param_default_dict[param_name].value)
else:
value = str(self._local_filesystem.file_parameters[param_file][param_name].value)
except (KeyError, ValueError):
# On any unexpected structure, leave the value empty (don't crash)
value = ""
row.append(value)
writer.writerow(row)
def upload_selected_params_workflow( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-locals
self,
selected_params: dict,
ask_confirmation: AskConfirmationCallback,
ask_retry_cancel: AskRetryCancelCallback,
show_error: ShowErrorCallback,
get_upload_progress_callback: Optional[Callable[[], Optional[Callable]]] = None,
get_reset_progress_callback: Optional[Callable[[], Optional[Callable]]] = None,
get_connection_progress_callback: Optional[Callable[[], Optional[Callable]]] = None,
get_download_progress_callback: Optional[Callable[[], Optional[Callable]]] = None,
) -> None:
"""
Complete workflow for uploading selected parameters, including reset, upload, validation, and retry.
Args:
selected_params: Dictionary of parameters to upload.
ask_confirmation: Callback to ask user for confirmation.
ask_retry_cancel: Callback to ask user to retry or cancel on upload error.
show_error: Callback to show error messages.
get_upload_progress_callback: Optional factory function that creates and returns an upload progress callback.
get_reset_progress_callback: Optional factory function that creates and returns a reset progress callback.
get_connection_progress_callback: Optional factory function that creates and returns a connection prog. callback.