Skip to content

Commit 71fdf2b

Browse files
committed
More WIP
1 parent fcba551 commit 71fdf2b

3 files changed

Lines changed: 87 additions & 53 deletions

File tree

ardupilot_methodic_configurator/data_model_vehicle_components_import.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ def _set_esc_type_from_fc_parameters(self, fc_parameters: dict[str, float], doc:
334334
else:
335335
self.set_component_value(("ESC", "FC->ESC Connection", "Type"), "AIO")
336336

337+
protocol = ""
337338
if "MOT_PWM_TYPE" in doc and "values" in doc["MOT_PWM_TYPE"]:
338339
protocol = doc["MOT_PWM_TYPE"]["values"].get(str(mot_pwm_type), "")
339340
if protocol:
@@ -343,6 +344,15 @@ def _set_esc_type_from_fc_parameters(self, fc_parameters: dict[str, float], doc:
343344
protocol = str(MOT_PWM_TYPE_DICT[str(mot_pwm_type)]["protocol"])
344345
self.set_component_value(("ESC", "FC->ESC Connection", "Protocol"), protocol)
345346

347+
# Set ESC->FC Telemetry: DShot protocols support BDShot telemetry on the same PWM wire
348+
esc_conn_type = self.get_component_value(("ESC", "FC->ESC Connection", "Type"))
349+
if protocol and protocol.startswith("DShot"):
350+
self.set_component_value(("ESC", "ESC->FC Telemetry", "Type"), esc_conn_type)
351+
self.set_component_value(("ESC", "ESC->FC Telemetry", "Protocol"), "BDShot")
352+
else:
353+
self.set_component_value(("ESC", "ESC->FC Telemetry", "Type"), "None")
354+
self.set_component_value(("ESC", "ESC->FC Telemetry", "Protocol"), "None")
355+
346356
def _set_battery_type_from_fc_parameters(self, fc_parameters: dict[str, float]) -> None: # pylint: disable=too-many-branches
347357
"""Process battery monitor parameters and update the data model."""
348358
if "BATT_MONITOR" in fc_parameters: # pylint: disable=too-many-nested-blocks

ardupilot_methodic_configurator/data_model_vehicle_components_validation.py

Lines changed: 76 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ def get_connection_type_tuples_with_labels(connection_types: tuple[str, ...]) ->
264264
"0": {"type": ["None"], "protocol": "None"},
265265
"1": {"type": PWM_OUT_PORTS, "protocol": "BDShot"},
266266
"2": {"type": CAN_PORTS, "protocol": "DroneCAN"},
267-
"3": {"type": SERIAL_PORTS, "protocol": "DShot"},
268-
"4": {"type": SERIAL_PORTS, "protocol": "FETtec OneWire"},
267+
"3": {"type": SERIAL_PORTS, "protocol": "DShotTelemetry"},
268+
"4": {"type": SERIAL_PORTS, "protocol": "FETtecOneWire"},
269269
}
270270

271271

@@ -371,6 +371,10 @@ def get_connection_types(conn_dict: dict) -> tuple[str, ...]:
371371
("Battery Monitor", "FC Connection", "Protocol"): get_combobox_values("BATT_MONITOR"),
372372
("ESC", "FC->ESC Connection", "Type"): (*PWM_OUT_PORTS, *SERIAL_PORTS, *CAN_PORTS),
373373
("ESC", "FC->ESC Connection", "Protocol"): self._mot_pwm_types,
374+
("ESC", "ESC->FC Telemetry", "Type"): ("None", *PWM_OUT_PORTS, *SERIAL_PORTS, *CAN_PORTS),
375+
("ESC", "ESC->FC Telemetry", "Protocol"): tuple(
376+
dict.fromkeys(str(v["protocol"]) for v in ESC_TELEMETRY_DICT.values())
377+
),
374378
("GNSS Receiver", "FC Connection", "Type"): ("None", *SERIAL_PORTS, *CAN_PORTS),
375379
("GNSS Receiver", "FC Connection", "Protocol"): get_all_protocols(GNSS_RECEIVER_CONNECTION),
376380
("Battery", "Specifications", "Chemistry"): BatteryCell.chemistries(),
@@ -398,47 +402,61 @@ def _update_possible_choices_for_path( # pylint: disable=too-many-branches
398402
self, path: ComponentPath, value: Union[ComponentData, ComponentValue, None]
399403
) -> None:
400404
"""Update _possible_choices when connection type values that affect protocol choices are changed."""
401-
# Only update if this is a connection type change that affects protocol choices
402-
if len(path) >= 3 and path[1] == "FC Connection" and path[2] == "Type" and isinstance(value, str):
403-
component_name = path[0]
404-
protocol_path: ComponentPath = (component_name, "FC Connection", "Protocol")
405-
406-
# Calculate the new possible choices for the corresponding protocol field
407-
if component_name == "RC Receiver":
408-
# Filter RC protocols based on the selected connection type
409-
if value == "None":
410-
new_choices: tuple[str, ...] = ("None",)
411-
else:
412-
# For any connection type, find protocols that support it
413-
new_choices = tuple(str(v["protocol"]) for v in RC_PROTOCOLS_DICT.values() if value in v["type"])
414-
self._possible_choices[protocol_path] = new_choices
405+
if len(path) < 3 or path[2] != "Type" or not isinstance(value, str):
406+
return
415407

416-
elif component_name == "Telemetry":
417-
if value == "None":
418-
self._possible_choices[protocol_path] = ("None",)
419-
else:
420-
# For non-None telemetry connections, use the standard telemetry protocols
421-
self._possible_choices[protocol_path] = tuple(
422-
str(v["protocol"]) for v in SERIAL_PROTOCOLS_DICT.values() if v["component"] == "Telemetry"
423-
)
408+
component_name = path[0]
409+
section = path[1]
424410

425-
elif component_name == "Battery Monitor":
426-
if value == "None":
427-
self._possible_choices[protocol_path] = ("None",)
428-
return
411+
if section not in ("FC Connection", "FC->ESC Connection", "ESC->FC Telemetry"):
412+
return
429413

430-
# Find protocols available for the selected connection type
431-
batt_available_protocols: list[str] = []
432-
for conn_dict in BATT_MONITOR_CONNECTION.values():
433-
conn_type = conn_dict["type"]
434-
if isinstance(conn_type, list) and value in conn_type:
435-
batt_available_protocols.append(str(conn_dict["protocol"]))
414+
protocol_path: ComponentPath = (component_name, section, "Protocol")
436415

437-
self._possible_choices[protocol_path] = (
438-
tuple(batt_available_protocols) if batt_available_protocols else ("None",)
416+
# Calculate the new possible choices for the corresponding protocol field
417+
if component_name == "RC Receiver":
418+
# Filter RC protocols based on the selected connection type
419+
if value == "None":
420+
new_choices: tuple[str, ...] = ("None",)
421+
else:
422+
# For any connection type, find protocols that support it
423+
new_choices = tuple(str(v["protocol"]) for v in RC_PROTOCOLS_DICT.values() if value in v["type"])
424+
self._possible_choices[protocol_path] = new_choices
425+
426+
elif component_name == "Telemetry":
427+
if value == "None":
428+
self._possible_choices[protocol_path] = ("None",)
429+
else:
430+
# For non-None telemetry connections, use the standard telemetry protocols
431+
self._possible_choices[protocol_path] = tuple(
432+
str(v["protocol"]) for v in SERIAL_PROTOCOLS_DICT.values() if v["component"] == "Telemetry"
439433
)
440434

441-
elif component_name == "ESC":
435+
elif component_name == "Battery Monitor":
436+
if value == "None":
437+
self._possible_choices[protocol_path] = ("None",)
438+
return
439+
440+
# Find protocols available for the selected connection type
441+
batt_available_protocols: list[str] = []
442+
for conn_dict in BATT_MONITOR_CONNECTION.values():
443+
conn_type = conn_dict["type"]
444+
if isinstance(conn_type, list) and value in conn_type:
445+
batt_available_protocols.append(str(conn_dict["protocol"]))
446+
447+
self._possible_choices[protocol_path] = tuple(batt_available_protocols) if batt_available_protocols else ("None",)
448+
449+
elif component_name == "ESC":
450+
if section == "ESC->FC Telemetry":
451+
if value == "None":
452+
self._possible_choices[protocol_path] = ("None",)
453+
else:
454+
self._possible_choices[protocol_path] = tuple(
455+
str(v["protocol"])
456+
for v in ESC_TELEMETRY_DICT.values()
457+
if isinstance(v["type"], list) and value in v["type"]
458+
) or ("None",)
459+
else: # "FC->ESC Connection"
442460
if value == "None":
443461
self._possible_choices[protocol_path] = ("None",)
444462
elif value in CAN_PORTS:
@@ -451,21 +469,19 @@ def _update_possible_choices_for_path( # pylint: disable=too-many-branches
451469
# For PWM outputs, use motor PWM types
452470
self._possible_choices[protocol_path] = self._mot_pwm_types
453471

454-
elif component_name == "GNSS Receiver":
455-
if value == "None":
456-
self._possible_choices[protocol_path] = ("None",)
457-
return
472+
elif component_name == "GNSS Receiver":
473+
if value == "None":
474+
self._possible_choices[protocol_path] = ("None",)
475+
return
458476

459-
# Find protocols available for the selected connection type
460-
gnss_available_protocols: list[str] = []
461-
for conn_dict in GNSS_RECEIVER_CONNECTION.values():
462-
conn_type = conn_dict["type"]
463-
if isinstance(conn_type, list) and value in conn_type:
464-
gnss_available_protocols.append(str(conn_dict["protocol"]))
477+
# Find protocols available for the selected connection type
478+
gnss_available_protocols: list[str] = []
479+
for conn_dict in GNSS_RECEIVER_CONNECTION.values():
480+
conn_type = conn_dict["type"]
481+
if isinstance(conn_type, list) and value in conn_type:
482+
gnss_available_protocols.append(str(conn_dict["protocol"]))
465483

466-
self._possible_choices[protocol_path] = (
467-
tuple(gnss_available_protocols) if gnss_available_protocols else ("None",)
468-
)
484+
self._possible_choices[protocol_path] = tuple(gnss_available_protocols) if gnss_available_protocols else ("None",)
469485

470486
def _validate_tow_limits(self, value: str, path: ComponentPath) -> tuple[str, Optional[float]]:
471487
"""Validate takeoff weight min/max cross-constraints."""
@@ -630,16 +646,24 @@ def validate_all_data(self, entry_values: dict[ComponentPath, str]) -> tuple[boo
630646
# Keep protocol choices in sync with connection type changes in this batch.
631647
# This ensures dependent fields like Battery Monitor protocol are validated correctly
632648
# when both Type and Protocol are present in entry_values.
633-
if len(path) >= 3 and path[1] == "FC Connection" and path[2] == "Type" and isinstance(value, str):
649+
if len(path) >= 3 and path[2] == "Type" and isinstance(value, str):
634650
self._update_possible_choices_for_path(path, value)
635651

636652
# Check for duplicate connections
637-
if len(path) >= 3 and path[1] == "FC Connection" and path[2] == "Type":
653+
esc_conn_sections = {"FC->ESC Connection", "ESC->FC Telemetry"}
654+
is_fc_conn_type = len(path) >= 3 and path[2] == "Type" and (
655+
path[1] == "FC Connection" or path[1] in esc_conn_sections
656+
)
657+
if is_fc_conn_type:
638658
if value in fc_serial_connection and value not in {"CAN1", "CAN2", "I2C1", "I2C2", "I2C3", "I2C4", "None"}:
639659
# Allow certain combinations
640660
if path[0] in {"Telemetry", "RC Receiver"} and fc_serial_connection[value] in {"Telemetry", "RC Receiver"}:
641661
continue
642662

663+
# Allow ESC->FC Telemetry to share the same port as FC->ESC Connection (bidirectional serial)
664+
if path[0] == "ESC" and path[1] in esc_conn_sections and fc_serial_connection[value] == "ESC":
665+
continue
666+
643667
error_msg = _("Duplicate FC connection type '{value}' for {paths_str}")
644668
errors.append(error_msg.format(value=value, paths_str=paths_str))
645669
continue

tests/test_data_model_vehicle_components_import.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2093,7 +2093,7 @@ def test_system_uses_mot_pwm_type_dict_fallback_when_doc_has_no_mot_pwm_type(sel
20932093

20942094
basic_model._set_esc_type_from_fc_parameters(fc_parameters, doc)
20952095

2096-
result = basic_model.get_component_value(("ESC", "FC Connection", "Protocol"))
2096+
result = basic_model.get_component_value(("ESC", "FC->ESC Connection", "Protocol"))
20972097
assert result == "Normal"
20982098

20992099
# ------------------------------------------------------------------

0 commit comments

Comments
 (0)