Skip to content
This repository was archived by the owner on May 4, 2026. It is now read-only.

Commit 851619a

Browse files
authored
Merge pull request #34 from CompassSecurity/dev
Fixes after internal Pentest
2 parents 23b39b2 + 6e39b72 commit 851619a

9 files changed

Lines changed: 294 additions & 270 deletions

File tree

blueprints/assessment_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def assessmenthexagons(id):
197197
cumulatedscore += testcase.testcasescore
198198
count += 1
199199

200-
if count is not 0:
200+
if count != 0:
201201
score = cumulatedscore / count
202202

203203
if score == 100:

blueprints/testcase.py

Lines changed: 129 additions & 203 deletions
Large diffs are not rendered by default.

blueprints/testcase_utils.py

Lines changed: 84 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import os
22
import shutil
33
from model import *
4-
from utils import user_assigned_assessment
4+
from utils import *
55
from werkzeug.utils import secure_filename
6-
from flask import Blueprint, request, send_from_directory, jsonify
6+
from flask import Blueprint, request, send_from_directory, jsonify, make_response
77
from flask_security import auth_required, roles_accepted, current_user
88

99
blueprint_testcase_utils = Blueprint('blueprint_testcase_utils', __name__)
@@ -12,88 +12,117 @@
1212
@auth_required()
1313
@roles_accepted('Admin', 'Red')
1414
@user_assigned_assessment
15-
def testcasevisibility(id):
16-
newcase = TestCase.objects(id=id).first()
17-
newcase.visible = not newcase.visible
18-
newcase.save()
19-
20-
return jsonify(newcase.to_json()), 200
15+
def toggle_visibility_flag(id):
16+
testcase = get_testcase_by_id(id)
17+
if not testcase:
18+
return "Test case not found", 404
19+
20+
testcase.visible = not testcase.visible
21+
testcase.save()
22+
return jsonify(testcase.to_json()), 200
23+
2124

2225
@blueprint_testcase_utils.route('/testcase/<id>/clone', methods = ['POST'])
2326
@auth_required()
2427
@roles_accepted('Admin', 'Red')
2528
@user_assigned_assessment
26-
def testcaseclone(id):
27-
orig = TestCase.objects(id=id).first()
28-
newcase = TestCase()
29-
copy = ["name", "assessmentid", "objective", "requirements", "actions", "rednotes", "mitreid", "tactic", "tools", "tags", "expectedprevention", "expectedalertcreation", "expectedincidentcreation", "expectedseverity" , "priorityurgency"]
30-
for field in copy:
31-
newcase[field] = orig[field]
32-
newcase.name = orig["name"] + " (Copy)"
33-
newcase.save()
29+
def clone_testcase(id):
30+
orig = get_testcase_by_id(id)
31+
if not orig:
32+
return "Test case not found", 404
3433

34+
fields_to_copy = [
35+
"name", "assessmentid", "objective", "requirements", "actions", "rednotes",
36+
"mitreid", "tactic", "tools", "tags", "expectedprevention",
37+
"expectedalertcreation", "expectedincidentcreation", "expectedseverity", "priorityurgency"
38+
]
39+
newcase = TestCase(**{field: orig[field] for field in fields_to_copy})
40+
newcase.name = f"{orig['name']} (Copy)"
41+
newcase.save()
3542
return jsonify(newcase.to_json()), 200
3643

44+
3745
@blueprint_testcase_utils.route('/testcase/<id>/toggle-delete', methods = ['POST'])
3846
@auth_required()
3947
@roles_accepted('Admin', 'Red')
4048
@user_assigned_assessment
41-
def testcasedeleted(id):
42-
newcase = TestCase.objects(id=id).first()
43-
newcase.deleted = not newcase.deleted
44-
newcase.save()
45-
49+
def toggle_delete_flag(id):
50+
testcase = get_testcase_by_id(id)
51+
if not testcase:
52+
return "Test case not found", 404
53+
54+
testcase.deleted = not testcase.deleted
55+
testcase.save()
4656
return "", 200
4757

58+
4859
@blueprint_testcase_utils.route('/testcase/<id>/delete', methods = ['POST'])
4960
@auth_required()
5061
@roles_accepted('Admin')
5162
@user_assigned_assessment
52-
def testcasedelete(id):
53-
testcase = TestCase.objects(id=id).first()
63+
def delete_testcase(id):
64+
testcase = get_testcase_by_id(id)
65+
if not testcase:
66+
return "Test case not found", 404
67+
5468
assessment = Assessment.objects(id=testcase.assessmentid).first()
5569
if os.path.exists(f"files/{str(assessment.id)}/{str(testcase.id)}"):
5670
shutil.rmtree(f"files/{str(assessment.id)}/{str(testcase.id)}")
5771
testcase.delete()
5872

59-
return "", 200
73+
return "Test case deleted", 200
74+
6075

6176
@blueprint_testcase_utils.route('/testcase/<id>/evidence/<colour>/<file>', methods = ['DELETE'])
6277
@auth_required()
6378
@roles_accepted('Admin', 'Red', 'Blue')
6479
@user_assigned_assessment
65-
def deletefile(id, colour, file):
66-
if colour not in ["red", "blue"]:
67-
return 401
80+
def delete_file(id, colour, file):
81+
VALID_COLOURS = {'red', 'blue'}
82+
83+
if colour not in VALID_COLOURS:
84+
return "Invalid colour", 400
6885
if colour == "red" and current_user.has_role("Blue"):
69-
return 403
86+
return "Invalid colour", 400
7087

71-
testcase = TestCase.objects(id=id).first()
72-
# Sanity check to prevent death if the image has already been removed
73-
path = f"files/{testcase.assessmentid}/{testcase.id}/{secure_filename(file)}"
74-
if os.path.isfile(path):
75-
os.remove(path)
76-
77-
files = []
78-
for f in testcase["redfiles" if colour == "red" else "bluefiles"]:
79-
if f.name != file:
80-
files.append(f)
81-
82-
if colour == "red":
83-
testcase.update(set__redfiles=files)
84-
else:
85-
testcase.update(set__bluefiles=files)
86-
87-
return '', 204
88-
89-
@blueprint_testcase_utils.route('/testcase/<id>/evidence/<file>', methods = ['GET'])
88+
testcase = get_testcase_by_id(id)
89+
if not testcase:
90+
return "Test case not found", 404
91+
92+
filepath = f"files/{testcase.assessmentid}/{testcase.id}/{secure_filename(file)}"
93+
if os.path.isfile(filepath):
94+
os.remove(filepath)
95+
96+
filelist = testcase.redfiles if colour == "red" else testcase.bluefiles
97+
updated_files = [f for f in filelist if f.name != file]
98+
99+
update_field = 'redfiles' if colour == "red" else 'bluefiles'
100+
testcase.update(**{f"set__{update_field}": updated_files})
101+
102+
return "", 204
103+
104+
105+
@blueprint_testcase_utils.route('/testcase/<id>/evidence/<file>', methods=['GET'])
90106
@auth_required()
91107
@user_assigned_assessment
92-
def fetchFile(id, file):
93-
testcase = TestCase.objects(id=id).first()
94-
95-
return send_from_directory(
96-
'files',
97-
f"{testcase.assessmentid}/{str(testcase.id)}/{secure_filename(file)}",
98-
as_attachment = True if "download" in request.args else False
99-
)
108+
def fetch_file(id, file):
109+
ALLOWED_INLINE_EXTENSIONS = {'.png', '.jpg', '.jpeg'}
110+
111+
testcase = get_testcase_by_id(id)
112+
if not testcase:
113+
return "Test case not found", 404
114+
115+
filename = secure_filename(file)
116+
folder_path = os.path.join('files', str(testcase.assessmentid), str(testcase.id))
117+
file_path = os.path.join(folder_path, filename)
118+
119+
if not os.path.isfile(file_path):
120+
return "File not found", 404
121+
122+
_, ext = os.path.splitext(filename)
123+
ext = ext.lower()
124+
as_attachment = not (ext in ALLOWED_INLINE_EXTENSIONS and "download" not in request.args)
125+
126+
response = make_response(send_from_directory(folder_path, filename, as_attachment=as_attachment))
127+
response.headers["X-Content-Type-Options"] = "nosniff"
128+
return response

flask.cfg

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ SECURITY_TWO_FACTOR_SETUP_TEMPLATE = "mfa_register.html"
2525
SECURITY_TWO_FACTOR_VERIFY_CODE_TEMPLATE = "mfa_verify.html"
2626
SECURITY_TWO_FACTOR_ALWAYS_VALIDATE = False
2727
SECURITY_TWO_FACTOR_LOGIN_VALIDITY = "1 weeks"
28+
SECURITY_TWO_FACTOR_RESCUE_EMAIL = False
29+
2830
SECURITY_TOTP_ISSUER = f"PurpleOps - {getenv('NAME')}"
2931
SECURITY_TOTP_SECRETS = {"1": getenv("FLASK_SECURITY_TOTP_SECRETS")}
3032
SECURITY_CHANGEABLE = True
@@ -36,7 +38,6 @@ SECURITY_SEND_PASSWORD_CHANGE_EMAIL = False
3638
SECURITY_SEND_PASSWORD_RESET_EMAIL = False
3739
SECURITY_SEND_PASSWORD_RESET_NOTICE_EMAIL = False
3840
SECURITY_PASSWORD_LENGTH_MIN = 12
39-
SECURITY_TWO_FACTOR_RESCUE_EMAIL = False
4041
SECURITY_EMAIL_VALIDATOR_ARGS = {"check_deliverability": False}
4142

4243
SECURITY_MSG_INVALID_PASSWORD = ('Invalid username or password.', 'error')
@@ -49,4 +50,7 @@ REMEMBER_COOKIE_SECURE = True
4950
REMEMBER_COOKIE_SAMESITE = "Strict"
5051

5152
WTF_CSRF_TIME_LIMIT = None
52-
WTF_CSRF_SSL_STRICT = False
53+
WTF_CSRF_SSL_STRICT = False
54+
55+
SESSION_TYPE = 'Mongodb'
56+
SESSION_PERMANENT = False

purpleops.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import os
22
from model import *
33
from dotenv import load_dotenv
4-
from flask import Flask, render_template, redirect, request
4+
from flask import Flask, render_template, redirect, request, session
55
from flask_security import Security, auth_required, current_user
6-
6+
from flask_session import Session
77
from flask_wtf.csrf import CSRFProtect
8+
from flask_login import user_logged_in
89

910
from blueprints import access, assessment, assessment_utils, assessment_import, assessment_export, testcase, testcase_utils
1011

@@ -27,8 +28,13 @@
2728

2829
me.connect(**app.config["MONGODB_SETTINGS"])
2930

31+
# Get the MongoClient instance and assign it to SESSION_MONGODB which is used by Security
32+
mongo_client = me.get_connection()
33+
app.config['SESSION_MONGODB'] = mongo_client
34+
3035
security = Security(app, user_datastore)
3136
csrf = CSRFProtect(app)
37+
session_interface = Session(app)
3238

3339
@app.route('/')
3440
@app.route('/index')
@@ -48,5 +54,10 @@ def inject_theme():
4854
theme = 'light'
4955
return dict(theme=theme)
5056

57+
# Session Fixation Prevention Logic
58+
@user_logged_in.connect_via(app)
59+
def on_user_logged_in(sender, user, **extra):
60+
app.session_interface.regenerate(session)
61+
5162
if __name__ == "__main__":
5263
app.run(host=os.getenv('HOST'), port=int(os.getenv('PORT')))

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ argon2-cffi-bindings==21.2.0
33
babel==2.17.0
44
bleach==6.2.0
55
blinker==1.9.0
6+
cachelib==0.13.0
67
certifi==2025.7.14
78
cffi==1.17.1
89
charset-normalizer==3.4.2
@@ -17,6 +18,7 @@ Flask==3.1.1
1718
Flask-Login==0.6.3
1819
Flask-Principal==0.4.0
1920
Flask-Security-Too==5.6.2
21+
Flask-Session==0.8.0
2022
Flask-WTF==1.2.2
2123
gitdb==4.0.12
2224
GitPython==3.1.44
@@ -30,6 +32,7 @@ lxml==6.0.0
3032
Markdown==3.8.2
3133
MarkupSafe==3.0.2
3234
mongoengine==0.29.1
35+
msgspec==0.19.0
3336
openpyxl==3.1.5
3437
packaging==25.0
3538
passlib==1.7.4

static/scripts/access.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,14 @@ $("#userDetailForm").submit(function(e) {
7676
body.assessments = "-";
7777
}
7878

79+
const safeAssessmentNames = $("<span>").text(body.assessments).html();
7980
newRow = {
8081
id: body.id,
8182
username: body.username,
8283
email: body.email,
8384
roles: body.roles.length ? body.roles.join(", ") : "-",
8485
"last-login": body["last-login"] || "-", // Support dash notation
85-
assessments: body.assessments,
86+
assessments: safeAssessmentNames,
8687
actions: body.username
8788
};
8889

static/scripts/testcase.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ $("#ttpform").submit(function(e) {
229229
showToast(`Conflict (409): ${error.responseText}`, "error");
230230
} else {
231231
showToast(`Testcase save error - ${error.message}`, "error");
232-
console.error(error);
233232
}
234233
});
235234
});

utils.py

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from model import TestCase
33
from flask_security import current_user
44
from functools import wraps
5+
from mongoengine.errors import ValidationError
56

67
def applyFormData (obj, form, fields):
78
for field in fields:
@@ -32,18 +33,68 @@ def applyFormTimeData (obj, form, fields):
3233
obj[field] = None
3334
return obj
3435

36+
def get_testcase_by_id(id):
37+
try:
38+
return TestCase.objects(id=id).first()
39+
except ValidationError:
40+
return None
41+
42+
def format_time_difference(d1, d2):
43+
44+
if not d1 or not d2:
45+
return None
46+
47+
if d2 > d1:
48+
return None
49+
50+
delta = abs(d1-d2)
51+
52+
days = delta.days
53+
hours, remainder = divmod(delta.seconds, 3600)
54+
minutes, _ = divmod(remainder, 60)
55+
56+
parts = []
57+
if days > 0:
58+
parts.append(f"{days}d")
59+
if hours > 0 or days > 0:
60+
parts.append(f"{hours}h")
61+
parts.append(f"{minutes}m")
62+
63+
return " ".join(parts)
64+
65+
def calculate_severity_score(severity, expected_severity):
66+
severity_levels = {
67+
"Critical": ["Critical"],
68+
"High": ["Critical", "High"],
69+
"Medium": ["Critical", "High", "Medium"],
70+
"Low": ["Critical", "High", "Medium", "Low"],
71+
"Informational": ["Critical", "High", "Medium", "Low", "Informational"]
72+
}
73+
if severity and expected_severity:
74+
accepted_severity_list = severity_levels.get(expected_severity, [])
75+
return 100 if severity in accepted_severity_list else 0
76+
return None
77+
3578
def user_assigned_assessment(f):
3679
@wraps(f)
3780
def inner(*args, **kwargs):
3881
if current_user.has_role("Admin"):
3982
return f(*args, **kwargs)
40-
id = kwargs.get("id")
83+
84+
id = kwargs.get("id") or (args[0] if args else None)
4185
if not id:
42-
id = args[0]
43-
if TestCase.objects(id=id).count():
44-
id = TestCase.objects(id=id).first().assessmentid
45-
if (id in [str(a.id) for a in current_user.assessments]):
86+
return ("Missing ID", 400)
87+
88+
testcase = get_testcase_by_id(id)
89+
if testcase:
90+
assessment_id = str(testcase.assessmentid)
91+
else:
92+
assessment_id = id # fallback to assuming it's already an assessment ID
93+
94+
if assessment_id in [str(a.id) for a in current_user.assessments]:
4695
return f(*args, **kwargs)
4796
else:
97+
current_app.logger.warning(f"Access denied for user {current_user.id} on assessment {assessment_id}")
4898
return ("", 403)
99+
49100
return inner

0 commit comments

Comments
 (0)