From ce489cceafb36497665efbdb544dc50b4878735d Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Tue, 18 Mar 2025 09:18:31 +0100 Subject: [PATCH 1/6] :recycle: Deprecate async import --- .../installation/running-in-production.md | 19 ----- dojo/importers/base_importer.py | 41 ----------- dojo/importers/default_importer.py | 38 +--------- dojo/importers/default_reimporter.py | 60 +--------------- dojo/importers/endpoint_manager.py | 70 ++----------------- dojo/settings/settings.dist.py | 10 --- 6 files changed, 6 insertions(+), 232 deletions(-) diff --git a/docs/content/en/open_source/installation/running-in-production.md b/docs/content/en/open_source/installation/running-in-production.md index 628c1cc7a12..7e6e90b9ef7 100644 --- a/docs/content/en/open_source/installation/running-in-production.md +++ b/docs/content/en/open_source/installation/running-in-production.md @@ -75,22 +75,3 @@ You can execute the following command to see the configuration: `docker compose exec celerybeat bash -c "celery -A dojo inspect stats"` and see what is in effect. - -### Asynchronous Import - -This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent. - -Import and Re-Import can also be configured to handle uploads asynchronously to aid in -processing especially large scans. It works by batching Findings and Endpoints by a -configurable amount. Each batch will be be processed in separate celery tasks. - -The following variables impact async imports. - -- `DD_ASYNC_FINDING_IMPORT` defaults to False -- `DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE` defaults to 100 - -When using asynchronous imports with dynamic scanners, Endpoints will continue to "trickle" in -even after the import has returned a successful response. This is because processing continues -to occur after the Findings have already been imported. - -To determine if an import has been fully completed, please see the progress bar in the appropriate test. diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index c74de49da14..1a5f6cf49a0 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -1,6 +1,5 @@ import base64 import logging -from warnings import warn from django.conf import settings from django.core.exceptions import ValidationError @@ -233,33 +232,11 @@ def sync_process_findings( """ return self.process_findings(parsed_findings, sync=True, **kwargs) - def async_process_findings( - self, - parsed_findings: list[Finding], - **kwargs: dict, - ) -> list[Finding]: - """ - Processes findings in chunks within N number of processes. The - ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many - findings will be processed in a given worker/process/thread - """ - warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.") - return self.process_findings(parsed_findings, sync=False, **kwargs) - def determine_process_method( self, parsed_findings: list[Finding], **kwargs: dict, ) -> list[Finding]: - """ - Determines whether to process the scan iteratively, or in chunks, - based upon the ASYNC_FINDING_IMPORT setting - """ - if settings.ASYNC_FINDING_IMPORT: - return self.async_process_findings( - parsed_findings, - **kwargs, - ) return self.sync_process_findings( parsed_findings, **kwargs, @@ -491,24 +468,6 @@ def construct_imported_message( return message - def chunk_findings( - self, - finding_list: list[Finding], - chunk_size: int = settings.ASYNC_FINDING_IMPORT_CHUNK_SIZE, - ) -> list[list[Finding]]: - """ - Split a single large list into a list of lists of size `chunk_size`. - For Example - ``` - >>> chunk_findings([A, B, C, D, E], 2) - >>> [[A, B], [B, C], [E]] - ``` - """ - # Break the list of parsed findings into "chunk_size" lists - chunk_list = [finding_list[i:i + chunk_size] for i in range(0, len(finding_list), chunk_size)] - logger.debug(f"Split endpoints/findings into {len(chunk_list)} chunks of {chunk_size}") - return chunk_list - def update_test_progress( self, percentage_value: int = 100, diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index fad8650e925..dfea572aaf5 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -1,8 +1,7 @@ import logging -from warnings import warn from django.core.files.uploadedfile import TemporaryUploadedFile -from django.core.serializers import deserialize, serialize +from django.core.serializers import serialize from django.db.models.query_utils import Q from django.urls import reverse @@ -398,38 +397,3 @@ def parse_findings_dynamic_test_type( logger.debug("IMPORT_SCAN parser v2: Parse findings (aggregate)") # Aggregate all the findings and return them with the newly created test return self.parse_dynamic_test_type_findings_from_tests(tests) - - def async_process_findings( - self, - parsed_findings: list[Finding], - **kwargs: dict, - ) -> list[Finding]: - """ - Processes findings in chunks within N number of processes. The - ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many - findings will be processed in a given worker/process/thread - """ - warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.") - chunk_list = self.chunk_findings(parsed_findings) - results_list = [] - new_findings = [] - # First kick off all the workers - for findings_list in chunk_list: - result = self.process_findings( - findings_list, - sync=False, - **kwargs, - ) - # Since I dont want to wait until the task is done right now, save the id - # So I can check on the task later - results_list += [result] - # After all tasks have been started, time to pull the results - logger.info("IMPORT_SCAN: Collecting Findings") - for results in results_list: - serial_new_findings = results - new_findings += [next(deserialize("json", finding)).object for finding in serial_new_findings] - logger.info("IMPORT_SCAN: All Findings Collected") - # Indicate that the test is not complete yet as endpoints will still be rolling in. - self.test.percent_complete = 50 - self.test.save() - return new_findings diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index ff29e416d8d..f5f623ac9c5 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -1,8 +1,7 @@ import logging -from warnings import warn from django.core.files.uploadedfile import TemporaryUploadedFile -from django.core.serializers import deserialize, serialize +from django.core.serializers import serialize from django.db.models.query_utils import Q import dojo.finding.helper as finding_helper @@ -323,63 +322,6 @@ def parse_findings_dynamic_test_type( logger.debug("REIMPORT_SCAN parser v2: Create parse findings") return super().parse_findings_dynamic_test_type(scan, parser) - def async_process_findings( - self, - parsed_findings: list[Finding], - **kwargs: dict, - ) -> tuple[list[Finding], list[Finding], list[Finding], list[Finding]]: - """ - Processes findings in chunks within N number of processes. The - ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many - findings will be processed in a given worker/process/thread - """ - warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.") - # Indicate that the test is not complete yet as endpoints will still be rolling in. - self.update_test_progress(percentage_value=50) - chunk_list = self.chunk_findings(parsed_findings) - results_list = [] - new_findings = [] - reactivated_findings = [] - findings_to_mitigate = [] - untouched_findings = [] - # First kick off all the workers - for findings_list in chunk_list: - result = self.process_findings( - findings_list, - sync=False, - **kwargs, - ) - # Since I dont want to wait until the task is done right now, save the id - # So I can check on the task later - results_list += [result] - # After all tasks have been started, time to pull the results - logger.debug("REIMPORT_SCAN: Collecting Findings") - for results in results_list: - ( - serial_new_findings, - serial_reactivated_findings, - serial_findings_to_mitigate, - serial_untouched_findings, - ) = results - new_findings += [ - next(deserialize("json", finding)).object - for finding in serial_new_findings - ] - reactivated_findings += [ - next(deserialize("json", finding)).object - for finding in serial_reactivated_findings - ] - findings_to_mitigate += [ - next(deserialize("json", finding)).object - for finding in serial_findings_to_mitigate - ] - untouched_findings += [ - next(deserialize("json", finding)).object - for finding in serial_untouched_findings - ] - logger.debug("REIMPORT_SCAN: All Findings Collected") - return new_findings, reactivated_findings, findings_to_mitigate, untouched_findings - def match_new_finding_to_existing_finding( self, unsaved_finding: Finding, diff --git a/dojo/importers/endpoint_manager.py b/dojo/importers/endpoint_manager.py index 625e3cb8073..255f392d37e 100644 --- a/dojo/importers/endpoint_manager.py +++ b/dojo/importers/endpoint_manager.py @@ -1,6 +1,5 @@ import logging -from django.conf import settings from django.core.exceptions import MultipleObjectsReturned, ValidationError from django.urls import reverse from django.utils import timezone @@ -95,24 +94,6 @@ def reactivate_endpoint_status( endpoint_status.save() return - def chunk_endpoints( - self, - endpoint_list: list[Endpoint], - chunk_size: int = settings.ASYNC_FINDING_IMPORT_CHUNK_SIZE, - ) -> list[list[Endpoint]]: - """ - Split a single large list into a list of lists of size `chunk_size`. - For Example - ``` - >>> chunk_endpoints([A, B, C, D, E], 2) - >>> [[A, B], [B, C], [E]] - ``` - """ - # Break the list of parsed findings into "chunk_size" lists - chunk_list = [endpoint_list[i:i + chunk_size] for i in range(0, len(endpoint_list), chunk_size)] - logger.debug(f"Split endpoints into {len(chunk_list)} chunks of {chunk_size}") - return chunk_list - def chunk_endpoints_and_disperse( self, finding: Finding, @@ -124,19 +105,8 @@ def chunk_endpoints_and_disperse( chunk up the findings to be dispersed into individual celery workers. Otherwise, only use one worker """ - if settings.ASYNC_FINDING_IMPORT: - chunked_list = self.chunk_endpoints(endpoints) - # If there is only one chunk, then do not bother with async - if len(chunked_list) < 2: - self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True) - return [] - # First kick off all the workers - for endpoints_list in chunked_list: - self.add_endpoints_to_unsaved_finding(finding, endpoints_list, sync=False) - else: - # Do not run this asynchronously or chunk the endpoints - self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True) - return None + self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True) + return def clean_unsaved_endpoints( self, @@ -158,23 +128,7 @@ def chunk_endpoints_and_reactivate( endpoint_status_list: list[Endpoint_Status], **kwargs: dict, ) -> None: - """ - Reactivates all endpoint status objects. Whether this function will asynchronous or not is dependent - on the ASYNC_FINDING_IMPORT setting. If it is set to true, endpoint statuses will be chunked, - and dispersed over celery workers. - """ - # Determine if this can be run async - if settings.ASYNC_FINDING_IMPORT: - chunked_list = self.chunk_endpoints(endpoint_status_list) - # If there is only one chunk, then do not bother with async - if len(chunked_list) < 2: - self.reactivate_endpoint_status(endpoint_status_list, sync=True) - logger.debug(f"Split endpoints into {len(chunked_list)} chunks of {len(chunked_list[0])}") - # First kick off all the workers - for endpoint_status_list in chunked_list: - self.reactivate_endpoint_status(endpoint_status_list, sync=False) - else: - self.reactivate_endpoint_status(endpoint_status_list, sync=True) + self.reactivate_endpoint_status(endpoint_status_list, sync=True) return def chunk_endpoints_and_mitigate( @@ -183,23 +137,7 @@ def chunk_endpoints_and_mitigate( user: Dojo_User, **kwargs: dict, ) -> None: - """ - Mitigates all endpoint status objects. Whether this function will asynchronous or not is dependent - on the ASYNC_FINDING_IMPORT setting. If it is set to true, endpoint statuses will be chunked, - and dispersed over celery workers. - """ - # Determine if this can be run async - if settings.ASYNC_FINDING_IMPORT: - chunked_list = self.chunk_endpoints(endpoint_status_list) - # If there is only one chunk, then do not bother with async - if len(chunked_list) < 2: - self.mitigate_endpoint_status(endpoint_status_list, user, sync=True) - logger.debug(f"Split endpoints into {len(chunked_list)} chunks of {len(chunked_list[0])}") - # First kick off all the workers - for endpoint_status_list in chunked_list: - self.mitigate_endpoint_status(endpoint_status_list, user, sync=False) - else: - self.mitigate_endpoint_status(endpoint_status_list, user, sync=True) + self.mitigate_endpoint_status(endpoint_status_list, user, sync=True) return def update_endpoint_status( diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index b0271be1ba9..36dc34d5135 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -270,12 +270,6 @@ DD_RATE_LIMITER_ACCOUNT_LOCKOUT=(bool, False), # when enabled SonarQube API parser will download the security hotspots DD_SONARQUBE_API_PARSER_HOTSPOTS=(bool, True), - # when enabled, finding importing will occur asynchronously, default False - # This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent. - DD_ASYNC_FINDING_IMPORT=(bool, False), - # The number of findings to be processed per celeryworker - # This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent. - DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE=(int, 100), # When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement # The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement DD_ASYNC_OBJECT_DELETE=(bool, False), @@ -1782,10 +1776,6 @@ def saml2_attrib_map_format(din): # Deside if SonarQube API parser should download the security hotspots SONARQUBE_API_PARSER_HOTSPOTS = env("DD_SONARQUBE_API_PARSER_HOTSPOTS") -# when enabled, finding importing will occur asynchronously, default False -ASYNC_FINDING_IMPORT = env("DD_ASYNC_FINDING_IMPORT") -# The number of findings to be processed per celeryworker -ASYNC_FINDING_IMPORT_CHUNK_SIZE = env("DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE") # When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement # The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement ASYNC_OBJECT_DELETE = env("DD_ASYNC_OBJECT_DELETE") From e29727b099ad9fd5bafd2ab59f0c994243f90ebe Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Tue, 18 Mar 2025 10:28:24 +0100 Subject: [PATCH 2/6] simplify --- dojo/importers/endpoint_manager.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/dojo/importers/endpoint_manager.py b/dojo/importers/endpoint_manager.py index 255f392d37e..709169bf2c1 100644 --- a/dojo/importers/endpoint_manager.py +++ b/dojo/importers/endpoint_manager.py @@ -123,23 +123,6 @@ def clean_unsaved_endpoints( logger.warning(f"DefectDojo is storing broken endpoint because cleaning wasn't successful: {e}") return - def chunk_endpoints_and_reactivate( - self, - endpoint_status_list: list[Endpoint_Status], - **kwargs: dict, - ) -> None: - self.reactivate_endpoint_status(endpoint_status_list, sync=True) - return - - def chunk_endpoints_and_mitigate( - self, - endpoint_status_list: list[Endpoint_Status], - user: Dojo_User, - **kwargs: dict, - ) -> None: - self.mitigate_endpoint_status(endpoint_status_list, user, sync=True) - return - def update_endpoint_status( self, existing_finding: Finding, @@ -169,6 +152,6 @@ def update_endpoint_status( lambda existing_finding_endpoint_status: existing_finding_endpoint_status.endpoint in new_finding_endpoints_list, existing_finding_endpoint_status_list), ) - self.chunk_endpoints_and_reactivate(endpoint_status_to_reactivate) - self.chunk_endpoints_and_mitigate(endpoint_status_to_mitigate, user) + self.reactivate_endpoint_status(endpoint_status_to_reactivate, sync=True) + self.mitigate_endpoint_status(endpoint_status_to_mitigate, user, sync=True) return From a3c0126ef165b532939937f1580fd7eff4ef8d4e Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Tue, 18 Mar 2025 10:30:50 +0100 Subject: [PATCH 3/6] update --- dojo/importers/endpoint_manager.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dojo/importers/endpoint_manager.py b/dojo/importers/endpoint_manager.py index 709169bf2c1..0faba079f88 100644 --- a/dojo/importers/endpoint_manager.py +++ b/dojo/importers/endpoint_manager.py @@ -100,11 +100,6 @@ def chunk_endpoints_and_disperse( endpoints: list[Endpoint], **kwargs: dict, ) -> None: - """ - Determines whether to asynchronously process endpoints on a finding or not. if so, - chunk up the findings to be dispersed into individual celery workers. Otherwise, - only use one worker - """ self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True) return From 1cc9fe630ce6a2f1f2b96b3d091c8b4fd653bc30 Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Tue, 18 Mar 2025 12:55:32 +0100 Subject: [PATCH 4/6] fix unittest --- dojo/importers/endpoint_manager.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/dojo/importers/endpoint_manager.py b/dojo/importers/endpoint_manager.py index 0faba079f88..59bcb11d286 100644 --- a/dojo/importers/endpoint_manager.py +++ b/dojo/importers/endpoint_manager.py @@ -118,6 +118,23 @@ def clean_unsaved_endpoints( logger.warning(f"DefectDojo is storing broken endpoint because cleaning wasn't successful: {e}") return + def chunk_endpoints_and_reactivate( + self, + endpoint_status_list: list[Endpoint_Status], + **kwargs: dict, + ) -> None: + self.reactivate_endpoint_status(endpoint_status_list, sync=True) + return + + def chunk_endpoints_and_mitigate( + self, + endpoint_status_list: list[Endpoint_Status], + user: Dojo_User, + **kwargs: dict, + ) -> None: + self.mitigate_endpoint_status(endpoint_status_list, user, sync=True) + return + def update_endpoint_status( self, existing_finding: Finding, @@ -147,6 +164,6 @@ def update_endpoint_status( lambda existing_finding_endpoint_status: existing_finding_endpoint_status.endpoint in new_finding_endpoints_list, existing_finding_endpoint_status_list), ) - self.reactivate_endpoint_status(endpoint_status_to_reactivate, sync=True) - self.mitigate_endpoint_status(endpoint_status_to_mitigate, user, sync=True) + self.chunk_endpoints_and_reactivate(endpoint_status_to_reactivate) + self.chunk_endpoints_and_mitigate(endpoint_status_to_mitigate, user) return From 9b97d8bb815931c93711f04fa96973c37fb6ab13 Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Thu, 8 May 2025 08:47:27 +0200 Subject: [PATCH 5/6] add docs --- docs/content/en/open_source/upgrading/2.47.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/content/en/open_source/upgrading/2.47.md b/docs/content/en/open_source/upgrading/2.47.md index 5cbab5dcd56..f3fbd1bd7da 100644 --- a/docs/content/en/open_source/upgrading/2.47.md +++ b/docs/content/en/open_source/upgrading/2.47.md @@ -5,3 +5,7 @@ weight: -20250505 description: No special instructions. --- There are no special instructions for upgrading to 2.47.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.47.0) for the contents of the release. + +## Removal of Asynchronous Import + +Please note that asynchronous import has been removed as it was announced in 2.46. If you haven't migrated from this feature yet, we recommend doing before upgrading to 2.47.0 \ No newline at end of file From 307643aeb9f4d805b0d0d247b81ece7ecaaaa31a Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Thu, 15 May 2025 12:03:05 +0200 Subject: [PATCH 6/6] update --- .../en/open_source/installation/running-in-production.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/content/en/open_source/installation/running-in-production.md b/docs/content/en/open_source/installation/running-in-production.md index e7b3d20990b..bee0830b204 100644 --- a/docs/content/en/open_source/installation/running-in-production.md +++ b/docs/content/en/open_source/installation/running-in-production.md @@ -87,3 +87,6 @@ You can execute the following command to see the configuration: `docker compose exec celerybeat bash -c "celery -A dojo inspect stats"` and see what is in effect. + +### Asynchronous Import: Deprecated +This feature has been removed in 2.47.0 \ No newline at end of file