Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/canmatrix/Frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class Frame(object):
signalGroups = attr.ib(factory=list) # type: typing.MutableSequence[SignalGroup]

cycle_time = attr.ib(default=0) # type: int
event_controlled_time = attr.ib(default=0) # type: int
debounce_time_range = attr.ib(default=0) # type: int
final_repetitions = attr.ib(default=0) # type: int
repeating_time_range = attr.ib(default=0) # type: int

is_j1939 = attr.ib(default=False) # type: bool
# ('cycleTime', '_cycleTime', int, None),
Expand Down Expand Up @@ -172,7 +176,7 @@ def source(self, value): # type: (int) -> None

@property
def effective_cycle_time(self):
"""Calculate effective cycle time for frame, depending on singal cycle times"""
"""Calculate effective cycle time for frame, depending on signal cycle times"""
min_cycle_time_list = [y for y in [x.cycle_time for x in self.signals] + [self.cycle_time] if y != 0]
if len(min_cycle_time_list) == 0:
return 0
Expand Down
211 changes: 166 additions & 45 deletions src/canmatrix/formats/fibex.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
logger = logging.getLogger(__name__)

fx = "http://www.asam.net/xml/fbx"
te = "http://www.technica-engineering.com/xml/fbx"
ho = "http://www.asam.net/xml"
can = "http://www.asam.net/xml/fbx/can"
xsi = "http://www.w3.org/2001/XMLSchema-instance"
ns_ho = "{%s}" % ho
ns_fx = "{%s}" % fx
ns_te = "{%s}" % te
ns_can = "{%s}" % can
ns_xsi = "{%s}" % xsi

Expand All @@ -74,6 +76,13 @@ def create_sub_element_fx(parent, element_name, element_text=None):
new.text = element_text
return new

def create_sub_element_te(parent, element_name, element_text=None):
# type: (_Element, str, typing.Optional[str]) -> _Element
new = lxml.etree.SubElement(parent, ns_te + element_name)
if element_text is not None:
new.text = element_text
return new


def create_sub_element_ho(parent, element_name, element_text=None):
# type: (_Element, str, typing.Optional[str]) -> _Element
Expand Down Expand Up @@ -133,6 +142,121 @@ def get_multiplexing_parts_infos(signals, frame_name, start_pos=-1, end_pos=-1,

return start_pos, end_pos, seg_big_endian

def create_frame_element(parent, frame, prefix=""):
"""Helper function to create a frame element with PDU instances."""
frame_element = create_sub_element_fx(parent, "FRAME")
frame_element.set("ID", f"{prefix}FRAME_{frame.name}")

create_short_name_desc(frame_element, frame.name, frame.comment)
create_sub_element_fx(frame_element, "BYTE-LENGTH", str(frame.size))
if frame.attribute("NmAsrMessage") and frame.attribute("NmAsrMessage").lower() == "yes":
create_sub_element_fx(frame_element, "FRAME-TYPE", "NM")
else:
create_sub_element_fx(frame_element, "FRAME-TYPE", "APPLICATION")

# PDU instances
pdu_instances = create_sub_element_fx(frame_element, "PDU-INSTANCES")
pdu_instance = create_sub_element_fx(pdu_instances, "PDU-INSTANCE")
pdu_instance.set("ID", f"PDUINSTANCE_{frame.name}") # Note: No prefix for ID

pdu_ref = create_sub_element_fx(pdu_instance, "PDU-REF")
pdu_ref.set("ID-REF", f"{prefix}PDU_{frame.name}")

create_sub_element_fx(pdu_instance, "BIT-POSITION", "0")
create_sub_element_fx(pdu_instance, "IS-HIGH-LOW-BYTE-ORDER", "false")

return frame_element

def create_frame_triggering(parent, frame, prefix=""):
"""Helper function to create a frame triggering element."""
frame_triggering = create_sub_element_fx(parent, "FRAME-TRIGGERING")
frame_triggering.set("ID", f"{prefix}FT_{frame.name}")

# Identifier
identifier = create_sub_element_fx(frame_triggering, "IDENTIFIER")
create_sub_element_fx(identifier, "IDENTIFIER-VALUE", str(frame.arbitration_id.id))

# Frame reference
frame_ref = create_sub_element_fx(frame_triggering, "FRAME-REF")
frame_ref.set("ID-REF", f"{prefix}FRAME_{frame.name}")

# CAN-FD behavior (if applicable)
if frame.is_fd:
create_sub_element_fx(frame_triggering, "CAN-FRAME-TX-BEHAVIOR", "CAN-FD")
create_sub_element_fx(frame_triggering, "CAN-FRAME-RX-BEHAVIOR", "CAN-FD")

return frame_triggering

def create_pdu_triggering(parent, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def, prefix=""):
"""Helper function to create a PDU triggering element with timings."""
pdu_triggering = create_sub_element_fx(parent, "PDU-TRIGGERING")
pdu_triggering.set("ID", f"{prefix}PDU_{pdu.name}")

# Timings
pdu_timings = create_sub_element_fx(pdu_triggering, "TIMINGS")
if pdu.cycle_time > 0:
cyclic_timing = create_sub_element_fx(pdu_timings, "CYCLIC-TIMING")
repeating_time_range = create_sub_element_fx(cyclic_timing, "REPEATING-TIME-RANGE")
time_value = f"PT{pdu.cycle_time / 1000.0}S"
create_sub_element_fx(repeating_time_range, "VALUE", time_value)

send_type = pdu.attributes["GenMsgSendType"] if "GenMsgSendType" in pdu.attributes else gen_msg_type_def
if send_type and ("event" in send_type.lower() or "spontaneous" in send_type.lower()):
event_controlled_timing = create_sub_element_fx(pdu_timings, "EVENT-CONTROLLED-TIMING")
pdu.debounce_time_range = int(float(pdu.attributes.get("GenMsgDelayTime", msg_delay_time_def)))
debounce_time_range = create_sub_element_fx(event_controlled_timing, "DEBOUNCE-TIME-RANGE")
debounce_time_value = f"PT{pdu.debounce_time_range / 1000.0}S"
create_sub_element_fx(debounce_time_range, "VALUE", debounce_time_value)
pdu.final_repetitions = pdu.attributes.get("GenMsgNrOfRepetition", msg_nr_of_repetition_def)
create_sub_element_fx(event_controlled_timing, "FINAL-REPETITIONS", pdu.final_repetitions)
pdu.repeating_time_range = int(float(pdu.attributes.get("GenMsgCycleTime", msg_cycle_time_def))) if pdu.cycle_time == 0 else None
if pdu.repeating_time_range is not None and pdu.repeating_time_range > 0:
repeating_time_range = create_sub_element_fx(event_controlled_timing, "REPEATING-TIME-RANGE")
repeating_time_range_value = f"PT{pdu.repeating_time_range / 1000.0}S"
create_sub_element_fx(repeating_time_range, "VALUE", repeating_time_range_value)

# PDU reference
pdu_ref = create_sub_element_fx(pdu_triggering, "PDU-REF")
pdu_ref.set("ID-REF", f"{prefix}PDU_{pdu.name}")

return pdu_triggering


def create_output_port(parent, frame, ecu_name, prefix=""):
"""Helper function to create output port with frame and PDU references."""
output_port = create_sub_element_fx(parent, "OUTPUT-PORT")
output_port.set('ID', 'Output_Port_' + ecu_name +'_'+ frame.name)
# Frame triggering reference
frame_triggering_ref = create_sub_element_fx(output_port, "FRAME-TRIGGERING-REF")
frame_triggering_ref.set("ID-REF", f"{prefix}FT_{frame.name}")

# PDU references
included_pdus = create_sub_element_fx(output_port, "INCLUDED-PDUS")
included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU")
included_pdu.set('ID', f'{prefix.lower()}output_included_pdu_{frame.name}')

pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF")
pdu_triggering_ref.set("ID-REF", f"{prefix}PDU_{frame.name}")

return output_port

def create_secoc_configuration(frame, auth_info_tx_length_def, freshness_value_tx_length_def, data_id_def, freshness_value_length_def, spdu):
auth_info_tx_length = int(frame.attribute("SCP_AuthInfoTxLength")) if frame.attribute("SCP_AuthInfoTxLength") is not None else int(auth_info_tx_length_def)
freshness_value_tx_length = int(frame.attribute("SCP_FreshnessValueTxLength")) if frame.attribute("SCP_FreshnessValueTxLength") is not None else int(freshness_value_tx_length_def)
secoc_pdu_length = frame.size + (auth_info_tx_length // 8) + (freshness_value_tx_length // 8)
create_sub_element_fx(spdu, "BYTE-LENGTH", str(secoc_pdu_length))
create_sub_element_fx(spdu, "PDU-TYPE", "OTHER")
manufac_extansion = create_sub_element_te(spdu, "MANUFACTURER-EXTENSION")
sec_props = create_sub_element_te(manufac_extansion, "SECURITY-PROPERTIES")
data_id = frame.attribute("SCP_DataId") if frame.attribute("SCP_DataId") else data_id_def
create_sub_element_te(sec_props, "DATA-ID", data_id)
create_sub_element_te(sec_props, "AUTH-INFO-TX-LENGTH", str(auth_info_tx_length))
freshness_value_length = frame.attribute("SCP_FreshnessValueLength") if frame.attribute("SCP_FreshnessValueLength") else freshness_value_length_def
create_sub_element_te(sec_props, "FRESHNESS-VALUE-LENGTH", str(freshness_value_length))
create_sub_element_te(sec_props, "FRESHNESS-VALUE-TX-LENGTH", str(freshness_value_tx_length))
payload_pdu = create_sub_element_te(sec_props, "PAYLOAD-REF")
payload_pdu.set("ID-REF", "PDU_" + frame.name)

def get_base_data_type(signal):
# type: (Signal) -> str
if signal.is_float:
Expand Down Expand Up @@ -423,7 +547,7 @@ def load(f, **_options):

def dump(db, f, **options):
# type: (canmatrix.CanMatrix, typing.IO, **typing.Any) -> None
ns_map = {"fx": fx, "ho": ho, "can": can, "xsi": xsi}
ns_map = {"fx": fx, "ho": ho, "te": te, "can": can, "xsi": xsi}
can_channel = 'CANCHANNEL01'
db_name= db.attribute("DBName")
if db_name:
Expand All @@ -433,6 +557,10 @@ def dump(db, f, **options):
'{{{pre}}}schemaLocation'.format(
pre=xsi)] = 'http://www.asam.net/xml/fbx ..\\..\\xml_schema\\fibex.xsd http://www.asam.net/xml/fbx/can ..\\..\\xml_schema\\fibex4can.xsd'

gen_msg_type_def = getattr(db.frame_defines.get("GenMsgSendType"), 'defaultValue', None)
msg_delay_time_def = getattr(db.frame_defines.get("GenMsgDelayTime"), 'defaultValue', None)
msg_nr_of_repetition_def = getattr(db.frame_defines.get("GenMsgNrOfRepetition"), 'defaultValue', None)
msg_cycle_time_def = getattr(db.frame_defines.get("GenMsgCycleTime"), 'defaultValue', None)
#
# Make sure that we can even write to FIBEX
#
Expand Down Expand Up @@ -512,30 +640,22 @@ def dump(db, f, **options):
# for pdu triggerings
pdu_triggerings = create_sub_element_fx(channel, "PDU-TRIGGERINGS")
for pdu in db.frames:
pdu_triggering = create_sub_element_fx(
pdu_triggerings, "PDU-TRIGGERING")
pdu_triggering.set("ID", "PDU_" + pdu.name)
pdu_timings = create_sub_element_fx(pdu_triggering, "TIMINGS")
if pdu.cycle_time > 0:
cyclic_timing = create_sub_element_fx(pdu_timings, "CYCLIC-TIMING")
repeating_time_range = create_sub_element_fx(cyclic_timing, "REPEATING-TIME-RANGE")
create_sub_element_fx(repeating_time_range, "VALUE", "PT" + str(pdu.cycle_time/1000.0) + "S")

pdu_ref = create_sub_element_fx(pdu_triggering, "PDU-REF")
pdu_ref.set("ID-REF", "PDU_" + pdu.name)
# Create regular PDU triggering
create_pdu_triggering(pdu_triggerings, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def)

# Create secured PDU triggering if applicable
if pdu.attribute("SC_Message") and pdu.attribute("SC_Message").lower() == "yes":
create_pdu_triggering(pdu_triggerings, pdu, gen_msg_type_def, msg_nr_of_repetition_def, msg_delay_time_def, msg_cycle_time_def, prefix="S")


frame_triggerings = create_sub_element_fx(channel, "FRAME-TRIGGERINGS")
for frame in db.frames:
frame_triggering = create_sub_element_fx(
frame_triggerings, "FRAME-TRIGGERING")
frame_triggering.set("ID", "FT_" + frame.name)
identifier = create_sub_element_fx(frame_triggering, "IDENTIFIER")
create_sub_element_fx(identifier, "IDENTIFIER-VALUE", str(frame.arbitration_id.id))
frame_ref = create_sub_element_fx(frame_triggering, "FRAME-REF")
frame_ref.set("ID-REF", "FRAME_" + frame.name)
if (frame.is_fd):
create_sub_element_fx(frame_triggering, "CAN-FRAME-TX-BEHAVIOR","CAN-FD")
create_sub_element_fx(frame_triggering, "CAN-FRAME-RX-BEHAVIOR","CAN-FD")
# Create regular frame triggering
create_frame_triggering(frame_triggerings, frame)

# Create secured frame triggering if applicable
if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes":
create_frame_triggering(frame_triggerings, frame, prefix="S")

#
# ECUS
Expand Down Expand Up @@ -578,29 +698,31 @@ def dump(db, f, **options):
outputs = create_sub_element_fx(connector, "OUTPUTS")
for frame in db.frames:
if bu.name in frame.transmitters:
output_port = create_sub_element_fx(outputs, "OUTPUT-PORT")
output_port.set('ID', 'Output_Port_' + bu.name +'_'+ frame.name)
frame_triggering_ref = create_sub_element_fx(output_port, "FRAME-TRIGGERING-REF")
frame_triggering_ref.set("ID-REF", "FT_" + frame.name)
# Reference to PDUs
included_pdus = create_sub_element_fx(output_port, "INCLUDED-PDUS")
included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU")
included_pdu.set('ID', 'output_included_pdu_' + frame.name)
pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF")
pdu_triggering_ref.set("ID-REF", "PDU_" + frame.name)
# Create regular output port
create_output_port(outputs, frame, bu.name)
# Create secured output port if applicable
if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes":
create_output_port(outputs, frame, bu.name, prefix="S")

# ignore CONTROLLERS/CONTROLLER

#
# PDUS
#
pdus = create_sub_element_fx(elements, "PDUS")
auth_info_tx_length_def = getattr(db.frame_defines.get("SCP_AuthInfoTxLength"), 'defaultValue', None)
freshness_value_tx_length_def = getattr(db.frame_defines.get("SCP_FreshnessValueTxLength"), 'defaultValue', None)
data_id_def = getattr(db.frame_defines.get("SCP_DataId"), 'defaultValue', None)
freshness_value_length_def = getattr(db.frame_defines.get("SCP_FreshnessValueLength"), 'defaultValue', None)
for frame in db.frames:
pdu = create_sub_element_fx(pdus, "PDU")
pdu.set("ID", "PDU_" + frame.name)
create_short_name_desc(pdu, "PDU_" + frame.name, frame.comment)
create_sub_element_fx(pdu, "BYTE-LENGTH", str(frame.size)) # DLC
create_sub_element_fx(pdu, "PDU-TYPE", "APPLICATION")
if frame.attribute("NmAsrMessage") and frame.attribute("NmAsrMessage").lower() == "yes":
create_sub_element_fx(pdu, "PDU-TYPE", "NM")
else:
create_sub_element_fx(pdu, "PDU-TYPE", "APPLICATION")

if frame.is_multiplexed:
mux = create_sub_element_fx(pdu, "MULTIPLEXER")
Expand Down Expand Up @@ -725,23 +847,22 @@ def dump(db, f, **options):
signal_id = create_signal_id(frame, signal)
signal_instance = create_signal_instance(signal_instances, signal, signal_id)
create_signal_ref(signal_instance, signal_id)
if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes":
spdu = create_sub_element_fx(pdus, "PDU")
spdu.set("ID", "SPDU_" + frame.name)
create_short_name_desc(spdu, "SPDU_" + frame.name, frame.comment)
create_secoc_configuration(frame, auth_info_tx_length_def, freshness_value_tx_length_def, data_id_def, freshness_value_length_def, spdu)

# FRAMES
#
frames = create_sub_element_fx(elements, "FRAMES")
for frame in db.frames:
frame_element = create_sub_element_fx(frames, "FRAME")
frame_element.set("ID", "FRAME_" + frame.name)
create_short_name_desc(frame_element, frame.name, frame.comment)
create_sub_element_fx(frame_element, "BYTE-LENGTH", str(frame.size)) # DLC
create_sub_element_fx(frame_element, "PDU-TYPE", "APPLICATION")
pdu_instances = create_sub_element_fx(frame_element, "PDU-INSTANCES")
pdu_instance = create_sub_element_fx(pdu_instances, "PDU-INSTANCE")
pdu_instance.set("ID", "PDUINSTANCE_" + frame.name)
pdu_ref = create_sub_element_fx(pdu_instance, "PDU-REF")
pdu_ref.set("ID-REF", "PDU_" + frame.name)
create_sub_element_fx(pdu_instance, "BIT-POSITION", "0")
create_sub_element_fx(pdu_instance, "IS-HIGH-LOW-BYTE-ORDER", "false")
# Create regular frame
create_frame_element(frames, frame)

# Create secured frame if applicable
if frame.attribute("SC_Message") and frame.attribute("SC_Message").lower() == "yes":
create_frame_element(frames, frame, prefix="S")

#
# FUNCTIONS
Expand Down