-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathfrontend_tkinter_connection_selection.py
More file actions
executable file
·472 lines (414 loc) · 22.5 KB
/
frontend_tkinter_connection_selection.py
File metadata and controls
executable file
·472 lines (414 loc) · 22.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
#!/usr/bin/env python3
"""
GUI to select the connection to the FC.
This file is part of ArduPilot Methodic Configurator. https://github.com/ArduPilot/MethodicConfigurator
SPDX-FileCopyrightText: 2024-2026 Amilcar do Carmo Lucas <amilcar.lucas@iav.de>
SPDX-License-Identifier: GPL-3.0-or-later
"""
import contextlib
import tkinter as tk
from argparse import ArgumentParser, Namespace
from logging import basicConfig as logging_basicConfig
from logging import debug as logging_debug
from logging import error as logging_error
from logging import getLevelName as logging_getLevelName
from logging import warning as logging_warning
from sys import exit as sys_exit
from tkinter import messagebox, simpledialog, ttk
from typing import Union
from ardupilot_methodic_configurator import _, __version__
from ardupilot_methodic_configurator.backend_filesystem_program_settings import ProgramSettings
from ardupilot_methodic_configurator.backend_flightcontroller import SUPPORTED_BAUDRATES, FlightController
from ardupilot_methodic_configurator.common_arguments import add_common_arguments
from ardupilot_methodic_configurator.frontend_tkinter_base_window import BaseWindow
from ardupilot_methodic_configurator.frontend_tkinter_pair_tuple_combobox import PairTupleCombobox
from ardupilot_methodic_configurator.frontend_tkinter_progress_window import ProgressWindow
from ardupilot_methodic_configurator.frontend_tkinter_show import show_no_connection_error, show_tooltip
class ConnectionSelectionWidgets: # pylint: disable=too-many-instance-attributes
"""
A class for managing the selection of flight controller connections in the GUI.
This class provides functionality for displaying available flight controller connections,
allowing the user to select a connection, and handling the connection process.
"""
def __init__( # pylint: disable=too-many-arguments, too-many-positional-arguments
self,
parent, # noqa: ANN001 ConnectionSelectionWindow can not add it here, otherwise a dependency loop will be created
parent_frame: ttk.Labelframe,
flight_controller: FlightController,
destroy_parent_on_connect: bool,
download_params_on_connect: bool,
default_baudrate: int = 115200,
) -> None:
self.parent = parent
self.flight_controller = flight_controller
self.destroy_parent_on_connect = destroy_parent_on_connect
self.download_params_on_connect = download_params_on_connect
self.default_baudrate = default_baudrate
self.previous_selection: Union[None, str] = (
flight_controller.comport.device
if flight_controller.comport and hasattr(flight_controller.comport, "device")
else None
)
self.connection_progress_window: ProgressWindow
self._refresh_timer_id: Union[None, str] = None
self._is_refreshing = False
# Create a new frame for the flight controller connection selection label and combobox
self.container_frame = ttk.Frame(parent_frame)
# Create a description label for the flight controller connection selection
conn_selection_label = ttk.Label(self.container_frame, text=_("flight controller connection:"))
conn_selection_label.pack(side=tk.TOP) # Add the label to the top of the conn_selection_frame
# Create a frame for connection and baudrate selection
selection_frame = ttk.Frame(self.container_frame)
selection_frame.pack(side=tk.TOP, pady=(4, 0))
# Create a label for port
port_label = ttk.Label(selection_frame, text=_("Port:"))
port_label.pack(side=tk.LEFT, padx=(0, 5))
# Load saved connection history from ProgramSettings and cache it to avoid
# repeated disk reads on every periodic port-refresh cycle.
self._connection_history_cache: list[str] = ProgramSettings.get_connection_history()
# Perform an initial connection discovery using the cached history so the
# combobox starts from the same source-of-truth list that refresh uses.
self.flight_controller.discover_connections(preserved_connections=self._connection_history_cache)
# Create a read-only combobox for flight controller connection selection
self.conn_selection_combobox = PairTupleCombobox(
selection_frame,
self.flight_controller.get_connection_tuples(),
self.previous_selection,
"FC connection",
state="readonly",
)
self.conn_selection_combobox.config(width=20)
self.conn_selection_combobox.bind("<<ComboboxSelected>>", self.on_select_connection_combobox_change, "+")
self.conn_selection_combobox.pack(side=tk.LEFT, padx=(0, 5))
show_tooltip(
self.conn_selection_combobox,
_("Select the flight controller connection\nYou can add a custom connection to the existing ones"),
)
# Create a label for baudrate
baudrate_label = ttk.Label(selection_frame, text=_("Baudrate:"))
baudrate_label.pack(side=tk.LEFT, padx=(10, 5))
# Create a combobox for baudrate selection
self.baudrate_var = tk.StringVar(value=str(self.default_baudrate))
self.baudrate_combobox = ttk.Combobox(
selection_frame,
textvariable=self.baudrate_var,
values=SUPPORTED_BAUDRATES,
state="normal",
width=10,
)
self.baudrate_combobox.pack(side=tk.LEFT, padx=(0, 5))
self.baudrate_combobox.bind("<<ComboboxSelected>>", self.on_baudrate_combobox_change, "+")
show_tooltip(
self.baudrate_combobox,
_("Select the baudrate for the connection\nMost flight controllers use 115200"),
)
# Start periodic port refresh
self.start_periodic_refresh()
def _persist_and_cache_connection(self, connection_string: str) -> str:
"""
Persist a connection string to settings and update the in-memory history cache.
Returns the normalized connection string that was stored, or the original if
normalization failed (e.g. the string was invalid and rejected by the store).
Callers should use the returned value so they stay in sync with what is cached.
"""
normalized = ProgramSettings.store_connection(connection_string)
if normalized is None:
return connection_string
# Update cache in-memory (most-recent-first, deduplicated) using the normalized value
# to avoid a disk re-read and to keep the cache consistent with what was persisted.
self._connection_history_cache = [normalized] + [c for c in self._connection_history_cache if c != normalized]
return normalized
def start_periodic_refresh(self) -> None:
"""Start periodic refresh of available ports every 3 seconds."""
if self._refresh_timer_id is None:
self._refresh_ports()
def _refresh_ports(self) -> None:
"""Refresh the list of available ports and update the combobox."""
if self._is_refreshing:
return
self._is_refreshing = True
try:
# Scan for new ports and get current data
current_selection = self.conn_selection_combobox.get_selected_key()
self.flight_controller.discover_connections(
progress_callback=None,
preserved_connections=self._connection_history_cache,
)
new_connection_tuples = self.flight_controller.get_connection_tuples()
# update the UI
if new_connection_tuples != self.conn_selection_combobox.get_entries_tuple():
logging_debug("Port list changed, updating UI")
self.conn_selection_combobox.set_entries_tuple(new_connection_tuples, current_selection)
except (OSError, AttributeError, RuntimeError) as e:
logging_debug("Error refreshing ports: %s", e)
finally:
self._is_refreshing = False
try:
if self.parent.root.winfo_exists():
self._refresh_timer_id = self.parent.root.after(3000, self._refresh_ports)
except tk.TclError:
self.stop_periodic_refresh()
def stop_periodic_refresh(self) -> None:
"""Stop the periodic port refresh."""
if self._refresh_timer_id is not None:
with contextlib.suppress(tk.TclError, AttributeError):
self.parent.root.after_cancel(self._refresh_timer_id)
self._refresh_timer_id = None
def on_select_connection_combobox_change(self, _event: tk.Event) -> None:
selected_connection = self.conn_selection_combobox.get_selected_key()
error_msg = _("Connection combobox changed to: {selected_connection}")
logging_debug(error_msg.format(**locals()))
comport_device = (
self.flight_controller.comport.device
if self.flight_controller.comport and hasattr(self.flight_controller.comport, "device")
else None
)
if self.flight_controller.master is None or selected_connection != comport_device:
if selected_connection == "Add another":
if not self.add_connection() and self.previous_selection:
# nothing got selected revert to the current connection
self.conn_selection_combobox.set(self.previous_selection)
return
self.reconnect(selected_connection) # type: ignore[arg-type] # workaround for mypy issue
def on_baudrate_combobox_change(self, _event: tk.Event) -> None:
"""Handle baudrate combobox selection changes."""
_selected_baudrate = self.baudrate_var.get()
error_msg = _("Baudrate combobox changed to: {_selected_baudrate}")
logging_debug(error_msg.format(**locals()))
# Get the current connection
current_connection = (
self.flight_controller.comport.device
if self.flight_controller.comport and hasattr(self.flight_controller.comport, "device")
else None
)
# If we have an active connection, reconnect with the new baudrate
if self.flight_controller.master is not None and current_connection:
self.reconnect(current_connection)
def add_connection(self) -> str:
# Open the connection selection dialog
selected_connection = simpledialog.askstring(
_("Flight Controller Connection"),
_(
"Enter the connection string to the flight controller. "
"Examples are:\n\nCOM4 (on Windows)\n"
"/dev/serial/by-id/usb-xxx (on Linux)\n"
"tcp:127.0.0.1:5761\n"
"udp:127.0.0.1:14551"
),
)
if selected_connection:
selected_connection = self._persist_and_cache_connection(selected_connection)
error_msg = _("Will add new connection: {selected_connection} if not duplicated")
logging_debug(error_msg.format(**locals()))
self.flight_controller.add_connection(selected_connection)
connection_tuples = self.flight_controller.get_connection_tuples()
error_msg = _("Updated connection tuples: {connection_tuples} with selected connection: {selected_connection}")
logging_debug(error_msg.format(**locals()))
self.conn_selection_combobox.set_entries_tuple(connection_tuples, selected_connection)
self.reconnect(selected_connection)
else:
error_msg = _("Add connection canceled or string empty {selected_connection}")
logging_debug(error_msg.format(**locals()))
selected_connection = ""
return selected_connection
def reconnect(self, selected_connection: str = "") -> bool: # Default is auto-connect
self.connection_progress_window = ProgressWindow(
self.parent.root, _("Connecting with the FC"), _("Connection step {} of {}")
)
# Get the current baudrate from the combobox
selected_baudrate = "unknown" # Initialize with default value for error handling
try:
selected_baudrate = self.baudrate_var.get()
current_baudrate = int(selected_baudrate)
if str(current_baudrate) not in SUPPORTED_BAUDRATES:
warning_msg = _("Unsupported baudrate selected: {current_baudrate}")
logging_warning(warning_msg.format(**locals()))
if current_baudrate < 1200:
raise ValueError
except (ValueError, AttributeError):
error_msg = _("Invalid baudrate selected: {selected_baudrate}, reverting to default: {self.default_baudrate}")
error_msg = error_msg.format(**locals())
# Display error popup
messagebox.showerror(_("Invalid Baudrate"), error_msg)
logging_error(error_msg)
# Reset the combobox to the default value
self.baudrate_var.set(str(self.default_baudrate))
current_baudrate = self.default_baudrate
error_message = self.flight_controller.connect(
selected_connection, self.connection_progress_window.update_progress_bar, baudrate=current_baudrate
)
if error_message:
show_no_connection_error(error_message)
return True
self.connection_progress_window.destroy()
# Stop periodic refresh when connection is established
self.stop_periodic_refresh()
# Store the current connection as the previous selection
if self.flight_controller.comport and hasattr(self.flight_controller.comport, "device"):
device: str = self.flight_controller.comport.device
# Prefer the user-selected connection string when available to avoid persisting
# a possibly resolved and unstable device path (e.g. /dev/ttyACM0 instead of
# the original /dev/serial/by-id/* symlink). For auto-connect (empty
# selected_connection), fall back to the detected device path.
connection_identifier: str = selected_connection or device
# Persist the connection so it is available next time the program opens
# and so it survives periodic auto-discovery refreshes during the current session.
# Use the normalized return value so previous_selection and the flight controller
# registry stay in sync with the cached/stored key (e.g. whitespace stripped).
connection_identifier = self._persist_and_cache_connection(connection_identifier)
self.previous_selection = connection_identifier
self.flight_controller.add_connection(connection_identifier)
if self.destroy_parent_on_connect:
self.parent.root.destroy()
if self.download_params_on_connect and hasattr(self.parent, "download_flight_controller_parameters"):
self.parent.download_flight_controller_parameters(redownload=False)
return False
class ConnectionSelectionWindow(BaseWindow):
"""
A window for selecting a flight controller connection.
This class provides a graphical user interface for selecting a connection to a flight controller.
It inherits from the BaseWindow class and uses the ConnectionSelectionWidgets class to handle
the UI elements related to connection selection.
"""
def __init__(
self,
flight_controller: FlightController,
connection_result_string: str,
default_baudrate: int = 115200,
) -> None:
super().__init__()
self.root.title(_("AMC {version} - Flight controller connection").format(version=__version__))
self.root.geometry(self.calculate_scaled_geometry(520, 380))
self.center_window_on_screen(self.root)
self.default_baudrate = default_baudrate
# Explain why we are here
if flight_controller.comport is None:
introduction_text = _("No ArduPilot flight controller was auto-detected detected yet.")
elif ":" in connection_result_string:
introduction_text = connection_result_string.replace(":", ":\n")
else:
introduction_text = connection_result_string
self.introduction_label = ttk.Label(
self.main_frame,
anchor=tk.CENTER,
justify=tk.CENTER,
text=introduction_text,
)
self.introduction_label.pack(expand=False, fill=tk.X, padx=6, pady=6)
# Option 1 - Auto-connect
option1_label = ttk.Label(text=_("Auto connection"), style="Bold.TLabel")
option1_label_frame = ttk.LabelFrame(self.main_frame, labelwidget=option1_label)
option1_label_frame.pack(expand=False, fill=tk.X, padx=6, pady=6)
option1_label = ttk.Label(
option1_label_frame,
anchor=tk.CENTER,
justify=tk.CENTER,
text=_(
"Connect a flight controller to the PC,\n"
"wait 7 seconds for it to fully boot and\n"
"press the Auto-connect button below to connect to it"
),
)
option1_label.pack(expand=False, fill=tk.X, padx=6)
autoconnect_button = ttk.Button(option1_label_frame, text=_("Auto-connect"), command=self.fc_autoconnect)
autoconnect_button.pack(expand=False, fill=tk.X, padx=100, pady=6)
show_tooltip(autoconnect_button, _("Auto-connect to a 'Mavlink'-talking serial device"))
# Option 2 - Manually select the flight controller connection or add a new one
option2_label = ttk.Label(text=_("Manual connection"), style="Bold.TLabel")
option2_label_frame = ttk.LabelFrame(self.main_frame, labelwidget=option2_label)
option2_label_frame.pack(expand=False, fill=tk.X, padx=6, pady=6)
# pylint: disable=duplicate-code
option2_label = ttk.Label(
option2_label_frame,
anchor=tk.CENTER,
justify=tk.CENTER,
text=_(
"Connect a flight controller to the PC,\n"
"wait 7 seconds for it to fully boot and\n"
"manually select the fight controller connection or add a new one"
),
)
# pylint: enable=duplicate-code
option2_label.pack(expand=False, fill=tk.X, padx=6)
self.connection_selection_widgets = ConnectionSelectionWidgets(
self,
option2_label_frame,
flight_controller,
destroy_parent_on_connect=True,
download_params_on_connect=False,
default_baudrate=self.default_baudrate,
)
self.connection_selection_widgets.container_frame.pack(expand=False, fill=tk.X, padx=5, pady=6)
# Option 3 - Skip FC connection, just edit the .param files on disk
option3_label = ttk.Label(text=_("No connection"), style="Bold.TLabel")
option3_label_frame = ttk.LabelFrame(self.main_frame, labelwidget=option3_label)
option3_label_frame.pack(expand=False, fill=tk.X, padx=6, pady=6)
# option3_label = ttk.Label(option3_label_frame, anchor=tk.CENTER, justify=tk.CENTER,
# text=_("Skip the flight controller connection,\n")
# "no default parameter values will be fetched from the FC,\n"
# "default parameter values from disk will be used instead\n"
# "(if '00_default.param' file is present)\n"
# "and just edit the intermediate '.param' files on disk")
# option3_label.pack(expand=False, fill=tk.X, padx=6)
skip_fc_connection_button = ttk.Button(
option3_label_frame,
text=_("Skip FC connection, just edit the .param files on disk"),
command=lambda fc=flight_controller: self.skip_fc_connection(fc), # type: ignore[misc]
)
skip_fc_connection_button.pack(expand=False, fill=tk.X, padx=15, pady=6)
show_tooltip(
skip_fc_connection_button,
_(
"No parameter values will be fetched from the FC, default parameter values from disk will be used\n"
"instead (if '00_default.param' file is present) and just edit the intermediate '.param' files on disk"
),
)
# Bind the close_connection_and_quit function to the window close event
self.root.protocol("WM_DELETE_WINDOW", self.close_and_quit)
def close_and_quit(self) -> None:
# Stop periodic refresh when window is closing
if hasattr(self, "connection_selection_widgets"):
self.connection_selection_widgets.stop_periodic_refresh()
sys_exit(0)
def fc_autoconnect(self) -> None:
self.connection_selection_widgets.reconnect()
def skip_fc_connection(self, flight_controller: FlightController) -> None:
logging_warning(_("Will proceed without FC connection. FC parameters will not be downloaded nor uploaded"))
logging_warning(_("Only the intermediate '.param' files on the PC disk will be edited"))
# Stop periodic refresh when skipping connection
if hasattr(self, "connection_selection_widgets"):
self.connection_selection_widgets.stop_periodic_refresh()
flight_controller.disconnect()
self.root.destroy()
def argument_parser() -> Namespace: # pragma: no cover
"""
Parses command-line arguments for the script.
This function sets up an argument parser to handle the command-line arguments for the script.
Returns:
argparse.Namespace: An object containing the parsed arguments.
"""
parser = ArgumentParser(
description=_(
"This main is for testing and development only. "
"Usually, the ConnectionSelectionWidgets is called from another script"
)
)
parser = FlightController.add_argparse_arguments(parser)
return add_common_arguments(parser).parse_args()
# pylint: disable=duplicate-code
def main() -> None: # pragma: no cover
args = argument_parser()
logging_basicConfig(level=logging_getLevelName(args.loglevel), format="%(asctime)s - %(levelname)s - %(message)s")
logging_warning(
_("This main is for testing and development only, usually the ConnectionSelectionWindow is called from another script")
)
# pylint: enable=duplicate-code
flight_controller = FlightController(reboot_time=args.reboot_time, baudrate=args.baudrate)
result = flight_controller.connect(device=args.device)
if result:
logging_warning(result)
window = ConnectionSelectionWindow(flight_controller, result, default_baudrate=args.baudrate)
window.root.mainloop()
flight_controller.disconnect() # Disconnect from the flight controller
if __name__ == "__main__": # pragma: no cover
main()