-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathDDoS_detector.py
More file actions
143 lines (85 loc) · 3.93 KB
/
DDoS_detector.py
File metadata and controls
143 lines (85 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
## James Quintero
## https://github.com/JamesQuintero
## Created: 5/2019
## Modified: 4/2021
##
## Handles all the data required for the program
import sys
import os
from data_handler import DataHandler
from ANN import ANN
class DDoSDetector:
#DataHandler class object
data_handler = None
#ANN class object
neural_network = None
def __init__(self):
self.data_handler = DataHandler()
self.neural_network = ANN()
def train(self, dataset_index, pcap_index=None):
print("Dataset: "+str(self.data_handler.get_dataset_path(dataset_index)))
if pcap_index!=None:
print("PCAP: "+str(self.data_handler.get_pcap_path(dataset_index, pcap_index)))
packets = []
labels = []
packets = self.data_handler.get_packet_information(dataset_index, pcap_index)
labels = self.data_handler.get_labels(dataset_index, pcap_index)
#turns each packet data from dictionaries into a flat 1d list.
compressed_packets = self.data_handler.compress_packets(packets)
#takes compressed packet data and returns input variables values for neural network
input_data = self.data_handler.generate_input_data(compressed_packets)
#takes input variables and labels, and normalizes them
normalized_input, normalized_output = self.data_handler.normalize_compressed_packets(input_data, labels, dataset_index)
print("Num packets: "+str(len(normalized_input)))
print("Num labels: "+str(len(normalized_output)))
print("These should match")
num_true_labels = sum([ label for label in normalized_output ])
print("Num true labels: {}".format(num_true_labels))
print("Num false labels: {}".format(len(normalized_output) - num_true_labels))
#feeds input data and output data into the neural network
self.neural_network.train_model(normalized_input, normalized_output, dataset_index)
#dataset_index can specify a dataset to predict on, or if None,
# will represent predicting on live packets from "./Live sniffing"
def predict(self, dataset_index=None, pcap_index=None):
if dataset_index == None:
print("Dataset unspecified when calling predict()")
return
#if predicting from a dataset
if dataset_index!=None:
packets = self.data_handler.get_packet_information(dataset_index, pcap_index)
labels = self.data_handler.get_labels(dataset_index, pcap_index)
#turns each packet data from dictionaries into a flat 1d list.
compressed_packets = self.data_handler.compress_packets(packets)
input_data = self.data_handler.generate_input_data(compressed_packets)
normalized_input, normalized_output = self.data_handler.normalize_compressed_packets(input_data, labels, dataset_index)
print("Num packets: "+str(len(normalized_input)))
print("Num labels: "+str(len(normalized_output)))
#feeds input data and output data into the neural network
predicted_labels = self.neural_network.predict(normalized_input)
# self.data_handler.save_prediction(dataset_index, pcap_index)
#predicting live pcap files
def predict_live(self, dataset_index=None):
if dataset_index == None:
print("Dataset unspecified when calling predict()")
return
latest_pcap_path = self.data_handler.get_latest_live_pcap()
if latest_pcap_path=="":
print("There is no pcap file to predict from")
return
print("Latest pcap path: "+str(latest_pcap_path))
#returns normalized input data from the specified pcap path
normalized_input = self.data_handler.get_live_input_data(latest_pcap_path)
print("Num packets: "+str(len(normalized_input)))
latest_packet = [normalized_input[-1]]
#feeds input data and output data into the neural network
predicted_label = self.neural_network.predict(dataset_index, latest_packet)
if len(predicted_label) > 0:
predicted_label = predicted_label[-1][0]
print("Predicted label: "+str(predicted_label))
else:
print("No predictions for live data")
print()
if __name__=="__main__":
DDoS_detector = DDoSDetector()
DDoS_detector.train(dataset_index=1, pcap_index=None)
# DDoS_detector.predict()