Skip to content
155 changes: 71 additions & 84 deletions pwnagotchi/plugins/default/net-pos.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,104 +9,96 @@


class NetPos(plugins.Plugin):
__author__ = 'zenzen san'
__version__ = '2.0.3'
__author__ = 'zenzen san, doki'
__version__ = '2.0.4'
__license__ = 'GPL3'
__description__ = """Saves a json file with the access points with more signal
whenever a handshake is captured.
When internet is available the files are converted in geo locations
using Mozilla LocationService """
When internet is available the files are converted to geo locations
using Google Geolocation API."""

API_URL = 'https://location.services.mozilla.com/v1/geolocate?key={api}'
API_URL = 'https://www.googleapis.com/geolocation/v1/geolocate?key={api}'

def __init__(self):
self.report = StatusFile('/root/.net_pos_saved', data_format='json')
self.skip = list()
self.ready = False
self.lock = threading.Lock()
self.report_path = '/root/.net_pos_saved'
self.report = StatusFile(self.report_path, data_format='json')

def on_loaded(self):
if 'api_key' not in self.options or ('api_key' in self.options and not self.options['api_key']):
logging.error("NET-POS: api_key isn't set. Can't use mozilla's api.")
if 'api_key' not in self.options or not self.options['api_key']:
logging.error("NET-POS: 'api_key' not set. Can't use Google's Geolocation API.")
return
if 'api_url' in self.options:
self.API_URL = self.options['api_url']
if 'status_file' in self.options:
self.report_path = self.options['status_file']
self.report = StatusFile(self.report_path, data_format='json')

self.ready = True
logging.info("net-pos plugin loaded.")
logging.debug(f"net-pos: use api_url: {self.API_URL}");

def _append_saved(self, path):
to_save = list()
if isinstance(path, str):
to_save.append(path)
elif isinstance(path, list):
to_save += path
else:
raise TypeError("Expected list or str, got %s" % type(path))

with open('/root/.net_pos_saved', 'a') as saved_file:
for x in to_save:
saved_file.write(x + "\n")
logging.info("NET-POS: Plugin loaded successfully.")
logging.debug(f"NET-POS: Using API URL: {self.API_URL}")

def on_internet_available(self, agent):
if self.lock.locked():
return
with self.lock:
if self.ready:
config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']

all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, filename)
for filename in all_files
if filename.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(reported) - set(self.skip)

if new_np_files:
logging.debug("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)
for idx, np_file in enumerate(new_np_files):

geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
# got already the position
reported.append(np_file)
self.report.update(data={'reported': reported})
continue

try:
geo_data = self._get_geo_data(np_file) # returns json obj
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s - RequestException: %s", np_file, req_e)
self.skip += np_file
continue
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s - JSONDecodeError: %s, removing it...", np_file, js_e)
os.remove(np_file)
continue
except OSError as os_e:
logging.error("NET-POS: %s - OSError: %s", np_file, os_e)
self.skip += np_file
continue

with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)

if not self.ready:
return

config = agent.config()
display = agent.view()
reported = self.report.data_field_or('reported', default=list())
handshake_dir = config['bettercap']['handshakes']

all_files = os.listdir(handshake_dir)
all_np_files = [os.path.join(handshake_dir, f)
for f in all_files if f.endswith('.net-pos.json')]
new_np_files = set(all_np_files) - set(reported) - set(self.skip)

if new_np_files:
logging.debug("NET-POS: Found %d new net-pos files. Fetching positions ...", len(new_np_files))
display.set('status', f"Found {len(new_np_files)} new net-pos files. Fetching positions ...")
display.update(force=True)

for idx, np_file in enumerate(new_np_files):
geo_file = np_file.replace('.net-pos.json', '.geo.json')
if os.path.exists(geo_file):
reported.append(np_file)
self.report.update(data={'reported': reported})

display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})")
display.update(force=True)
continue

try:
geo_data = self._get_geo_data(np_file)
except requests.exceptions.RequestException as req_e:
logging.error("NET-POS: %s - RequestException: %s", np_file, req_e)
self.skip.append(np_file)
continue
except json.JSONDecodeError as js_e:
logging.error("NET-POS: %s - JSONDecodeError: %s, removing it...", np_file, js_e)
os.remove(np_file)
continue
except OSError as os_e:
logging.error("NET-POS: %s - OSError: %s", np_file, os_e)
self.skip.append(np_file)
continue

with open(geo_file, 'w+t') as sf:
json.dump(geo_data, sf)

reported.append(np_file)
self.report.update(data={'reported': reported})

display.set('status', f"Fetching positions ({idx + 1}/{len(new_np_files)})")
display.update(force=True)

def on_handshake(self, agent, filename, access_point, client_station):
netpos = self._get_netpos(agent)
if not netpos['wifiAccessPoints']:
return

netpos["ts"] = int("%.0f" % time.time())
netpos["ts"] = int(time.time())
netpos_filename = filename.replace('.pcap', '.net-pos.json')
logging.debug("NET-POS: Saving net-location to %s", netpos_filename)

Expand All @@ -116,15 +108,14 @@ def on_handshake(self, agent, filename, access_point, client_station):
except OSError as os_e:
logging.error("NET-POS: %s", os_e)


def _get_netpos(self, agent):
aps = agent.get_access_points()
netpos = dict()
netpos['wifiAccessPoints'] = list()
# 6 seems a good number to save a wifi networks location
netpos = {'wifiAccessPoints': []}
for access_point in sorted(aps, key=lambda i: i['rssi'], reverse=True)[:6]:
netpos['wifiAccessPoints'].append({'macAddress': access_point['mac'],
'signalStrength': access_point['rssi']})
netpos['wifiAccessPoints'].append({
'macAddress': access_point['mac'],
'signalStrength': access_point['rssi']
})
return netpos

def _get_geo_data(self, path, timeout=30):
Expand All @@ -133,17 +124,13 @@ def _get_geo_data(self, path, timeout=30):
try:
with open(path, "r") as json_file:
data = json.load(json_file)
except json.JSONDecodeError as js_e:
raise js_e
except OSError as os_e:
raise os_e
except (json.JSONDecodeError, OSError) as e:
raise e

try:
result = requests.post(geourl,
json=data,
timeout=timeout)
result = requests.post(geourl, json=data, timeout=timeout)
return_geo = result.json()
if data["ts"]:
if data.get("ts"):
return_geo["ts"] = data["ts"]
return return_geo
except requests.exceptions.RequestException as req_e:
Expand Down
3 changes: 3 additions & 0 deletions pwnagotchi/ui/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def is_waveshare35lcd(self):

def is_spotpear24inch(self):
return self._implementation.name == 'spotpear24inch'

def is_displayhatmini(self):
return self._implementation.name == 'displayhatmini'

def is_waveshare_any(self):
return self.is_waveshare_v1() or self.is_waveshare_v2()
Expand Down
4 changes: 4 additions & 0 deletions pwnagotchi/ui/hw/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pwnagotchi.ui.hw.waveshare213bc import Waveshare213bc
from pwnagotchi.ui.hw.waveshare35lcd import Waveshare35lcd
from pwnagotchi.ui.hw.spotpear24inch import Spotpear24inch
from pwnagotchi.ui.hw.displayhatmini import DisplayHatMini

def display_for(config):
# config has been normalized already in utils.load_config
Expand Down Expand Up @@ -68,3 +69,6 @@ def display_for(config):

elif config['ui']['display']['type'] == 'spotpear24inch':
return Spotpear24inch(config)

elif config['ui']['display']['type'] == 'displayhatmini':
return DisplayHatMini(config)
44 changes: 44 additions & 0 deletions pwnagotchi/ui/hw/displayhatmini.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

import pwnagotchi.ui.fonts as fonts
from pwnagotchi.ui.hw.base import DisplayImpl


class DisplayHatMini(DisplayImpl):
def __init__(self, config):
super(DisplayHatMini, self).__init__(config, 'displayhatmini')
self._display = None

def layout(self):
fonts.setup(12, 10, 12, 70, 25, 9)
self._layout['width'] = 320
self._layout['height'] = 240
self._layout['face'] = (35, 50)
self._layout['name'] = (5, 20)
self._layout['channel'] = (0, 0)
self._layout['aps'] = (40, 0)
self._layout['uptime'] = (240, 0)
self._layout['line1'] = [0, 14, 320, 14]
self._layout['line2'] = [0, 220, 320, 220]
self._layout['friend_face'] = (0, 130)
self._layout['friend_name'] = (40, 135)
self._layout['shakes'] = (0, 220)
self._layout['mode'] = (280, 220)
self._layout['status'] = {
'pos': (80, 160),
'font': fonts.status_font(fonts.Medium),
'max': 20
}

return self._layout

def initialize(self):
logging.info("initializing Display Hat Mini")
from pwnagotchi.ui.hw.libs.pimoroni.displayhatmini.ST7789 import ST7789
self._display = ST7789(0,1,9,13)

def render(self, canvas):
self._display.display(canvas)

def clear(self):
self._display.clear()
Loading