Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Sniffle has a number of useful features, including:
* Easy to extend host-side software written in Python
* PCAP export compatible with the Ubertooth
* Wireshark compatible plugin
* ZMQ Publishing server

## Prerequisites

Expand Down Expand Up @@ -279,6 +280,8 @@ options:
-d, --decode Decode advertising data
-o OUTPUT, --output OUTPUT
PCAP output file name
-z, --zmq Enable ZMQ server
--zmqsetting Set ZMQ server ip and port (Default 127.0.0.1:4222)
```

The XDS110 debugger on the Launchpad boards creates two serial ports. On
Expand Down Expand Up @@ -389,15 +392,14 @@ options:
-s SERPORT, --serport SERPORT
Sniffer serial port name
-b BAUDRATE, --baudrate BAUDRATE
Sniffer serial port baud rate
Sniffer baudrate (2000000 or 921600)
-c {37,38,39}, --advchan {37,38,39}
Advertising channel to listen on
-r RSSI, --rssi RSSI Filter packets by minimum RSSI
-l, --longrange Use long range (coded) PHY for primary advertising
-d, --decode Decode advertising data
-o OUTPUT, --output OUTPUT
PCAP output file name

```

The scanner command line arguments work the same as the sniffer. The purpose of
Expand Down
Empty file added python_cli/__init__.py
Empty file.
6 changes: 4 additions & 2 deletions python_cli/advertiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@
from sniffle.sniffle_hw import SniffleHW

# global variable to access hardware
hw = None
HW = None


def main():
aparse = argparse.ArgumentParser(description="Advertiser test script for Sniffle BLE5 sniffer")
aparse.add_argument("-s", "--serport", default=None, help="Sniffer serial port name")
aparse.add_argument("-b", "--baudrate", default=None, help="Sniffer serial port baud rate")
args = aparse.parse_args()

global hw
global HW
hw = SniffleHW(args.serport, baudrate=args.baudrate)

# set the advertising channel (and return to ad-sniffing mode)
Expand Down Expand Up @@ -70,5 +71,6 @@ def main():
if msg is not None:
print(msg, end='\n\n')


if __name__ == "__main__":
main()
24 changes: 12 additions & 12 deletions python_cli/initiator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ScanRspMessage, str_mac)

# global variable to access hardware
hw = None
HW = None
_aa = 0

def main():
Expand All @@ -31,7 +31,7 @@ def main():
help="Supplied MAC address is public")
args = aparse.parse_args()

global hw
global HW
hw = SniffleHW(args.serport, baudrate=args.baudrate)

targ_specs = bool(args.mac) + bool(args.irk) + bool(args.string)
Expand Down Expand Up @@ -101,23 +101,23 @@ def main():
print_message(msg)

def get_mac_from_irk(irk):
hw.cmd_irk(irk, False)
hw.mark_and_flush()
HW.cmd_irk(irk, False)
HW.mark_and_flush()
print("Waiting for advertisement with suitable RPA...")
while True:
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
if isinstance(msg, (AdvaMessage, AdvDirectIndMessage,
AdvExtIndMessage)) and msg.AdvA is not None:
print("Found target MAC: %s" % str_mac(msg.AdvA))
return msg.AdvA

def get_mac_from_string(s):
hw.cmd_mac()
hw.cmd_scan()
hw.mark_and_flush()
HW.cmd_mac()
HW.cmd_scan()
HW.mark_and_flush()
print("Waiting for advertisement containing specified string...")
while True:
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
if isinstance(msg, (AdvaMessage, AdvDirectIndMessage, ScanRspMessage,
AdvExtIndMessage)) and msg.AdvA is not None:
if s in msg.body:
Expand All @@ -132,7 +132,7 @@ def print_message(msg):
elif isinstance(msg, StateMessage):
print(msg)
if msg.new_state == SnifferState.CENTRAL:
hw.decoder_state.cur_aa = _aa
HW.decoder_state.cur_aa = _aa
print()

msg_ctr = 0
Expand All @@ -143,7 +143,7 @@ def print_packet(dpkt):
global msg_ctr
MCMASK = 3
if (msg_ctr & MCMASK) == MCMASK:
hw.cmd_transmit(3, b'\x12') # LL_PING_REQ
HW.cmd_transmit(3, b'\x12') # LL_PING_REQ
msg_ctr += 1

# also test sending LL_CONNECTION_UPDATE_IND
Expand All @@ -154,7 +154,7 @@ def print_packet(dpkt):
# Latency = 0x0003
# Timeout = 0x0080
# Instant = 0x0080
hw.cmd_transmit(3, b'\x00\x04\x08\x00\x30\x00\x03\x00\x80\x00\x80\x00')
HW.cmd_transmit(3, b'\x00\x04\x08\x00\x30\x00\x03\x00\x80\x00\x80\x00')
print("sent change!")

if __name__ == "__main__":
Expand Down
100 changes: 50 additions & 50 deletions python_cli/relay_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@
"""

# global variable to access hardware
hw = None
HW = None

# global variable for pcap writer
pcwriter = None

def sigint_handler(sig, frame):
hw.cancel_recv()
hw.cmd_chan_aa_phy() # stop scanning or connection
hw.cmd_rssi(0)
HW.cancel_recv()
HW.cmd_chan_aa_phy() # stop scanning or connection
HW.cmd_rssi(0)
sys.exit(0)

def main():
Expand All @@ -106,7 +106,7 @@ def main():
aparse.add_argument("-o", "--output", default=None, help="PCAP output file name")
args = aparse.parse_args()

global hw
global HW
hw = SniffleHW(args.serport)

# put the hardware in a normal state (passive scanning) and configure it with an impossibly
Expand Down Expand Up @@ -280,17 +280,17 @@ def sock_recv_print_forward(conn, quiet, filter_changes=False):
# construct packet object for display and PCAP
pkt = DPacketMessage.from_body(body, True)
pkt.ts_epoch = time()
pkt.ts = pkt.ts_epoch - hw.decoder_state.first_epoch_time
pkt.aa = hw.decoder_state.cur_aa
pkt.ts = pkt.ts_epoch - HW.decoder_state.first_epoch_time
pkt.aa = HW.decoder_state.cur_aa
pkt.event = event

# Passing on PDUs with instants in the past would break the connection
if not (filter_changes and has_instant(pkt)):
hw.cmd_transmit(llid, pdu, event)
HW.cmd_transmit(llid, pdu, event)
print_message(pkt, quiet)

def ser_recv_print_forward(conn, quiet, filter_changes=False):
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()

if isinstance(msg, PacketMessage):
msg = DPacketMessage.decode(msg)
Expand All @@ -302,7 +302,7 @@ def ser_recv_print_forward(conn, quiet, filter_changes=False):
conn.send_msg(MessageType.PACKET, pack('<H', msg.event) + msg.body)
if block_req:
# LL_REJECT_EXT_IND, unacceptable connection parameters
hw.cmd_transmit(3, b'\x11\x0F\x3B')
HW.cmd_transmit(3, b'\x11\x0F\x3B')

print_message(msg, quiet)

Expand All @@ -315,17 +315,17 @@ def print_message(msg, quiet=False):
print(msg, end='\n\n')

def get_mac_from_irk(irk, chan=37):
hw.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
hw.cmd_pause_done(True)
hw.cmd_follow(False) # capture advertisements only
hw.cmd_rssi(-128)
hw.cmd_irk(irk, False)
hw.cmd_auxadv(False)
hw.mark_and_flush()
HW.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
HW.cmd_pause_done(True)
HW.cmd_follow(False) # capture advertisements only
HW.cmd_rssi(-128)
HW.cmd_irk(irk, False)
HW.cmd_auxadv(False)
HW.mark_and_flush()

print("Waiting for advertisement with suitable RPA...")
while True:
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
if not isinstance(msg, PacketMessage):
continue
dpkt = DPacketMessage.decode(msg)
Expand All @@ -334,19 +334,19 @@ def get_mac_from_irk(irk, chan=37):
return dpkt.AdvA

def get_mac_from_string(s, chan=37):
hw.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
hw.cmd_pause_done(True)
hw.cmd_follow(False) # capture advertisements only
hw.cmd_rssi(-128)
hw.cmd_mac()
hw.cmd_auxadv(False)
hw.random_addr()
hw.cmd_scan()
hw.mark_and_flush()
HW.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
HW.cmd_pause_done(True)
HW.cmd_follow(False) # capture advertisements only
HW.cmd_rssi(-128)
HW.cmd_mac()
HW.cmd_auxadv(False)
HW.random_addr()
HW.cmd_scan()
HW.mark_and_flush()

print("Waiting for advertisement containing specified string...")
while True:
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
if not isinstance(msg, PacketMessage):
continue
dpkt = DPacketMessage.decode(msg)
Expand All @@ -359,18 +359,18 @@ def scan_target(mac):
advPkt = None
scanRspPkt = None

hw.cmd_chan_aa_phy(37, BLE_ADV_AA, 0)
hw.cmd_pause_done(True)
hw.cmd_follow(False)
hw.cmd_rssi(-128)
hw.cmd_mac(mac, False)
hw.cmd_auxadv(False) # we only support impersonating legacy advertisers for now
hw.random_addr()
hw.cmd_scan()
hw.mark_and_flush()
HW.cmd_chan_aa_phy(37, BLE_ADV_AA, 0)
HW.cmd_pause_done(True)
HW.cmd_follow(False)
HW.cmd_rssi(-128)
HW.cmd_mac(mac, False)
HW.cmd_auxadv(False) # we only support impersonating legacy advertisers for now
HW.random_addr()
HW.cmd_scan()
HW.mark_and_flush()

while (advPkt is None) or (scanRspPkt is None):
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
if not isinstance(msg, PacketMessage):
continue
dpkt = DPacketMessage.decode(msg)
Expand All @@ -397,22 +397,22 @@ def scan_target(mac):

def connect_target(targ_mac, chan=37, targ_random=True, initiator_mac=None, initiator_random=True,
interval=24, latency=1, preloads=[]):
hw.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
hw.cmd_pause_done(True)
hw.cmd_follow(False)
hw.cmd_rssi(-128)
hw.cmd_mac(targ_mac, False)
hw.cmd_auxadv(False)
hw.cmd_interval_preload(preloads)
hw.cmd_phy_preload()
HW.cmd_chan_aa_phy(chan, BLE_ADV_AA, 0)
HW.cmd_pause_done(True)
HW.cmd_follow(False)
HW.cmd_rssi(-128)
HW.cmd_mac(targ_mac, False)
HW.cmd_auxadv(False)
HW.cmd_interval_preload(preloads)
HW.cmd_phy_preload()
if initiator_mac is None:
hw.random_addr()
HW.random_addr()
else:
hw.cmd_setaddr(initiator_mac, initiator_random)
hw.mark_and_flush()
HW.cmd_setaddr(initiator_mac, initiator_random)
HW.mark_and_flush()

# now enter initiator mode
return hw.initiate_conn(targ_mac, targ_random, interval, latency)
return HW.initiate_conn(targ_mac, targ_random, interval, latency)

def print_packet(pkt, quiet=False):
is_not_empty = not (isinstance(pkt, LlDataContMessage) and pkt.data_length == 0)
Expand Down
16 changes: 8 additions & 8 deletions python_cli/relay_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
from sniffle.relay_protocol import connect_relay, MessageType

# global variable to access hardware
hw = None
HW = None
_aa = 0

def sigint_handler(sig, frame):
hw.cancel_recv()
hw.cmd_chan_aa_phy() # stop advertising or connection
hw.cmd_rssi(0)
HW.cancel_recv()
HW.cmd_chan_aa_phy() # stop advertising or connection
HW.cmd_rssi(0)
sys.exit(0)

def main():
Expand All @@ -33,7 +33,7 @@ def main():
help="Don't show empty packets")
args = aparse.parse_args()

global hw
global HW
hw = SniffleHW(args.serport)

# put the hardware in a normal state (passive scanning) and configure it with an impossibly
Expand Down Expand Up @@ -131,15 +131,15 @@ def sock_recv_print_forward(conn):
body = body[2:]
llid = body[0] & 3
pdu = body[2:]
hw.cmd_transmit(llid, pdu, event)
HW.cmd_transmit(llid, pdu, event)
pkt = DPacketMessage.from_body(body, True, True)
pkt.ts_epoch = time()
pkt.ts = pkt.ts_epoch - hw.decoder_state.first_epoch_time
pkt.ts = pkt.ts_epoch - HW.decoder_state.first_epoch_time
pkt.event = event
print(pkt, end='\n\n')

def ser_recv_print_forward(conn, quiet):
msg = hw.recv_and_decode()
msg = HW.recv_and_decode()
print_message(msg, quiet)

# only forward packets
Expand Down
4 changes: 4 additions & 0 deletions python_cli/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pyserial
zmq
numpy
scipy
Loading