-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathflowmeter.py
More file actions
256 lines (215 loc) · 13.2 KB
/
flowmeter.py
File metadata and controls
256 lines (215 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
'''
This code implements the flow meter to determine flow statistics for the IDS to extract features
Author: Noor Muhammad Malik
Date: April 15, 2018
License: None
'''
# the structures for flow detection
import networking
# the functions for raw socket data processing
import sniffer
# other standard imports
import socket
import os
import time
class FlowMeter():
# True, True -> exists, and fwd flow
# True, False -> exists, and bwd flow
# False, False -> doesn't exist
def flow_exists(self, flow, buffer):
# search for flow in the buffer, using the same id and the reversed id
if flow.get_flow_id() in buffer:
return True, True
elif networking.Flow.make_reverse_flow(flow) in buffer:
return True, False
else:
return False, False
def __init__(self, scan_event, gui_queue, gui_event, log_queue, run_log_event, engine_dnn_queue, dnn_ready_event):
# clear screen to begin
os.system("clear")
# queue for thread synchronization
self.gui_queue = gui_queue
# queue to sync with log_getter thread
self.log_queue = log_queue
# event object for signalling start and stop of scan
self.scan_event = scan_event
# event object for signalling the GUI to refresh itself
self.gui_event = gui_event
# event object for signalling log buffer to append given data
self.run_log_event = run_log_event
# queue to sync engine with the DNN
self.engine_dnn_queue = engine_dnn_queue
# event object to know when the DNN is ready
self.dnn_ready_event = dnn_ready_event
# create sniffer socket and set it to recieve packets
self.sniffer_socket = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
# dict() objects to store header information
self.ip_header = dict()
self.tcp_header = dict()
self.udp_header = dict()
# buffer to store flows for faster in-memory processing
self.flow_buffer = dict()
# print the title head
# print("{0:<46s}{1:<12s} {2:<12s} {3:<12s} {4:<12s} {5:<12s}".format(
# "Flow ID",
# "Packet Count",
# "Time stamp",
# "Fwd Count",
# "Bwd Count",
# "Flow Count"))
# create a flow object to store the flow data of the first packet
# current_flow is just a placeholder, which will be updated on each iteration to point to a different
# Flow() object which will then be stored in the buffer
self.current_flow = networking.Flow()
# start the timer to determine the start of capture, for timestamp feature of software
self.start_time = time.time()
# count number of unique flows currently in the buffer
self.flow_count = 0
# keep a track of bwd-id and fwd-id to save function calls for faster processing
self.bwd_id = ""
self.fwd_id = ""
# run the main flow_meter
def run_flow_meter(self):
# wait to let the DNN get loaded
self.dnn_ready_event.wait()
# wait until the event is signalled by the main thread
while True:
print("[DEBUG-sniffer] waiting for event")
self.scan_event.wait()
print("[DEBUG-sniffer] event happened!")
# while the scan event is set keep scanning
# if the event is cleared, the scanning loop ends and the outer loops makes the
# thread wait again, for the event to signal again
while self.scan_event.is_set():
self.flow_count = len(self.flow_buffer)
# this is a blocking call, and therefore it might delay
# scan stop unless a packet is sniffed on the socket
# the block is released, the stop scan command will take place
# since max PDU size for ethernet is 1522 bytes, 2048 is a safe value to read from socket
self.recv_data = self.sniffer_socket.recv(2048)
# process the ethernet header and extract ethernet payload
self.eth_payload, self.is_ip = sniffer.check_eth_data(self.recv_data)
if self.is_ip:
# process the ip header and extract the payload
self.ip_payload, self.ip_header = sniffer.check_ip_data(self.eth_payload, self.ip_header)
# update the flow object attributes
self.current_flow.src_ip = self.ip_header['src_ip']
self.current_flow.dest_ip = self.ip_header['dest_ip']
self.current_flow.protocol = self.ip_header['protocol']
# ignore localhost traffic, it's unimportant for an IDS
try:
if self.is_ip and self.ip_header['src_ip'] == "127.0.0.1" or self.ip_header['dest_ip'] == "127.0.0.1":
continue
except KeyError:
pass
# process the TCP header and extract the TCP payload
if self.is_ip and self.ip_header["protocol"] == 0x6:
self.tcp_payload, self.tcp_header = sniffer.check_tcp_data(self.ip_payload, self.tcp_header)
# update the flow object attributes
self.current_flow.src_port = self.tcp_header['src_port']
self.current_flow.dest_port = self.tcp_header['dest_port']
# process the UDP header and extract the UDP payload
elif self.is_ip and self.ip_header["protocol"] == 0x11:
self.udp_payload, self.udp_header = sniffer.check_udp_data(self.ip_payload, self.udp_header)
# update the flow object attributes
self.current_flow.src_port = self.udp_header['src_port']
self.current_flow.dest_port = self.udp_header['dest_port']
else:
# not really concerned with any other type of data, so just continue to next iteration
# and save processing time!
continue
self.exist_tuple = self.flow_exists(self.current_flow, self.flow_buffer)
self.fwd_id = self.current_flow.get_flow_id()
# if a flow doesn't exist, create it, and store it
if not self.exist_tuple[0]:
# determine the time_stamp to determine flow duration
self.current_flow.start_time = time.time()
# print("New Flow Detected: {}".format(current_flow.get_flow_id()))
# add the flow object to the dict() buffer
self.flow_buffer[self.fwd_id] = self.current_flow
# increase the packet count
self.current_flow.packet_count += 1
# first packet of flow will always be in the fwd direction
self.current_flow.fwd_packet_count += 1
# set the init_win_bytes_forward feature value
if self.is_ip and self.ip_header['protocol'] == 0x6:
self.flow_buffer[self.fwd_id].init_win_bytes_forward = self.tcp_header['win_size']
# print("{0:<46s}{1:<12d} {2:<12f} {3:<12d} {4:<12d} {5:<12d}".format(
# self.fwd_id,
# self.current_flow.packet_count,
# time.time() - self.start_time,
# self.flow_buffer[self.fwd_id].fwd_packet_count,
# self.flow_buffer[self.fwd_id].bwd_packet_count,
# self.flow_count ))
# if a flow doesn't exist, create a new flow object to handle the next packet
self.current_flow = networking.Flow()
else:
# flow already exists in the buffer
# if it's a fwd packet
if self.exist_tuple[1] and self.exist_tuple[0]:
# if the flow-id is fwd-id, increment fwd_packet_count
self.flow_buffer[self.fwd_id].fwd_packet_count += 1
# increment the total packet count
self.flow_buffer[self.fwd_id].packet_count += 1
# increment the psh_flag_count if applicable
if self.is_ip and self.ip_header['protocol'] == 0x6 and self.tcp_header['psh_flag'] == 1:
self.flow_buffer[self.fwd_id].psh_flag_count += 1
# print("{0:<46s}{1:<12d} {2:<12f} {3:<12d} {4:<12d} {5:<12d}".format(
# self.fwd_id,
# self.flow_buffer[self.fwd_id].packet_count,
# time.time() - self.start_time,
# self.flow_buffer[self.fwd_id].fwd_packet_count,
# self.flow_buffer[self.fwd_id].bwd_packet_count,
# self.flow_count ))
# use else for faster processing
else:
# only compute the bwd-id in case of bwd packet for faster processing
self.bwd_id = networking.Flow.make_reverse_flow(self.current_flow)
# increment the psh_flag_count if applicable
if self.is_ip and self.ip_header['protocol'] == 0x6 and self.tcp_header['psh_flag'] == 1:
self.flow_buffer[self.bwd_id].psh_flag_count += 1
# teardown is checked only in case of bwd_packet, since bwd_packet is the final FIN packet
# of a tcp 4-way teardown!
# or, a flow might end because of an rst-flag
# look for tcp first, since udp packets will give key-error for fin_flag!
if self.ip_header['protocol'] == 0x6 and (self.tcp_header['fin_flag'] or self.tcp_header['rst_flag']) == 1:
self.flow_buffer[self.bwd_id].duration = int((time.time() - self.flow_buffer[self.bwd_id].start_time) * 1000000)
self.flow_buffer[self.bwd_id].bwd_packets_per_second = (self.flow_buffer[self.bwd_id].bwd_packet_count / self.flow_buffer[self.bwd_id].duration) * 1000000
# put the flows that are finished into the queue and send to GUI
# as well as to send to the neural network for predictions
if self.scan_event.is_set():
self.gui_queue.put((
self.current_flow.get_flow_id(),
self.flow_buffer[self.bwd_id].duration,
self.flow_buffer[self.bwd_id].bwd_packets_per_second,
self.flow_buffer[self.bwd_id].psh_flag_count,
self.flow_buffer[self.bwd_id].init_win_bytes_forward))
self.gui_event.set()
self.log_queue.put(self.current_flow.get_flow_id())
self.run_log_event.set()
# send a dictionary of feature vector to the DNN with corresponding attacks
self.engine_dnn_queue.put({"portscan":[
self.current_flow.get_flow_id(),
self.flow_buffer[self.bwd_id].bwd_packets_per_second,
self.flow_buffer[self.bwd_id].psh_flag_count,
self.flow_buffer[self.bwd_id].init_win_bytes_forward],
"other" : [self.flow_buffer[self.bwd_id].duration]
})
# print("Flow {0} ended with duration {1:d}!".format(self.bwd_id, self.flow_buffer[self.bwd_id].duration))
# print("Flow {} had bwd_packets_per_second {}".format(self.bwd_id, self.flow_buffer[self.bwd_id].bwd_packets_per_second))
# print("Flow {} had init_win_bytes_fwd {}".format(self.bwd_id, self.flow_buffer[self.bwd_id].init_win_bytes_fwd))
# print("Flow {} had psh_flag_count {}".format(self.bwd_id, self.flow_buffer[self.bwd_id].psh_flag_count))
self.flow_buffer[self.bwd_id].bwd_packet_count += 1
self.flow_buffer[self.bwd_id].packet_count += 1
# print("{0:<46s}{1:<12d} {2:<12f} {3:<12d} {4:<12d} {5:<12d}".format(
# self.bwd_id,
# self.flow_buffer[self.bwd_id].packet_count,
# time.time() - self.start_time,
# self.flow_buffer[self.bwd_id].fwd_packet_count,
# self.flow_buffer[self.bwd_id].bwd_packet_count,
# self.flow_count ))
# if self.scan_event.is_set():
# # put stuff on the queue
# self.q.put(self.current_flow.get_flow_id())
# # signal the gui to refresh with the given data
# self.gui_event.set()