Skip to content

Commit 24a40e5

Browse files
authored
Merge pull request #1863 from stratosphereips/alya/immune/DoS_protection
DoS protection
2 parents 6508a2b + 4fd59ef commit 24a40e5

9 files changed

Lines changed: 371 additions & 45 deletions

File tree

slips_files/common/abstracts/icore.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from slips_files.common.abstracts.imodule import IModule
88

99
# in seconds
10-
FIVE_MINS = 300
10+
ONE_MIN = 60
1111

1212

1313
class ICore(IModule, Process):
@@ -32,21 +32,22 @@ def __init__(self, *args, **kwargs):
3232
# used to keep track of the FPs of this module
3333
now = time.monotonic()
3434
self.last_fps_check_time = now
35-
# 300s = 5 mins
36-
self.next_fps_check_time = now + FIVE_MINS
35+
self.next_fps_check_time = now + ONE_MIN
3736

3837
def pre_main(self): ...
3938

4039
def store_flows_read_per_second(self):
4140
"""
42-
updates the db about the flows read per second
41+
updates the db about the flows read per second.
42+
this func estimates the flows processed per second by observing flows
43+
over 1 min span.
4344
"""
4445
if not hasattr(self, "next_fps_check_time"):
4546
# Defensive init for cases where ICore.__init__ wasn't invoked.
4647
now = time.monotonic()
4748
self.last_flows_count = getattr(self, "last_flows_count", 0)
4849
self.last_fps_check_time = now
49-
self.next_fps_check_time = now + FIVE_MINS
50+
self.next_fps_check_time = now + ONE_MIN
5051

5152
now = time.monotonic()
5253
if now < self.next_fps_check_time:
@@ -59,11 +60,10 @@ def store_flows_read_per_second(self):
5960
time_delta = now - self.last_fps_check_time
6061

6162
flows_per_sec = int(flows_delta / time_delta)
62-
63-
self.db.store_module_flows_per_second(self.name, flows_per_sec)
63+
self.db.store_core_module_flows_per_second(self.name, flows_per_sec)
6464

6565
self.last_fps_check_time = now
66-
self.next_fps_check_time = now + FIVE_MINS
66+
self.next_fps_check_time = now + ONE_MIN
6767
self.last_flows_count = flows_now
6868

6969
def run(self):

slips_files/core/database/database_manager.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,11 @@ def get_output_dir(self, *args, **kwargs):
300300
def get_input_file(self, *args, **kwargs):
301301
return self.rdb.get_input_file(*args, **kwargs)
302302

303-
def store_module_flows_per_second(self, *args, **kwargs):
304-
return self.rdb.store_module_flows_per_second(*args, **kwargs)
303+
def store_core_module_flows_per_second(self, *args, **kwargs):
304+
return self.rdb.store_core_module_flows_per_second(*args, **kwargs)
305305

306-
def get_module_flows_per_second(self, *args, **kwargs):
307-
return self.rdb.get_module_flows_per_second(*args, **kwargs)
306+
def get_core_module_flows_per_second(self, *args, **kwargs):
307+
return self.rdb.get_core_module_flows_per_second(*args, **kwargs)
308308

309309
def record_flow_per_minute(self, module: str, now: Optional[float] = None):
310310
if not self.conf.generate_performance_plots():

slips_files/core/database/redis_db/constants.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,12 @@ class Constants:
7070
KNOWN_FP_MD5_HASHES = "known_fps"
7171
WILL_SLIPS_HAVE_MORE_FLOWS = "will_slips_have_more_flows"
7272
SUBS_WHO_PROCESSED_MSG = "number_of_subscribers_who_processed_this_msg"
73-
FLOWS_ANALYZED_BY_ALL_MODULES_PER_MIN = "flows_analyzed_per_minute"
74-
MODULES_FLOWS_PER_SECOND = "modules_processed_flows_per_second"
73+
FLOWS_ANALYZED_BY_ALL_MODULES_PER_MIN = (
74+
"flows_analyzed_by_all_modules_per_minute"
75+
)
76+
CORE_MODULE_NUMBER_OF_PROCESSED_FLOWS_PER_SECOND = (
77+
"core_module_number_of_processed_flows_per_second"
78+
)
7579
LINE_PROCESSORS = "line_processors"
7680
ALERTS = "alerts"
7781
MAX_THREAT_LEVEL = "max_threat_level"

slips_files/core/database/redis_db/database.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,11 +1494,18 @@ def get_pid_of(self, module_name: str):
14941494
pid = self.r.hget(self.constants.PIDS, module_name)
14951495
return int(pid) if pid else None
14961496

1497-
def store_module_flows_per_second(self, module, fps):
1498-
self.r.hset(self.constants.MODULES_FLOWS_PER_SECOND, module, fps)
1497+
def store_core_module_flows_per_second(self, module, fps):
1498+
self.r.hset(
1499+
self.constants.CORE_MODULE_NUMBER_OF_PROCESSED_FLOWS_PER_SECOND,
1500+
module,
1501+
fps,
1502+
)
14991503

1500-
def get_module_flows_per_second(self, module):
1501-
return self.r.hget(self.constants.MODULES_FLOWS_PER_SECOND, module)
1504+
def get_core_module_flows_per_second(self, module):
1505+
return self.r.hget(
1506+
self.constants.CORE_MODULE_NUMBER_OF_PROCESSED_FLOWS_PER_SECOND,
1507+
module,
1508+
)
15021509

15031510
def increment_flows_per_minute(self, module: str, minute_ts: int) -> int:
15041511
key = f"{self.constants.FLOWS_PER_MINUTE}:{module}"
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import time
2+
3+
from slips_files.common.slips_utils import utils
4+
from slips_files.common.style import green
5+
6+
7+
class DoSProtector:
8+
def __init__(self, input):
9+
self.input = input
10+
self.db = self.input.db
11+
self.is_running_non_stop: bool = self.db.is_running_non_stop()
12+
# is slips (input.py) is given > this number of flow per min,
13+
# this protector runs
14+
self.flows_per_min_threshold = 2000
15+
self.flow_sampling_stop_time = 0
16+
# number of seconds slips is going to be skipping flows for before
17+
# returning to normal (aka before going back to reading all flows)
18+
self.sampling_time_window = 60
19+
self._is_now_sampling = False
20+
21+
def _get_input_flows_per_min(self) -> int:
22+
input_flows_per_s = (
23+
self.db.get_core_module_flows_per_second("Input") or 0
24+
)
25+
input_flows_per_min = int(input_flows_per_s) * 60
26+
return input_flows_per_min
27+
28+
def _get_sampling_ratio(self) -> int:
29+
"""
30+
sr = flow_per_min² / 20000
31+
this sr is the number of flows we're gonna skip to protect slips
32+
from DoS (or high traffic in general)
33+
"""
34+
input_flows_per_min = self._get_input_flows_per_min()
35+
if not input_flows_per_min:
36+
return 1
37+
38+
return input_flows_per_min**2 / 20000
39+
40+
def _should_run(self) -> bool:
41+
"""
42+
Returns true if slips is under high traffic and the DoS protector
43+
should run.
44+
Runs only when analysing an interface or a growing zeek dir.
45+
return True if:
46+
1. if high traffic is detected
47+
2. we're in the 1 min window after slips has detected a high
48+
traffic. this is the 1 min of skipping flows before rechecking if
49+
the read number of flows has decreased.
50+
"""
51+
if not self.is_running_non_stop:
52+
return False
53+
54+
if time.time() < self.flow_sampling_stop_time:
55+
# we should still be sampling.
56+
return True
57+
58+
input_flows_per_min = self._get_input_flows_per_min()
59+
should_skip_flows = input_flows_per_min > self.flows_per_min_threshold
60+
61+
if self._is_now_sampling and input_flows_per_min == 0:
62+
# this means we justtt stopped sampling, now we want slips to
63+
# keep thinking thta it's in a sampling state until we get a
64+
# input_flows_per_min = something, once we have a number we can
65+
# decide whether to stop sampling or not, but until then we want to keep the sampling state
66+
pass
67+
elif (
68+
not should_skip_flows
69+
and self._is_now_sampling
70+
and input_flows_per_min
71+
):
72+
# slips was sampling and now stopped officially stopped,
73+
# we have a input_flows_per_min that's less than the threshold.
74+
self._is_now_sampling = False
75+
self.input.print(
76+
f"Throughput is back to normal. Input "
77+
f"flows/min = {green(input_flows_per_min)}. "
78+
f"Slips stopped skipping flows."
79+
)
80+
81+
return should_skip_flows
82+
83+
def _update_flow_sampling_stop_time_if_needed(self) -> bool:
84+
"""
85+
sets the next stop time to
86+
now + sampling_time_window
87+
if the time now exceeded the last registered flow_sampling_stop_time
88+
"""
89+
if time.time() > self.flow_sampling_stop_time:
90+
# flow sampling is going to take place for the next 1 min
91+
self.flow_sampling_stop_time = (
92+
time.time() + self.sampling_time_window
93+
)
94+
return True
95+
return False
96+
97+
def get_number_of_flows_to_skip_and_time_to_stop_sampling(self) -> int:
98+
if not self._should_run():
99+
return 0
100+
101+
sampling_time_updated = (
102+
self._update_flow_sampling_stop_time_if_needed()
103+
)
104+
105+
# -1 means read 1 flow every sampling_ratio flows.
106+
# at 2000 flows/min → sr = 200, read 1 flow every 200 flows
107+
# at 3000 flows/min → sr = 450, read 1 flow every 450 flows
108+
# at 4000 flows/min → sr = 800, etc.
109+
sampling_ratio = int(self._get_sampling_ratio() - 1)
110+
self.print_skipping_flows_warning(
111+
sampling_ratio, sampling_time_updated
112+
)
113+
114+
return sampling_ratio
115+
116+
def print_skipping_flows_warning(
117+
self, sampling_ratio: int, sampling_time_updated: bool
118+
):
119+
"""Prints a warning every time slips decides to start sampling
120+
again"""
121+
if sampling_time_updated and sampling_ratio:
122+
sr = green(f"1/{sampling_ratio}")
123+
human_readable_time_to_stop_sampling = utils.convert_ts_format(
124+
self.flow_sampling_stop_time, utils.alerts_format
125+
)
126+
green_time_to_stop_sampling = green(
127+
human_readable_time_to_stop_sampling
128+
)
129+
if self._is_now_sampling:
130+
# slips decided to extend the sampling period
131+
self.input.print(
132+
f"Slips is still under high "
133+
f"traffic. The time to stop sampling has been extended to "
134+
f"{green_time_to_stop_sampling} "
135+
)
136+
else:
137+
# reaching here means slips decided again to start sampling flows
138+
self.input.print(
139+
f"Slips started skipping flows due to high "
140+
f"traffic for DoS protection. "
141+
f"Sampling ratio: {sr} flows. "
142+
f"Time to stop sampling: {green_time_to_stop_sampling} "
143+
)
144+
self._is_now_sampling = True

slips_files/core/input/zeek/utils/zeek_input_utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from re import split
1313

1414
from slips_files.common.slips_utils import utils
15+
from slips_files.core.input.zeek.utils.dos_protector import DoSProtector
1516
from slips_files.core.zeek_cmd_builder import ZeekCommandBuilder
1617

1718

@@ -27,6 +28,7 @@ def __init__(self, input_process):
2728
self.zeek_files = {}
2829
self.zeek_threads = []
2930
self.zeek_pids = []
31+
self.dos_protector = DoSProtector(self.input)
3032

3133
def check_if_time_to_del_rotated_files(self):
3234
"""
@@ -127,7 +129,16 @@ def cache_nxt_line_in_file(self, filename: str, interface: str):
127129

128130
# We don't have any waiting line for this file, so proceed
129131
try:
132+
flows_to_skip_reading_if_under_heavy_load: int = (
133+
self.dos_protector.get_number_of_flows_to_skip_and_time_to_stop_sampling()
134+
)
135+
136+
# skips flows
137+
for _ in range(flows_to_skip_reading_if_under_heavy_load):
138+
file_handle.readline()
139+
130140
zeek_line = file_handle.readline()
141+
131142
except ValueError:
132143
# remover thread just finished closing all old handles.
133144
# comes here if I/O operation failed due to a closed file.
@@ -232,6 +243,10 @@ def read_zeek_files(self) -> int:
232243

233244
# Go to all the files generated by Zeek and read 1
234245
# line from each of them
246+
247+
# PS: self.zeek_files ties each zeek file to its interface (
248+
# beacause slips supports reading multiple interfaces)
249+
235250
for filename, interface in self.zeek_files.items():
236251
if utils.is_ignored_zeek_log_file(filename):
237252
continue

slips_files/core/profiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ def _check_if_high_throughput_and_add_workers(self):
330330
if not self.did_5min_pass_since_last_throughput_check():
331331
return
332332

333-
profiler_fps = self.db.get_module_flows_per_second(self.name) or 0
334-
input_fps = self.db.get_module_flows_per_second("Input") or 0
333+
profiler_fps = self.db.get_core_module_flows_per_second(self.name) or 0
334+
input_fps = self.db.get_core_module_flows_per_second("Input") or 0
335335
if float(input_fps) > (
336336
float(profiler_fps) * 1.1
337337
): # 10% more input fps than profiler fps

0 commit comments

Comments
 (0)