Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions install/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ black==24.10.0
ruff==0.11.7
pre-commit==4.0.1
coverage==7.8.0
netifaces==0.11.0
pyyaml
pytest-asyncio
git+https://github.com/SECEF/python-idmefv2.git
2 changes: 1 addition & 1 deletion modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def init(self):
self.dns_analyzer = DNS(self.db, flowalerts=self)
self.is_running_non_stop: bool = self.db.is_running_non_stop()
self.classifier = FlowClassifier()
self.our_ips = utils.get_own_ips()
self.our_ips: List[str] = utils.get_own_ips(ret=List)
self.input_type: str = self.db.get_input_type()
self.multiple_reconnection_attempts_threshold = 5
# we use this to try to detect if there's dns server that has a
Expand Down
2 changes: 1 addition & 1 deletion modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def init(self):
self.arpa_scan_threshold = 10
self.is_running_non_stop: bool = self.db.is_running_non_stop()
self.classifier = FlowClassifier()
self.our_ips = utils.get_own_ips()
self.our_ips: List[str] = utils.get_own_ips(ret=List)
# In mins
self.dns_without_conn_interface_wait_time = 30
# to store dns queries that we should check later. the purpose of
Expand Down
29 changes: 7 additions & 22 deletions modules/ip_info/ip_info.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <sebastian.garcia@agents.fel.cvut.cz>
# SPDX-License-Identifier: GPL-2.0-only
import platform
from asyncio import Task
from typing import (
Union,
Expand All @@ -16,7 +15,7 @@
import json
from contextlib import redirect_stdout, redirect_stderr
import subprocess
import re
import netifaces
import time
import asyncio
import multiprocessing
Expand Down Expand Up @@ -338,26 +337,12 @@ def get_gateway_ip_if_interface(self):
# only works if running on an interface
return False

gw_ip = False
if platform.system() == "Darwin":
route_default_result = subprocess.check_output(
["route", "get", "default"]
).decode()
try:
gw_ip = re.search(
r"\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}",
route_default_result,
).group(0)
except AttributeError:
pass

elif platform.system() == "Linux":
route_default_result = re.findall(
r"([\w.][\w.]*'?\w?)",
subprocess.check_output(["ip", "route"]).decode(),
)
gw_ip = route_default_result[2]
return gw_ip
gws = netifaces.gateways()
try:
return gws["default"][netifaces.AF_INET][0]
except (KeyError, IndexError):
# no default gateway found
return False

def get_gateway_mac(self, gw_ip: str) -> Optional[str]:
"""
Expand Down
69 changes: 44 additions & 25 deletions slips_files/common/slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from re import findall
from threading import Thread

import netifaces
from uuid import UUID
import tldextract
import validators
Expand All @@ -19,12 +20,7 @@
import sys
import ipaddress
import aid_hash
from typing import (
Any,
Optional,
Union,
List,
)
from typing import Any, Optional, Union, List, Dict
from ipaddress import IPv4Network, IPv6Network, IPv4Address, IPv6Address
from dataclasses import is_dataclass, asdict
from enum import Enum
Expand Down Expand Up @@ -405,26 +401,41 @@ def to_delta(self, time_in_seconds):
def get_human_readable_datetime(self) -> str:
return utils.convert_format(datetime.now(), self.alerts_format)

def get_own_ips(self) -> list:
def get_own_ips(self, ret=Dict) -> Union[Dict[str, List[str]], List[str]]:
"""
Returns a list of our local and public IPs
Returns a dict of our private and public IPs
return a dict by default
e.g. { "ipv4": [..], "ipv6": [..] }
and returns a list of all the ips combined if ret=List is given
"""
if "-i" not in sys.argv:
if "-i" not in sys.argv and "-g" not in sys.argv:
# this method is only valid when running on an interface
return []

IPs = []
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1))
IPs.append(s.getsockname()[0])
except Exception:
IPs.append("127.0.0.1")
finally:
s.close()
ips = {"ipv4": [], "ipv6": []}

# get public ip
interfaces = netifaces.interfaces()

for interface in interfaces:
try:
addrs = netifaces.ifaddresses(interface)

# get IPv4 addresses
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
ips["ipv4"].append(addr["addr"])

# get IPv6 addresses
if netifaces.AF_INET6 in addrs:
for addr in addrs[netifaces.AF_INET6]:
# remove interface suffix
ip = addr["addr"].split("%")[0]
ips["ipv6"].append(ip)

except Exception as e:
print(f"Error processing interface {interface}: {e}")

# get public ip
try:
response = requests.get(
"http://ipinfo.io/json",
Expand All @@ -435,19 +446,27 @@ def get_own_ips(self) -> list:
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ReadTimeout,
):
return IPs
return ips

if response.status_code != 200:
return IPs
return ips
if "Connection timed out" in response.text:
return IPs
return ips
try:
response = json.loads(response.text)
except json.decoder.JSONDecodeError:
return IPs
return ips

public_ip = response["ip"]
IPs.append(public_ip)
return IPs
if validators.ipv4(public_ip):
ips["ipv4"].append(public_ip)
elif validators.ipv6(public_ip):
ips["ipv6"].append(public_ip)

if ret == Dict:
return ips
elif ret == List:
return [ip for sublist in ips.values() for ip in sublist]

def convert_to_mb(self, bytes):
return int(bytes) / (10**6)
Expand Down
2 changes: 1 addition & 1 deletion slips_files/core/database/redis_db/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class RedisDB(IoCHandler, AlertHandler, ProfileHandler, P2PHandler):
# flag to know if we found the gateway MAC using the most seen MAC method
_gateway_MAC_found = False
_conf_file = "config/redis.conf"
our_ips = utils.get_own_ips()
our_ips: List[str] = utils.get_own_ips(ret=List)
# flag to know which flow is the start of the pcap/file
first_flow = True
# to make sure we only detect and store the user's localnet once
Expand Down
2 changes: 1 addition & 1 deletion slips_files/core/evidence_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def init(self):
self.jsonfile = self.clean_file(self.output_dir, "alerts.json")
utils.change_logfiles_ownership(self.jsonfile.name, self.UID, self.GID)
# this list will have our local and public ips when using -i
self.our_ips = utils.get_own_ips()
self.our_ips: List[str] = utils.get_own_ips(ret=List)
self.formatter = EvidenceFormatter(self.db)
# thats just a tmp value, this variable will be set and used when
# the
Expand Down
31 changes: 17 additions & 14 deletions tests/test_ip_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import asyncio

import netifaces

from tests.module_factory import ModuleFactory
import maxminddb
import pytest
Expand Down Expand Up @@ -405,27 +407,28 @@ async def test_shutdown_gracefully(


@pytest.mark.parametrize(
"platform_system, subprocess_output, expected_ip",
"is_running_non_stop, netifaces_ret, expected_return_value",
[
# Testcase 1: MacOS (Darwin) with valid output
("Darwin", b"gateway: 192.168.1.1", "192.168.1.1"),
# Testcase 2: Linux with valid output
("Linux", b"default via 10.0.0.1 dev eth0", "10.0.0.1"),
# Testcase 3: MacOS with invalid output
("Darwin", b"No default gateway", False),
# Testcase 4: Unsupported OS
("Windows", b"", False),
(
True,
{"default": {netifaces.AF_INET: ["192.168.1.1"]}},
"192.168.1.1",
),
(True, {}, False),
(True, {"default": {}}, False),
(True, {"default": {netifaces.AF_INET: []}}, False),
(False, "192.168.1.1", False),
],
)
def test_get_gateway_ip(
mocker, platform_system, subprocess_output, expected_ip
mocker, is_running_non_stop, netifaces_ret, expected_return_value
):
ip_info = ModuleFactory().create_ip_info_obj()
mocker.patch("platform.system", return_value=platform_system)
mocker.patch("subprocess.check_output", return_value=subprocess_output)
mocker.patch("sys.argv", ["-i", "eth0"])
ip_info.is_running_non_stop = is_running_non_stop
mocker.patch("netifaces.gateways", return_value=netifaces_ret)

result = ip_info.get_gateway_ip_if_interface()
assert result == expected_ip
assert result == expected_return_value


@pytest.mark.parametrize(
Expand Down
7 changes: 5 additions & 2 deletions tests/test_slips_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytz
import json
from collections import namedtuple
from typing import List


def test_get_sha256_hash():
Expand Down Expand Up @@ -289,13 +290,15 @@ def _check_ip_presence(utils, expected_ip):
Helper function to check if the given IP is present
in the list of own IPs.
"""
return expected_ip in utils.get_own_ips() or not utils.get_own_ips()
return (
expected_ip in utils.get_own_ips(ret=List) or not utils.get_own_ips()
)


def test_get_own_ips_success():
"""Test that the function returns a list when successful."""
utils = ModuleFactory().create_utils_obj()
ips = utils.get_own_ips()
ips = utils.get_own_ips(ret=List)
assert isinstance(ips, list), "Should return a list of IPs"


Expand Down
Loading