55from datetime import timedelta
66from multiprocessing import Process
77
8- from django .db .models import Q
98from django .utils import timezone
10- from rq .job import JobStatus
119
12- from core .choices import JobStatusChoices
13- from netbox .jobs import JobRunner
10+ from core .choices import JobStatusChoices , JobIntervalChoices
11+ from netbox import settings
12+ from netbox .jobs import JobRunner , system_job
1413from netbox_config_backup .backup .processing import run_backup
1514from netbox_config_backup .choices import StatusChoices
1615from netbox_config_backup .models import Backup , BackupJob
2019logger = logging .getLogger (f"netbox_config_backup" )
2120
2221
23- class SchedulerRunner (JobRunner ):
24- class Meta :
25- name = "The scheduler"
26-
27-
28-
29-
22+ @system_job (interval = JobIntervalChoices .INTERVAL_MINUTELY )
3023class BackupRunner (JobRunner ):
3124 processes = {}
3225
3326 class Meta :
3427 name = 'The Backup Job Runner'
3528
36- def clean_stale_jobs (self ):
29+ @classmethod
30+ def fail_job (cls , job : BackupJob , status : str , error : str = '' ):
31+ job .status = status
32+ if not job .data :
33+ job .data = {}
34+ job .data .update ({'error' : 'Process terminated' })
35+ job .save ()
36+ job .refresh_from_db ()
37+
38+
39+ @classmethod
40+ def clean_stale_jobs (cls ):
41+ results = {
42+ 'stale' : 0 ,
43+ 'scheduled' : 0
44+ }
45+
3746 jobs = BackupJob .objects .order_by ('created' ).filter (
38- status = JobStatusChoices .ENQUEUED_STATE_CHOICES ,
39- ).prefetch_related ('device' )
40- scheduled = jobs .filter (status = JobStatusChoices .STATUS_SCHEDULED )
41- stale = jobs .filter (scheduled__lt = timezone .now () - timedelta (minutes = 30 ))
47+ status__in = JobStatusChoices .ENQUEUED_STATE_CHOICES ,
48+ ).prefetch_related ('backup' , 'backup__device' )
4249
50+ stale = jobs .filter (scheduled__lt = timezone .now () - timedelta (minutes = 30 ))
4351 for job in stale :
44- if job .pid :
45- pass
46- job .status = JobStatusChoices .STATUS_ERRORED
47- if not job .data :
48- job .data = {}
49- job .data .update ({'error' : 'Job hung' })
50- job .save ()
51- job .refresh_from_db ()
52+ results ['stale' ] += 1
53+ cls .fail_job (job , JobStatusChoices .STATUS_FAILED , 'Job hung' )
5254 logger .warning (f'Job { job .backup } appears stuck, deleting' )
5355
56+ scheduled = jobs .filter (status = JobStatusChoices .STATUS_SCHEDULED )
5457 for job in scheduled :
5558 if job != scheduled .filter (backup = job .backup ).last ():
56- job .status = JobStatusChoices .STATUS_FAILED
57- if not job .data :
58- job .data = {}
59- job .data .update ({'error' : 'Process terminated' })
60- job .save ()
59+ results ['scheduled' ] += 1
60+ cls .fail_job (job , JobStatusChoices .STATUS_ERRORED , 'Job missed' )
61+ logger .warning (f'Job { job .backup } appears to have been missed, deleting' )
6162
62- def schedule_jobs (self , backup = None , device = None ):
63+ return results
64+
65+ @classmethod
66+ def schedule_jobs (cls , runner , backup = None , device = None ):
67+ scheduled_status = 0
6368 if backup :
6469 logging .debug (f'Scheduling backup for backup: { backup } ' )
6570 backups = Backup .objects .filter (pk = backup .pk , status = StatusChoices .STATUS_ACTIVE , device__isnull = False )
@@ -70,38 +75,47 @@ def schedule_jobs(self, backup=None, device=None):
7075 logging .debug (f'Scheduling all backups' )
7176 backups = Backup .objects .filter (status = StatusChoices .STATUS_ACTIVE , device__isnull = False )
7277
78+ frequency = timedelta (seconds = settings .PLUGINS_CONFIG .get ('netbox_config_backup' , {}).get ('frequency' , 3600 ))
79+
7380 for backup in backups :
7481 if can_backup (backup ):
7582 logger .debug (f'Queuing device { backup .device } for backup' )
76- jobs = BackupJob .objects .filter (backup = backup , status__in = JobStatusChoices .ENQUEUED_STATE_CHOICES )
77- job = jobs .last ()
78- if job is not None :
79- job .runner = self .job
80- job .status = JobStatusChoices .STATUS_SCHEDULED
81- job .scheduled = timezone .now ()
82- job .save ()
83- else :
83+ jobs = BackupJob .objects .filter (backup = backup )
84+ if jobs .filter (status__in = JobStatusChoices .ENQUEUED_STATE_CHOICES ).count () == 0 :
85+ scheduled = timezone .now ()
8486 job = BackupJob (
85- runner = self . job ,
87+ runner = None ,
8688 backup = backup ,
8789 status = JobStatusChoices .STATUS_SCHEDULED ,
88- scheduled = timezone . now () ,
90+ scheduled = scheduled ,
8991 job_id = uuid .uuid4 (),
9092 data = {},
9193 )
92- job .full_clean ()
93- job .save ()
94+ job .full_clean ()
95+ job .save ()
96+ scheduled_status += 1
9497 else :
9598 jobs = BackupJob .objects .filter (backup = backup , status__in = JobStatusChoices .ENQUEUED_STATE_CHOICES )
9699 for job in jobs :
97- job .status = JobStatusChoices .STATUS_FAILED
98- if not job .data :
99- job .data = {}
100- job .data .update ({'error' : f'Cannot queue job' })
101- job .save ()
100+ cls .fail_job (job , JobStatusChoices .STATUS_FAILED , f'Cannot queue job' )
101+
102+ return scheduled_status
102103
103104 def run_processes (self ):
104- for job in BackupJob .objects .filter (runner = self .job , status = JobStatusChoices .STATUS_SCHEDULED ):
105+ jobs = BackupJob .objects .filter (
106+ runner = None ,
107+ status = JobStatusChoices .STATUS_SCHEDULED ,
108+ scheduled__lte = timezone .now ()
109+ )
110+ for job in jobs :
111+ job .runner = self .job
112+ job .status = JobStatusChoices .STATUS_PENDING
113+ job .save ()
114+
115+ self .job .data .update ({'status' : {'pending' : jobs .count ()}})
116+ self .job .save ()
117+
118+ for job in jobs :
105119 try :
106120 process = self .fork_process (job )
107121 process .join (1 )
@@ -128,6 +142,9 @@ def fork_process(self, job):
128142 def handle_processes (self ):
129143 close_db ()
130144 for pk in list (self .processes .keys ()):
145+ terminated = self .job .data .get ('status' , {}).get ('terminated' , 0 )
146+ completed = self .job .data .get ('status' , {}).get ('completed' , 0 )
147+
131148 process = self .processes .get (pk , {}).get ('process' )
132149 job_pk = self .processes .get (pk , {}).get ('job' )
133150 backup = self .processes .get (pk , {}).get ('backup' )
@@ -137,16 +154,32 @@ def handle_processes(self):
137154 del self .processes [pk ]
138155 job = BackupJob .objects .filter (pk = job_pk ).first ()
139156 if job and job .status != JobStatusChoices .STATUS_COMPLETED :
157+ self .job .data .update ({'status' : {'terminated' : terminated }})
140158 job .status = JobStatusChoices .STATUS_ERRORED
141159 if not job .data :
142160 job .data = {}
143161 job .data .update ({'error' : 'Process terminated' })
144162 job .save ()
163+ else :
164+ self .job .data .update ({'status' : {'completed' : completed }})
165+ self .job .save ()
166+ self .job .refresh_from_db ()
145167
146168 def run (self , backup = None , device = None , * args , ** kwargs ):
169+
170+ if not self .job .data :
171+ self .job .data = {}
172+ self .job .save ()
173+
147174 try :
148- self .clean_stale_jobs ()
149- self .schedule_jobs (backup = backup , device = device )
175+ status = self .clean_stale_jobs ()
176+ self .job .data .update ({'status' : status })
177+
178+ status = self .schedule_jobs (runner = self .job , backup = backup , device = device )
179+ self .job .data .update ({'status' : {'scheduled' : status }})
180+
181+ self .job .save ()
182+
150183 self .run_processes ()
151184 while (True ):
152185 self .handle_processes ()
@@ -156,3 +189,4 @@ def run(self, backup=None, device=None, *args, **kwargs):
156189 except Exception as e :
157190 logger .warning (f'{ traceback .format_exc ()} ' )
158191 logger .error (f'{ e } ' )
192+ raise e
0 commit comments