diff --git a/src/canmatrix/Frame.py b/src/canmatrix/Frame.py index 0159c721..fde15872 100644 --- a/src/canmatrix/Frame.py +++ b/src/canmatrix/Frame.py @@ -82,6 +82,10 @@ class Frame(object): signalGroups = attr.ib(factory=list) # type: typing.MutableSequence[SignalGroup] cycle_time = attr.ib(default=0) # type: int + event_controlled_time = attr.ib(default=0) # type: int + debounce_time_range = attr.ib(default=0) # type: int + final_repetitions = attr.ib(default=0) # type: int + repeating_time_range = attr.ib(default=0) # type: int is_j1939 = attr.ib(default=False) # type: bool # ('cycleTime', '_cycleTime', int, None), @@ -172,7 +176,7 @@ def source(self, value): # type: (int) -> None @property def effective_cycle_time(self): - """Calculate effective cycle time for frame, depending on singal cycle times""" + """Calculate effective cycle time for frame, depending on signal cycle times""" min_cycle_time_list = [y for y in [x.cycle_time for x in self.signals] + [self.cycle_time] if y != 0] if len(min_cycle_time_list) == 0: return 0 diff --git a/src/canmatrix/formats/fibex.py b/src/canmatrix/formats/fibex.py index 11a85e9d..27adc670 100644 --- a/src/canmatrix/formats/fibex.py +++ b/src/canmatrix/formats/fibex.py @@ -45,11 +45,13 @@ logger = logging.getLogger(__name__) fx = "http://www.asam.net/xml/fbx" +te = "http://www.technica-engineering.com/xml/fbx" ho = "http://www.asam.net/xml" can = "http://www.asam.net/xml/fbx/can" xsi = "http://www.w3.org/2001/XMLSchema-instance" ns_ho = "{%s}" % ho ns_fx = "{%s}" % fx +ns_te = "{%s}" % te ns_can = "{%s}" % can ns_xsi = "{%s}" % xsi @@ -74,6 +76,13 @@ def create_sub_element_fx(parent, element_name, element_text=None): new.text = element_text return new +def create_sub_element_te(parent, element_name, element_text=None): + # type: (_Element, str, typing.Optional[str]) -> _Element + new = lxml.etree.SubElement(parent, ns_te + element_name) + if element_text is not None: + new.text = element_text + return new + def create_sub_element_ho(parent, element_name, element_text=None): # type: (_Element, str, typing.Optional[str]) -> _Element @@ -133,6 +142,121 @@ def get_multiplexing_parts_infos(signals, frame_name, start_pos=-1, end_pos=-1, return start_pos, end_pos, seg_big_endian +def create_frame_element(parent, frame, prefix=""): + """Helper function to create a frame element with PDU instances.""" + frame_element = create_sub_element_fx(parent, "FRAME") + frame_element.set("ID", f"{prefix}FRAME_{frame.name}") + + create_short_name_desc(frame_element, frame.name, frame.comment) + create_sub_element_fx(frame_element, "BYTE-LENGTH", str(frame.size)) + if frame.attribute("NmAsrMessage") and frame.attribute("NmAsrMessage").lower() == "yes": + create_sub_element_fx(frame_element, "FRAME-TYPE", "NM") + else: + create_sub_element_fx(frame_element, "FRAME-TYPE", "APPLICATION") + + # PDU instances + pdu_instances = create_sub_element_fx(frame_element, "PDU-INSTANCES") + pdu_instance = create_sub_element_fx(pdu_instances, "PDU-INSTANCE") + pdu_instance.set("ID", f"PDUINSTANCE_{frame.name}") # Note: No prefix for ID + + pdu_ref = create_sub_element_fx(pdu_instance, "PDU-REF") + pdu_ref.set("ID-REF", f"{prefix}PDU_{frame.name}") + + create_sub_element_fx(pdu_instance, "BIT-POSITION", "0") + create_sub_element_fx(pdu_instance, "IS-HIGH-LOW-BYTE-ORDER", "false") + + return frame_element + +def create_frame_triggering(parent, frame, prefix=""): + """Helper function to create a frame triggering element.""" + frame_triggering = create_sub_element_fx(parent, "FRAME-TRIGGERING") + frame_triggering.set("ID", f"{prefix}FT_{frame.name}") + + # Identifier + identifier = create_sub_element_fx(frame_triggering, "IDENTIFIER") + create_sub_element_fx(identifier, "IDENTIFIER-VALUE", str(frame.arbitration_id.id)) + + # Frame reference + frame_ref = create_sub_element_fx(frame_triggering, "FRAME-REF") + frame_ref.set("ID-REF", f"{prefix}FRAME_{frame.name}") + + # CAN-FD behavior (if applicable) + if frame.is_fd: + create_sub_element_fx(frame_triggering, "CAN-FRAME-TX-BEHAVIOR", "CAN-FD") + create_sub_element_fx(frame_triggering, "CAN-FRAME-RX-BEHAVIOR", "CAN-FD") + + return frame_triggering + +def create_pdu_triggering(parent, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def, prefix=""): + """Helper function to create a PDU triggering element with timings.""" + pdu_triggering = create_sub_element_fx(parent, "PDU-TRIGGERING") + pdu_triggering.set("ID", f"{prefix}PDU_{pdu.name}") + + # Timings + pdu_timings = create_sub_element_fx(pdu_triggering, "TIMINGS") + if pdu.cycle_time > 0: + cyclic_timing = create_sub_element_fx(pdu_timings, "CYCLIC-TIMING") + repeating_time_range = create_sub_element_fx(cyclic_timing, "REPEATING-TIME-RANGE") + time_value = f"PT{pdu.cycle_time / 1000.0}S" + create_sub_element_fx(repeating_time_range, "VALUE", time_value) + + send_type = pdu.attributes["GenMsgSendType"] if "GenMsgSendType" in pdu.attributes else gen_msg_type_def + if send_type and ("event" in send_type.lower() or "spontaneous" in send_type.lower()): + event_controlled_timing = create_sub_element_fx(pdu_timings, "EVENT-CONTROLLED-TIMING") + pdu.debounce_time_range = int(float(pdu.attributes.get("GenMsgDelayTime", msg_delay_time_def))) + debounce_time_range = create_sub_element_fx(event_controlled_timing, "DEBOUNCE-TIME-RANGE") + debounce_time_value = f"PT{pdu.debounce_time_range / 1000.0}S" + create_sub_element_fx(debounce_time_range, "VALUE", debounce_time_value) + pdu.final_repetitions = pdu.attributes.get("GenMsgNrOfRepetition", msg_nr_of_repetition_def) + create_sub_element_fx(event_controlled_timing, "FINAL-REPETITIONS", pdu.final_repetitions) + pdu.repeating_time_range = int(float(pdu.attributes.get("GenMsgCycleTime", msg_cycle_time_def))) if pdu.cycle_time == 0 else None + if pdu.repeating_time_range is not None and pdu.repeating_time_range > 0: + repeating_time_range = create_sub_element_fx(event_controlled_timing, "REPEATING-TIME-RANGE") + repeating_time_range_value = f"PT{pdu.repeating_time_range / 1000.0}S" + create_sub_element_fx(repeating_time_range, "VALUE", repeating_time_range_value) + + # PDU reference + pdu_ref = create_sub_element_fx(pdu_triggering, "PDU-REF") + pdu_ref.set("ID-REF", f"{prefix}PDU_{pdu.name}") + + return pdu_triggering + + +def create_output_port(parent, frame, ecu_name, prefix=""): + """Helper function to create output port with frame and PDU references.""" + output_port = create_sub_element_fx(parent, "OUTPUT-PORT") + output_port.set('ID', 'Output_Port_' + ecu_name +'_'+ frame.name) + # Frame triggering reference + frame_triggering_ref = create_sub_element_fx(output_port, "FRAME-TRIGGERING-REF") + frame_triggering_ref.set("ID-REF", f"{prefix}FT_{frame.name}") + + # PDU references + included_pdus = create_sub_element_fx(output_port, "INCLUDED-PDUS") + included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU") + included_pdu.set('ID', f'{prefix.lower()}output_included_pdu_{frame.name}') + + pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF") + pdu_triggering_ref.set("ID-REF", f"{prefix}PDU_{frame.name}") + + return output_port + +def create_secoc_configuration(frame, auth_info_tx_length_def, freshness_value_tx_length_def, data_id_def, freshness_value_length_def, spdu): + auth_info_tx_length = int(frame.attribute("SCP_AuthInfoTxLength")) if frame.attribute("SCP_AuthInfoTxLength") is not None else int(auth_info_tx_length_def) + freshness_value_tx_length = int(frame.attribute("SCP_FreshnessValueTxLength")) if frame.attribute("SCP_FreshnessValueTxLength") is not None else int(freshness_value_tx_length_def) + secoc_pdu_length = frame.size + (auth_info_tx_length // 8) + (freshness_value_tx_length // 8) + create_sub_element_fx(spdu, "BYTE-LENGTH", str(secoc_pdu_length)) + create_sub_element_fx(spdu, "PDU-TYPE", "OTHER") + manufac_extansion = create_sub_element_te(spdu, "MANUFACTURER-EXTENSION") + sec_props = create_sub_element_te(manufac_extansion, "SECURITY-PROPERTIES") + data_id = frame.attribute("SCP_DataId") if frame.attribute("SCP_DataId") else data_id_def + create_sub_element_te(sec_props, "DATA-ID", data_id) + create_sub_element_te(sec_props, "AUTH-INFO-TX-LENGTH", str(auth_info_tx_length)) + freshness_value_length = frame.attribute("SCP_FreshnessValueLength") if frame.attribute("SCP_FreshnessValueLength") else freshness_value_length_def + create_sub_element_te(sec_props, "FRESHNESS-VALUE-LENGTH", str(freshness_value_length)) + create_sub_element_te(sec_props, "FRESHNESS-VALUE-TX-LENGTH", str(freshness_value_tx_length)) + payload_pdu = create_sub_element_te(sec_props, "PAYLOAD-REF") + payload_pdu.set("ID-REF", "PDU_" + frame.name) + def get_base_data_type(signal): # type: (Signal) -> str if signal.is_float: @@ -423,7 +547,7 @@ def load(f, **_options): def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.Any) -> None - ns_map = {"fx": fx, "ho": ho, "can": can, "xsi": xsi} + ns_map = {"fx": fx, "ho": ho, "te": te, "can": can, "xsi": xsi} can_channel = 'CANCHANNEL01' db_name= db.attribute("DBName") if db_name: @@ -433,6 +557,10 @@ def dump(db, f, **options): '{{{pre}}}schemaLocation'.format( pre=xsi)] = 'http://www.asam.net/xml/fbx ..\\..\\xml_schema\\fibex.xsd http://www.asam.net/xml/fbx/can ..\\..\\xml_schema\\fibex4can.xsd' + gen_msg_type_def = getattr(db.frame_defines.get("GenMsgSendType"), 'defaultValue', None) + msg_delay_time_def = getattr(db.frame_defines.get("GenMsgDelayTime"), 'defaultValue', None) + msg_nr_of_repetition_def = getattr(db.frame_defines.get("GenMsgNrOfRepetition"), 'defaultValue', None) + msg_cycle_time_def = getattr(db.frame_defines.get("GenMsgCycleTime"), 'defaultValue', None) # # Make sure that we can even write to FIBEX # @@ -512,30 +640,22 @@ def dump(db, f, **options): # for pdu triggerings pdu_triggerings = create_sub_element_fx(channel, "PDU-TRIGGERINGS") for pdu in db.frames: - pdu_triggering = create_sub_element_fx( - pdu_triggerings, "PDU-TRIGGERING") - pdu_triggering.set("ID", "PDU_" + pdu.name) - pdu_timings = create_sub_element_fx(pdu_triggering, "TIMINGS") - if pdu.cycle_time > 0: - cyclic_timing = create_sub_element_fx(pdu_timings, "CYCLIC-TIMING") - repeating_time_range = create_sub_element_fx(cyclic_timing, "REPEATING-TIME-RANGE") - create_sub_element_fx(repeating_time_range, "VALUE", "PT" + str(pdu.cycle_time/1000.0) + "S") - - pdu_ref = create_sub_element_fx(pdu_triggering, "PDU-REF") - pdu_ref.set("ID-REF", "PDU_" + pdu.name) + # Create regular PDU triggering + create_pdu_triggering(pdu_triggerings, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def) + + # Create secured PDU triggering if applicable + if pdu.attribute("SC_Message") and pdu.attribute("SC_Message").lower() == "yes": + create_pdu_triggering(pdu_triggerings, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def, prefix="S") + frame_triggerings = create_sub_element_fx(channel, "FRAME-TRIGGERINGS") for frame in db.frames: - frame_triggering = create_sub_element_fx( - frame_triggerings, "FRAME-TRIGGERING") - frame_triggering.set("ID", "FT_" + frame.name) - identifier = create_sub_element_fx(frame_triggering, "IDENTIFIER") - create_sub_element_fx(identifier, "IDENTIFIER-VALUE", str(frame.arbitration_id.id)) - frame_ref = create_sub_element_fx(frame_triggering, "FRAME-REF") - frame_ref.set("ID-REF", "FRAME_" + frame.name) - if (frame.is_fd): - create_sub_element_fx(frame_triggering, "CAN-FRAME-TX-BEHAVIOR","CAN-FD") - create_sub_element_fx(frame_triggering, "CAN-FRAME-RX-BEHAVIOR","CAN-FD") + # Create regular frame triggering + create_frame_triggering(frame_triggerings, frame) + + # Create secured frame triggering if applicable + if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes": + create_frame_triggering(frame_triggerings, frame, prefix="S") # # ECUS @@ -578,16 +698,11 @@ def dump(db, f, **options): outputs = create_sub_element_fx(connector, "OUTPUTS") for frame in db.frames: if bu.name in frame.transmitters: - output_port = create_sub_element_fx(outputs, "OUTPUT-PORT") - output_port.set('ID', 'Output_Port_' + bu.name +'_'+ frame.name) - frame_triggering_ref = create_sub_element_fx(output_port, "FRAME-TRIGGERING-REF") - frame_triggering_ref.set("ID-REF", "FT_" + frame.name) - # Reference to PDUs - included_pdus = create_sub_element_fx(output_port, "INCLUDED-PDUS") - included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU") - included_pdu.set('ID', 'output_included_pdu_' + frame.name) - pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF") - pdu_triggering_ref.set("ID-REF", "PDU_" + frame.name) + # Create regular output port + create_output_port(outputs, frame, bu.name) + # Create secured output port if applicable + if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes": + create_output_port(outputs, frame, bu.name, prefix="S") # ignore CONTROLLERS/CONTROLLER @@ -595,12 +710,19 @@ def dump(db, f, **options): # PDUS # pdus = create_sub_element_fx(elements, "PDUS") + auth_info_tx_length_def = getattr(db.frame_defines.get("SCP_AuthInfoTxLength"), 'defaultValue', None) + freshness_value_tx_length_def = getattr(db.frame_defines.get("SCP_FreshnessValueTxLength"), 'defaultValue', None) + data_id_def = getattr(db.frame_defines.get("SCP_DataId"), 'defaultValue', None) + freshness_value_length_def = getattr(db.frame_defines.get("SCP_FreshnessValueLength"), 'defaultValue', None) for frame in db.frames: pdu = create_sub_element_fx(pdus, "PDU") pdu.set("ID", "PDU_" + frame.name) create_short_name_desc(pdu, "PDU_" + frame.name, frame.comment) create_sub_element_fx(pdu, "BYTE-LENGTH", str(frame.size)) # DLC - create_sub_element_fx(pdu, "PDU-TYPE", "APPLICATION") + if frame.attribute("NmAsrMessage") and frame.attribute("NmAsrMessage").lower() == "yes": + create_sub_element_fx(pdu, "PDU-TYPE", "NM") + else: + create_sub_element_fx(pdu, "PDU-TYPE", "APPLICATION") if frame.is_multiplexed: mux = create_sub_element_fx(pdu, "MULTIPLEXER") @@ -725,23 +847,22 @@ def dump(db, f, **options): signal_id = create_signal_id(frame, signal) signal_instance = create_signal_instance(signal_instances, signal, signal_id) create_signal_ref(signal_instance, signal_id) + if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes": + spdu = create_sub_element_fx(pdus, "PDU") + spdu.set("ID", "SPDU_" + frame.name) + create_short_name_desc(spdu, "SPDU_" + frame.name, frame.comment) + create_secoc_configuration(frame, auth_info_tx_length_def, freshness_value_tx_length_def, data_id_def, freshness_value_length_def, spdu) # FRAMES # frames = create_sub_element_fx(elements, "FRAMES") for frame in db.frames: - frame_element = create_sub_element_fx(frames, "FRAME") - frame_element.set("ID", "FRAME_" + frame.name) - create_short_name_desc(frame_element, frame.name, frame.comment) - create_sub_element_fx(frame_element, "BYTE-LENGTH", str(frame.size)) # DLC - create_sub_element_fx(frame_element, "PDU-TYPE", "APPLICATION") - pdu_instances = create_sub_element_fx(frame_element, "PDU-INSTANCES") - pdu_instance = create_sub_element_fx(pdu_instances, "PDU-INSTANCE") - pdu_instance.set("ID", "PDUINSTANCE_" + frame.name) - pdu_ref = create_sub_element_fx(pdu_instance, "PDU-REF") - pdu_ref.set("ID-REF", "PDU_" + frame.name) - create_sub_element_fx(pdu_instance, "BIT-POSITION", "0") - create_sub_element_fx(pdu_instance, "IS-HIGH-LOW-BYTE-ORDER", "false") + # Create regular frame + create_frame_element(frames, frame) + + # Create secured frame if applicable + if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes": + create_frame_element(frames, frame, prefix="S") # # FUNCTIONS