diff --git a/README.md b/README.md index 3d08e91..a081195 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Sniffle has a number of useful features, including: * Easy to extend host-side software written in Python * PCAP export compatible with the Ubertooth * Wireshark compatible plugin +* ZMQ Publishing server ## Prerequisites @@ -279,6 +280,8 @@ options: -d, --decode Decode advertising data -o OUTPUT, --output OUTPUT PCAP output file name + -z, --zmq Enable ZMQ server + --zmqsetting Set ZMQ server ip and port (Default 127.0.0.1:4222) ``` The XDS110 debugger on the Launchpad boards creates two serial ports. On @@ -389,7 +392,7 @@ options: -s SERPORT, --serport SERPORT Sniffer serial port name -b BAUDRATE, --baudrate BAUDRATE - Sniffer serial port baud rate + Sniffer baudrate (2000000 or 921600) -c {37,38,39}, --advchan {37,38,39} Advertising channel to listen on -r RSSI, --rssi RSSI Filter packets by minimum RSSI @@ -397,7 +400,6 @@ options: -d, --decode Decode advertising data -o OUTPUT, --output OUTPUT PCAP output file name - ``` The scanner command line arguments work the same as the sniffer. The purpose of diff --git a/python_cli/__init__.py b/python_cli/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/python_cli/advertiser.py b/python_cli/advertiser.py index 89303c4..e9a937b 100755 --- a/python_cli/advertiser.py +++ b/python_cli/advertiser.py @@ -9,7 +9,8 @@ from sniffle.sniffle_hw import SniffleHW # global variable to access hardware -hw = None +HW = None + def main(): aparse = argparse.ArgumentParser(description="Advertiser test script for Sniffle BLE5 sniffer") @@ -17,7 +18,7 @@ def main(): aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baud rate") args = aparse.parse_args() - global hw + global HW hw = SniffleHW(args.serport, baudrate=args.baudrate) # set the advertising channel (and return to ad-sniffing mode) @@ -70,5 +71,6 @@ def main(): if msg is not None: print(msg, end='\n\n') + if __name__ == "__main__": main() diff --git a/python_cli/initiator.py b/python_cli/initiator.py index 41b5268..512fd78 100755 --- a/python_cli/initiator.py +++ b/python_cli/initiator.py @@ -12,7 +12,7 @@ ScanRspMessage, str_mac) # global variable to access hardware -hw = None +HW = None _aa = 0 def main(): @@ -31,7 +31,7 @@ def main(): help="Supplied MAC address is public") args = aparse.parse_args() - global hw + global HW hw = SniffleHW(args.serport, baudrate=args.baudrate) targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) @@ -101,23 +101,23 @@ def main(): print_message(msg) def get_mac_from_irk(irk): - hw.cmd_irk(irk, False) - hw.mark_and_flush() + HW.cmd_irk(irk, False) + HW.mark_and_flush() print("Waiting for advertisement with suitable RPA...") while True: - msg = hw.recv_and_decode() + msg = HW.recv_and_decode() if isinstance(msg, (AdvaMessage, AdvDirectIndMessage, AdvExtIndMessage)) and msg.AdvA is not None: print("Found target MAC: %s" % str_mac(msg.AdvA)) return msg.AdvA def get_mac_from_string(s): - hw.cmd_mac() - hw.cmd_scan() - hw.mark_and_flush() + HW.cmd_mac() + HW.cmd_scan() + HW.mark_and_flush() print("Waiting for advertisement containing specified string...") while True: - msg = hw.recv_and_decode() + msg = HW.recv_and_decode() if isinstance(msg, (AdvaMessage, AdvDirectIndMessage, ScanRspMessage, AdvExtIndMessage)) and msg.AdvA is not None: if s in msg.body: @@ -132,7 +132,7 @@ def print_message(msg): elif isinstance(msg, StateMessage): print(msg) if msg.new_state == SnifferState.CENTRAL: - hw.decoder_state.cur_aa = _aa + HW.decoder_state.cur_aa = _aa print() msg_ctr = 0 @@ -143,7 +143,7 @@ def print_packet(dpkt): global msg_ctr MCMASK = 3 if (msg_ctr & MCMASK) == MCMASK: - hw.cmd_transmit(3, b'\x12') # LL_PING_REQ + HW.cmd_transmit(3, b'\x12') # LL_PING_REQ msg_ctr += 1 # also test sending LL_CONNECTION_UPDATE_IND @@ -154,7 +154,7 @@ def print_packet(dpkt): # Latency = 0x0003 # Timeout = 0x0080 # Instant = 0x0080 - hw.cmd_transmit(3, b'\x00\x04\x08\x00\x30\x00\x03\x00\x80\x00\x80\x00') + HW.cmd_transmit(3, b'\x00\x04\x08\x00\x30\x00\x03\x00\x80\x00\x80\x00') print("sent change!") if __name__ == "__main__": diff --git a/python_cli/relay_master.py b/python_cli/relay_master.py index c7a41c7..1b50ded 100755 --- a/python_cli/relay_master.py +++ b/python_cli/relay_master.py @@ -71,15 +71,15 @@ """ # global variable to access hardware -hw = None +HW = None # global variable for pcap writer pcwriter = None def sigint_handler(sig, frame): - hw.cancel_recv() - hw.cmd_chan_aa_phy() # stop scanning or connection - hw.cmd_rssi(0) + HW.cancel_recv() + HW.cmd_chan_aa_phy() # stop scanning or connection + HW.cmd_rssi(0) sys.exit(0) def main(): @@ -106,7 +106,7 @@ def main(): aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") args = aparse.parse_args() - global hw + global HW hw = SniffleHW(args.serport) # put the hardware in a normal state (passive scanning) and configure it with an impossibly @@ -280,17 +280,17 @@ def sock_recv_print_forward(conn, quiet, filter_changes=False): # construct packet object for display and PCAP pkt = DPacketMessage.from_body(body, True) pkt.ts_epoch = time() - pkt.ts = pkt.ts_epoch - hw.decoder_state.first_epoch_time - pkt.aa = hw.decoder_state.cur_aa + pkt.ts = pkt.ts_epoch - HW.decoder_state.first_epoch_time + pkt.aa = HW.decoder_state.cur_aa pkt.event = event # Passing on PDUs with instants in the past would break the connection if not (filter_changes and has_instant(pkt)): - hw.cmd_transmit(llid, pdu, event) + HW.cmd_transmit(llid, pdu, event) print_message(pkt, quiet) def ser_recv_print_forward(conn, quiet, filter_changes=False): - msg = hw.recv_and_decode() + msg = HW.recv_and_decode() if isinstance(msg, PacketMessage): msg = DPacketMessage.decode(msg) @@ -302,7 +302,7 @@ def ser_recv_print_forward(conn, quiet, filter_changes=False): conn.send_msg(MessageType.PACKET, pack(' self.rssi_max: self.rssi_max = rssi - self.rssi_avg = (self.rssi_avg*self.hits + rssi) / (self.hits + 1) + self.rssi_avg = (self.rssi_avg * self.hits + rssi) / (self.hits + 1) self.hits += 1 + def main(): aparse = argparse.ArgumentParser(description="Scanner utility for Sniffle BLE5 sniffer") aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baud rate") aparse.add_argument("-c", "--advchan", default=37, choices=[37, 38, 39], type=int, - help="Advertising channel to listen on") + help="Advertising channel to listen on") aparse.add_argument("-r", "--rssi", default=-128, type=int, - help="Filter packets by minimum RSSI") + help="Filter packets by minimum RSSI") aparse.add_argument("-l", "--longrange", action="store_true", - help="Use long range (coded) PHY for primary advertising") + help="Use long range (coded) PHY for primary advertising") aparse.add_argument("-d", "--decode", action="store_true", - help="Decode advertising data") + help="Decode advertising data") aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") args = aparse.parse_args() - global hw + global HW hw = make_sniffle_hw(args.serport, baudrate=args.baudrate) hw.setup_sniffer( - mode=SnifferMode.ACTIVE_SCAN, - chan=args.advchan, - ext_adv=True, - coded_phy=args.longrange, - rssi_min=args.rssi) + mode=SnifferMode.ACTIVE_SCAN, + chan=args.advchan, + ext_adv=True, + coded_phy=args.longrange, + rssi_min=args.rssi) # zero timestamps and flush old packets hw.mark_and_flush() @@ -97,10 +100,10 @@ def main(): print("\n\nScan Results:") for a in sorted(advertisers.keys(), key=lambda k: advertisers[k].rssi_avg, reverse=True): - print("="*80) + print("=" * 80) print("AdvA: %s Avg/Min/Max RSSI: %.1f/%i/%i Hits: %i" % ( - a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max, - advertisers[a].hits)) + a, advertisers[a].rssi_avg, advertisers[a].rssi_min, advertisers[a].rssi_max, + advertisers[a].hits)) if advertisers[a].adv: print("\nAdvertisement:") print(advertisers[a].adv.str_header()) @@ -121,7 +124,8 @@ def main(): print(advertisers[a].scan_rsp.hexdump()) else: print("\nScan Response: None") - print("="*80, end="\n\n") + print("=" * 80, end="\n\n") + def handle_packet(dpkt): # Ignore non-advertisements (shouldn't get any) @@ -152,5 +156,6 @@ def handle_packet(dpkt): else: advertisers[adva].adv = dpkt + if __name__ == "__main__": main() diff --git a/python_cli/sniff_receiver.py b/python_cli/sniff_receiver.py index be9e537..af12e6e 100755 --- a/python_cli/sniff_receiver.py +++ b/python_cli/sniff_receiver.py @@ -1,62 +1,58 @@ #!/usr/bin/env python3 # Written by Sultan Qasim Khan +# OpenDroneID mods Copyright (c) 2024 by B. Kerler # Copyright (c) 2018-2025, NCC Group plc # Released as open source under GPLv3 import argparse, sys +import json +import time from binascii import unhexlify -from sniffle.constants import BLE_ADV_AA from sniffle.pcap import PcapBleWriter from sniffle.sniffle_hw import (make_sniffle_hw, PacketMessage, DebugMessage, StateMessage, MeasurementMessage, SnifferMode, PhyMode) from sniffle.packet_decoder import (AdvaMessage, AdvDirectIndMessage, AdvExtIndMessage, - ScanRspMessage, DataMessage, str_mac) + ScanRspMessage, DataMessage, str_mac, AdvIndMessage) from sniffle.errors import UsageError, SourceDone from sniffle.advdata.decoder import decode_adv_data # global variable to access hardware -hw = None +HW = None # global variable for pcap writer pcwriter = None -def main(): - aparse = argparse.ArgumentParser(description="Host-side receiver for Sniffle BLE5 sniffer") - aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") - aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baud rate") - aparse.add_argument("-c", "--advchan", default=40, choices=[37, 38, 39], type=int, - help="Advertising channel to listen on") - aparse.add_argument("-p", "--pause", action="store_true", - help="Pause sniffer after disconnect") - aparse.add_argument("-r", "--rssi", default=-128, type=int, - help="Filter packets by minimum RSSI") - aparse.add_argument("-m", "--mac", default=None, help="Filter packets by advertiser MAC") - aparse.add_argument("-i", "--irk", default=None, help="Filter packets by advertiser IRK") - aparse.add_argument("-S", "--string", default=None, - help="Filter for advertisements containing the specified string") - aparse.add_argument("-a", "--advonly", action="store_true", - help="Passive scanning, don't follow connections") - aparse.add_argument("-A", "--scan", action="store_true", - help="Active scanning, don't follow connections") - aparse.add_argument("-e", "--extadv", action="store_true", - help="Capture BT5 extended (auxiliary) advertising") - aparse.add_argument("-H", "--hop", action="store_true", - help="Hop primary advertising channels in extended mode") - aparse.add_argument("-l", "--longrange", action="store_true", - help="Use long range (coded) PHY for primary advertising") - aparse.add_argument("-q", "--quiet", action="store_true", - help="Don't display empty packets") - aparse.add_argument("-Q", "--preload", default=None, help="Preload expected encrypted " - "connection parameter changes") - aparse.add_argument("-n", "--nophychange", action="store_true", - help="Ignore encrypted PHY mode changes") - aparse.add_argument("-C", "--crcerr", action="store_true", - help="Capture packets with CRC errors") - aparse.add_argument("-d", "--decode", action="store_true", - help="Decode advertising data") - aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") - args = aparse.parse_args() +def main(args): + if args.zmq: + import zmq + + url = f"tcp://{args.zmqsetting}" + + context = zmq.Context() + socket = context.socket(zmq.XPUB) + socket.setsockopt(zmq.XPUB_VERBOSE, True) + socket.bind(url) + + def zmq_thread(socket): + try: + while True: + event = socket.recv() + # Event is one byte 0=unsub or 1=sub, followed by topic + if event[0] == 1: + log("new subscriber for", event[1:]) + elif event[0] == 0: + log("unsubscribed", event[1:]) + except zmq.error.ContextTerminated: + pass + + def log(*msg): + s = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + print("%s:" % s, *msg, end="\n", file=sys.stderr) + + from threading import Thread + zthread = Thread(target=zmq_thread, args=[socket], daemon=True, name='zmq') + zthread.start() # Sanity check argument combinations targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string) @@ -70,8 +66,8 @@ def main(): if args.advchan != 40 and args.hop: raise UsageError("Don't specify an advertising channel if you want advertising channel hopping!") - global hw - hw = make_sniffle_hw(args.serport, baudrate=args.baudrate) + global HW + hw = make_sniffle_hw(serport=args.serport, baudrate=args.baudrate) # if a channel was explicitly specified, don't hop hop3 = True if targ_specs else False @@ -138,10 +134,19 @@ def main(): while True: try: msg = hw.recv_and_decode() - print_message(msg, args.quiet, args.decode) + if args.zmq: + smsg = msg.to_dict() + smsg = json.dumps(smsg) + socket.send_string(smsg) + if args.verbose: + print_message(msg, args.quiet, args.decode) + else: + print_message(msg, args.quiet, args.decode) except SourceDone: break except KeyboardInterrupt: + if args.zmq: + socket.close() hw.cancel_recv() sys.stderr.write("\r") break @@ -170,14 +175,52 @@ def print_packet(dpkt, quiet, decode_ad): pcwriter.write_packet_message(dpkt) def get_mac_from_string(s, coded_phy=False): - hw.setup_sniffer(SnifferMode.ACTIVE_SCAN, ext_adv=True, coded_phy=coded_phy) - hw.mark_and_flush() + HW.setup_sniffer(SnifferMode.ACTIVE_SCAN, ext_adv=True, coded_phy=coded_phy) + HW.mark_and_flush() while True: - msg = hw.recv_and_decode() + msg = HW.recv_and_decode() if isinstance(msg, (AdvaMessage, AdvDirectIndMessage, ScanRspMessage, AdvExtIndMessage)) and msg.AdvA is not None: if s in msg.body: return msg.AdvA, not msg.TxAdd if __name__ == "__main__": - main() + aparse = argparse.ArgumentParser(description="Host-side receiver for Sniffle BLE5 sniffer") + aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name") + aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baud rate") + aparse.add_argument("-c", "--advchan", default=40, choices=[37, 38, 39], type=int, + help="Advertising channel to listen on") + aparse.add_argument("-p", "--pause", action="store_true", + help="Pause sniffer after disconnect") + aparse.add_argument("-r", "--rssi", default=-128, type=int, + help="Filter packets by minimum RSSI") + aparse.add_argument("-m", "--mac", default=None, help="Filter packets by advertiser MAC") + aparse.add_argument("-i", "--irk", default=None, help="Filter packets by advertiser IRK") + aparse.add_argument("-S", "--string", default=None, + help="Filter for advertisements containing the specified string") + aparse.add_argument("-a", "--advonly", action="store_true", + help="Passive scanning, don't follow connections") + aparse.add_argument("-A", "--scan", action="store_true", + help="Active scanning, don't follow connections") + aparse.add_argument("-e", "--extadv", action="store_true", + help="Capture BT5 extended (auxiliary) advertising") + aparse.add_argument("-H", "--hop", action="store_true", + help="Hop primary advertising channels in extended mode") + aparse.add_argument("-l", "--longrange", action="store_true", + help="Use long range (coded) PHY for primary advertising") + aparse.add_argument("-q", "--quiet", action="store_true", + help="Don't display empty packets") + aparse.add_argument("-Q", "--preload", default=None, help="Preload expected encrypted " + "connection parameter changes") + aparse.add_argument("-n", "--nophychange", action="store_true", + help="Ignore encrypted PHY mode changes") + aparse.add_argument("-C", "--crcerr", action="store_true", + help="Capture packets with CRC errors") + aparse.add_argument("-d", "--decode", action="store_true", + help="Decode advertising data") + aparse.add_argument("-o", "--output", default=None, help="PCAP output file name") + aparse.add_argument("-z", "--zmq", action="store_true", help="Enable zmq") + aparse.add_argument("--zmqsetting", default="127.0.0.1:4222", help="Define zmq server settings") + aparse.add_argument("-v", "--verbose", action="store_true", help="Print messages") + args = aparse.parse_args() + main(args) diff --git a/python_cli/sniffle/advdata/constants.py b/python_cli/sniffle/advdata/constants.py index 83cbe81..1a27142 100644 --- a/python_cli/sniffle/advdata/constants.py +++ b/python_cli/sniffle/advdata/constants.py @@ -3716,4 +3716,5 @@ 0x1857: 'Electronic Shelf Label', 0x1858: 'Gaming Audio', 0x1859: 'Mesh Proxy Solicitation', + 0xFFFA: 'ASTM Remote ID' } diff --git a/python_cli/sniffle/packet_decoder.py b/python_cli/sniffle/packet_decoder.py index 0de548d..8a2cd1e 100644 --- a/python_cli/sniffle/packet_decoder.py +++ b/python_cli/sniffle/packet_decoder.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 - +import json # Written by Sultan Qasim Khan # Copyright (c) 2019-2024, NCC Group plc # Released as open source under GPLv3 @@ -15,9 +15,11 @@ from .errors import SniffleHWPacketError from .hexdump import hexdump + def str_mac(mac): return ":".join(["%02X" % b for b in reversed(mac)]) + def _str_atype(addr, is_random): # Non-resolvable private address # Resolvable private address @@ -28,12 +30,15 @@ def _str_atype(addr, is_random): atype = addr[5] >> 6 return atypes[atype] + def str_mac2(mac, is_random): return "%s (%s)" % (str_mac(mac), _str_atype(mac, is_random)) + # radio time wraparound period in seconds TS_WRAP_PERIOD = 0x100000000 / 4E6 + class PacketMessage: def __init__(self, raw_msg, dstate: SniffleDecoderState, crc_rev=None): ts, l, event, rssi, chan = unpack("= 37: type_classes = [ - AdvIndMessage, # 0 - AdvDirectIndMessage, # 1 - AdvNonconnIndMessage, # 2 - ScanReqMessage, # 3 - ScanRspMessage, # 4 - ConnectIndMessage, # 5 - AdvScanIndMessage, # 6 - AdvExtIndMessage] # 7 + AdvIndMessage, # 0 + AdvDirectIndMessage, # 1 + AdvNonconnIndMessage, # 2 + ScanReqMessage, # 3 + ScanRspMessage, # 4 + ConnectIndMessage, # 5 + AdvScanIndMessage, # 6 + AdvExtIndMessage] # 7 if pdu_type < len(type_classes): tc = type_classes[pdu_type] else: @@ -228,6 +256,7 @@ def decode(pkt: PacketMessage, dstate=None): return tc(pkt) + class DataMessage(DPacketMessage): def __init__(self, pkt: PacketMessage): super().__init__(pkt) @@ -245,28 +274,70 @@ def str_datatype(self): dtstr += "Data Length: %i" % self.data_length return dtstr + def dict_datatype(self): + return {"LLID": self.pdutype, + "Dir": ("S->M" if self.data_dir else "M->S"), + "NESN": self.NESN, + "SN": self.SN, + "MD": self.MD, + "Data Length": self.data_length} + def str_header(self): return super().str_header() + " Event: %d" % self.event def _str_decode(self): return self.str_datatype() + def to_dict(self): + return self.dict_datatype() + @staticmethod def decode(pkt: PacketMessage, dstate=None): LLID = pkt.body[0] & 0x3 type_classes = [ - DataMessage, # 0 (RFU) - LlDataContMessage, # 1 - LlDataMessage, # 2 - LlControlMessage] # 3 + DataMessage, # 0 (RFU) + LlDataContMessage, # 1 + LlDataMessage, # 2 + LlControlMessage] # 3 return type_classes[LLID](pkt) + class LlDataMessage(DataMessage): pdutype = "LL DATA" + class LlDataContMessage(DataMessage): pdutype = "LL DATA CONT" +control_opcodes = [ + "LL_CONNECTION_UPDATE_IND", + "LL_CHANNEL_MAP_IND", + "LL_TERMINATE_IND", + "LL_ENC_REQ", + "LL_ENC_RSP", + "LL_START_ENC_REQ", + "LL_START_ENC_RSP", + "LL_UNKNOWN_RSP", + "LL_FEATURE_REQ", + "LL_FEATURE_RSP", + "LL_PAUSE_ENC_REQ", + "LL_PAUSE_ENC_RSP", + "LL_VERSION_IND", + "LL_REJECT_IND", + "LL_PERIPHERAL_FEATURE_REQ", + "LL_CONNECTION_PARAM_REQ", + "LL_CONNECTION_PARAM_RSP", + "LL_REJECT_EXT_IND", + "LL_PING_REQ", + "LL_PING_RSP", + "LL_LENGTH_REQ", + "LL_LENGTH_RSP", + "LL_PHY_REQ", + "LL_PHY_RSP", + "LL_PHY_UPDATE_IND", + "LL_MIN_USED_CHANNELS_IND" +] + class LlControlMessage(DataMessage): pdutype = "LL CONTROL" @@ -275,44 +346,26 @@ def __init__(self, pkt: PacketMessage): self.opcode = self.body[2] def str_opcode(self): - control_opcodes = [ - "LL_CONNECTION_UPDATE_IND", - "LL_CHANNEL_MAP_IND", - "LL_TERMINATE_IND", - "LL_ENC_REQ", - "LL_ENC_RSP", - "LL_START_ENC_REQ", - "LL_START_ENC_RSP", - "LL_UNKNOWN_RSP", - "LL_FEATURE_REQ", - "LL_FEATURE_RSP", - "LL_PAUSE_ENC_REQ", - "LL_PAUSE_ENC_RSP", - "LL_VERSION_IND", - "LL_REJECT_IND", - "LL_PERIPHERAL_FEATURE_REQ", - "LL_CONNECTION_PARAM_REQ", - "LL_CONNECTION_PARAM_RSP", - "LL_REJECT_EXT_IND", - "LL_PING_REQ", - "LL_PING_RSP", - "LL_LENGTH_REQ", - "LL_LENGTH_RSP", - "LL_PHY_REQ", - "LL_PHY_RSP", - "LL_PHY_UPDATE_IND", - "LL_MIN_USED_CHANNELS_IND" - ] if self.opcode < len(control_opcodes): return "Opcode: %s" % control_opcodes[self.opcode] else: return "Opcode: RFU (0x%02X)" % self.opcode + def dict_opcode(self): + if self.opcode < len(control_opcodes): + return {"Opcode": control_opcodes[self.opcode]} + else: + return {"Opcode RFU": self.opcode} + def _str_decode(self): return "\n".join([ self.str_datatype(), self.str_opcode()]) + def to_dict(self): + return {self.pdutype: self.pkt, "DataType": self.dict_datatype(), + "Opcode": self.dict_opcode()} + class AdvaMessage(AdvertMessage): def __init__(self, pkt: PacketMessage): super().__init__(pkt) @@ -327,18 +380,29 @@ def _str_decode(self): self.str_adtype(), self.str_adva()]) + def to_dict(self): + res = {self.pdutype: self.pkt} + res["AdvA"] = str_mac2(self.AdvA, self.TxAdd) + if len(self.adv_data) > 0: + res["AdvData"] = self.adv_data.hex() + return res + class AdvIndMessage(AdvaMessage): pdutype = "ADV_IND" + class AdvNonconnIndMessage(AdvaMessage): pdutype = "ADV_NONCONN_IND" + class ScanRspMessage(AdvaMessage): pdutype = "SCAN_RSP" + class AdvScanIndMessage(AdvaMessage): pdutype = "ADV_SCAN_IND" + class AdvDirectIndMessage(AdvertMessage): pdutype = "ADV_DIRECT_IND" @@ -351,11 +415,19 @@ def __init__(self, pkt: PacketMessage): def str_ata(self): return "AdvA: %s TargetA: %s" % (str_mac2(self.AdvA, self.TxAdd), str_mac2(self.TargetA, self.RxAdd)) + def dict_ata(self): + return {"AdvA": str_mac2(self.AdvA, self.TxAdd), "TargetA": str_mac2(self.TargetA, self.RxAdd), + "AdvData": self.adv_data.hex()} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_ata()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "ata": self.dict_ata(), + "AdvData": self.adv_data.hex()} + class ScanReqMessage(AdvertMessage): pdutype = "SCAN_REQ" @@ -367,14 +439,21 @@ def __init__(self, pkt: PacketMessage): def str_asa(self): return "ScanA: %s AdvA: %s" % (str_mac2(self.ScanA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd)) + def dict_asa(self): + return {"ScanA": str_mac2(self.ScanA, self.TxAdd), "AdvA": str_mac2(self.AdvA, self.RxAdd)} + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_asa()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "asa": self.str_asa()} + class AuxScanReqMessage(ScanReqMessage): pdutype = "AUX_SCAN_REQ" + class ConnectIndMessage(AdvertMessage): pdutype = "CONNECT_IND" @@ -386,19 +465,49 @@ def __init__(self, pkt: PacketMessage): self.CRCInit = self.body[18] | (self.body[19] << 8) | (self.body[20] << 16) self.WinSize = self.body[21] self.WinOffset, self.Interval, self.Latency, self.Timeout = unpack( - "> 5 def str_aia(self): return "InitA: %s AdvA: %s AA: 0x%08X CRCInit: 0x%06X" % ( - str_mac2(self.InitA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd), self.aa_conn, self.CRCInit) + str_mac2(self.InitA, self.TxAdd), str_mac2(self.AdvA, self.RxAdd), self.aa_conn, self.CRCInit) + + def dict_aia(self): + return {"InitA": str_mac2(self.InitA, self.TxAdd), + "AdvA": str_mac2(self.AdvA, self.RxAdd), + "aa": self.aa_conn, + "CRCInit": self.CRCInit} + + def json_aia(self): + return {"InitA":str_mac2(self.InitA, self.TxAdd), + "AdvA":str_mac2(self.AdvA, self.RxAdd), + "AA": self.aa_conn, + "CRCInit": self.CRCInit} def str_conn_params(self): return "WinSize: %d WinOffset: %d Interval: %d Latency: %d Timeout: %d Hop: %d SCA: %d" % ( - self.WinSize, self.WinOffset, self.Interval, self.Latency, self.Timeout, - self.Hop, self.SCA) + self.WinSize, self.WinOffset, self.Interval, self.Latency, self.Timeout, + self.Hop, self.SCA) + + def dict_conn_params(self): + return {"WinSize": self.WinSize, + "WinOffset": self.WinOffset, + "Interval": self.Interval, + "Latency": self.Latency, + "Timeout": self.Timeout, + "Hop": self.Hop, + "SCA": self.SCA} + + def json_conn_params(self): + return {"WinSize": self.WinSize, + "WinOffset": self.WinOffset, + "Interval": self.Interval, + "Latency": self.Latency, + "Timeout": self.Timeout, + "Hop": self.Hop, + "SCA": self.SCA} def str_chm(self): if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': @@ -413,6 +522,19 @@ def str_chm(self): chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) return "Channel Map: %s (%s)" % (chanstr, descstr) + def dict_chm(self): + if self.ChM == b'\xFF\xFF\xFF\xFF\x1F': + descstr = "all channels" + else: + has_chan = lambda chm, i: (chm[i // 8] & (1 << (i & 7))) != 0 + excludes = [] + for i in range(37): + if not has_chan(self.ChM, i): + excludes.append(i) + descstr = "excludes " + ", ".join([str(i) for i in excludes]) + chanstr = "%02X %02X %02X %02X %02X" % tuple(self.ChM) + return {"Channel Map": {"channel": chanstr, "desc": descstr}} + def _str_decode(self): return "\n".join([ self.str_adtype(), @@ -420,9 +542,17 @@ def _str_decode(self): self.str_conn_params(), self.str_chm()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "aia": self.dict_aia(), + "conn_params": self.dict_conn_params(), "chm": self.dict_chm()} + class AuxConnectReqMessage(ConnectIndMessage): pdutype = "AUX_CONNECT_REQ" + +phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", + "Invalid5", "Invalid6", "Invalid7"] + class AuxPtr: def __init__(self, ptr): self.chan = ptr[0] & 0x3F @@ -432,11 +562,13 @@ def __init__(self, ptr): self.offsetUsec = auxOffset * offsetMult def __str__(self): - phy_names = ["1M", "2M", "Coded", "Invalid3", "Invalid4", - "Invalid5", "Invalid6", "Invalid7"] return "AuxPtr Chan: %d PHY: %s Delay: %d us" % ( self.chan, phy_names[self.phy], self.offsetUsec) + def to_dict(self): + return {"chan": self.chan, "PHY": phy_names[self.phy], + "Delay_us": self.offsetUsec} + class AdvDataInfo: def __init__(self, adi): self.did = adi[0] + ((adi[1] & 0x0F) << 8) @@ -450,6 +582,9 @@ def __eq__(self, other): return self.did == other.did and self.sid == other.sid return False + def to_dict(self): + return {"did": self.did, "sid": self.sid} + class AdvExtIndMessage(AdvertMessage): pdutype = "ADV_EXT_IND" @@ -466,7 +601,7 @@ def __init__(self, pkt: PacketMessage): if len(self.body) < 3: raise ValueError("Extended advertisement too short!") - self.AdvMode = self.body[2] >> 6 # Neither, Connectable, Scannable, or RFU + self.AdvMode = self.body[2] >> 6 # Neither, Connectable, Scannable, or RFU hdrBodyLen = self.body[2] & 0x3F if len(self.body) < hdrBodyLen + 1: @@ -476,37 +611,37 @@ def __init__(self, pkt: PacketMessage): hdrPos = 4 if hdrFlags & 0x01: - self.AdvA = self.body[hdrPos:hdrPos+6] + self.AdvA = self.body[hdrPos:hdrPos + 6] hdrPos += 6 if hdrFlags & 0x02: - self.TargetA = self.body[hdrPos:hdrPos+6] + self.TargetA = self.body[hdrPos:hdrPos + 6] hdrPos += 6 if hdrFlags & 0x04: self.CTEInfo = self.body[hdrPos] hdrPos += 1 if hdrFlags & 0x08: - self.AdvDataInfo = AdvDataInfo(self.body[hdrPos:hdrPos+2]) + self.AdvDataInfo = AdvDataInfo(self.body[hdrPos:hdrPos + 2]) hdrPos += 2 if hdrFlags & 0x10: - self.AuxPtr = AuxPtr(self.body[hdrPos:hdrPos+3]) + self.AuxPtr = AuxPtr(self.body[hdrPos:hdrPos + 3]) hdrPos += 3 if hdrFlags & 0x20: # TODO decode this nicely - self.SyncInfo = self.body[hdrPos:hdrPos+18] + self.SyncInfo = self.body[hdrPos:hdrPos + 18] hdrPos += 18 if hdrFlags & 0x40: - self.TxPower = unpack("b", self.body[hdrPos:hdrPos+1])[0] + self.TxPower = unpack("b", self.body[hdrPos:hdrPos + 1])[0] hdrPos += 1 if hdrPos - 3 < hdrBodyLen: ACADLen = hdrBodyLen - (hdrPos - 3) - self.ACAD = self.body[hdrPos:hdrPos+ACADLen] + self.ACAD = self.body[hdrPos:hdrPos + ACADLen] hdrPos += ACADLen self.adv_data = self.body[hdrPos:] def str_aext(self): amodes = ["Non-connectable, non-scannable", - "Connectable", "Scannable", "RFU"] + "Connectable", "Scannable", "RFU"] modemsg = "AdvMode: %s\n" % amodes[self.AdvMode] dispMsgs = [] @@ -533,27 +668,61 @@ def str_aext(self): else: return dmsg + def dict_aext(self): + ret = {} + + amodes = ["Non-connectable, non-scannable", + "Connectable", "Scannable", "RFU"] + ret["AdvMode"] = amodes[self.AdvMode] + if self.AdvA: + ret["AdvA"] = str_mac2(self.AdvA, self.TxAdd) + if self.TargetA: + ret["TargetA"] = str_mac2(self.TargetA, self.RxAdd) + if self.CTEInfo: + ret["CTEInfo"] = self.CTEInfo + if self.AdvDataInfo: + ret["AdvDataInfo"] = self.AdvDataInfo.to_dict() + if self.SyncInfo: + # TODO decode this nicely + ret["SyncInfo"] = self.SyncInfo.hex() + if self.TxPower: + ret["TxPower"] = self.TxPower + if self.ACAD: + ret["ACAD"] = self.ACAD.hex() + if self.AuxPtr: + ret["AuxPtr"] = self.AuxPtr.to_dict() + return ret + def _str_decode(self): return "\n".join([ self.str_adtype(), self.str_aext()]) + def to_dict(self): + return {self.pdutype: self.pkt, "adtype": self.dict_adtype(), "aext": self.dict_aext(), + "AdvData": self.adv_data.hex()} + def get_adi(pkt: PacketMessage): dpkt = AdvExtIndMessage(pkt) return dpkt.AdvDataInfo + class AuxAdvIndMessage(AdvExtIndMessage): pdutype = "AUX_ADV_IND" + class AuxScanRspMessage(AuxAdvIndMessage): pdutype = "AUX_SCAN_RSP" + class AuxChainIndMessage(AuxAdvIndMessage): pdutype = "AUX_CHAIN_IND" + class AuxConnectRspMessage(AdvExtIndMessage): pdutype = "AUX_CONNECT_RSP" + def update_state(pkt: DPacketMessage, dstate: SniffleDecoderState): if isinstance(pkt, ConnectIndMessage): if pkt.chan < 37 and dstate.last_state != SnifferState.ADVERTISING_EXT: @@ -569,15 +738,15 @@ def update_state(pkt: DPacketMessage, dstate: SniffleDecoderState): dstate.aux_pending_crci = None elif isinstance(pkt, AuxAdvIndMessage) and pkt.AuxPtr: dstate.aux_pending_chain = (pkt.AdvDataInfo, pkt.AuxPtr.chan, - pkt.ts + pkt.AuxPtr.offsetUsec*1E-6 + 0.0005) - elif isinstance(pkt, AuxAdvIndMessage) and pkt.AdvMode == 2: # scannable - overhead_bytes = 8 # 1 byte preamble, 4 byte AA, 3 byte CRC - if pkt.phy == 1: # 2M + pkt.ts + pkt.AuxPtr.offsetUsec * 1E-6 + 0.0005) + elif isinstance(pkt, AuxAdvIndMessage) and pkt.AdvMode == 2: # scannable + overhead_bytes = 8 # 1 byte preamble, 4 byte AA, 3 byte CRC + if pkt.phy == 1: # 2M time_per_byte = 4E-6 - elif pkt.phy == 2: # Coded S=8 + elif pkt.phy == 2: # Coded S=8 overhead_bytes = 10 time_per_byte = 64E-6 - elif pkt.phy == 3: # Coded S=2 + elif pkt.phy == 3: # Coded S=2 overhead_bytes = 27 time_per_byte = 16E-6 else: diff --git a/python_cli/sniffle/sniffle_hw.py b/python_cli/sniffle/sniffle_hw.py index 6c9c874..b0eb73e 100644 --- a/python_cli/sniffle/sniffle_hw.py +++ b/python_cli/sniffle/sniffle_hw.py @@ -7,7 +7,7 @@ from struct import pack, unpack from base64 import b64encode, b64decode from binascii import Error as BAError -from time import time +from time import time, sleep from random import randint, randrange from serial.tools.list_ports import comports from traceback import format_exception @@ -105,15 +105,22 @@ class SniffleHW: def __init__(self, serport=None, logger=None, timeout=None, baudrate=None): if baudrate is None: baudrate = 2000000 - + else: + baudrate = int(baudrate) while serport is None: serport = find_xds110_serport() if serport is not None: break serport = find_sonoff_serport() if serport is not None: break serport = find_catsniffer_v3_serport() - if serport is not None: break + if serport is not None: + baudrate = 921600 + break raise IOError("Sniffle device not found") + if is_cp2102(serport): + if baudrate is None: + baudrate = 921600 + self.timeout = timeout self.decoder_state = SniffleDecoderState() @@ -418,10 +425,14 @@ def mark_and_flush(self): marker_data = pack(' 1000: + break def probe_fw_version(self): self.cmd_version() diff --git a/python_cli/sniffle/sniffle_sdr.py b/python_cli/sniffle/sniffle_sdr.py index 093880a..c058ff1 100644 --- a/python_cli/sniffle/sniffle_sdr.py +++ b/python_cli/sniffle/sniffle_sdr.py @@ -394,10 +394,16 @@ def __init__(self, driver='rfnm', mode='single', logger=None): multi_chan = False gain = 10 chan = 37 - + if driver == 'rfnm': - self.sdr = SoapyDevice({'driver': driver}) - + self.sdr = None + results = SoapyDevice.enumerate() + for device in results: + if device["driver"] == driver: + self.sdr = SoapyDevice(device) + break + if self.sdr is None: + assert False, "No SoapySDR found" rates = self.sdr.listSampleRates(SOAPY_SDR_RX, self.sdr_chan) antennas = self.sdr.listAntennas(SOAPY_SDR_RX, self.sdr_chan) self.sdr.setAntenna(SOAPY_SDR_RX, self.sdr_chan, antennas[1]) diff --git a/python_cli/sniffle_extcap.py b/python_cli/sniffle_extcap.py index d971f3c..846403b 100755 --- a/python_cli/sniffle_extcap.py +++ b/python_cli/sniffle_extcap.py @@ -188,6 +188,7 @@ def loadArgs(self, args=None): help="Ignore encrypted PHY mode changes") argParser.add_argument("--crcerr", action="store_true", help="Capture packets with CRC errors") + argParser.add_argument("--baudrate", default=None, help="Sniffer serial port baudrate") self.args = argParser.parse_args(args=args)