-
Notifications
You must be signed in to change notification settings - Fork 202
Expand file tree
/
Copy pathblocking.py
More file actions
255 lines (228 loc) · 9.77 KB
/
blocking.py
File metadata and controls
255 lines (228 loc) · 9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <sebastian.garcia@agents.fel.cvut.cz>
# SPDX-License-Identifier: GPL-2.0-only
import platform
import sys
import os
import shutil
import json
import subprocess
from typing import Dict
import time
from threading import Lock
from slips_files.common.abstracts.imodule import IModule
from slips_files.common.slips_utils import utils
from .exec_iptables_cmd import exec_iptables_command
from modules.blocking.unblocker import Unblocker
class Blocking(IModule):
"""Data should be passed to this module as a json encoded python dict,
by default this module flushes all slipsBlocking chains before it starts"""
# Name: short name of the module. Do not use spaces
name = "Blocking"
description = "Block malicious IPs connecting to this device"
authors = ["Sebastian Garcia, Alya Gomaa"]
def init(self):
self.c1 = self.db.subscribe("new_blocking")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_blocking": self.c1,
"tw_closed": self.c2,
}
if platform.system() == "Darwin":
self.print("Mac OS blocking is not supported yet.")
sys.exit()
self.firewall = self._determine_linux_firewall()
self.sudo = utils.get_sudo_according_to_env()
self._init_chains_in_firewall()
self.blocking_log_path = os.path.join(self.output_dir, "blocking.log")
self.blocking_logfile_lock = Lock()
# clear it
try:
open(self.blocking_log_path, "w").close()
except FileNotFoundError:
pass
def log(self, text: str):
"""Logs the given text to the blocking log file"""
with self.blocking_logfile_lock:
with open(self.blocking_log_path, "a") as f:
now = time.time()
human_readable_datetime = utils.convert_ts_format(
now, utils.alerts_format
)
f.write(f"{human_readable_datetime} - {text}\n")
def _determine_linux_firewall(self):
"""Returns the currently installed firewall and installs iptables if
none was found"""
if shutil.which("iptables"):
# comes pre installed in docker
return "iptables"
else:
# no firewall installed
# user doesn't have a firewall
self.print(
"iptables is not installed. Blocking module is quitting."
)
sys.exit()
def _get_cmd_output(self, command):
"""Executes a command and returns the output"""
result = subprocess.run(command.split(), stdout=subprocess.PIPE)
return result.stdout.decode("utf-8")
def _init_chains_in_firewall(self):
"""For linux: Adds a chain to iptables or a table to nftables called
slipsBlocking where all the rules will reside"""
if self.firewall != "iptables":
return
# delete any pre existing slipsBlocking rules that may conflict before
# adding a new one
# self.delete_iptables_chain()
self.print('Executing "sudo iptables -N slipsBlocking"', 6, 0)
# Add a new chain to iptables
os.system(f"{self.sudo} iptables -N slipsBlocking >/dev/null 2>&1")
# Check if we're already redirecting to slipsBlocking chain
input_chain_rules = self._get_cmd_output(
f"{self.sudo} iptables -nvL INPUT"
)
output_chain_rules = self._get_cmd_output(
f"{self.sudo} iptables -nvL OUTPUT"
)
forward_chain_rules = self._get_cmd_output(
f"{self.sudo} iptables -nvL FORWARD"
)
# Redirect the traffic from all other chains to slipsBlocking so rules
# in any pre-existing chains dont override it
# -I to insert slipsBlocking at the top of the INPUT, OUTPUT and
# FORWARD chains
if "slipsBlocking" not in input_chain_rules:
os.system(
self.sudo
+ " iptables -I INPUT -j slipsBlocking >/dev/null 2>&1"
)
if "slipsBlocking" not in output_chain_rules:
os.system(
self.sudo
+ " iptables -I OUTPUT -j slipsBlocking >/dev/null 2>&1"
)
if "slipsBlocking" not in forward_chain_rules:
os.system(
self.sudo
+ " iptables -I FORWARD -j slipsBlocking >/dev/null 2>&1"
)
def _is_ip_already_blocked(self, ip) -> bool:
"""Checks if ip is already blocked or not using iptables"""
command = f"{self.sudo} iptables -L slipsBlocking -v -n"
# Execute command
result = subprocess.run(command.split(), stdout=subprocess.PIPE)
result = result.stdout.decode("utf-8")
return ip in result
def _block_ip(self, ip_to_block: str, flags: Dict[str, str]) -> bool:
"""
This function determines the user's platform and firewall and calls
the appropriate function to add the rules to the used firewall.
By default this function blocks all traffic from and to the given ip.
return strue if the ip is successfully blocked
"""
if self.firewall != "iptables":
return
if not isinstance(ip_to_block, str):
return False
# Make sure ip isn't already blocked before blocking
if self._is_ip_already_blocked(ip_to_block):
return False
from_ = flags.get("from_")
to = flags.get("to")
dport = flags.get("dport")
sport = flags.get("sport")
protocol = flags.get("protocol")
# Set the default behaviour to block all traffic from and to an ip
if from_ is None and to is None:
from_, to = True, True
# This dictionary will be used to construct the rule
options = {
"protocol": f" -p {protocol}" if protocol is not None else "",
"dport": f" --dport {str(dport)}" if dport is not None else "",
"sport": f" --sport {str(sport)}" if sport is not None else "",
}
blocked = False
if from_:
# Add rule to block traffic from source ip_to_block (-s)
blocked = exec_iptables_command(
self.sudo,
action="insert",
ip_to_block=ip_to_block,
flag="-s",
options=options,
)
if blocked:
txt = f"Blocked all traffic from: {ip_to_block}"
self.print(txt)
self.log(txt)
if to:
# Add rule to block traffic to ip_to_block (-d)
blocked = exec_iptables_command(
self.sudo,
action="insert",
ip_to_block=ip_to_block,
flag="-d",
options=options,
)
if blocked:
txt = f"Blocked all traffic to: {ip_to_block}"
self.print(txt)
self.log(f"Blocked all traffic to: {ip_to_block}")
self.db.set_blocked_ip(ip_to_block)
return blocked
def shutdown_gracefully(self):
self.unblocker.unblocker_thread.join(30)
if self.unblocker.unblocker_thread.is_alive():
self.print("Problem shutting down unblocker thread.")
def pre_main(self):
self.unblocker = Unblocker(
self.db, self.sudo, self.should_stop, self.logger, self.log
)
def main(self):
if msg := self.get_msg("new_blocking"):
# message['data'] in the new_blocking channel is a dictionary that contains
# the ip and the blocking options
# Example of the data dictionary to block or unblock an ip:
# (notice you have to specify from,to,dport,sport,protocol or at
# least 2 of them when unblocking)
# blocking_data = {
# "ip" : "0.0.0.0"
# "tw" : 1
# "block" : True to block - False to unblock
# "from" : True to block traffic from ip (default) - False does nothing
# "to" : True to block traffic to ip (default) - False does nothing
# "dport" : Optional destination port number
# "sport" : Optional source port number
# "protocol" : Optional protocol
# }
# Example of passing blocking_data to this module:
# blocking_data = json.dumps(blocking_data)
# self.db.publish('new_blocking', blocking_data )
data = json.loads(msg["data"])
ip = data.get("ip")
tw: int = data.get("tw")
block = data.get("block")
flags = {
"from_": data.get("from"),
"to": data.get("to"),
"dport": data.get("dport"),
"sport": data.get("sport"),
"protocol": data.get("protocol"),
}
if block:
self._block_ip(ip, flags)
# whether this ip is blocked now, or was already blocked, make an
# unblocking request to either extend its
# blocking period, or block it until the next timewindow is over.
self.unblocker.unblock_request(ip, tw, flags)
if msg := self.get_msg("tw_closed"):
# this channel receives requests for closed tws for every ip
# slips sees.
# if slips saw 3 ips, this channel will receive 3 msgs with tw1
# as closed. we're not interested in the ips, we just wanna
# know when slips advances to the next tw.
profileid_tw = msg["data"].split("_")
twid = profileid_tw[-1]
if self.last_closed_tw != twid:
self.last_closed_tw = twid
self.unblocker.update_requests()