2020from DIRAC .Core .Utilities .ClassAd .ClassAdLight import ClassAd
2121from DIRAC .Core .Utilities .TimeUtilities import fromString , second , toEpoch
2222from DIRAC .WorkloadManagementSystem .Client import JobMinorStatus , JobStatus
23- from DIRAC .WorkloadManagementSystem .Client .JobManagerClient import JobManagerClient
24- from DIRAC .WorkloadManagementSystem .Client .JobMonitoringClient import JobMonitoringClient
2523from DIRAC .WorkloadManagementSystem .Client .WMSClient import WMSClient
2624from DIRAC .WorkloadManagementSystem .DB .JobDB import JobDB
2725from DIRAC .WorkloadManagementSystem .DB .JobLoggingDB import JobLoggingDB
2826from DIRAC .WorkloadManagementSystem .DB .PilotAgentsDB import PilotAgentsDB
27+ from DIRAC .WorkloadManagementSystem .Utilities .JobParameters import getJobParameters
28+ from DIRAC .WorkloadManagementSystem .Utilities .JobStatusUtility import rescheduleJobs
2929
3030
3131class StalledJobAgent (AgentModule ):
@@ -254,11 +254,11 @@ def _failStalledJobs(self, jobID):
254254
255255 def _getJobPilotStatus (self , jobID ):
256256 """Get the job pilot status."""
257- result = JobMonitoringClient (). getJobParameter (jobID , "Pilot_Reference" )
257+ result = getJobParameters (jobID , "Pilot_Reference" )
258258 if not result ["OK" ]:
259259 return result
260- pilotReference = result ["Value" ].get ("Pilot_Reference" , "Unknown" )
261- if pilotReference == "Unknown" :
260+ pilotReference = result ["Value" ].get ("Pilot_Reference" )
261+ if not pilotReference :
262262 # There is no pilot reference, hence its status is unknown
263263 return S_OK ("NoPilot" )
264264
@@ -389,7 +389,7 @@ def _sendAccounting(self, jobID):
389389 if lastHeartBeatTime is not None and lastHeartBeatTime > endTime :
390390 endTime = lastHeartBeatTime
391391
392- result = JobMonitoringClient (). getJobParameter (jobID , "CPUNormalizationFactor" )
392+ result = getJobParameters (jobID , "CPUNormalizationFactor" )
393393 if not result ["OK" ] or not result ["Value" ]:
394394 self .log .error (
395395 "Error getting Job Parameter CPUNormalizationFactor, setting 0" ,
@@ -518,8 +518,7 @@ def _checkLoggingInfo(self, jobID, jobDict):
518518 return startTime , endTime
519519
520520 def _kickStuckJobs (self ):
521- """Reschedule jobs stuck in initialization status Rescheduled,
522- Matched."""
521+ """Reschedule jobs stuck in initialization status Rescheduled, Matched."""
523522
524523 message = ""
525524
@@ -530,17 +529,12 @@ def _kickStuckJobs(self):
530529 return result
531530
532531 jobIDs = result ["Value" ]
533- jobManagerClient = JobManagerClient ()
534532 if jobIDs :
535533 self .log .info (f"Rescheduling { len (jobIDs )} jobs stuck in { JobStatus .MATCHED } status" )
536- result = jobManagerClient . rescheduleJob (jobIDs )
534+ result = rescheduleJobs (jobIDs )
537535 if not result ["OK" ]:
538536 message = f"Failed to reschedule jobs stuck in { JobStatus .MATCHED } status"
539537 message += "\n " + result ["Message" ]
540- if "InvalidJobIDs" in result :
541- message += "\n " + "\t Invalid job IDs: " + str (result ["InvalidJobIDs" ])
542- if "NonauthorizedJobIDs" in result :
543- message += "\n " + "\t Non authorized job IDs: " + str (result ["NonauthorizedJobIDs" ])
544538
545539 checkTime = datetime .datetime .utcnow () - self .rescheduledTime * second
546540 result = self .jobDB .selectJobs ({"Status" : JobStatus .RESCHEDULED }, older = checkTime )
@@ -550,18 +544,14 @@ def _kickStuckJobs(self):
550544
551545 jobIDs = result ["Value" ]
552546 if jobIDs :
553- self .log .info (f"Rescheduling { len (jobIDs )} jobs stuck in Rescheduled status" )
554- result = jobManagerClient . rescheduleJob (jobIDs )
547+ self .log .info (f"Rescheduling { len (jobIDs )} jobs stuck in { JobStatus . RESCHEDULED } status" )
548+ result = rescheduleJobs (jobIDs )
555549 if not result ["OK" ]:
556550 message = f"Failed to reschedule jobs stuck in { JobStatus .RESCHEDULED } status"
557551 message += "\n " + result ["Message" ]
558- if "InvalidJobIDs" in result :
559- message += "\n " + "\t Invalid job IDs: " + str (result ["InvalidJobIDs" ])
560- if "NonauthorizedJobIDs" in result :
561- message += "\n " + "\t Non authorized job IDs: " + str (result ["NonauthorizedJobIDs" ])
562552
563553 if message :
564- return S_ERROR (message )
554+ self . log . error (message )
565555 return S_OK ()
566556
567557 def _failSubmittingJobs (self ):
0 commit comments