Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions scapy/contrib/automotive/uds_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
_SocketUnion, _TransitionTuple, StateGenerator
from scapy.contrib.automotive.uds import UDS, UDS_NR, UDS_DSC, UDS_TP, \
UDS_RDBI, UDS_WDBI, UDS_SA, UDS_RC, UDS_IOCBI, UDS_RMBA, UDS_ER, \
UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD
UDS_TesterPresentSender, UDS_CC, UDS_RDBPI, UDS_RD, UDS_TD, UDS_DSCPR
# TODO: Refactor this import
from scapy.contrib.automotive.uds_ecu_states import * # noqa: F401, F403
from scapy.error import Scapy_Exception
Expand Down Expand Up @@ -89,7 +89,8 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator):
_supported_kwargs.update({
'delay_state_change': (int, lambda x: x >= 0),
'overwrite_timeout': (bool, None),
'close_socket_when_entering_session_2': (bool, None)
'close_socket_when_entering_session_2': (bool, None),
'support_suppress_positive_response': (bool, None)
})
_supported_kwargs["scan_range"] = (
(list, tuple, range), lambda x: max(x) < 0x100 and min(x) >= 0)
Expand All @@ -113,7 +114,13 @@ class UDS_DSCEnumerator(UDS_Enumerator, StateGeneratingServiceEnumerator):
This enumerator will close the socket
if session 2 (ProgrammingSession)
was entered, if True. This will
force a reconnect by the executor."""
force a reconnect by the executor.
:param bool support_suppress_positive_response: False by default.
If True, this enumerator will treat
no response for a DSC request with a
session type > 0x80 as a positive
response and will therefore create a
new state with a session value - 0x80."""

def _get_initial_requests(self, **kwargs):
# type: (Any) -> Iterable[Packet]
Expand Down Expand Up @@ -172,6 +179,29 @@ def get_new_edge(self,
except KeyError:
close_socket = False

try:
support_out_of_spec = config[UDS_DSCEnumerator.__name__]["support_suppress_positive_response"] # noqa: E501
except KeyError:
support_out_of_spec = False

if edge is None:
try:
state, req, resp, _, _ = cast(ServiceEnumerator, self).results[-1] # noqa: E501
except IndexError:
return None

if support_out_of_spec and resp is None and req.diagnosticSessionType > 0x80: # noqa: E501
resp = UDS() / UDS_DSCPR(diagnosticSessionType=0x80 - req.diagnosticSessionType) # noqa: E501
new_state = EcuState.get_modified_ecu_state(resp, req, state)
if new_state == state:
return None
else:
edge = (state, new_state)
self._edge_requests[edge] = req
return edge
else:
return None

if edge:
state, new_state = edge
# Force TesterPresent if session is changed
Expand Down Expand Up @@ -1210,8 +1240,7 @@ def _random_memory_addr_pkt(addr_len=None): # noqa: E501
pkt.memorySizeLen = random.randint(1, 4)
pkt.memoryAddressLen = addr_len or random.randint(1, 4)
UDS_RMBARandomEnumerator.set_size(pkt, 0x10)
addr = random.randint(0, 2 ** (8 * pkt.memoryAddressLen) - 1) & \
(0xffffffff << (4 * pkt.memoryAddressLen))
addr = random.randint(0, 2 ** (8 * pkt.memoryAddressLen) - 1) & (0xffffffff << (4 * pkt.memoryAddressLen)) # noqa: E501
UDS_RMBARandomEnumerator.set_addr(pkt, addr)
return pkt

Expand Down
Loading