Skip to content
This repository was archived by the owner on May 4, 2026. It is now read-only.

Commit ce372b7

Browse files
committed
fixed XSS in file download, refactored testcase blueprints
1 parent 0cf2b21 commit ce372b7

3 files changed

Lines changed: 269 additions & 263 deletions

File tree

blueprints/testcase.py

Lines changed: 129 additions & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,27 @@
1212
@auth_required()
1313
@roles_accepted('Admin', 'Red')
1414
@user_assigned_assessment
15-
def newtestcase(id):
16-
newcase = TestCase()
17-
newcase.assessmentid = id
18-
newcase = applyFormData(newcase, request.form, ["name", "mitreid", "tactic"])
19-
newcase.save()
20-
return jsonify(newcase.to_json()), 200
15+
def new_testcase(id):
16+
testcase = TestCase()
17+
testcase.assessmentid = id
18+
testcase = applyFormData(testcase, request.form, ["name", "mitreid", "tactic"])
19+
testcase.save()
20+
return jsonify(testcase.to_json()), 200
21+
2122

2223
@blueprint_testcase.route('/testcase/<id>',methods = ['GET'])
2324
@auth_required()
2425
@user_assigned_assessment
25-
def runtestcasepost(id):
26-
testcase = TestCase.objects(id=id).first()
27-
assessment = Assessment.objects(id=testcase.assessmentid).first()
28-
history_entries = TestCaseHistory.objects(testcaseid=id).order_by('-timestamp')
29-
30-
if testcase.deleted:
31-
return ("", 403)
26+
def render_testcase(id):
27+
testcase = get_testcase_by_id(id)
28+
if not testcase or testcase.deleted:
29+
return "Test case not found", 404
3230

3331
if not testcase.visible and current_user.has_role("Blue"):
34-
return ("", 403)
32+
return "", 403
33+
34+
assessment = Assessment.objects(id=testcase.assessmentid).first()
35+
history_entries = TestCaseHistory.objects(testcaseid=id).order_by('-timestamp')
3536

3637
return render_template('testcase.html',
3738
testcase = testcase,
@@ -55,230 +56,155 @@ def runtestcasepost(id):
5556
}
5657
)
5758

58-
def format_time_difference(d1, d2):
59-
60-
if not d1 or not d2:
61-
return None
62-
63-
if d2 > d1:
64-
return None
65-
66-
delta = abs(d1-d2)
67-
68-
days = delta.days
69-
hours, remainder = divmod(delta.seconds, 3600)
70-
minutes, _ = divmod(remainder, 60)
71-
72-
parts = []
73-
if days > 0:
74-
parts.append(f"{days}d")
75-
if hours > 0 or days > 0:
76-
parts.append(f"{hours}h")
77-
parts.append(f"{minutes}m")
78-
79-
return " ".join(parts)
80-
81-
def calculate_severity_score(severity, expected_severity):
82-
severity_levels = {
83-
"Critical": ["Critical"],
84-
"High": ["Critical", "High"],
85-
"Medium": ["Critical", "High", "Medium"],
86-
"Low": ["Critical", "High", "Medium", "Low"],
87-
"Informational": ["Critical", "High", "Medium", "Low", "Informational"]
88-
}
89-
if severity and expected_severity:
90-
accepted_severity_list = severity_levels.get(expected_severity, [])
91-
return 100 if severity in accepted_severity_list else 0
92-
return None
93-
59+
9460
@blueprint_testcase.route('/testcase/<id>',methods = ['POST'])
9561
@auth_required()
9662
@roles_accepted('Admin', 'Red', 'Blue')
9763
@user_assigned_assessment
98-
def testcasesave(id):
99-
testcase = TestCase.objects(id=id).first()
100-
isBlue = current_user.has_role("Blue")
101-
102-
if testcase.deleted:
103-
return ("", 403)
104-
105-
if not testcase.visible and isBlue:
106-
return ("", 403)
107-
108-
directFields = ["name", "objective", "actions", "rednotes", "bluenotes", "uuid", "mitreid", "tactic", "state", "preventedrating", "alertseverity", "logged", "detectionrating", "priority", "priorityurgency", "expectedseverity", "incidentseverity", "requirements"] if not isBlue else ["bluenotes", "prevented", "alerted", "alertseverity","state", "incidentcreated", "incidentseverity"]
109-
listFields = ["sources", "targets", "tools", "controls", "tags", "preventionsources", "detectionsources"] if not isBlue else ["tags" , "preventionsources", "detectionsources"]
110-
boolFields = ["alerted", "logged", "visible", "incidentcreated", "prevented", "expectedincidentcreation", "expectedprevention", "expectedalertcreation"] if not isBlue else ["prevented", "alerted", "logged","incidentcreated"]
111-
timeFields = ["starttime", "endtime", "alerttime", "preventtime", "incidenttime"] if not isBlue else ["alerttime", "preventtime", "incidenttime"]
112-
fileFields = ["redfiles", "bluefiles"] if not isBlue else ["bluefiles"]
113-
114-
# only allow state update from blue if correct state is sent and testcase is in changable state
115-
if isBlue:
116-
if request.form.get("state") != 'Waiting Blue' and request.form.get("state") != 'Waiting Red' and request.form.get("state"):
117-
return ("Not allowed state value", 403)
118-
if testcase.state != 'Waiting Blue' and testcase.state != 'Waiting Red':
119-
return ("State cannot be changed at the moment", 403)
120-
121-
# do not update testcase if it was modified in the meantime
122-
if request.form.get("modifytime"):
123-
requestmodifytime = request.form.get("modifytime")
124-
# ugly string compare of date
125-
if requestmodifytime != str(testcase.modifytime):
126-
return ("Testcase has been modified in the meantime.", 409)
127-
128-
testcase = applyFormData(testcase, request.form, directFields)
129-
testcase = applyFormListData(testcase, request.form, listFields)
130-
testcase = applyFormBoolData(testcase, request.form, boolFields)
131-
testcase = applyFormTimeData(testcase, request.form, timeFields)
132-
133-
if not os.path.exists(f"files/{testcase.assessmentid}/{str(testcase.id)}"):
134-
os.makedirs(f"files/{testcase.assessmentid}/{str(testcase.id)}")
135-
136-
for field in fileFields:
137-
files = []
138-
for file in request.files.getlist(field):
139-
if request.files.getlist(field)[0].filename:
140-
filename = secure_filename(file.filename)
141-
path = f"files/{testcase.assessmentid}/{str(testcase.id)}/{filename}"
142-
143-
# Check if file already exists
144-
if os.path.exists(path):
145-
return (f"File '{filename}' already exists.", 409)
146-
147-
file.save(path)
148-
files.append({"name": filename, "path": path, "caption": ""})
149-
for file in testcase[field]:
150-
if file.name.lower().split(".")[-1] in ["png", "jpg", "jpeg"]:
151-
caption = request.form[field.replace("files", "").upper() + file.name]
152-
else:
153-
caption = ""
154-
files.append({
155-
"name": secure_filename(file.name),
156-
"path": file.path,
157-
"caption": caption
158-
})
159-
if field == "redfiles":
160-
testcase.update(set__redfiles=files)
161-
else:
162-
testcase.update(set__bluefiles=files)
64+
def save_testcase(id):
65+
testcase = get_testcase_by_id(id)
66+
if not testcase or testcase.deleted:
67+
return "Test case not found", 404
68+
69+
is_blue = current_user.has_role("Blue")
70+
if not testcase.visible and is_blue:
71+
return "", 403
72+
73+
# Access control for Blue team
74+
if is_blue:
75+
if request.form.get("state") not in ["Waiting Blue", "Waiting Red", None]:
76+
return "Not allowed state value", 403
77+
if testcase.state not in ["Waiting Blue", "Waiting Red"]:
78+
return "State cannot be changed at the moment", 403
79+
80+
# Prevent race condition on concurrent update
81+
if request.form.get("modifytime") and request.form.get("modifytime") != str(testcase.modifytime):
82+
return "Testcase has been modified in the meantime.", 409
83+
84+
# Field categories based on role
85+
direct_fields = ["bluenotes", "prevented", "alerted", "alertseverity", "state", "incidentcreated", "incidentseverity"] if is_blue else \
86+
["name", "objective", "actions", "rednotes", "bluenotes", "uuid", "mitreid", "tactic", "state", "preventedrating", "alertseverity", "logged", "detectionrating", "priority", "priorityurgency", "expectedseverity", "incidentseverity", "requirements"]
87+
88+
list_fields = ["tags", "preventionsources", "detectionsources"] if is_blue else \
89+
["sources", "targets", "tools", "controls", "tags", "preventionsources", "detectionsources"]
90+
91+
bool_fields = ["prevented", "alerted", "logged", "incidentcreated"] if is_blue else \
92+
["alerted", "logged", "visible", "incidentcreated", "prevented", "expectedincidentcreation", "expectedprevention", "expectedalertcreation"]
93+
94+
time_fields = ["alerttime", "preventtime", "incidenttime"] if is_blue else \
95+
["starttime", "endtime", "alerttime", "preventtime", "incidenttime"]
96+
97+
file_fields = ["bluefiles"] if is_blue else ["redfiles", "bluefiles"]
98+
99+
# Apply updates
100+
testcase = applyFormData(testcase, request.form, direct_fields)
101+
testcase = applyFormListData(testcase, request.form, list_fields)
102+
testcase = applyFormBoolData(testcase, request.form, bool_fields)
103+
testcase = applyFormTimeData(testcase, request.form, time_fields)
104+
105+
# File handling
106+
file_dir = os.path.join("files", str(testcase.assessmentid), str(testcase.id))
107+
os.makedirs(file_dir, exist_ok=True)
108+
109+
for field in file_fields:
110+
updated_files = []
111+
for uploaded_file in request.files.getlist(field):
112+
if uploaded_file.filename:
113+
filename = secure_filename(uploaded_file.filename)
114+
file_path = os.path.join(file_dir, filename)
115+
if os.path.exists(file_path):
116+
return f"File '{filename}' already exists.", 409
117+
uploaded_file.save(file_path)
118+
updated_files.append({"name": filename, "path": file_path, "caption": ""})
119+
120+
# Preserve existing files
121+
for existing in getattr(testcase, field):
122+
caption = request.form.get(field.replace("files", "").upper() + existing.name, "")
123+
updated_files.append({"name": secure_filename(existing.name), "path": existing.path, "caption": caption})
124+
125+
testcase.update(**{f"set__{field}": updated_files})
163126

164127
testcase.modifytime = datetime.utcnow()
165-
166-
# replace last three digits in the string with "000". Required for comparing utcnow vs. mongodb timestamp
167-
mongomodifytime = str(testcase.modifytime)[:-3] + "000"
168-
169-
if "logged" in request.form and request.form["logged"] == "Yes" and not testcase.detecttime:
128+
if request.form.get("logged") == "Yes" and not testcase.detecttime:
170129
testcase.detecttime = datetime.utcnow()
171130

172-
# Calculate alert severity score
131+
# Scoring and outcome logic
173132
testcase.alertseverityscore = calculate_severity_score(testcase.alertseverity, testcase.expectedseverity)
174-
175-
# Calculate incident severity score
176133
testcase.incidentseverityscore = calculate_severity_score(testcase.incidentseverity, testcase.expectedseverity)
177134

178-
# Calculate testcase outcome (Note that Prevented or alerted but not Logged is not catched and will be "missed")
179135
if not testcase.logged:
180136
testcase.outcome = "Missed"
137+
elif testcase.prevented and testcase.alerted:
138+
testcase.outcome = "Prevented and Alerted"
139+
elif testcase.prevented:
140+
testcase.outcome = "Prevented"
141+
elif testcase.alerted:
142+
testcase.outcome = "Alerted"
181143
else:
182-
if testcase.prevented and testcase.alerted:
183-
testcase.outcome = "Prevented and Alerted"
184-
elif testcase.prevented:
185-
testcase.outcome = "Prevented"
186-
elif testcase.alerted:
187-
testcase.outcome = "Alerted"
188-
else:
189-
testcase.outcome = "Logged"
190-
191-
# Calculate testcase score
192-
criteriacounter = 1
193-
if testcase.expectedalertcreation:
194-
criteriacounter = criteriacounter + 1
195-
if testcase.expectedprevention:
196-
criteriacounter = criteriacounter + 1
197-
score = 0
198-
199-
criteriavalue = 100 / criteriacounter
200-
201-
if testcase.logged:
202-
score = score + criteriavalue
203-
if testcase.expectedalertcreation and testcase.alerted:
204-
score = score + criteriavalue
205-
if testcase.expectedprevention and testcase.prevented:
206-
score = score + criteriavalue
207-
208-
testcase.testcasescore = score
209-
210-
# Calculate event start time to alert time
211-
diff_result = format_time_difference(testcase.alerttime, testcase.starttime)
212-
if diff_result is not None:
213-
testcase.eventtoalert = diff_result
214-
else:
215-
testcase.eventtoalert = ""
216-
217-
# Calculate alert to incident
218-
diff_result = format_time_difference(testcase.incidenttime, testcase.alerttime)
219-
if diff_result is not None:
220-
testcase.alerttoincident = diff_result
221-
else:
222-
testcase.alerttoincident = ""
223-
224-
# This is some sanity check code where we check if some of the UI elements are out of sync with the backend. This is trggered by the horrible tabs bug
225-
# Does not fix user not saving test case before navigating away
226-
# Todo: Turns this BS code into a single mongoengine query against the subdocument list
144+
testcase.outcome = "Logged"
145+
146+
expected_criteria = sum([
147+
testcase.expectedalertcreation,
148+
testcase.expectedprevention,
149+
True # always check logged
150+
])
151+
score_unit = 100 / expected_criteria
152+
testcase.testcasescore = sum([
153+
score_unit if testcase.logged else 0,
154+
score_unit if testcase.expectedalertcreation and testcase.alerted else 0,
155+
score_unit if testcase.expectedprevention and testcase.prevented else 0
156+
])
157+
158+
testcase.eventtoalert = format_time_difference(testcase.alerttime, testcase.starttime) or ""
159+
testcase.alerttoincident = format_time_difference(testcase.incidenttime, testcase.alerttime) or ""
160+
161+
# Sanity check list fields with assessment items
227162
assessment = Assessment.objects(id=testcase.assessmentid).first()
228-
for field in listFields:
229-
ids = []
230-
valid_ids = []
231-
for t in assessment[field]:
232-
ids.append(str(t.id))
233-
for field_id in testcase[field]:
234-
if field_id in ids:
235-
valid_ids.append(field_id)
236-
testcase[field] = valid_ids
163+
for field in list_fields:
164+
valid_ids = {str(item.id) for item in assessment[field]}
165+
testcase[field] = [id for id in testcase[field] if id in valid_ids]
166+
237167
testcase.save()
238168

239-
# Save SnapShot to test_case_history collection
240-
latest_version = TestCaseHistory.objects(testcaseid=testcase.id).count()
241169
TestCaseHistory(
242170
testcaseid=testcase.id,
243171
testcase_name=testcase.name,
244172
snapshot=testcase.to_mongo().to_dict(),
245-
version=latest_version + 1,
246-
modified_by = f"{current_user.username} ({current_user.id})"
173+
version=TestCaseHistory.objects(testcaseid=testcase.id).count() + 1,
174+
modified_by=f"{current_user.username} ({current_user.id})"
247175
).save()
248176

249-
return (str(mongomodifytime), 200)
177+
return str(testcase.modifytime)[:-3] + "000", 200
250178

251179
@blueprint_testcase.route('/testcase/<id>/history/<int:version>', methods=['GET'])
252180
@auth_required()
253181
@user_assigned_assessment
254182
def view_testcase_history_version(id, version):
255-
# Check if testcase is delted
256-
testcase = TestCase.objects(id=id).first()
257-
if testcase.deleted:
258-
return("", 404)
183+
testcase = get_testcase_by_id(id)
184+
if not testcase or testcase.deleted:
185+
return "Test case not found", 404
186+
187+
is_blue_or_spectator = current_user.has_role("Blue") or current_user.has_role("Spectator")
188+
if not testcase.visible and is_blue_or_spectator:
189+
return "", 403
259190

260-
# Get the historical version
261191
history = TestCaseHistory.objects(testcaseid=id, version=version).first()
262192
if not history:
263-
return("", 404)
264-
snapshot_data = history.snapshot
193+
return "History version not found", 404
265194

266-
# Convert snapshot into a mock TestCase-like object
267-
testcase = TestCase._from_son(snapshot_data)
268-
269-
# Get related data
270-
assessment = Assessment.objects(id=testcase.assessmentid).first()
195+
snapshot = TestCase._from_son(history.snapshot)
196+
assessment = Assessment.objects(id=snapshot.assessmentid).first()
271197

272198
return render_template('testcase.html',
273-
testcase=testcase,
199+
testcase=snapshot,
274200
testcases=TestCase.objects(assessmentid=str(assessment.id)).all(),
275201
tactics=Tactic.objects().all(),
276202
assessment=assessment,
277-
kb=KnowlegeBase.objects(mitreid=testcase.mitreid).first(),
278-
testcasekb=TestcaseKnowlegeBase.objects(mitreid=testcase.mitreid).first(),
279-
templates=TestCaseTemplate.objects(mitreid=testcase.mitreid),
203+
kb=KnowlegeBase.objects(mitreid=snapshot.mitreid).first(),
204+
testcasekb=TestcaseKnowlegeBase.objects(mitreid=snapshot.mitreid).first(),
205+
templates=TestCaseTemplate.objects(mitreid=snapshot.mitreid),
280206
mitres=[[m["mitreid"], m["name"]] for m in Technique.objects()],
281-
sigmas=Sigma.objects(mitreid=testcase.mitreid),
207+
sigmas=Sigma.objects(mitreid=snapshot.mitreid),
282208
multi={
283209
"sources": assessment.sources,
284210
"targets": assessment.targets,
@@ -289,7 +215,7 @@ def view_testcase_history_version(id, version):
289215
"detectionsources": assessment.detectionsources
290216
},
291217
history_read_only=True,
292-
history_version = history.version,
293-
history_modified_by = history.modified_by,
294-
history_timestamp = history.timestamp
218+
history_version=history.version,
219+
history_modified_by=history.modified_by,
220+
history_timestamp=history.timestamp
295221
)

0 commit comments

Comments
 (0)