diff --git a/dojo/models.py b/dojo/models.py index 396e851f9b4..a06d2bde864 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -1093,8 +1093,8 @@ def save(self, *args, **kwargs): product.async_updating = True super(Product, product).save() # launch the async task to update all finding sla expiration dates - from dojo.sla_config.helpers import update_sla_expiration_dates_sla_config_async # noqa: I001, PLC0415 circular import - update_sla_expiration_dates_sla_config_async(self, products, tuple(severities)) + from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import + async_update_sla_expiration_dates_sla_config_sync(self, products, severities=severities) def clean(self): sla_days = [self.critical, self.high, self.medium, self.low] @@ -1255,8 +1255,8 @@ def save(self, *args, **kwargs): sla_config.async_updating = True super(SLA_Configuration, sla_config).save() # launch the async task to update all finding sla expiration dates - from dojo.sla_config.helpers import update_sla_expiration_dates_product_async # noqa: I001, PLC0415 circular import - update_sla_expiration_dates_product_async(self, sla_config) + from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import + async_update_sla_expiration_dates_sla_config_sync(sla_config, Product.objects.filter(id=self.id)) def get_absolute_url(self): return reverse("view_product", args=[str(self.id)]) @@ -3146,16 +3146,25 @@ def get_sla_configuration(self): return self.test.engagement.product.sla_configuration def get_sla_period(self): + # Determine which method to use to calculate the SLA + from dojo.utils import get_custom_method # noqa: PLC0415 circular import + if method := get_custom_method("FINDING_SLA_PERIOD_METHOD"): + return method(self) + # Run the default method sla_configuration = self.get_sla_configuration() sla_period = getattr(sla_configuration, self.severity.lower(), None) enforce_period = getattr(sla_configuration, str("enforce_" + self.severity.lower()), None) return sla_period, enforce_period def set_sla_expiration_date(self): + # First check if SLA is enabled globally system_settings = System_Settings.objects.get() if not system_settings.enable_finding_sla: return + # Call the internal method to set the sla expiration date + self._set_sla_expiration_date() + def _set_sla_expiration_date(self): # some parsers provide date as a `str` instead of a `date` in which case we need to parse it #12299 on GitHub sla_start_date = self.get_sla_start_date() if sla_start_date and isinstance(sla_start_date, str): diff --git a/dojo/sla_config/helpers.py b/dojo/sla_config/helpers.py index 57633d0c2ec..da5899a85b0 100644 --- a/dojo/sla_config/helpers.py +++ b/dojo/sla_config/helpers.py @@ -2,26 +2,27 @@ from dojo.celery import app from dojo.decorators import dojo_async_task -from dojo.models import Finding, Product, SLA_Configuration -from dojo.utils import calculate_grade, mass_model_updater +from dojo.models import Finding, Product, SLA_Configuration, System_Settings +from dojo.utils import get_custom_method, mass_model_updater logger = logging.getLogger(__name__) @dojo_async_task @app.task -def update_sla_expiration_dates_sla_config_async(sla_config, products, severities, *args, **kwargs): - update_sla_expiration_dates_sla_config_sync(sla_config, products, severities) +def async_update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], *args, severities: list[str] | None = None, **kwargs): + if method := get_custom_method("FINDING_SLA_EXPIRATION_CALCULATION_METHOD"): + method(sla_config, products, severities=severities) + else: + update_sla_expiration_dates_sla_config_sync(sla_config, products, severities=severities) -@dojo_async_task -@app.task -def update_sla_expiration_dates_product_async(product, sla_config, *args, **kwargs): - update_sla_expiration_dates_sla_config_sync(sla_config, [product]) - - -def update_sla_expiration_dates_sla_config_sync(sla_config, products, severities=None): +def update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], severities: list[str] | None = None): logger.info("Updating finding SLA expiration dates within the %s SLA configuration", sla_config) + # First check if SLA is enabled globally + system_settings = System_Settings.objects.get() + if not system_settings.enable_finding_sla: + return # update each finding that is within the SLA configuration that was saved findings = Finding.objects.filter(test__engagement__product__sla_configuration_id=sla_config.id) if products: @@ -29,22 +30,22 @@ def update_sla_expiration_dates_sla_config_sync(sla_config, products, severities if severities: findings = findings.filter(severity__in=severities) - findings = findings.prefetch_related( + findings = ( + findings.prefetch_related( "test", "test__engagement", "test__engagement__product", "test__engagement__product__sla_configuration", + ) + .order_by("id") + .only("id", "sla_start_date", "date", "severity", "test") ) - - findings = findings.order_by("id").only("id", "sla_start_date", "date", "severity", "test") - + # Call the internal method so that we are not checking system settings for each finding mass_model_updater(Finding, findings, lambda f: f.set_sla_expiration_date(), fields=["sla_expiration_date"]) # reset the async updating flag to false for all products using this sla config - for product in products: - product.async_updating = False - super(Product, product).save() - calculate_grade(product) + # use update as we don't want save() and signals to be triggered + products.update(async_updating=False) # reset the async updating flag to false for this sla config sla_config.async_updating = False