Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions netbox_config_backup/jobs/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def fail_job(cls, job: BackupJob, status: str, error: str = ''):
job.refresh_from_db()

@classmethod
def clean_stale_jobs(cls):
def clean_stale_jobs(cls, backup=None):
logger.info('Starting stale job cleanup')
results = {'stale': 0, 'scheduled': 0}

Expand All @@ -54,6 +54,8 @@ def clean_stale_jobs(cls):
)
.prefetch_related('backup', 'backup__device')
)
if backup:
jobs = jobs.filter(backup=backup)

stale = jobs.filter(scheduled__lt=timezone.now() - timedelta(minutes=30))
for job in stale:
Expand Down Expand Up @@ -83,20 +85,16 @@ def schedule_jobs(cls, runner, backup=None, device=None):
logging.debug('Scheduling all backups for')
backups = Backup.objects.filter(status=StatusChoices.STATUS_ACTIVE, device__isnull=False)

frequency = timedelta(seconds=job_frequency)

for backup in backups:
if can_backup(backup):
logger.debug(f'Checking jobs for backup for {backup.device}' f'+')
jobs = BackupJob.objects.filter(backup=backup)
if jobs.filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES).count() == 0:
jobs = backup.jobs.filter(status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES)
if jobs.filter(status=JobStatusChoices.STATUS_SCHEDULED, scheduled__gte=timezone.now()).exists():
continue
else:
logger.debug(f'Queuing device {backup.device} for backup')
if jobs.last() is not None and jobs.last().scheduled + frequency < timezone.now():
scheduled = timezone.now()
elif jobs.last() is not None:
scheduled = jobs.last().scheduled + frequency
else:
scheduled = timezone.now()
scheduled = timezone.now()
logger.info(f'Scheduling {backup} for {scheduled} (Now: {timezone.now()}')
job = BackupJob(
runner=None,
backup=backup,
Expand All @@ -115,17 +113,25 @@ def schedule_jobs(cls, runner, backup=None, device=None):

return scheduled_status


def get_scheduled_jobs(self):
return BackupJob.objects.filter(
runner=None,
status=JobStatusChoices.STATUS_SCHEDULED,
scheduled__lte=timezone.now(),
)

def run_processes(self):
def run_processes(self, backup=None):
logger.info('Starting processes')
if not self.running:
logger.info('Not running')
self.handle_main_exit(signal.SIGTERM, None)
jobs = self.get_scheduled_jobs()

if backup:
jobs = jobs.filter(backup=backup)
logger.info(f'Backup Job Count: {jobs.count()}')

for job in jobs:
job.runner = self.job
job.status = JobStatusChoices.STATUS_PENDING
Expand All @@ -140,9 +146,11 @@ def run_processes(self):

for job in jobs:
try:
logger.info(f'Forking {job} ({job.backup.name})')
process = self.fork_process(job)
process.join(1)
except Exception as e:
logger.warning(f'Exception forking: {e}')
try:
import sentry_sdk

Expand All @@ -165,6 +173,7 @@ def run_backup(self, job_id):

def fork_process(self, job):
if not self.running:
logger.info('Not running')
return
close_db()
process = self.ctx.Process(
Expand Down Expand Up @@ -278,15 +287,14 @@ def run(self, backup=None, device=None, *args, **kwargs):
self.job.save()

try:
status = self.clean_stale_jobs()
status = self.clean_stale_jobs(backup=backup)
self.job.data.update({'status': status})

status = self.schedule_jobs(runner=self.job, backup=backup, device=device)
self.job.data.update({'status': {'scheduled': status}})

self.job.save()

self.run_processes()
self.run_processes(backup=backup)

self.handle_processes()
self.handle_stuck_jobs()
Expand Down