Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions dojo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ def save(self, *args, **kwargs):
product.async_updating = True
super(Product, product).save()
# launch the async task to update all finding sla expiration dates
from dojo.sla_config.helpers import update_sla_expiration_dates_sla_config_async # noqa: I001, PLC0415 circular import
update_sla_expiration_dates_sla_config_async(self, products, tuple(severities))
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
async_update_sla_expiration_dates_sla_config_sync(self, products, severities=severities)

def clean(self):
sla_days = [self.critical, self.high, self.medium, self.low]
Expand Down Expand Up @@ -1255,8 +1255,8 @@ def save(self, *args, **kwargs):
sla_config.async_updating = True
super(SLA_Configuration, sla_config).save()
# launch the async task to update all finding sla expiration dates
from dojo.sla_config.helpers import update_sla_expiration_dates_product_async # noqa: I001, PLC0415 circular import
update_sla_expiration_dates_product_async(self, sla_config)
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
async_update_sla_expiration_dates_sla_config_sync(sla_config, Product.objects.filter(id=self.id))

def get_absolute_url(self):
return reverse("view_product", args=[str(self.id)])
Expand Down Expand Up @@ -3146,16 +3146,25 @@ def get_sla_configuration(self):
return self.test.engagement.product.sla_configuration

def get_sla_period(self):
# Determine which method to use to calculate the SLA
from dojo.utils import get_custom_method # noqa: PLC0415 circular import
if method := get_custom_method("FINDING_SLA_PERIOD_METHOD"):
return method(self)
# Run the default method
sla_configuration = self.get_sla_configuration()
sla_period = getattr(sla_configuration, self.severity.lower(), None)
enforce_period = getattr(sla_configuration, str("enforce_" + self.severity.lower()), None)
return sla_period, enforce_period

def set_sla_expiration_date(self):
# First check if SLA is enabled globally
system_settings = System_Settings.objects.get()
if not system_settings.enable_finding_sla:
return
# Call the internal method to set the sla expiration date
self._set_sla_expiration_date()

def _set_sla_expiration_date(self):
# some parsers provide date as a `str` instead of a `date` in which case we need to parse it #12299 on GitHub
sla_start_date = self.get_sla_start_date()
if sla_start_date and isinstance(sla_start_date, str):
Expand Down
39 changes: 20 additions & 19 deletions dojo/sla_config/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,50 @@

from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.models import Finding, Product, SLA_Configuration
from dojo.utils import calculate_grade, mass_model_updater
from dojo.models import Finding, Product, SLA_Configuration, System_Settings
from dojo.utils import get_custom_method, mass_model_updater

logger = logging.getLogger(__name__)


@dojo_async_task
@app.task
def update_sla_expiration_dates_sla_config_async(sla_config, products, severities, *args, **kwargs):
update_sla_expiration_dates_sla_config_sync(sla_config, products, severities)
def async_update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], *args, severities: list[str] | None = None, **kwargs):
if method := get_custom_method("FINDING_SLA_EXPIRATION_CALCULATION_METHOD"):
method(sla_config, products, severities=severities)
else:
update_sla_expiration_dates_sla_config_sync(sla_config, products, severities=severities)


@dojo_async_task
@app.task
def update_sla_expiration_dates_product_async(product, sla_config, *args, **kwargs):
update_sla_expiration_dates_sla_config_sync(sla_config, [product])


def update_sla_expiration_dates_sla_config_sync(sla_config, products, severities=None):
def update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], severities: list[str] | None = None):
logger.info("Updating finding SLA expiration dates within the %s SLA configuration", sla_config)
# First check if SLA is enabled globally
system_settings = System_Settings.objects.get()
if not system_settings.enable_finding_sla:
return
# update each finding that is within the SLA configuration that was saved
findings = Finding.objects.filter(test__engagement__product__sla_configuration_id=sla_config.id)
if products:
findings = findings.filter(test__engagement__product__in=products)
if severities:
findings = findings.filter(severity__in=severities)

findings = findings.prefetch_related(
findings = (
findings.prefetch_related(
"test",
"test__engagement",
"test__engagement__product",
"test__engagement__product__sla_configuration",
)
.order_by("id")
.only("id", "sla_start_date", "date", "severity", "test")
)

findings = findings.order_by("id").only("id", "sla_start_date", "date", "severity", "test")

# Call the internal method so that we are not checking system settings for each finding
mass_model_updater(Finding, findings, lambda f: f.set_sla_expiration_date(), fields=["sla_expiration_date"])

# reset the async updating flag to false for all products using this sla config
for product in products:
product.async_updating = False
super(Product, product).save()
calculate_grade(product)
# use update as we don't want save() and signals to be triggered
products.update(async_updating=False)

# reset the async updating flag to false for this sla config
sla_config.async_updating = False
Expand Down