diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index b8f7b5bf1ce..7e77e0d55ea 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -7,6 +7,7 @@ :caption: TransformationCleaningAgent options """ + # # imports import ast import errno @@ -144,10 +145,13 @@ def initialize(self): return result self.taskQueueDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index de342224c05..346c4e5458e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -31,15 +31,14 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.RequestManagementSystem.Client.File import File from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters -from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE @@ -55,7 +54,10 @@ def __init__(self, *args, **kwargs): # clients self.jobDB = None + self.taskQueueDB = None + self.pilotAgentsDB = None self.sandboxDB = None + self.storageManagementDB = None self.maxJobsAtOnce = 500 self.prodTypes = [] @@ -67,8 +69,33 @@ def __init__(self, *args, **kwargs): def initialize(self): """Sets defaults""" - self.jobDB = JobDB() - self.sandboxDB = SandboxMetadataDB() + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + self.jobDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") + if not result["OK"]: + return result + self.taskQueueDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + self.pilotAgentsDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") + if not result["OK"]: + return result + self.sandboxDB = result["Value"]() + + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass agentTSTypes = self.am_getOption("ProductionTypes", []) if agentTSTypes: @@ -239,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False): wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup) result = wmsClient.removeJob(jobsList) else: - result = kill_delete_jobs(RIGHT_DELETE, jobsList) + result = kill_delete_jobs( + RIGHT_DELETE, + jobsList, + jobdb=self.jobDB, + taskqueuedb=self.taskQueueDB, + pilotagentsdb=self.pilotAgentsDB, + storagemanagementdb=self.storageManagementDB, + ) if not result["OK"]: self.log.error( f"Could not {'remove' if remove else 'delete'} jobs", diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 23092ef200c..ff04dd72806 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -73,10 +73,13 @@ def initialize(self): return result self.pilotAgentsDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass # getting parameters diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py index ae098c9f756..5632d66692e 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py @@ -1,5 +1,5 @@ -""" Test class for Job Cleaning Agent -""" +"""Test class for Job Cleaning Agent""" + from unittest.mock import MagicMock import pytest @@ -16,6 +16,9 @@ mockNone = MagicMock() mockNone.return_value = None mockJMC = MagicMock() +mockJobDB = MagicMock() +mockJobDB.getDistinctJobAttributes = mockReply +mockJobDB.selectJobs = mockReply @pytest.fixture @@ -27,16 +30,21 @@ def jca(mocker): create=True, ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) + + def mock_load_object(module_path, class_name): + mocks = { + "JobDB": mockJobDB, + "TaskQueueDB": MagicMock(), + "PilotAgentsDB": MagicMock(), + "SandboxMetadataDB": MagicMock(), + "StorageManagementDB": MagicMock(), + } + return {"OK": True, "Value": lambda: mocks[class_name]} + mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply - ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) - mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone + "DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject", + side_effect=mock_load_object, ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) - jca = JobCleaningAgent() jca.log = gLogger jca.log.setLevel("DEBUG") @@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"]) ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params) + def mock_load_object(module_path, class_name): + mocks = { + "JobDB": MagicMock(), + "TaskQueueDB": MagicMock(), + "PilotAgentsDB": MagicMock(), + "SandboxMetadataDB": MagicMock(), + "StorageManagementDB": MagicMock(), + } + return {"OK": True, "Value": lambda: mocks[class_name]} + + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject", + side_effect=mock_load_object, + ) jobCleaningAgent = JobCleaningAgent() + jobCleaningAgent.log = gLogger jobCleaningAgent.log.setLevel("DEBUG") jobCleaningAgent._AgentModule__configDefaults = mockAM diff --git a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py index 8af19416d1f..30e695e5334 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py @@ -1,6 +1,6 @@ from DIRAC import S_ERROR, S_OK, gLogger -from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition @@ -96,11 +96,6 @@ def kill_delete_jobs( if not result["OK"]: return result pilotagentsdb = result["Value"]() - if storagemanagementdb is None: - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - storagemanagementdb = result["Value"]() badIDs = [] @@ -133,6 +128,14 @@ def kill_delete_jobs( stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING] if stagingJobList: + if storagemanagementdb is None: + result = ObjectLoader().loadObject( + "StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB" + ) + if not result["OK"]: + return result + storagemanagementdb = result["Value"]() + gLogger.info("Going to send killing signal to stager as well!") result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 0f1d82d4ee1..dab370c4eab 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -1,14 +1,15 @@ -""" JobManagerHandler is the implementation of the JobManager service - in the DISET framework +"""JobManagerHandler is the implementation of the JobManager service +in the DISET framework - The following methods are available in the Service interface +The following methods are available in the Service interface - submitJob() - rescheduleJob() - deleteJob() - killJob() +submitJob() +rescheduleJob() +deleteJob() +killJob() """ + from pydantic import ValidationError from DIRAC import S_ERROR, S_OK @@ -22,6 +23,7 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_DELETE, RIGHT_KILL, @@ -30,7 +32,6 @@ RIGHT_SUBMIT, JobPolicy, ) -from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs @@ -44,34 +45,33 @@ class JobManagerHandlerMixin: @classmethod def initializeHandler(cls, serviceInfoDict): """Initialization of DB objects and OptimizationMind""" - try: - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") - if not result["OK"]: - return result - cls.jobDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + cls.jobDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") - if not result["OK"]: - return result - cls.jobLoggingDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") + if not result["OK"]: + return result + cls.jobLoggingDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") - if not result["OK"]: - return result - cls.taskQueueDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") + if not result["OK"]: + return result + cls.taskQueueDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") - if not result["OK"]: - return result - cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) + try: result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") if not result["OK"]: return result - cls.storageManagementDB = result["Value"](parentLogger=cls.log) - - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp!r}") + cls.storageManagementDB = result["Value"]() + except RuntimeError: + cls.storageManagementDB = None cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind") result = cls.msgClient.connect(JobManager=True)