Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .config/codespell_ignore.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ba
browseable
byteorder
cace
cantact
cas
ciph
componet
Expand Down
330 changes: 329 additions & 1 deletion doc/scapy/layers/automotive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ function to get all information about one specific protocol.
| +----------------------+--------------------------------------------------------+
| | GMLAN | GMLAN, GMLAN_*, GMLAN_[Utilities] |
| +----------------------+--------------------------------------------------------+
| | SAE J1939 | j1939_scan, j1939_scan_passive, j1939_scan_addr_claim, |
| | | j1939_scan_ecu_id, j1939_scan_unicast, |
| | | j1939_scan_rts_probe |
| +----------------------+--------------------------------------------------------+
| | SOME/IP | SOMEIP, SD |
| +----------------------+--------------------------------------------------------+
| | BMW HSFZ | HSFZ, HSFZSocket, UDS_HSFZSocket |
Expand Down Expand Up @@ -1364,7 +1368,331 @@ Request the Vehicle Identification Number (VIN)::
.. image:: ../graphics/animations/animation-scapy-obd.svg


Message Authentication (AUTOSAR SecOC)
SAE J1939
=========

SAE J1939 is a higher-layer protocol built on top of CAN that is widely used in heavy-duty
vehicles, agricultural machinery, construction equipment, and any application that adopts the
SAE J1939 standard family. The central concept of J1939 is the *Controller Application*
(CA): every logical function on the network claims a unique *Source Address* (SA, 0x00–0xFD)
and communicates with other CAs using *Parameter Group Numbers* (PGNs).

Scapy provides six complementary active techniques for enumerating active CAs on a J1939
bus, as well as a passive sniffing helper. All active scanners automatically perform a
passive pre-scan to detect pre-existing bus traffic ("background noise") so that already-known
CAs are not re-probed or re-reported by default.

.. note::
The J1939 scanner requires the ``automotive.j1939`` contrib to be loaded and a raw CAN
socket. On Linux, a ``CANSocket`` on a SocketCAN interface is the recommended choice.

Setup::

>>> load_contrib('automotive.j1939')
>>> from scapy.contrib.cansocket import CANSocket
>>> from scapy.contrib.automotive.j1939.j1939_scanner import (
... j1939_scan,
... j1939_scan_passive,
... j1939_scan_addr_claim,
... j1939_scan_ecu_id,
... j1939_scan_unicast,
... j1939_scan_rts_probe,
... j1939_scan_uds,
... j1939_scan_xcp,
... )
>>> sock = CANSocket("can0")


J1939SoftSocket — multi-packet reassembly
-------------------------------------------

``J1939SoftSocket`` wraps a raw ``CANSocket`` and provides J1939 Transport
Protocol reassembly (BAM and CMDT). Use it to sniff complete J1939 messages
including multi-packet payloads larger than 8 bytes.

The ``pgn`` parameter controls which PGNs are accepted. Setting ``pgn=0``
(the default) accepts **all** PGNs — each received packet carries the actual
PGN, source address, and destination address from the CAN frame::

>>> from scapy.contrib.automotive.j1939.j1939_soft_socket import (
... J1939SoftSocket,
... )
>>> with J1939SoftSocket(CANSocket("can0"), src_addr=0xFE,
... dst_addr=0xFF, pgn=0) as s:
... pkts = s.sniff(timeout=2)
>>> for p in pkts:
... print("PGN=0x{:05X} SA=0x{:02X}".format(p.pgn, p.src_addr))

To filter for a specific PGN, pass it as ``pgn``::

>>> with J1939SoftSocket(CANSocket("can0"), src_addr=0xFE,
... dst_addr=0xFF, pgn=0xFECA) as s:
... pkts = s.sniff(timeout=2)


j1939_scan_passive — background-noise detection
-------------------------------------------------

``j1939_scan_passive()`` listens on the bus without sending any frame and returns the
``set`` of source addresses observed. The result can be used as the ``noise_ids`` argument
to all active scanners to suppress already-known CAs.

::

>>> noise = j1939_scan_passive(sock, listen_time=2.0)
>>> print("Pre-existing SAs:", {hex(sa) for sa in noise})
Pre-existing SAs: {'0x10', '0x27', '0x49'}


Active scanning techniques
--------------------------

The six active techniques complement each other and are all noise-aware by default.

**Technique 1 — Global Address Claim Request** (``j1939_scan_addr_claim``)

Broadcasts a single Request (PGN 59904) for the Address Claimed PGN (60928).
Every J1939-81-compliant CA **must** respond. Best for networks where all nodes implement
the J1939-81 address-claiming procedure::

>>> found = j1939_scan_addr_claim(sock, listen_time=1.0)
>>> for sa, pkt in sorted(found.items()):
... print("SA=0x{:02X}".format(sa))
SA=0x10
SA=0x27

Suppress pre-existing CAs by passing a noise baseline::

>>> noise = j1939_scan_passive(sock, listen_time=1.0)
>>> found = j1939_scan_addr_claim(sock, listen_time=1.0, noise_ids=noise)

**Technique 2 — Global ECU Identification Request** (``j1939_scan_ecu_id``)

Broadcasts a Request for the ECU Identification Information PGN (64965).
Responding nodes announce their ECU identification string via a BAM multi-packet
transfer. Identifies CAs that publish an ECU ID::

>>> found = j1939_scan_ecu_id(sock, listen_time=1.0)

**Technique 3 — Unicast Ping Sweep** (``j1939_scan_unicast``)

Iterates through destination addresses 0x00–0xFD, sending a unicast Request for
Address Claimed to each. Any response whose source address equals the probed DA
is treated as a positive reply. Detects nodes even if they do not respond to
global broadcasts::

>>> found = j1939_scan_unicast(sock, scan_range=range(0x00, 0xFE), sniff_time=0.05)
>>> for sa, pkt in sorted(found.items()):
... print("SA=0x{:02X}".format(sa))

**Technique 4 — TP.CM RTS Probing** (``j1939_scan_rts_probe``)

Sends a minimal TP.CM_RTS (Transport Protocol Connection Management – Request to
Send) frame to each destination address. An active node replies with either
TP.CM_CTS (Clear to Send), TP.Conn_Abort (Connection Abort), or a NACK
Acknowledgment (PGN 0xE800). All three responses confirm the node is present::

>>> found = j1939_scan_rts_probe(sock, scan_range=range(0x00, 0xFE), sniff_time=0.05)

**Technique 5 — UDS TesterPresent** (``j1939_scan_uds``)

Sends UDS TesterPresent (SID 0x3E) requests over Diagnostic Message A
(PF=0xDA, peer-to-peer) and Diagnostic Message B (PF=0xDB, functional broadcast).
A node that implements UDS responds with a positive response (SID 0x7E) or a
negative response (SID 0x7F). Both subfunctions 0x00 and 0x01 are probed::

>>> found = j1939_scan_uds(sock, scan_range=range(0x00, 0xFE), sniff_time=0.05)
>>> for sa, pkts in sorted(found.items()):
... print("SA=0x{:02X} responded to UDS".format(sa))

Skip the functional (broadcast) scan to avoid waking every ECU::

>>> found = j1939_scan_uds(sock, scan_range=[0x00], sniff_time=0.1,
... skip_functional=True)

**Technique 6 — XCP Connect Probe** (``j1939_scan_xcp``)

Sends an XCP CONNECT command (command byte 0xFF, mode 0x00) over Proprietary A
(PF=0xEF) to each destination address. A node that implements XCP responds with
a positive response whose first byte is ``0xFF``::

>>> found = j1939_scan_xcp(sock, scan_range=range(0x00, 0xFE), sniff_time=0.05)
>>> for sa, pkts in sorted(found.items()):
... print("SA=0x{:02X} responded to XCP".format(sa))


j1939_scan — combined scanner with automatic noise filtering
-------------------------------------------------------------

``j1939_scan()`` runs any subset of the six techniques and merges results. Before
starting any active probe it performs a passive pre-scan (``j1939_scan_passive``) to
collect the set of source addresses already present on the bus. Those addresses are
excluded from probing and from the returned results.

The returned dictionary maps each **newly-discovered** source address to a record with
keys ``"methods"`` (a list of **all** techniques that detected the CA, in order of
first detection), ``"packets"`` (a parallel list of lists of CAN frames), and
``"src_addrs"`` (the scanner source addresses that elicited the response).

Quick scan using all six techniques::

>>> found = j1939_scan(sock)
>>> for sa, info in sorted(found.items()):
... print("SA=0x{:02X} found_by={}".format(sa, info["methods"]))
SA=0x10 found_by=['addr_claim', 'unicast', 'rts_probe']
SA=0x49 found_by=['unicast']

Select specific techniques and tune timing::

>>> found = j1939_scan(
... sock,
... methods=["addr_claim", "unicast"],
... broadcast_listen_time=2.0,
... sniff_time=0.05,
... )

Adjust the passive pre-scan duration::

>>> found = j1939_scan(sock, noise_listen_time=2.0)

Provide an explicit noise baseline to skip the automatic passive pre-scan::

>>> noise = j1939_scan_passive(sock, listen_time=2.0)
>>> found = j1939_scan(sock, noise_ids=noise)

Format results as human-readable text or JSON::

>>> text_output = j1939_scan(sock, output_format="text")
>>> print(text_output)
>>> json_output = j1939_scan(sock, output_format="json")


Bus-load pacing
---------------

All per-address probe functions (``j1939_scan_unicast``, ``j1939_scan_rts_probe``,
``j1939_scan_uds``, ``j1939_scan_xcp``) and the DM scanner (``j1939_scan_dm``,
``j1939_scan_dm_pgn``) automatically pace their probe rate so that the scanner's
contribution to the bus stays within a configurable fraction of the total bitrate.
The broadcast methods (``j1939_scan_addr_claim``, ``j1939_scan_ecu_id``) accept
the same parameters for API uniformity, but do not apply pacing because they send
only a single probe frame.

``bitrate`` (``int``, default ``250000``)
CAN bus bitrate in bit/s. SAE J1939 mandates 250 kbit/s; adjust when testing
on 500 kbit/s or 1 Mbit/s buses.

``busload`` (``float``, default ``0.05``)
Maximum fraction of bus capacity (0 < *busload* ≤ 1.0) the scanner may
consume, counting both the outgoing probe frame and the expected response
frame. The default of ``0.05`` (5 %) is conservative and safe for live
production buses. Increase it for faster scans on quiet or virtual
interfaces. A value of ``1.0`` means no artificial pacing delay is added.

The pacing formula counts the fixed-field bits of a CAN extended frame
(67 bits of overhead plus 8 bits per data byte — no bit-stuffing overhead),
computes the minimum cycle time for the configured budget, and sleeps for any
remaining time after the per-probe sniff window::

>>> # Conservative 2 % bus load scan
>>> found = j1939_scan_unicast(sock, sniff_time=0.05,
... bitrate=250000, busload=0.02)

>>> # Fast scan on a test bench – allow 50 % bus load
>>> found = j1939_scan_unicast(sock, sniff_time=0.05,
... bitrate=250000, busload=0.50)

>>> # 500 kbit/s bus at 10 % load
>>> found = j1939_scan(sock, sniff_time=0.05,
... bitrate=500000, busload=0.10)


J1939 Diagnostic Message (DM) Scanner
---------------------------------------

The ``j1939_scan_dm`` function probes a single ECU (identified by its
Destination Address) to discover which SAE J1939-73 Diagnostic Messages it
supports. For each PGN in the built-in table the scanner sends a unicast
Request (PGN 59904) and classifies the reply:

- **Positive response** — the ECU replies with the requested PGN.
- **NACK** — the ECU replies with an Acknowledgment (PGN 0xE800), control
byte 0x01 (Negative Acknowledgment).
- **Timeout** — the ECU does not reply within *sniff_time* seconds.

Probe a target ECU for all eight standard DMs::

>>> from scapy.contrib.automotive.j1939.j1939_dm_scanner import (
... j1939_scan_dm,
... )
>>> sock = CANSocket("can0")
>>> results = j1939_scan_dm(sock, target_da=0x00)
>>> for name, res in sorted(results.items()):
... status = "supported" if res.supported else res.error
... print("{} (PGN 0x{:04X}): {}".format(name, res.pgn, status))
DM1 (PGN 0xFECA): supported
DM2 (PGN 0xFECB): NACK
DM3 (PGN 0xFECC): Timeout

Scan only a subset of DMs::

>>> results = j1939_scan_dm(sock, target_da=0x00, dms=["DM1", "DM5"])

Probe a single PGN directly::

>>> from scapy.contrib.automotive.j1939.j1939_dm_scanner import (
... j1939_scan_dm_pgn, J1939_DM_PGNS,
... )
>>> res = j1939_scan_dm_pgn(sock, target_da=0x00,
... pgn=J1939_DM_PGNS["DM1"], dm_name="DM1")
>>> print(res)
<DmScanResult dm=DM1 pgn=0xFECA supported=True error=None>

Both ``j1939_scan_dm`` and ``j1939_scan_dm_pgn`` accept ``bitrate`` and
``busload`` to pace the probe rate (see `Bus-load pacing`_ above)::

>>> # Scan at most 10 % of a 250 kbit/s bus
>>> results = j1939_scan_dm(sock, target_da=0x00,
... bitrate=250000, busload=0.10)


Reset and reconnect handlers
-----------------------------

``j1939_scan_dm`` mirrors the interface of
:class:`~scapy.contrib.automotive.uds_scan.UDS_Scanner` and accepts two
optional handler callbacks to manage ECU state between DM probes:

``reset_handler`` (``Optional[Callable[[], None]]``, default ``None``)
Called between each pair of DM PGN probes to reset the target ECU to a
known state (e.g. toggle an ignition line, trigger a power cycle, or send a
hardware-reset signal). Called *N − 1* times when scanning *N* PGNs.

``reconnect_handler`` (``Optional[Callable[[], SuperSocket]]``, default ``None``)
Called immediately after *reset_handler* (when provided) to re-establish the
CAN connection. Must return a new
:class:`~scapy.supersocket.SuperSocket`; all subsequent probes use the
returned socket. Can also be provided without *reset_handler* when the ECU
spontaneously drops the connection after each diagnostic exchange.
On failure, *reconnect_handler* is retried up to ``reconnect_retries``
times (default 5) with a 1-second backoff between attempts.

Example — reset and reconnect between every DM probe::

>>> def reset():
... pass # e.g., toggle GPIO reset pin
>>> def reconnect():
... return CANSocket("can0")
>>> results = j1939_scan_dm(
... reconnect(), target_da=0x00,
... reset_handler=reset,
... reconnect_handler=reconnect,
... )
>>> for name, res in sorted(results.items()):
... status = "supported" if res.supported else res.error
... print("{}: {}".format(name, status))


======================================

AutoSAR SecOC is a security architecture protecting communication between ECUs in a vehicle from cyber-attacks.
Expand Down
Loading
Loading