|
9 | 9 | Engagement, |
10 | 10 | Finding, |
11 | 11 | Product, |
| 12 | + Product_Member, |
12 | 13 | Product_Type, |
13 | 14 | Product_Type_Member, |
14 | 15 | Role, |
@@ -117,3 +118,190 @@ def test_finding_accept_risks(self): |
117 | 118 | for ra in self.engagement_2b.risk_acceptance.all(): |
118 | 119 | for finding in ra.accepted_findings.all(): |
119 | 120 | self.assertEqual(self.engagement_2a.product, finding.test.engagement.product) |
| 121 | + |
| 122 | + |
| 123 | +class TestBulkRiskAcceptanceRbac(APITestCase): |
| 124 | + |
| 125 | + """Tests that accept_risks endpoints use RBAC (Permissions.Risk_Acceptance) instead of is_staff.""" |
| 126 | + |
| 127 | + @classmethod |
| 128 | + def setUpTestData(cls): |
| 129 | + cls.product_type = Product_Type.objects.create(name="RBAC Test Type") |
| 130 | + cls.test_type = Test_Type.objects.create(name="RBAC Mock Scan", static_tool=True) |
| 131 | + |
| 132 | + # Product with full risk acceptance enabled (default) |
| 133 | + cls.product_enabled = Product.objects.create( |
| 134 | + prod_type=cls.product_type, name="RBAC Enabled", |
| 135 | + description="Full risk acceptance enabled", |
| 136 | + enable_full_risk_acceptance=True, |
| 137 | + ) |
| 138 | + # Product with full risk acceptance disabled |
| 139 | + cls.product_disabled = Product.objects.create( |
| 140 | + prod_type=cls.product_type, name="RBAC Disabled", |
| 141 | + description="Full risk acceptance disabled", |
| 142 | + enable_full_risk_acceptance=False, |
| 143 | + ) |
| 144 | + |
| 145 | + cls.engagement_enabled = Engagement.objects.create( |
| 146 | + product=cls.product_enabled, |
| 147 | + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), |
| 148 | + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), |
| 149 | + ) |
| 150 | + cls.engagement_disabled = Engagement.objects.create( |
| 151 | + product=cls.product_disabled, |
| 152 | + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), |
| 153 | + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), |
| 154 | + ) |
| 155 | + |
| 156 | + cls.test_enabled = Test.objects.create( |
| 157 | + engagement=cls.engagement_enabled, test_type=cls.test_type, |
| 158 | + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), |
| 159 | + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), |
| 160 | + ) |
| 161 | + cls.test_disabled = Test.objects.create( |
| 162 | + engagement=cls.engagement_disabled, test_type=cls.test_type, |
| 163 | + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), |
| 164 | + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), |
| 165 | + ) |
| 166 | + |
| 167 | + # Writer user: has Risk_Acceptance permission, NOT is_staff |
| 168 | + cls.writer = User.objects.create(username="rbac_writer", is_staff=False) |
| 169 | + cls.writer_token = Token.objects.create(user=cls.writer) |
| 170 | + Product_Member.objects.create( |
| 171 | + product=cls.product_enabled, user=cls.writer, |
| 172 | + role=Role.objects.get(id=Roles.Writer), |
| 173 | + ) |
| 174 | + Product_Member.objects.create( |
| 175 | + product=cls.product_disabled, user=cls.writer, |
| 176 | + role=Role.objects.get(id=Roles.Writer), |
| 177 | + ) |
| 178 | + |
| 179 | + # Reader user: does NOT have Risk_Acceptance permission, NOT is_staff |
| 180 | + cls.reader = User.objects.create(username="rbac_reader", is_staff=False) |
| 181 | + cls.reader_token = Token.objects.create(user=cls.reader) |
| 182 | + Product_Member.objects.create( |
| 183 | + product=cls.product_enabled, user=cls.reader, |
| 184 | + role=Role.objects.get(id=Roles.Reader), |
| 185 | + ) |
| 186 | + |
| 187 | + def create_finding(test, reporter, cve): |
| 188 | + return Finding( |
| 189 | + test=test, title=f"Finding {cve}", cve=cve, severity="High", |
| 190 | + verified=True, description="Test", mitigation="Test", |
| 191 | + impact="Test", reporter=reporter, numerical_severity="S1", |
| 192 | + static_finding=True, dynamic_finding=False, |
| 193 | + ) |
| 194 | + |
| 195 | + # Findings on the enabled product |
| 196 | + Finding.objects.bulk_create( |
| 197 | + create_finding(cls.test_enabled, cls.writer, f"CVE-2024-{i}") for i in range(10)) |
| 198 | + for f in Finding.objects.filter(test=cls.test_enabled): |
| 199 | + Vulnerability_Id.objects.get_or_create(finding=f, vulnerability_id=f.cve) |
| 200 | + |
| 201 | + # Findings on the disabled product |
| 202 | + Finding.objects.bulk_create( |
| 203 | + create_finding(cls.test_disabled, cls.writer, f"CVE-2024-{i + 100}") for i in range(5)) |
| 204 | + for f in Finding.objects.filter(test=cls.test_disabled): |
| 205 | + Vulnerability_Id.objects.get_or_create(finding=f, vulnerability_id=f.cve) |
| 206 | + |
| 207 | + def _client_for(self, token): |
| 208 | + client = APIClient() |
| 209 | + client.credentials(HTTP_AUTHORIZATION="Token " + token.key) |
| 210 | + return client |
| 211 | + |
| 212 | + def _accepted_risks(self, cve_ids): |
| 213 | + return [{"vulnerability_id": cve, "justification": "Test", "accepted_by": "Tester"} for cve in cve_ids] |
| 214 | + |
| 215 | + # --- Writer (has Risk_Acceptance) succeeds on enabled product --- |
| 216 | + |
| 217 | + def test_writer_can_accept_risks_on_engagement(self): |
| 218 | + client = self._client_for(self.writer_token) |
| 219 | + result = client.post( |
| 220 | + reverse("engagement-accept-risks", kwargs={"pk": self.engagement_enabled.id}), |
| 221 | + data=self._accepted_risks(["CVE-2024-0"]), |
| 222 | + format="json", |
| 223 | + ) |
| 224 | + self.assertEqual(result.status_code, 201) |
| 225 | + |
| 226 | + def test_writer_can_accept_risks_on_test(self): |
| 227 | + client = self._client_for(self.writer_token) |
| 228 | + result = client.post( |
| 229 | + reverse("test-accept-risks", kwargs={"pk": self.test_enabled.id}), |
| 230 | + data=self._accepted_risks(["CVE-2024-1"]), |
| 231 | + format="json", |
| 232 | + ) |
| 233 | + self.assertEqual(result.status_code, 201) |
| 234 | + |
| 235 | + def test_writer_can_accept_risks_on_findings(self): |
| 236 | + client = self._client_for(self.writer_token) |
| 237 | + result = client.post( |
| 238 | + reverse("finding-accept-risks"), |
| 239 | + data=self._accepted_risks(["CVE-2024-2"]), |
| 240 | + format="json", |
| 241 | + ) |
| 242 | + self.assertEqual(result.status_code, 201) |
| 243 | + |
| 244 | + # --- Reader (no Risk_Acceptance) is forbidden --- |
| 245 | + |
| 246 | + def test_reader_forbidden_on_engagement(self): |
| 247 | + client = self._client_for(self.reader_token) |
| 248 | + result = client.post( |
| 249 | + reverse("engagement-accept-risks", kwargs={"pk": self.engagement_enabled.id}), |
| 250 | + data=self._accepted_risks(["CVE-2024-3"]), |
| 251 | + format="json", |
| 252 | + ) |
| 253 | + self.assertEqual(result.status_code, 403) |
| 254 | + |
| 255 | + def test_reader_forbidden_on_test(self): |
| 256 | + client = self._client_for(self.reader_token) |
| 257 | + result = client.post( |
| 258 | + reverse("test-accept-risks", kwargs={"pk": self.test_enabled.id}), |
| 259 | + data=self._accepted_risks(["CVE-2024-4"]), |
| 260 | + format="json", |
| 261 | + ) |
| 262 | + self.assertEqual(result.status_code, 403) |
| 263 | + |
| 264 | + def test_reader_gets_empty_result_on_findings(self): |
| 265 | + client = self._client_for(self.reader_token) |
| 266 | + result = client.post( |
| 267 | + reverse("finding-accept-risks"), |
| 268 | + data=self._accepted_risks(["CVE-2024-5"]), |
| 269 | + format="json", |
| 270 | + ) |
| 271 | + # Mass endpoint returns 201 with empty results (no authorized engagements) |
| 272 | + self.assertEqual(result.status_code, 201) |
| 273 | + self.assertEqual(len(result.json()), 0) |
| 274 | + |
| 275 | + # --- enable_full_risk_acceptance=False blocks risk acceptance --- |
| 276 | + |
| 277 | + def test_engagement_blocked_when_full_risk_acceptance_disabled(self): |
| 278 | + client = self._client_for(self.writer_token) |
| 279 | + result = client.post( |
| 280 | + reverse("engagement-accept-risks", kwargs={"pk": self.engagement_disabled.id}), |
| 281 | + data=self._accepted_risks(["CVE-2024-100"]), |
| 282 | + format="json", |
| 283 | + ) |
| 284 | + self.assertEqual(result.status_code, 403) |
| 285 | + |
| 286 | + def test_test_blocked_when_full_risk_acceptance_disabled(self): |
| 287 | + client = self._client_for(self.writer_token) |
| 288 | + result = client.post( |
| 289 | + reverse("test-accept-risks", kwargs={"pk": self.test_disabled.id}), |
| 290 | + data=self._accepted_risks(["CVE-2024-101"]), |
| 291 | + format="json", |
| 292 | + ) |
| 293 | + self.assertEqual(result.status_code, 403) |
| 294 | + |
| 295 | + def test_mass_endpoint_skips_disabled_products(self): |
| 296 | + client = self._client_for(self.writer_token) |
| 297 | + # Use a CVE that exists only on the disabled product |
| 298 | + result = client.post( |
| 299 | + reverse("finding-accept-risks"), |
| 300 | + data=self._accepted_risks(["CVE-2024-102"]), |
| 301 | + format="json", |
| 302 | + ) |
| 303 | + self.assertEqual(result.status_code, 201) |
| 304 | + # No risk acceptances created because the matching engagement's product has it disabled |
| 305 | + self.assertEqual(len(result.json()), 0) |
| 306 | + # Findings on disabled product remain unaccepted |
| 307 | + self.assertEqual(self.engagement_disabled.unaccepted_open_findings.count(), 5) |
0 commit comments