Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions scapy/contrib/automotive/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,12 @@ class DoIP(Packet):
0x8003: "Diagnostic message NACK"}
name = 'DoIP'
fields_desc = [
XByteField("protocol_version", 0x02),
XByteField("inverse_version", 0xFD),
XByteEnumField("protocol_version", 0x02, {
0x01: "ISO13400_2010", 0x02: "ISO13400_2012",
0x03: "ISO13400_2019", 0x04: "ISO13400_2019_AMD1"}),
XByteEnumField("inverse_version", 0xFD, {
0xFE: "ISO13400_2010", 0xFD: "ISO13400_2012",
0xFC: "ISO13400_2019", 0xFB: "ISO13400_2019_AMD1"}),
XShortEnumField("payload_type", 0, payload_types),
IntField("payload_length", None),
ConditionalField(ByteEnumField("nack", 0, {
Expand Down Expand Up @@ -239,7 +243,26 @@ def answers(self, other):

def hashret(self):
# type: () -> bytes
return bytes(self)[:3]
payload_type_mapping = {
0x0000: b"\x01",
0x0001: b"\x01",
0x0002: b"\x01",
0x0003: b"\x01",
0x0004: b"\x01",
0x0005: b"\x02",
0x0006: b"\x02",
0x0007: b"\x03",
0x0008: b"\x03",
0x4001: b"\x04",
0x4002: b"\x04",
0x4003: b"\x05",
0x4004: b"\x05",
0x8001: b"\x06",
0x8002: b"\x06",
0x8003: b"\x06",
}

return payload_type_mapping.get(self.payload_type, b"\xff")

def post_build(self, pkt, pay):
# type: (bytes, bytes) -> bytes
Expand Down Expand Up @@ -328,6 +351,10 @@ class DoIPSocket(DoIPSSLStreamSocket):
connect via SSL/TLS
:param context: Optional ssl.SSLContext object for initialization of ssl socket
connections.
:param doip_version: DoIP protocol version to use, default is 2 (ISO 13400-2012)
:param enforce_doip_version: If true, the protocol_version field in each DoIP
packet to be sent, is always set to the value of
doip_version.

Example:
>>> socket = DoIPSocket("169.254.0.131")
Expand All @@ -345,7 +372,9 @@ def __init__(self,
activation_type=0, # type: int
reserved_oem=b"", # type: bytes
force_tls=False, # type: bool
context=None # type: Optional[ssl.SSLContext]
context=None, # type: Optional[ssl.SSLContext]
doip_version=2, # type: int
enforce_doip_version=False, # type: bool
): # type: (...) -> None
self.ip = ip
self.port = port
Expand All @@ -357,6 +386,8 @@ def __init__(self,
self.reserved_oem = reserved_oem
self.force_tls = force_tls
self.context = context
self.doip_version = doip_version
self.enforce_doip_version = enforce_doip_version
try:
self._init_socket()
except Exception:
Expand Down Expand Up @@ -448,6 +479,12 @@ def _activate_routing(self): # type: (...) -> int
else:
return -1

def send(self, x): # type: (Packet) -> int
if self.enforce_doip_version and isinstance(x, DoIP):
x[DoIP].protocol_version = self.doip_version
x[DoIP].inverse_version = 0xFF - self.doip_version
return super().send(x)


class UDS_DoIPSocket(DoIPSocket):
"""
Expand Down
121 changes: 115 additions & 6 deletions test/contrib/automotive/doip.uts
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,14 @@ def server():
sock.bind(('127.0.0.1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
for i in range(len(buffer)):
connection.send(buffer[i:i+1])
time.sleep(0.01)
connection.close()
try:
connection, address = sock.accept()
sniff_up.wait(timeout=1)
for i in range(len(buffer)):
connection.send(buffer[i:i+1])
time.sleep(0.01)
finally:
connection.close()
finally:
sock.close()

Expand All @@ -503,6 +505,49 @@ pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2

= Test DoIPSocket 2 enforce protocol_version
~ linux

server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
try:
sniff_up.wait(timeout=1)
connection.send(buffer)
doip_sock = DoIPSSLStreamSocket(connection)
pkts = doip_sock.sniff(timeout=2, count=1)
doip_sock.send(pkts[0])
finally:
connection.close()
finally:
sock.close()


server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False, doip_version=3, enforce_doip_version=True)

pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
sock.send(DoIP(payload_type=0x8001, source_address=0xe80, target_address=0xe400) / UDS() / UDS_TP())
pkts2 = sock.sniff(timeout=1, count=1)
server_thread.join(timeout=1)
assert len(pkts) == 2
assert len(pkts2) == 1
assert pkts2[0].protocol_version == 0x03
assert pkts2[0].inverse_version == 0xfc
assert pkts2[0].payload_type == 0x8001
assert pkts2[0].service == 0x3E

= Test DoIPSocket 3

server_up = threading.Event()
Expand Down Expand Up @@ -813,3 +858,67 @@ pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_tcp_thread.join(timeout=1)
server_tls_thread.join(timeout=1)
assert len(pkts) == 2

= Test UDS_DualDoIPSslSocket6 force version 3
~ broken_windows not_pypy

server_tcp_up = threading.Event()
server_tls_up = threading.Event()
sniff_up = threading.Event()
def server_tls():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
buffer = bytes.fromhex("03fc0006000000090e8011061000000000")
buffer += b'\x03\xfc\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x03\xfc\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = context.wrap_socket(sock)
try:
ssock.bind(('::1', 3496))
ssock.listen(1)
server_tls_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
ssock.close()

def server_tcp():
buffer = bytes.fromhex("03fc0006000000090e8011060700000000")
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('::1', 13400))
sock.listen(1)
server_tcp_up.set()
connection, address = sock.accept()
connection.send(buffer)
connection.shutdown(socket.SHUT_RDWR)
connection.close()
finally:
sock.close()


server_tcp_thread = threading.Thread(target=server_tcp)
server_tcp_thread.start()
server_tcp_up.wait(timeout=1)
server_tls_thread = threading.Thread(target=server_tls)
server_tls_thread.start()
server_tls_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE

conf.debug_dissector = True

sock = UDS_DoIPSocket(ip="::1", context=context, doip_version=3, enforce_doip_version=True)

pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_tcp_thread.join(timeout=1)
server_tls_thread.join(timeout=1)
assert len(pkts) == 2
Loading