diff --git a/scapy/contrib/automotive/uds_scan.py b/scapy/contrib/automotive/uds_scan.py index f30b9643ccb..6271b051868 100644 --- a/scapy/contrib/automotive/uds_scan.py +++ b/scapy/contrib/automotive/uds_scan.py @@ -41,7 +41,7 @@ _SocketUnion, _TransitionTuple, StateGenerator from scapy.contrib.automotive.uds import UDS, UDS_NR, UDS_DSC, UDS_TP, \ UDS_RDBI, UDS_WDBI, UDS_SA, UDS_RC, UDS_IOCBI, UDS_RMBA, UDS_ER, \ - UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD + UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD, UDS_DSCPR # TODO: Refactor this import from scapy.contrib.automotive.uds_ecu_states import * # noqa: F401, F403 from scapy.error import Scapy_Exception @@ -89,7 +89,8 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator): _supported_kwargs.update({ 'delay_state_change': (int, lambda x: x >= 0), 'overwrite_timeout': (bool, None), - 'close_socket_when_entering_session_2': (bool, None) + 'close_socket_when_entering_session_2': (bool, None), + 'support_suppress_positive_response': (bool, None) }) _supported_kwargs["scan_range"] = ( (list, tuple, range), lambda x: max(x) < 0x100 and min(x) >= 0) @@ -113,7 +114,13 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator): This enumerator will close the socket if session 2 (ProgrammingSession) was entered, if True. This will - force a reconnect by the executor.""" + force a reconnect by the executor. + :param bool support_suppress_positive_response: False by default. + If True, this enumerator will treat + no response for a DSC request with a + session type > 0x80 as a positive + response and will therefore create a + new state with a session value - 0x80.""" def _get_initial_requests(self, **kwargs): # type: (Any) -> Iterable[Packet] @@ -172,6 +179,29 @@ def get_new_edge(self, except KeyError: close_socket = False + try: + support_out_of_spec = config[UDS_DSCEnumerator.__name__]["support_suppress_positive_response"] # noqa: E501 + except KeyError: + support_out_of_spec = False + + if edge is None: + try: + state, req, resp, _, _ = cast(ServiceEnumerator, self).results[-1] # noqa: E501 + except IndexError: + return None + + if support_out_of_spec and resp is None and req.diagnosticSessionType > 0x80: # noqa: E501 + resp = UDS() / UDS_DSCPR(diagnosticSessionType=0x80 - req.diagnosticSessionType) # noqa: E501 + new_state = EcuState.get_modified_ecu_state(resp, req, state) + if new_state == state: + return None + else: + edge = (state, new_state) + self._edge_requests[edge] = req + return edge + else: + return None + if edge: state, new_state = edge # Force TesterPresent if session is changed @@ -1210,8 +1240,7 @@ def _random_memory_addr_pkt(addr_len=None): # noqa: E501 pkt.memorySizeLen = random.randint(1, 4) pkt.memoryAddressLen = addr_len or random.randint(1, 4) UDS_RMBARandomEnumerator.set_size(pkt, 0x10) - addr = random.randint(0, 2 ** (8 * pkt.memoryAddressLen) - 1) & \ - (0xffffffff << (4 * pkt.memoryAddressLen)) + addr = random.randint(0, 2 ** (8 * pkt.memoryAddressLen) - 1) & (0xffffffff << (4 * pkt.memoryAddressLen)) # noqa: E501 UDS_RMBARandomEnumerator.set_addr(pkt, addr) return pkt