-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlowimpactplus.py
More file actions
410 lines (356 loc) · 12.1 KB
/
lowimpactplus.py
File metadata and controls
410 lines (356 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
###############################################################
# Get RADIUS Authentication report From ISE for
# some length of time.
#
# Will parse this information to identify which devices are still
# hitting a Low Impact/Monitor mode policy so they
# can be remediated.
#
# You can customize your low impact policies you want to search for
# by modifying global list li_policy_list with the full names
# of your Low Impact policies.
#
# Takes raw output from the RADIUS Authentications reports
# including all authentications. No filters.
#
# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-==-=-=-=-=-
# Author: Garrett Munson
# 8/19/2019
###############################################################
import csv
import pandas as pd
import datetime
import os
src_report = 'LongReportAAA.csv'
src_dc_auth_count = 0 # Count of authenticated endpoints.
mab_ep = []
d1x_ep = []
csv_header = ''
output_csv = [] # Array that creates the output file
# li_policy_list is a list of your low impact policies you wish to search against
# Devices hitting these policies likely need to be remediated
# NOTE! 30 day raw reports require you escape a single apostrophe for while you search
# IE: '\'MONITOR_MODE-Main Office\''
# Shortened reports require no apostrophes
# Delete the following values and add your own
li_policy_list = ['\'Low_Impact_All_Sites\'',
'\'Default\'',
'\'Location_Specific_Low_Impact\'',
'\'MONITOR_MODE\'',
'\'MONITOR - By Location\'',
'Low_Impact_All_Sites',
'MONITOR_MODE',
'MONITOR - By Location',
'Default',
'Low_Impact_All_Sites',
'Location_Specific_Low_Impact'
]
def initialize():
global src_report
print('\n' * 2)
print('~' * 20 + 'Low Impact Report' + '~' * 20)
freport = filter_report(src_report)
m = get_low_impact(freport)
d = get_authenticated(freport)
l = create_auth_list(m)
a = create_auth_list(d)
compare_auths(l, a)
print_output_file(output_csv)
remove_file(m)
remove_file(d)
return
def filter_report(file):
"""
Filters out the necessary fields of the low impact report and exports it
to a Pandas dataframe
:param file: source file - 30 day radius authentication report .csv
:return: filtered_report - Pandas DataFrame
"""
try:
df = pd.read_csv(file)
# Pull the relevant fields
filtered_report = df[['\'CALLING_STATION_ID\'',
'\'LOCATION\'',
'\'LOGGED AT\'',
'\'POLICY_SET_NAME\'',
'\'ENDPOINTMATCHEDPROFILE\'',
'\'IDENTITY_GROUP\'',
'\'NAS_IP_ADDRESS\'',
'\'NETWORK_DEVICE_NAME\'',
'\'NAS_PORT_ID\'',
'\'USER_NAME\'',
'\'AUTHORIZATION_RULE\'']]
header = ',Calling Station ID,' \
'Location,Logged At,' \
'Policy Set,' \
'Endpoint Profile,' \
'Identity Group,'\
'NAS IP Address,'\
'Network Device Name,'\
'Port ID,'\
'User Name,'\
'Authorization Rule\n'
create_csv_header(header)
return filtered_report
except KeyError:
sr_filtered_report = filter_short_report(file)
return sr_filtered_report
#print('File headers incorrect\n')
quit()
return
except FileNotFoundError:
print('Source file not found: ' + src_report)
quit()
return
def filter_short_report(file):
"""
Filtered non-30 day reports export with different headers. This function will allow the application
to ingest and analyze these types of reports.
:param file: Source csv
:return: filtered_report Pandas Dataframe
"""
try:
df = pd.read_csv(file)
# Pull the relevant fields
filtered_report = df[['Endpoint ID',
'Location',
'Logged At',
'Endpoint Profile',
'Identity',
'Network Device IP',
'Network Device',
'Device Port',
'Identity',
'Authorization Rule']]
header = ',Calling Station ID,' \
'Location,Logged At,' \
'Endpoint Profile,' \
'Identity Group,'\
'NAS IP Address,'\
'Network Device Name,'\
'Port ID,'\
'User Name,'\
'Authorization Rule\n'
create_csv_header(header)
return filtered_report
except KeyError:
print('File headers incorrect\n'
'Please fix or obtain a new report')
quit()
return
except FileNotFoundError:
print('Source file not found: ' + src_report)
quit()
return
def get_low_impact(df):
"""
Update this Array with the Authorization Policies you want to check
It must be a full match! Add or remove as needed
:param df: pandas dataframe, returned from filtered_report()
:return: fn - Output filename.
"""
global li_policy_list
li = li_policy_list
try:
# Isolate the low impact authentications
print('Finding Low Impact Authorizations ...')
li_df = df.loc[df['\'AUTHORIZATION_RULE\''].isin(li)]
fn = get_date() + 'LowImpact.csv'
# deduplicate the dataframe
li_df = li_df.drop_duplicates(subset='\'CALLING_STATION_ID\'', keep='first')
# Create formatted and deduped csv file
li_df.to_csv(fn)
# Testing - print number of hits
print('Low Impact count should match kill count to indicate no unsuccessful authentications found.')
print(len(li_df.index))
return fn
except KeyError:
print('Key error: get_low_impact()')
fn_sr = get_low_impact_short_report(df)
return fn_sr
quit()
except FileNotFoundError:
print('get_low_impact(): File Not Found')
quit()
def get_low_impact_short_report(df):
"""
Short report. Uses short report headers.
:param df: pandas dataframe, returned from filtered_report()
:return: fn - Output filename.
"""
global li_policy_list
li = li_policy_list
try:
# Isolate the low impact authentications
print('Finding Low Impact Authorizations ...')
li_df = df.loc[df['Authorization Rule'].isin(li)]
fn = get_date() + 'LowImpact.csv'
# deduplicate the dataframe
li_df = li_df.drop_duplicates(subset='Endpoint ID', keep='first')
# Create formatted and deduped csv file
li_df.to_csv(fn)
# Testing - print number of hits
print('Low Impact count should match kill count to indicate no unsuccessful authentications found.')
print(len(li_df.index))
return fn
except KeyError:
print('Key error: get_low_impact_short_report()')
quit()
except FileNotFoundError:
print('get_low_impact(): File Not Found')
quit()
def get_authenticated(df):
"""
Get Authentications to be compared
:param df: Pandas Dataframe
:return: Filename
"""
global src_dc_auth_count
global li_policy_list
li = li_policy_list
try:
print('Finding Authenticated Devices ...')
a_df = df.loc[~df['\'AUTHORIZATION_RULE\''].isin(li)]
a_df = a_df.drop_duplicates(subset='\'CALLING_STATION_ID\'', keep='first')
# Set the count of authenticated endpoints
src_dc_auth_count = len(a_df.index)
fn = get_date() + 'authenticated.csv'
# print(a_df.loc[1])
a_df.to_csv(fn)
return fn
except KeyError:
fn = get_authenticated_short_report(df)
return fn
quit()
except FileNotFoundError:
print('Source file not found')
quit()
def get_authenticated_short_report(df):
"""
Uses Short Report Headers
:param df: Pandas Dataframe
:return: fn - Filename for csv of authentications
"""
global src_dc_auth_count
global li_policy_list
li = li_policy_list
try:
print('Finding Authenticated Devices ...')
a_df = df.loc[~df['Authorization Rule'].isin(li)]
a_df = a_df.drop_duplicates(subset='Endpoint ID', keep='first')
# Set the count of authenticated endpoints
src_dc_auth_count = len(a_df.index)
fn = get_date() + 'authenticated.csv'
# print(a_df.loc[1])
a_df.to_csv(fn)
return fn
except KeyError:
print('Authenticated df key error. Handle me')
quit()
except FileNotFoundError:
print('Source file not found')
quit()
def create_auth_list(src_file):
try:
with open(src_file) as sfile:
readfile = csv.reader(sfile, delimiter=',')
next(readfile)
endpoints = []
for row in readfile:
endpoints.append(row)
return endpoints
except:
print("Unknown Error - create_auth_list()")
quit()
def compare_auths(mab, d1x):
"""
Compares Low Impact authentications
:param mab: temp file that tracks low impact authentications
:param d1x: temp file that tracks endpoints that hit intended authorization policies
:return: Nothing
"""
count = 1
global output_csv
global src_dc_auth_count
killcount = 0
try:
for m in mab:
for d in d1x:
# upper constraint. m matched no d
# and the end of the file was reached, so append to list
# the count is reset to 1 and a new m is compared
if m[1] != d[1] and count >= src_dc_auth_count:
output_csv.append(m)
# print(m)
count = 1
continue
# we found a match. reset count to 1 to compare
# a new m and continue
elif m[1] == d[1] and count < src_dc_auth_count:
count = 1
killcount += 1
continue
# m does not match d, keep looking
elif m[1] != d[1] and count < src_dc_auth_count:
count += 1
# print(count)
count = 1
kc = str(killcount)
print('Kill count = ' + kc)
return
except FileNotFoundError:
print(mab + ' is not found')
quit()
def print_output_file(le):
"""
Creates the final Low Impact Report .csv file
:param le: List that contains all endpoints that need remediation.
:return: Nothing
"""
global csv_header
fname = get_date() + 'LowImpactReport.csv'
print('#' * 50)
print('Creating Output File: ' + fname + '...')
headers = csv_header
global output_csv
f = open(fname, 'w+')
f.write(headers)
f.close()
f =open(fname, 'a+')
for i in output_csv:
str_i = str(i)
str_i = str_i[1:-1]
str_i = str_i.replace('\'', '')
str_i = str_i.replace('\"', '')
# print(str_i)
f.write(str_i)
f.write('\n')
# print(i)
f.close()
print('\nCompleted Report!' + '\n' * 2)
return
def get_date():
now = datetime.datetime.now()
year = now.year
month = now.month
day = now.day
y = str(year)
m = str(month)
d = str(day)
d = y + '_' + m + '_' + d + '_'
return d
def create_csv_header(header):
global csv_header
csv_header = header
return
def remove_file(fname):
try:
os.remove(fname)
except Exception:
pass
if __name__ == '__main__':
try:
initialize()
except KeyboardInterrupt:
print('You have cancelled the operation')
print('\n' * 10)