Skip to content

Commit fe64ee1

Browse files
Maffoochclaude
andcommitted
Fix accept_risks API endpoints to use RBAC instead of IsAdminUser
Replace DRF's IsAdminUser permission with DefectDojo's RBAC system on all accept_risks endpoints. IsAdminUser only checked is_staff, bypassing role-based access control entirely. - Use UserHasRiskAcceptanceRelatedObjectPermission for detail endpoints (engagement/test accept_risks) to enforce Permissions.Risk_Acceptance - Change mass endpoint to query engagements with Risk_Acceptance permission instead of Engagement_View - Enforce product-level enable_full_risk_acceptance setting on all accept_risks endpoints - Add 9 RBAC unit tests covering writer/reader roles and the enable_full_risk_acceptance product setting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9f92409 commit fe64ee1

2 files changed

Lines changed: 200 additions & 6 deletions

File tree

dojo/risk_acceptance/api.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
from abc import ABC, abstractmethod
22
from typing import NamedTuple
33

4+
from django.core.exceptions import PermissionDenied
45
from django.db.models import QuerySet
56
from django.utils import timezone
67
from drf_spectacular.utils import extend_schema
78
from rest_framework import serializers, status
89
from rest_framework.decorators import action
9-
from rest_framework.permissions import IsAdminUser
10+
from rest_framework.permissions import IsAuthenticated
1011
from rest_framework.response import Response
1112

13+
from dojo.api_v2.permissions import UserHasRiskAcceptanceRelatedObjectPermission
1214
from dojo.api_v2.serializers import RiskAcceptanceSerializer
1315
from dojo.authorization.roles_permissions import Permissions
1416
from dojo.engagement.queries import get_authorized_engagements
15-
from dojo.models import Risk_Acceptance, User, Vulnerability_Id
17+
from dojo.models import Engagement, Risk_Acceptance, User, Vulnerability_Id
1618

1719
AcceptedRisk = NamedTuple("AcceptedRisk", (("vulnerability_id", str), ("justification", str), ("accepted_by", str)))
1820

@@ -40,10 +42,13 @@ def risk_application_model_class(self):
4042
request=AcceptedRiskSerializer(many=True),
4143
responses={status.HTTP_201_CREATED: RiskAcceptanceSerializer(many=True)},
4244
)
43-
@action(methods=["post"], detail=True, permission_classes=[IsAdminUser], serializer_class=AcceptedRiskSerializer,
44-
filter_backends=[], pagination_class=None)
45+
@action(methods=["post"], detail=True, permission_classes=[IsAuthenticated, UserHasRiskAcceptanceRelatedObjectPermission],
46+
serializer_class=AcceptedRiskSerializer, filter_backends=[], pagination_class=None)
4547
def accept_risks(self, request, pk=None):
4648
model = self.get_object()
49+
product = model.product if isinstance(model, Engagement) else model.engagement.product
50+
if not product.enable_full_risk_acceptance:
51+
raise PermissionDenied
4752
serializer = AcceptedRiskSerializer(data=request.data, many=True)
4853
if serializer.is_valid():
4954
accepted_risks = serializer.save()
@@ -63,7 +68,7 @@ class AcceptedFindingsMixin(ABC):
6368
request=AcceptedRiskSerializer(many=True),
6469
responses={status.HTTP_201_CREATED: RiskAcceptanceSerializer(many=True)},
6570
)
66-
@action(methods=["post"], detail=False, permission_classes=[IsAdminUser], serializer_class=AcceptedRiskSerializer)
71+
@action(methods=["post"], detail=False, permission_classes=[IsAuthenticated], serializer_class=AcceptedRiskSerializer)
6772
def accept_risks(self, request):
6873
serializer = AcceptedRiskSerializer(data=request.data, many=True)
6974
if serializer.is_valid():
@@ -72,7 +77,9 @@ def accept_risks(self, request):
7277
return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST)
7378
owner = request.user
7479
accepted_result = []
75-
for engagement in get_authorized_engagements(Permissions.Engagement_View):
80+
for engagement in get_authorized_engagements(Permissions.Risk_Acceptance):
81+
if not engagement.product.enable_full_risk_acceptance:
82+
continue
7683
base_findings = engagement.unaccepted_open_findings
7784
accepted = _accept_risks(accepted_risks, base_findings, owner)
7885
engagement.accept_risks(accepted)

unittests/test_bulk_risk_acceptance_api.py

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
Engagement,
1010
Finding,
1111
Product,
12+
Product_Member,
1213
Product_Type,
1314
Product_Type_Member,
1415
Role,
@@ -117,3 +118,189 @@ def test_finding_accept_risks(self):
117118
for ra in self.engagement_2b.risk_acceptance.all():
118119
for finding in ra.accepted_findings.all():
119120
self.assertEqual(self.engagement_2a.product, finding.test.engagement.product)
121+
122+
123+
class TestBulkRiskAcceptanceRbac(APITestCase):
124+
"""Tests that accept_risks endpoints use RBAC (Permissions.Risk_Acceptance) instead of is_staff."""
125+
126+
@classmethod
127+
def setUpTestData(cls):
128+
cls.product_type = Product_Type.objects.create(name="RBAC Test Type")
129+
cls.test_type = Test_Type.objects.create(name="RBAC Mock Scan", static_tool=True)
130+
131+
# Product with full risk acceptance enabled (default)
132+
cls.product_enabled = Product.objects.create(
133+
prod_type=cls.product_type, name="RBAC Enabled",
134+
description="Full risk acceptance enabled",
135+
enable_full_risk_acceptance=True,
136+
)
137+
# Product with full risk acceptance disabled
138+
cls.product_disabled = Product.objects.create(
139+
prod_type=cls.product_type, name="RBAC Disabled",
140+
description="Full risk acceptance disabled",
141+
enable_full_risk_acceptance=False,
142+
)
143+
144+
cls.engagement_enabled = Engagement.objects.create(
145+
product=cls.product_enabled,
146+
target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC),
147+
target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC),
148+
)
149+
cls.engagement_disabled = Engagement.objects.create(
150+
product=cls.product_disabled,
151+
target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC),
152+
target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC),
153+
)
154+
155+
cls.test_enabled = Test.objects.create(
156+
engagement=cls.engagement_enabled, test_type=cls.test_type,
157+
target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC),
158+
target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC),
159+
)
160+
cls.test_disabled = Test.objects.create(
161+
engagement=cls.engagement_disabled, test_type=cls.test_type,
162+
target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC),
163+
target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC),
164+
)
165+
166+
# Writer user: has Risk_Acceptance permission, NOT is_staff
167+
cls.writer = User.objects.create(username="rbac_writer", is_staff=False)
168+
cls.writer_token = Token.objects.create(user=cls.writer)
169+
Product_Member.objects.create(
170+
product=cls.product_enabled, user=cls.writer,
171+
role=Role.objects.get(id=Roles.Writer),
172+
)
173+
Product_Member.objects.create(
174+
product=cls.product_disabled, user=cls.writer,
175+
role=Role.objects.get(id=Roles.Writer),
176+
)
177+
178+
# Reader user: does NOT have Risk_Acceptance permission, NOT is_staff
179+
cls.reader = User.objects.create(username="rbac_reader", is_staff=False)
180+
cls.reader_token = Token.objects.create(user=cls.reader)
181+
Product_Member.objects.create(
182+
product=cls.product_enabled, user=cls.reader,
183+
role=Role.objects.get(id=Roles.Reader),
184+
)
185+
186+
def create_finding(test, reporter, cve):
187+
return Finding(
188+
test=test, title=f"Finding {cve}", cve=cve, severity="High",
189+
verified=True, description="Test", mitigation="Test",
190+
impact="Test", reporter=reporter, numerical_severity="S1",
191+
static_finding=True, dynamic_finding=False,
192+
)
193+
194+
# Findings on the enabled product
195+
Finding.objects.bulk_create(
196+
create_finding(cls.test_enabled, cls.writer, f"CVE-2024-{i}") for i in range(10))
197+
for f in Finding.objects.filter(test=cls.test_enabled):
198+
Vulnerability_Id.objects.get_or_create(finding=f, vulnerability_id=f.cve)
199+
200+
# Findings on the disabled product
201+
Finding.objects.bulk_create(
202+
create_finding(cls.test_disabled, cls.writer, f"CVE-2024-{i + 100}") for i in range(5))
203+
for f in Finding.objects.filter(test=cls.test_disabled):
204+
Vulnerability_Id.objects.get_or_create(finding=f, vulnerability_id=f.cve)
205+
206+
def _client_for(self, token):
207+
client = APIClient()
208+
client.credentials(HTTP_AUTHORIZATION="Token " + token.key)
209+
return client
210+
211+
def _accepted_risks(self, cve_ids):
212+
return [{"vulnerability_id": cve, "justification": "Test", "accepted_by": "Tester"} for cve in cve_ids]
213+
214+
# --- Writer (has Risk_Acceptance) succeeds on enabled product ---
215+
216+
def test_writer_can_accept_risks_on_engagement(self):
217+
client = self._client_for(self.writer_token)
218+
result = client.post(
219+
reverse("engagement-accept-risks", kwargs={"pk": self.engagement_enabled.id}),
220+
data=self._accepted_risks(["CVE-2024-0"]),
221+
format="json",
222+
)
223+
self.assertEqual(result.status_code, 201)
224+
225+
def test_writer_can_accept_risks_on_test(self):
226+
client = self._client_for(self.writer_token)
227+
result = client.post(
228+
reverse("test-accept-risks", kwargs={"pk": self.test_enabled.id}),
229+
data=self._accepted_risks(["CVE-2024-1"]),
230+
format="json",
231+
)
232+
self.assertEqual(result.status_code, 201)
233+
234+
def test_writer_can_accept_risks_on_findings(self):
235+
client = self._client_for(self.writer_token)
236+
result = client.post(
237+
reverse("finding-accept-risks"),
238+
data=self._accepted_risks(["CVE-2024-2"]),
239+
format="json",
240+
)
241+
self.assertEqual(result.status_code, 201)
242+
243+
# --- Reader (no Risk_Acceptance) is forbidden ---
244+
245+
def test_reader_forbidden_on_engagement(self):
246+
client = self._client_for(self.reader_token)
247+
result = client.post(
248+
reverse("engagement-accept-risks", kwargs={"pk": self.engagement_enabled.id}),
249+
data=self._accepted_risks(["CVE-2024-3"]),
250+
format="json",
251+
)
252+
self.assertEqual(result.status_code, 403)
253+
254+
def test_reader_forbidden_on_test(self):
255+
client = self._client_for(self.reader_token)
256+
result = client.post(
257+
reverse("test-accept-risks", kwargs={"pk": self.test_enabled.id}),
258+
data=self._accepted_risks(["CVE-2024-4"]),
259+
format="json",
260+
)
261+
self.assertEqual(result.status_code, 403)
262+
263+
def test_reader_gets_empty_result_on_findings(self):
264+
client = self._client_for(self.reader_token)
265+
result = client.post(
266+
reverse("finding-accept-risks"),
267+
data=self._accepted_risks(["CVE-2024-5"]),
268+
format="json",
269+
)
270+
# Mass endpoint returns 201 with empty results (no authorized engagements)
271+
self.assertEqual(result.status_code, 201)
272+
self.assertEqual(len(result.json()), 0)
273+
274+
# --- enable_full_risk_acceptance=False blocks risk acceptance ---
275+
276+
def test_engagement_blocked_when_full_risk_acceptance_disabled(self):
277+
client = self._client_for(self.writer_token)
278+
result = client.post(
279+
reverse("engagement-accept-risks", kwargs={"pk": self.engagement_disabled.id}),
280+
data=self._accepted_risks(["CVE-2024-100"]),
281+
format="json",
282+
)
283+
self.assertEqual(result.status_code, 403)
284+
285+
def test_test_blocked_when_full_risk_acceptance_disabled(self):
286+
client = self._client_for(self.writer_token)
287+
result = client.post(
288+
reverse("test-accept-risks", kwargs={"pk": self.test_disabled.id}),
289+
data=self._accepted_risks(["CVE-2024-101"]),
290+
format="json",
291+
)
292+
self.assertEqual(result.status_code, 403)
293+
294+
def test_mass_endpoint_skips_disabled_products(self):
295+
client = self._client_for(self.writer_token)
296+
# Use a CVE that exists only on the disabled product
297+
result = client.post(
298+
reverse("finding-accept-risks"),
299+
data=self._accepted_risks(["CVE-2024-102"]),
300+
format="json",
301+
)
302+
self.assertEqual(result.status_code, 201)
303+
# No risk acceptances created because the matching engagement's product has it disabled
304+
self.assertEqual(len(result.json()), 0)
305+
# Findings on disabled product remain unaccepted
306+
self.assertEqual(self.engagement_disabled.unaccepted_open_findings.count(), 5)

0 commit comments

Comments
 (0)