diff --git a/components/package.json b/components/package.json index f652c2733ea..6fc51fe3ce9 100644 --- a/components/package.json +++ b/components/package.json @@ -1,6 +1,6 @@ { "name": "defectdojo", - "version": "2.54.3", + "version": "2.55.0-dev", "license" : "BSD-3-Clause", "private": true, "dependencies": { diff --git a/docs/content/en/open_source/upgrading/2.52.md b/docs/content/en/open_source/upgrading/2.52.md index 20eef3fb214..18aa85e15be 100644 --- a/docs/content/en/open_source/upgrading/2.52.md +++ b/docs/content/en/open_source/upgrading/2.52.md @@ -90,7 +90,7 @@ There are other instructions for upgrading to 2.52.x. Check the [Release Notes]( ## Merge of MobSF parsers -Mobsfscan Scan" has been merged into the "MobSF Scan" parser. The "Mobsfscan Scan" scan_type has been retained to keep deduplication working for existing Tests, but users are encouraged to move to the "MobSF Scan" scan_type. +"Mobsfscan Scan" has been merged into the "MobSF Scan" parser. The "Mobsfscan Scan" scan_type has been retained to keep deduplication working for existing Tests, but users are encouraged to move to the "MobSF Scan" scan_type. ## Release notes Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.52.0) for the contents of the release. diff --git a/dojo/api_v2/permissions.py b/dojo/api_v2/permissions.py index 905ddf99b58..86c8d93ad08 100644 --- a/dojo/api_v2/permissions.py +++ b/dojo/api_v2/permissions.py @@ -1,4 +1,3 @@ -import re from django.db.models import Model from django.shortcuts import get_object_or_404 @@ -20,6 +19,7 @@ from dojo.importers.auto_create_context import AutoCreateContextManager from dojo.models import ( Cred_Mapping, + Development_Environment, Dojo_Group, Endpoint, Engagement, @@ -27,6 +27,8 @@ Finding_Group, Product, Product_Type, + Regulation, + SLA_Configuration, Test, ) @@ -60,6 +62,72 @@ def check_object_permission( return False +class BaseRelatedObjectPermission(permissions.BasePermission): + + """ + An "abstract" base class for related object permissions (like notes, metadata, etc.) + that only need object permissions, not general permissions. This class will serve as + the base class for other more aptly named permission classes. + """ + + permission_map: dict[str, int] = { + "get_permission": None, + "put_permission": None, + "delete_permission": None, + "post_permission": None, + } + + def has_permission(self, request: Request, view): + # related object only need object permission + return True + + def has_object_permission(self, request: Request, view, obj): + return check_object_permission( + request, + obj, + **self.permission_map, + ) + + +class BaseDjangoModelPermission(permissions.BasePermission): + + """ + An "abstract" base class for Django model permissions. + This class will serve as the base class for other more aptly named permission classes. + """ + + django_model: Model = None + request_method_permission_map: dict[str, str] = { + "GET": "view", + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + def _evaluate_permissions(self, request: Request, permissions: dict[str, str]) -> bool: + # Short circuit if the request method is not in the expected methods + if request.method not in permissions: + return True + # Evaluate the permissions as usual + for method, permission in permissions.items(): + if request.method == method: + return user_has_configuration_permission( + request.user, + f"{self.django_model._meta.app_label}.{permission}_{self.django_model._meta.model_name}", + ) + return False + + def has_permission(self, request: Request, view): + # First restrict the mapping got GET/POST only + expected_request_method_permission_map = {k: v for k, v in self.request_method_permission_map.items() if k in {"GET", "POST"}} + # Evaluate the permissions + return self._evaluate_permissions(request, expected_request_method_permission_map) + + def has_object_permission(self, request: Request, view, obj): + return self._evaluate_permissions(request, self.request_method_permission_map) + + class UserHasAppAnalysisPermission(permissions.BasePermission): def has_permission(self, request, view): return check_post_permission( @@ -279,132 +347,82 @@ def has_object_permission(self, request, view, obj): class UserHasEngagementPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_engagement_post = re.compile(r"^/api/v2/engagements/$") - path_engagement = re.compile(r"^/api/v2/engagements/\d+/$") - def has_permission(self, request, view): - if UserHasEngagementPermission.path_engagement_post.match( - request.path, - ) or UserHasEngagementPermission.path_engagement.match(request.path): - return check_post_permission( + return check_post_permission( request, Product, "product", Permissions.Engagement_Add, ) - # related object only need object permission - return True def has_object_permission(self, request, view, obj): - if UserHasEngagementPermission.path_engagement_post.match( - request.path, - ) or UserHasEngagementPermission.path_engagement.match(request.path): - return check_object_permission( - request, - obj, - Permissions.Engagement_View, - Permissions.Engagement_Edit, - Permissions.Engagement_Delete, - ) return check_object_permission( request, obj, Permissions.Engagement_View, Permissions.Engagement_Edit, - Permissions.Engagement_Edit, - Permissions.Engagement_Edit, + Permissions.Engagement_Delete, ) -class UserHasRiskAcceptancePermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_risk_acceptance_post = re.compile(r"^/api/v2/risk_acceptances/$") - path_risk_acceptance = re.compile(r"^/api/v2/risk_acceptances/\d+/$") +class UserHasEngagementRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Engagement_View, + "put_permission": Permissions.Engagement_Edit, + "delete_permission": Permissions.Engagement_Edit, + "post_permission": Permissions.Engagement_Edit, + } + +class UserHasRiskAcceptancePermission(permissions.BasePermission): def has_permission(self, request, view): - if UserHasRiskAcceptancePermission.path_risk_acceptance_post.match( - request.path, - ) or UserHasRiskAcceptancePermission.path_risk_acceptance.match( - request.path, - ): - return check_post_permission( - request, Product, "product", Permissions.Risk_Acceptance, - ) - # related object only need object permission + # The previous implementation only checked for the object permission if the path was + # /api/v2/risk_acceptances/, but the path has always been /api/v2/risk_acceptance/ (notice the missing "s") + # So there really has not been a notion of a post permission check for risk acceptances. + # It would be best to leave as is to not break any existing implementations. return True def has_object_permission(self, request, view, obj): - if UserHasRiskAcceptancePermission.path_risk_acceptance_post.match( - request.path, - ) or UserHasRiskAcceptancePermission.path_risk_acceptance.match( - request.path, - ): - return check_object_permission( - request, - obj, - Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, - ) return check_object_permission( request, obj, Permissions.Risk_Acceptance, Permissions.Risk_Acceptance, Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, ) -class UserHasFindingPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_finding_post = re.compile(r"^/api/v2/findings/$") - path_finding = re.compile(r"^/api/v2/findings/\d+/$") - path_stub_finding_post = re.compile(r"^/api/v2/stub_findings/$") - path_stub_finding = re.compile(r"^/api/v2/stub_findings/\d+/$") +class UserHasRiskAcceptanceRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Risk_Acceptance, + "put_permission": Permissions.Risk_Acceptance, + "delete_permission": Permissions.Risk_Acceptance, + "post_permission": Permissions.Risk_Acceptance, + } + +class UserHasFindingPermission(permissions.BasePermission): def has_permission(self, request, view): - if ( - UserHasFindingPermission.path_finding_post.match(request.path) - or UserHasFindingPermission.path_finding.match(request.path) - or UserHasFindingPermission.path_stub_finding_post.match( - request.path, - ) - or UserHasFindingPermission.path_stub_finding.match(request.path) - ): - return check_post_permission( - request, Test, "test", Permissions.Finding_Add, - ) - # related object only need object permission - return True + return check_post_permission( + request, Test, "test", Permissions.Finding_Add, + ) def has_object_permission(self, request, view, obj): - if ( - UserHasFindingPermission.path_finding_post.match(request.path) - or UserHasFindingPermission.path_finding.match(request.path) - or UserHasFindingPermission.path_stub_finding_post.match( - request.path, - ) - or UserHasFindingPermission.path_stub_finding.match(request.path) - ): - return check_object_permission( - request, - obj, - Permissions.Finding_View, - Permissions.Finding_Edit, - Permissions.Finding_Delete, - ) return check_object_permission( request, obj, Permissions.Finding_View, Permissions.Finding_Edit, - Permissions.Finding_Edit, - Permissions.Finding_Edit, + Permissions.Finding_Delete, ) +class UserHasFindingRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Finding_View, + "put_permission": Permissions.Finding_Edit, + "delete_permission": Permissions.Finding_Edit, + "post_permission": Permissions.Finding_Edit, + } + + class UserHasImportPermission(permissions.BasePermission): def has_permission(self, request, view): # permission check takes place before validation, so we don't have access to serializer.validated_data() @@ -761,42 +779,30 @@ def has_permission(self, request, view): class UserHasTestPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_tests_post = re.compile(r"^/api/v2/tests/$") - path_tests = re.compile(r"^/api/v2/tests/\d+/$") - def has_permission(self, request, view): - if UserHasTestPermission.path_tests_post.match( - request.path, - ) or UserHasTestPermission.path_tests.match(request.path): - return check_post_permission( - request, Engagement, "engagement", Permissions.Test_Add, - ) - # related object only need object permission - return True + return check_post_permission( + request, Engagement, "engagement", Permissions.Test_Add, + ) def has_object_permission(self, request, view, obj): - if UserHasTestPermission.path_tests_post.match( - request.path, - ) or UserHasTestPermission.path_tests.match(request.path): - return check_object_permission( - request, - obj, - Permissions.Test_View, - Permissions.Test_Edit, - Permissions.Test_Delete, - ) return check_object_permission( request, obj, Permissions.Test_View, Permissions.Test_Edit, - Permissions.Test_Edit, - Permissions.Test_Edit, + Permissions.Test_Delete, ) +class UserHasTestRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Test_View, + "put_permission": Permissions.Test_Edit, + "delete_permission": Permissions.Test_Edit, + "post_permission": Permissions.Test_Edit, + } + + class UserHasTestImportPermission(permissions.BasePermission): def has_permission(self, request, view): return check_post_permission( @@ -1023,6 +1029,36 @@ def has_object_permission(self, request, view, obj): ) +class UserHasSLAPermission(BaseDjangoModelPermission): + django_model = SLA_Configuration + + +class UserHasDevelopmentEnvironmentPermission(BaseDjangoModelPermission): + django_model = Development_Environment + # https://github.com/DefectDojo/django-DefectDojo/blob/963d4a35bfd8f5138330f0d70595a755fa4999b0/dojo/user/utils.py#L93 + # It looks like view permission was explicitly not supported, so I assume + # reading these endpoints are not necessarily restricted (unless you're auth'd of course) + request_method_permission_map = { + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + +class UserHasRegulationPermission(BaseDjangoModelPermission): + django_model = Regulation + # https://github.com/DefectDojo/django-DefectDojo/blob/963d4a35bfd8f5138330f0d70595a755fa4999b0/dojo/user/utils.py#L104 + # It looks like view permission was explicitly not supported, so I assume + # reading these endpoints are not necessarily restricted (unless you're auth'd of course) + request_method_permission_map = { + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + def raise_no_auto_create_import_validation_error( test_title, scan_type, diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 9c27e1f7820..a083535ef42 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -503,7 +503,7 @@ def generate_report(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) def notes(self, request, pk=None): engagement = self.get_object() if request.method == "POST": @@ -567,7 +567,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], ) def files(self, request, pk=None): engagement = self.get_object() @@ -603,7 +603,7 @@ def files(self, request, pk=None): status.HTTP_201_CREATED: serializers.EngagementCheckListSerializer, }, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) def complete_checklist(self, request, pk=None): engagement = self.get_object() check_lists = Check_List.objects.filter(engagement=engagement) @@ -650,6 +650,7 @@ def complete_checklist(self, request, pk=None): detail=True, methods=["get"], url_path=r"files/download/(?P\d+)", + permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], ) def download_file(self, request, file_id, pk=None): engagement = self.get_object() @@ -735,7 +736,7 @@ def get_queryset(self): status.HTTP_200_OK: serializers.RiskAcceptanceProofSerializer, }, ) - @action(detail=True, methods=["get"]) + @action(detail=True, methods=["get"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) def download_proof(self, request, pk=None): risk_acceptance = self.get_object() # Get the file object @@ -937,7 +938,7 @@ def get_serializer_class(self): request=serializers.FindingCloseSerializer, responses={status.HTTP_200_OK: serializers.FindingCloseSerializer}, ) - @action(detail=True, methods=["post"]) + @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def close(self, request, pk=None): finding = self.get_object() @@ -978,7 +979,7 @@ def close(self, request, pk=None): request=serializers.TagSerializer, responses={status.HTTP_201_CREATED: serializers.TagSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def tags(self, request, pk=None): finding = self.get_object() @@ -1019,7 +1020,7 @@ def tags(self, request, pk=None): status.HTTP_201_CREATED: serializers.BurpRawRequestResponseSerializer, }, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def request_response(self, request, pk=None): finding = self.get_object() @@ -1069,7 +1070,7 @@ def request_response(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def notes(self, request, pk=None): finding = self.get_object() if request.method == "POST": @@ -1137,7 +1138,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def files(self, request, pk=None): finding = self.get_object() @@ -1175,7 +1176,7 @@ def files(self, request, pk=None): @action( detail=True, methods=["get"], - url_path=r"files/download/(?P\d+)", + url_path=r"files/download/(?P\d+)", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def download_file(self, request, file_id, pk=None): finding = self.get_object() @@ -1196,7 +1197,7 @@ def download_file(self, request, file_id, pk=None): request=serializers.FindingNoteSerializer, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["patch"]) + @action(detail=True, methods=["patch"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def remove_note(self, request, pk=None): """Remove Note From Finding Note""" finding = self.get_object() @@ -1235,7 +1236,7 @@ def remove_note(self, request, pk=None): request=serializers.TagSerializer, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["put", "patch"]) + @action(detail=True, methods=["put", "patch"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def remove_tags(self, request, pk=None): """Remove Tag(s) from finding list of tags""" finding = self.get_object() @@ -1285,6 +1286,7 @@ def remove_tags(self, request, pk=None): url_path=r"duplicate", filter_backends=[], pagination_class=None, + permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def get_duplicate_cluster(self, request, pk): finding = self.get_object() @@ -1298,7 +1300,7 @@ def get_duplicate_cluster(self, request, pk): request=OpenApiTypes.NONE, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["post"], url_path=r"duplicate/reset") + @action(detail=True, methods=["post"], url_path=r"duplicate/reset", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def reset_finding_duplicate_status(self, request, pk): checked_duplicate_id = reset_finding_duplicate_status_internal( request.user, pk, @@ -1317,7 +1319,7 @@ def reset_finding_duplicate_status(self, request, pk): responses={status.HTTP_204_NO_CONTENT: ""}, ) @action( - detail=True, methods=["post"], url_path=r"original/(?P\d+)", + detail=True, methods=["post"], url_path=r"original/(?P\d+)", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def set_finding_as_original(self, request, pk, new_fid): success = set_finding_as_original_internal(request.user, pk, new_fid) @@ -1493,6 +1495,7 @@ def _remove_metadata(self, request, finding): methods=["post", "put", "delete", "get"], filter_backends=[], pagination_class=None, + permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def metadata(self, request, pk=None): finding = self.get_object() @@ -2036,7 +2039,7 @@ class DevelopmentEnvironmentViewSet( serializer_class = serializers.DevelopmentEnvironmentSerializer queryset = Development_Environment.objects.none() filter_backends = (DjangoFilterBackend,) - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasDevelopmentEnvironmentPermission) def get_queryset(self): return Development_Environment.objects.all().order_by("id") @@ -2128,7 +2131,7 @@ def generate_report(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission)) def notes(self, request, pk=None): test = self.get_object() if request.method == "POST": @@ -2190,7 +2193,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), ) def files(self, request, pk=None): test = self.get_object() @@ -2229,6 +2232,7 @@ def files(self, request, pk=None): detail=True, methods=["get"], url_path=r"files/download/(?P\d+)", + permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), ) def download_file(self, request, file_id, pk=None): test = self.get_object() @@ -2388,7 +2392,7 @@ class RegulationsViewSet( queryset = Regulation.objects.none() filter_backends = (DjangoFilterBackend,) filterset_fields = ["id", "name", "description"] - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasRegulationPermission) def get_queryset(self): return Regulation.objects.all().order_by("id") @@ -2742,7 +2746,7 @@ class BurpRawRequestResponseViewSet( filterset_fields = ["finding"] permission_classes = ( IsAuthenticated, - permissions.UserHasFindingPermission, + permissions.UserHasFindingRelatedObjectPermission, ) def get_queryset(self): @@ -3129,7 +3133,7 @@ class SLAConfigurationViewset( serializer_class = serializers.SLAConfigurationSerializer queryset = SLA_Configuration.objects.none() filter_backends = (DjangoFilterBackend,) - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasSLAPermission) def get_queryset(self): return SLA_Configuration.objects.all().order_by("id") @@ -3143,7 +3147,7 @@ class QuestionnaireQuestionViewSet( queryset = Question.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3159,7 +3163,7 @@ class QuestionnaireAnswerViewSet( queryset = Answer.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3174,7 +3178,7 @@ class QuestionnaireGeneralSurveyViewSet( queryset = General_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3189,7 +3193,7 @@ class QuestionnaireEngagementSurveyViewSet( queryset = Engagement_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3230,7 +3234,7 @@ class QuestionnaireAnsweredSurveyViewSet( queryset = Answered_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) diff --git a/dojo/filters.py b/dojo/filters.py index 4ae5224dab6..f8538f0f53d 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -1889,6 +1889,8 @@ class FindingFilterHelper(FilterSet): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + if "test__test_type" in self.form.fields: + self.form.fields["test__test_type"].queryset = get_visible_scan_types() def set_date_fields(self, *args: list, **kwargs: dict): date_input_widget = forms.DateInput(attrs={"class": "datepicker", "placeholder": "YYYY-MM-DD"}, format="%Y-%m-%d") @@ -2023,9 +2025,6 @@ def __init__(self, *args, **kwargs): del self.form.fields["test__engagement__product__name_contains"] del self.form.fields["test__engagement__product__prod_type__name"] del self.form.fields["test__engagement__product__prod_type__name_contains"] - else: - del self.form.fields["test__name"] - del self.form.fields["test__name_contains"] class FindingFilter(FindingFilterHelper, FindingTagFilter): @@ -2075,9 +2074,6 @@ def __init__(self, *args, **kwargs): # Don't show the product filter on the product finding view self.set_related_object_fields(*args, **kwargs) - if "test__test_type" in self.form.fields: - self.form.fields["test__test_type"].queryset = get_visible_scan_types() - def set_related_object_fields(self, *args: list, **kwargs: dict): finding_group_query = Finding_Group.objects.all() if self.pid is not None: diff --git a/dojo/fixtures/dojo_testdata.json b/dojo/fixtures/dojo_testdata.json index 26148621eaf..9575d2aba3a 100644 --- a/dojo/fixtures/dojo_testdata.json +++ b/dojo/fixtures/dojo_testdata.json @@ -112,6 +112,24 @@ "date_joined": "2018-04-13T07:59:51.527Z" } }, + { + "pk": 7, + "model": "auth.user", + "fields": { + "username": "globalWriter", + "first_name": "Global", + "last_name": "Writer", + "is_active": true, + "is_superuser": false, + "is_staff": false, + "last_login": null, + "groups": [], + "user_permissions": [], + "password": "pbkdf2_sha256$36000$pe8Ff8HrBPac$Lb3ee6/R9z/aL9nM+D2AXWTpIt9Pa9kcLueXxYNy1ZY=", + "email": "global_writer@email.com", + "date_joined": "2018-04-13T07:59:51.527Z" + } + }, { "pk": "2dqr18yqu9mzb87abk0okid75w2clakl", "model": "sessions.session", @@ -2804,6 +2822,14 @@ "created": "2018-04-16T06:54:35.933Z" } }, + { + "pk": "184770c4c3256aba904297610fbb4da3fa15ba37", + "model": "authtoken.token", + "fields": { + "user": 7, + "created": "2018-04-16T06:54:35.933Z" + } + }, { "pk": "1", "model": "dojo.dojo_group", @@ -2871,6 +2897,14 @@ "role": 5 } }, + { + "pk": 4, + "model": "dojo.global_role", + "fields": { + "user": 7, + "role": 2 + } + }, { "model": "dojo.language_type", "pk": 1, diff --git a/dojo/jira_link/helper.py b/dojo/jira_link/helper.py index add39666c0d..5ad040fd003 100644 --- a/dojo/jira_link/helper.py +++ b/dojo/jira_link/helper.py @@ -906,13 +906,30 @@ def failure_to_add_message(message: str, exception: Exception, _: Any) -> bool: message = f"Object {obj.id} cannot be pushed to JIRA as the JIRA instance has been deleted or is not available." return failure_to_add_message(message, None, obj) - obj_can_be_pushed_to_jira, error_message, _error_code = can_be_pushed_to_jira(obj) + obj_can_be_pushed_to_jira, error_message, error_code = can_be_pushed_to_jira(obj) if not obj_can_be_pushed_to_jira: + # Expected validation failures (not verified, not active, below threshold) + # should not create alerts when auto-pushing via "push all issues" + # These are expected conditions that don't indicate a problem + expected_validation_errors = [ + "error_not_active_or_verified", + "error_below_minimum_threshold", + "error_empty", + "error_inactive", + ] + # not sure why this check is not part of can_be_pushed_to_jira, but afraid to change it if isinstance(obj, Finding) and obj.duplicate and not obj.active: - logger.warning("%s will not be pushed to JIRA as it's a duplicate finding", to_str_typed(obj)) - log_jira_cannot_be_pushed_reason(error_message + " and findis a duplicate", obj) + logger.info("%s will not be pushed to JIRA as it's a duplicate finding", to_str_typed(obj)) + # Duplicates are expected, don't create alerts + logger.info("%s cannot be pushed to JIRA: %s (expected - duplicate finding)", + to_str_typed(obj), error_message) + elif error_code in expected_validation_errors: + # These are expected when auto-pushing, only log, don't alert + logger.info("%s cannot be pushed to JIRA: %s (expected - finding not ready yet)", + to_str_typed(obj), error_message) else: + # Unexpected errors (configuration issues, etc.) should still alert log_jira_cannot_be_pushed_reason(error_message, obj) logger.warning("%s cannot be pushed to JIRA: %s.", to_str_typed(obj), error_message) logger.warning("The JIRA issue will NOT be created.") diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index a5141612fea..d70239b6829 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -1260,43 +1260,72 @@ def saml2_attrib_map_format(din): "task": "dojo.tasks.add_alerts", "schedule": timedelta(hours=1), "args": [timedelta(hours=1)], + "options": { + "expires": int(60 * 60 * 1 * 1.2), # If a task is not executed within 72 minutes, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "cleanup-alerts": { "task": "dojo.tasks.cleanup_alerts", "schedule": timedelta(hours=8), + "options": { + "expires": int(60 * 60 * 8 * 1.2), # If a task is not executed within 9.6 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "dedupe-delete": { "task": "dojo.tasks.async_dupe_delete", "schedule": timedelta(minutes=1), - "args": [timedelta(minutes=1)], + "options": { + "expires": int(60 * 1 * 1.2), # If a task is not executed within 72 seconds, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "flush_auditlog": { "task": "dojo.tasks.flush_auditlog", "schedule": timedelta(hours=8), + "options": { + "expires": int(60 * 60 * 8 * 1.2), # If a task is not executed within 9.6 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "update-findings-from-source-issues": { "task": "dojo.tools.tool_issue_updater.update_findings_from_source_issues", "schedule": timedelta(hours=3), + "options": { + "expires": int(60 * 60 * 3 * 1.2), # If a task is not executed within 9 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "compute-sla-age-and-notify": { "task": "dojo.tasks.async_sla_compute_and_notify_task", "schedule": crontab(hour=7, minute=30), + "options": { + "expires": int(60 * 60 * 24 * 1.2), # If a task is not executed within 28.8 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "risk_acceptance_expiration_handler": { "task": "dojo.risk_acceptance.helper.expiration_handler", - "schedule": crontab(minute=0, hour="*/3"), # every 3 hours + "schedule": crontab(minute=0, hour="*/3"), # every 72 minutes + "options": { + "expires": int(60 * 60 * 3 * 1.2), # If a task is not executed within 9 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "notification_webhook_status_cleanup": { "task": "dojo.notifications.helper.webhook_status_cleanup", "schedule": timedelta(minutes=1), + "options": { + "expires": int(60 * 1 * 1.2), # If a task is not executed within 72 seconds, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "trigger_evaluate_pro_proposition": { "task": "dojo.tasks.evaluate_pro_proposition", "schedule": timedelta(hours=8), + "options": { + "expires": int(60 * 60 * 8 * 1.2), # If a task is not executed within 9.6 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, "clear_sessions": { "task": "dojo.tasks.clear_sessions", "schedule": crontab(hour=0, minute=0, day_of_week=0), + "options": { + "expires": int(60 * 60 * 24 * 7 * 1.2), # If a task is not executed within 8.4 days, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. + }, }, # 'jira_status_reconciliation': { # 'task': 'dojo.tasks.jira_status_reconciliation_task', diff --git a/dojo/system_settings/views.py b/dojo/system_settings/views.py index b0bce9d52d3..67bad91dc0e 100644 --- a/dojo/system_settings/views.py +++ b/dojo/system_settings/views.py @@ -9,7 +9,7 @@ from dojo.forms import SystemSettingsForm from dojo.models import System_Settings -from dojo.utils import add_breadcrumb, get_celery_worker_status +from dojo.utils import add_breadcrumb, get_celery_queue_length, get_celery_worker_status logger = logging.getLogger(__name__) @@ -110,6 +110,15 @@ def get_celery_status( else: context["celery_msg"] = "Celery does not appear to be up and running. Please ensure celery is running." context["celery_status"] = "Not Running" + + q_len = get_celery_queue_length() + if q_len is None: + context["celery_q_len"] = " It is not possible to identify number of waiting tasks." + elif q_len: + context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed." + else: + context["celery_q_len"] = "No task is waiting to be proccessed." + else: context["celery_bool"] = False context["celery_msg"] = "Celery needs to have the setting CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' set in settings.py." diff --git a/dojo/tasks.py b/dojo/tasks.py index d02040fa5b3..4b780b952cd 100644 --- a/dojo/tasks.py +++ b/dojo/tasks.py @@ -126,7 +126,7 @@ def _async_dupe_delete_impl(): # limit to settings.DUPE_DELETE_MAX_PER_RUN to prevent overlapping jobs results = Finding.objects \ .filter(duplicate=True) \ - .order_by() \ + .order_by("date") \ .values("duplicate_finding") \ .annotate(num_dupes=Count("id")) \ .filter(num_dupes__gt=dupe_max)[:total_duplicate_delete_count_max_per_run] diff --git a/dojo/templates/dojo/system_settings.html b/dojo/templates/dojo/system_settings.html index 02510452e16..4ea772bc7a3 100644 --- a/dojo/templates/dojo/system_settings.html +++ b/dojo/templates/dojo/system_settings.html @@ -26,6 +26,9 @@

Celery {{celery_status}}

{{celery_msg}}
+
+ {{celery_q_len}} +
diff --git a/dojo/utils.py b/dojo/utils.py index cad81d3ad23..3db3a437174 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -20,6 +20,7 @@ import crum import cvss import vobject +from amqp.exceptions import ChannelError from auditlog.models import LogEntry from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -42,6 +43,7 @@ from django.utils import timezone from django.utils.http import url_has_allowed_host_and_scheme from django.utils.translation import gettext as _ +from kombu import Connection from dojo.authorization.roles_permissions import Permissions from dojo.celery import app @@ -1322,6 +1324,18 @@ def get_celery_worker_status(): return False +def get_celery_queue_length(): + try: + with Connection(settings.CELERY_BROKER_URL) as conn, conn.SimpleQueue("celery") as queue: + return queue.qsize() + except ChannelError as e: + if "NOT_FOUND" in str(e): + return 0 + return None + except: + return None + + # Used to display the counts and enabled tabs in the product view # Uses @cached_property for lazy loading to avoid expensive queries on every page load # See: https://github.com/DefectDojo/django-DefectDojo/issues/10313 diff --git a/dojo/views.py b/dojo/views.py index d6b33f8c08b..3939abff987 100644 --- a/dojo/views.py +++ b/dojo/views.py @@ -15,6 +15,7 @@ from dojo.authorization.authorization import ( user_has_configuration_permission_or_403, + user_has_global_permission, user_has_permission, user_has_permission_or_403, ) @@ -115,10 +116,17 @@ def action_history(request, cid, oid): elif ct.model == "risk_acceptance": engagements = Engagement.objects.filter(risk_acceptance=obj) authorized = False - for engagement in engagements: - if user_has_permission(request.user, engagement, Permissions.Engagement_View): - authorized = True - break + fetched_engagements = list(engagements) + # Check the case that there are no engagements associated with the risk acceptance + if len(fetched_engagements) == 0: + # Determine if the user has risk acceptance view permission globally + authorized = user_has_global_permission(request.user, Permissions.Risk_Acceptance) + else: + # Iterate through engagements to see if the user has view permission on any of them + for engagement in fetched_engagements: + if user_has_permission(request.user, engagement, Permissions.Engagement_View): + authorized = True + break if not authorized: raise PermissionDenied elif ct.model == "user": diff --git a/helm/defectdojo/Chart.yaml b/helm/defectdojo/Chart.yaml index 4c740040829..9c74dfd5350 100644 --- a/helm/defectdojo/Chart.yaml +++ b/helm/defectdojo/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 -appVersion: "2.54.3" +appVersion: "2.55.0-dev" description: A Helm chart for Kubernetes to install DefectDojo name: defectdojo -version: 1.9.9 +version: 1.9.10-dev icon: https://defectdojo.com/hubfs/DefectDojo_favicon.png maintainers: - name: madchap @@ -33,5 +33,5 @@ dependencies: # - kind: security # description: Critical bug annotations: - artifacthub.io/prerelease: "false" - artifacthub.io/changes: "- kind: changed\n description: Bump DefectDojo to 2.54.3\n" + artifacthub.io/prerelease: "true" + artifacthub.io/changes: "" diff --git a/helm/defectdojo/README.md b/helm/defectdojo/README.md index eb755edc7a4..bab94c770b5 100644 --- a/helm/defectdojo/README.md +++ b/helm/defectdojo/README.md @@ -511,7 +511,7 @@ The HELM schema will be generated for you. # General information about chart values -![Version: 1.9.9](https://img.shields.io/badge/Version-1.9.9-informational?style=flat-square) ![AppVersion: 2.54.3](https://img.shields.io/badge/AppVersion-2.54.3-informational?style=flat-square) +![Version: 1.9.10-dev](https://img.shields.io/badge/Version-1.9.10--dev-informational?style=flat-square) ![AppVersion: 2.55.0-dev](https://img.shields.io/badge/AppVersion-2.55.0--dev-informational?style=flat-square) A Helm chart for Kubernetes to install DefectDojo diff --git a/requirements-dev.txt b/requirements-dev.txt index d2a367fca58..8f6bc1ada76 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -12,4 +12,4 @@ django-test-migrations==1.5.0 parameterized==0.9.0 # Development file watching (hot reload) -watchdog==6.0.0 +watchdog[watchmedo]==6.0.0 diff --git a/unittests/test_duplication_loops.py b/unittests/test_duplication_loops.py index 9a84024e560..79b3cf1b966 100644 --- a/unittests/test_duplication_loops.py +++ b/unittests/test_duplication_loops.py @@ -6,12 +6,14 @@ from dojo.finding.deduplication import set_duplicate from dojo.management.commands.fix_loop_duplicates import fix_loop_duplicates from dojo.models import Engagement, Finding, Product, User, copy_model_util +from dojo.tasks import async_dupe_delete from .dojo_test_case import DojoTestCase logger = logging.getLogger(__name__) +# tests here class TestDuplicationLoops(DojoTestCase): fixtures = ["dojo_testdata.json"] @@ -373,6 +375,26 @@ def test_list_relations_for_three_reverse(self): self.assertEqual(self.finding_c.duplicate_finding_set().count(), 2) self.assertEqual(self.finding_b.duplicate_finding_set().count(), 2) + # Test that Delete Duplicate Findings & Maximum Duplicate is correctly deleting olding finding first based off of finding date value + @override_settings(DELETE_DUPLICATE_FINDINGS=True) + @override_settings(MAXIMUM_DUPLICATES=1) + def test_delete_duplicate_order(self): + self.finding_b.date = "2023-01-01" + self.finding_c.date = "2024-01-01" + set_duplicate(self.finding_b, self.finding_a) + set_duplicate(self.finding_c, self.finding_a) + + async_dupe_delete() + + # Finding b should be deleted because it is the older finding + + # I think this is how you would access it from the database + Finding.objects.get(id=self.finding_b.id) + # Why am I still able to access finding b. It should be deleted??? + + self.assertEqual(self.finding_a.duplicate_finding_set().count(), 2) + self.assertEqual(self.finding_a.duplicate_finding_set().first().id, self.finding_b.id) + def test_delete_all_engagements(self): # make sure there is no exception when deleting all engagements for engagement in Engagement.objects.all().order_by("id"): diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index 7c5b289a211..151d22e5d58 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -215,7 +215,7 @@ def test_product_types(self, mock): with self.subTest("product_type_added"): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod_type = Product_Type.objects.create(name="notif prod type") - self.assertEqual(mock.call_count, last_count + 4) + self.assertEqual(mock.call_count, last_count + 5) self.assertEqual(mock.call_args_list[-1].args[0], "product_type_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/type/{prod_type.id}") @@ -236,7 +236,7 @@ def test_products(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "product_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/{prod.id}") @@ -257,7 +257,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod = Product.objects.first() eng = Engagement.objects.create(product=prod, target_start=timezone.now(), target_end=timezone.now()) - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}") @@ -266,7 +266,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): eng.status = "Completed" eng.save() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_closed") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}/finding/all") @@ -275,7 +275,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): eng.status = "In Progress" eng.save() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_reopened") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}") @@ -376,7 +376,7 @@ def test_finding_groups(self, mock): with self.subTest("test_deleted itself"): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): fg2.delete() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "finding_group_deleted") self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The finding group "fg test" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/test/{test2.id}") diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 7f2001dd193..7e9af36d82e 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -14,6 +14,7 @@ from django.conf import settings from django.contrib.auth.models import Permission +from django.core.files.uploadedfile import SimpleUploadedFile from django.test import tag as test_tag from django.test.utils import override_settings from django.urls import reverse @@ -21,6 +22,7 @@ from drf_spectacular.drainage import GENERATOR_STATS from drf_spectacular.settings import spectacular_settings from drf_spectacular.validation import validate_schema +from parameterized import parameterized from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.mixins import ( @@ -368,34 +370,36 @@ class TestType(Enum): class BaseClass: class RESTEndpointTest(DojoAPITestCase): + NOT_AUTHORIZED_USER_ID = 3 + GLOBAL_READER_USER_ID = 5 + GLOBAL_WRITER_USER_ID = 7 + GLOBAL_OWNER_USER_ID = 6 + def __init__(self, *args, **kwargs): DojoAPITestCase.__init__(self, *args, **kwargs) - def setUp(self): - testuser = User.objects.get(username="admin") + def _get_client(self, user_criteria: dict) -> None: + testuser = User.objects.get(**user_criteria) token = Token.objects.get(user=testuser) self.client = APIClient() self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + + def setUp(self): + self._get_client({"username": "admin"}) self.url = reverse(self.viewname + "-list") self.schema = get_open_api3_json_schema() def setUp_not_authorized(self): - testuser = User.objects.get(id=3) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.NOT_AUTHORIZED_USER_ID}) def setUp_global_reader(self): - testuser = User.objects.get(id=5) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.GLOBAL_READER_USER_ID}) + + def setUp_global_writer(self): + self._get_client({"id": self.GLOBAL_WRITER_USER_ID}) def setUp_global_owner(self): - testuser = User.objects.get(id=6) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.GLOBAL_OWNER_USER_ID}) def check_schema(self, schema, obj): schema_checker = SchemaChecker(self.schema["components"]) @@ -1179,6 +1183,42 @@ def __init__(self, *args, **kwargs): self.deleted_objects = 23 BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs) + @parameterized.expand( + [ + ("files", {"title": "test", "file": b"empty"}), + ("notes", {"entry": "string"}), + ], + ) + def test_related_objects(self, related_object_path, payload): + """ + Tests that BaseRelatedObjectPermission enforces the permissions not associated + with the base object. For example, even though a request to add a note to an + engagement is a POST, we do not need engagement add permissions, but rather + engagement edit permissions since that is what is defined in the + UserHasEngagementRelatedObjectPermission class + """ + self.setUp_global_reader() + # Get an engagement + response = self.client.get(self.url, format="json") + self.assertEqual(200, response.status_code, response.content[:1000]) + engagement_id = response.data["results"][0]["id"] + # Attempt to add a related object + relative_url = f"{self.url}{engagement_id}/{related_object_path}/" + response = self.client.post(relative_url, payload) + self.assertEqual(403, response.status_code, response.content[:1000]) + # Now switch to a user with edit permissions (but not create) + self.setUp_global_writer() + # Retry adding the related object + if related_object_path == "files": + # Convert bytes to a mock uploaded file + payload["file"] = SimpleUploadedFile( + name="test_file.txt", + content=payload["file"], # the b"empty" + content_type="text/plain", + ) + response = self.client.post(relative_url, payload) + self.assertEqual(201, response.status_code, response.content[:1000]) + class RiskAcceptanceTest(BaseClass.BaseClassTest): fixtures = ["dojo_testdata.json"] @@ -3440,6 +3480,66 @@ def test_delete(self): response = self.client.delete(relative_url) self.assertEqual(409, response.status_code, response.content[:1000]) + def test_list_method_requires_no_authorization(self): + """ + Tests the use case of not supplying GET permissions for the BaseDjangoModelPermission + class used in the UserHasDevelopmentEnvironmentPermission class. + """ + self.setUp_not_authorized() + response = self.client.get(self.url, format="json") + self.assertEqual(200, response.status_code, response.content[:1000]) + + @parameterized.expand( + [ + ( + "add_development_environment", + "post", + 201, + { + "name": "Test_1", + }, + ), + ( + "change_development_environment", + "put", + 200, + {"name": "Test_2"}, + ), + ( + "change_development_environment", + "put", + 200, + {"name": "Test_3"}, + ), + ( + "delete_development_environment", + "delete", + 409, # Deletion is blocked because of existing references, but it is better than 403 for this test + None, + ), + ], + ) + def test_user_needs_configuration_permission(self, codename, method, expected_status, payload): + """ + Tests that BaseDjangoModelPermission enforces the django configuration permissions + through the class used in the UserHasDevelopmentEnvironmentPermission class. + """ + # Ensure we get a 403 first + self.setUp_not_authorized() + response = self.client.post(self.url, payload, format="json") + self.assertEqual(403, response.status_code, response.content[:1000]) + # Now Get the same user as self.client is using, add the permission, and try again + testuser = User.objects.get(id=self.NOT_AUTHORIZED_USER_ID) + permission = Permission.objects.get(codename=codename) + testuser.user_permissions.add(permission) + if method in {"put", "patch", "delete"}: + current_objects = self.client.get(self.url, format="json").data + relative_url = self.url + "{}/".format(current_objects["results"][-1]["id"]) + else: + relative_url = self.url + response = getattr(self.client, method)(relative_url, payload, format="json") + self.assertEqual(expected_status, response.status_code, response.content[:1000]) + class TestTypeTest(BaseClass.AuthenticatedViewTest): fixtures = ["dojo_testdata.json"] @@ -3635,7 +3735,7 @@ def __init__(self, *args, **kwargs): } self.update_fields = {"style": "warning"} self.test_type = TestType.CONFIGURATION_PERMISSIONS - self.deleted_objects = 7 + self.deleted_objects = 8 BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs) def test_create(self):