-
Notifications
You must be signed in to change notification settings - Fork 184
Fix: do not need storage management db #8515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: integration
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,15 +31,14 @@ | |
| from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername | ||
| from DIRAC.Core.Base.AgentModule import AgentModule | ||
| from DIRAC.Core.Utilities import TimeUtilities | ||
| from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader | ||
| from DIRAC.RequestManagementSystem.Client.File import File | ||
| from DIRAC.RequestManagementSystem.Client.Operation import Operation | ||
| from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient | ||
| from DIRAC.RequestManagementSystem.Client.Request import Request | ||
| from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||
| from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient | ||
| from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB | ||
| from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters | ||
| from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB | ||
| from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs | ||
| from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE | ||
|
|
||
|
|
@@ -55,7 +54,10 @@ def __init__(self, *args, **kwargs): | |
|
|
||
| # clients | ||
| self.jobDB = None | ||
| self.taskQueueDB = None | ||
| self.pilotAgentsDB = None | ||
| self.sandboxDB = None | ||
| self.storageManagementDB = None | ||
|
|
||
| self.maxJobsAtOnce = 500 | ||
| self.prodTypes = [] | ||
|
|
@@ -67,8 +69,33 @@ def __init__(self, *args, **kwargs): | |
| def initialize(self): | ||
| """Sets defaults""" | ||
|
|
||
| self.jobDB = JobDB() | ||
| self.sandboxDB = SandboxMetadataDB() | ||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.jobDB = result["Value"]() | ||
|
|
||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.taskQueueDB = result["Value"]() | ||
|
|
||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.pilotAgentsDB = result["Value"]() | ||
|
|
||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.sandboxDB = result["Value"]() | ||
|
|
||
| try: | ||
| result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.storageManagementDB = result["Value"]() | ||
| except RuntimeError: | ||
| pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here |
||
|
|
||
| agentTSTypes = self.am_getOption("ProductionTypes", []) | ||
| if agentTSTypes: | ||
|
|
@@ -239,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False): | |
| wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup) | ||
| result = wmsClient.removeJob(jobsList) | ||
| else: | ||
| result = kill_delete_jobs(RIGHT_DELETE, jobsList) | ||
| result = kill_delete_jobs( | ||
| RIGHT_DELETE, | ||
| jobsList, | ||
| jobdb=self.jobDB, | ||
| taskqueuedb=self.taskQueueDB, | ||
| pilotagentsdb=self.pilotAgentsDB, | ||
| storagemanagementdb=self.storageManagementDB, | ||
| ) | ||
| if not result["OK"]: | ||
| self.log.error( | ||
| f"Could not {'remove' if remove else 'delete'} jobs", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,10 +73,13 @@ def initialize(self): | |
| return result | ||
| self.pilotAgentsDB = result["Value"]() | ||
|
|
||
| result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.storageManagementDB = result["Value"]() | ||
| try: | ||
| result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") | ||
| if not result["OK"]: | ||
| return result | ||
| self.storageManagementDB = result["Value"]() | ||
| except RuntimeError: | ||
| pass | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment here |
||
|
|
||
| # getting parameters | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| """ Test class for Job Cleaning Agent | ||
| """ | ||
| """Test class for Job Cleaning Agent""" | ||
|
|
||
| from unittest.mock import MagicMock | ||
|
|
||
| import pytest | ||
|
|
@@ -16,6 +16,9 @@ | |
| mockNone = MagicMock() | ||
| mockNone.return_value = None | ||
| mockJMC = MagicMock() | ||
| mockJobDB = MagicMock() | ||
| mockJobDB.getDistinctJobAttributes = mockReply | ||
| mockJobDB.selectJobs = mockReply | ||
|
|
||
|
|
||
| @pytest.fixture | ||
|
|
@@ -27,16 +30,21 @@ def jca(mocker): | |
| create=True, | ||
| ) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) | ||
|
|
||
| def mock_load_object(module_path, class_name): | ||
| mocks = { | ||
| "JobDB": mockJobDB, | ||
| "TaskQueueDB": MagicMock(), | ||
| "PilotAgentsDB": MagicMock(), | ||
| "SandboxMetadataDB": MagicMock(), | ||
| "StorageManagementDB": MagicMock(), | ||
| } | ||
| return {"OK": True, "Value": lambda: mocks[class_name]} | ||
|
|
||
| mocker.patch( | ||
| "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply | ||
| ) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) | ||
| mocker.patch( | ||
| "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone | ||
| "DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you patching the right module here? Shouldn't it be |
||
| side_effect=mock_load_object, | ||
| ) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) | ||
|
|
||
| jca = JobCleaningAgent() | ||
| jca.log = gLogger | ||
| jca.log.setLevel("DEBUG") | ||
|
|
@@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected): | |
|
|
||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) | ||
| mocker.patch( | ||
| "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"]) | ||
| ) | ||
| mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params) | ||
|
|
||
| def mock_load_object(module_path, class_name): | ||
| mocks = { | ||
| "JobDB": MagicMock(), | ||
| "TaskQueueDB": MagicMock(), | ||
| "PilotAgentsDB": MagicMock(), | ||
| "SandboxMetadataDB": MagicMock(), | ||
| "StorageManagementDB": MagicMock(), | ||
| } | ||
| return {"OK": True, "Value": lambda: mocks[class_name]} | ||
|
|
||
| mocker.patch( | ||
| "DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here? |
||
| side_effect=mock_load_object, | ||
| ) | ||
| jobCleaningAgent = JobCleaningAgent() | ||
|
|
||
| jobCleaningAgent.log = gLogger | ||
| jobCleaningAgent.log.setLevel("DEBUG") | ||
| jobCleaningAgent._AgentModule__configDefaults = mockAM | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,6 @@ | ||
| from DIRAC import S_ERROR, S_OK, gLogger | ||
| from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||
| from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader | ||
| from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||
| from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL | ||
| from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition | ||
|
|
||
|
|
@@ -96,11 +96,6 @@ def kill_delete_jobs( | |
| if not result["OK"]: | ||
| return result | ||
| pilotagentsdb = result["Value"]() | ||
| if storagemanagementdb is None: | ||
| result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") | ||
| if not result["OK"]: | ||
| return result | ||
| storagemanagementdb = result["Value"]() | ||
|
|
||
| badIDs = [] | ||
|
|
||
|
|
@@ -133,6 +128,14 @@ def kill_delete_jobs( | |
| stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING] | ||
|
|
||
| if stagingJobList: | ||
| if storagemanagementdb is None: | ||
| result = ObjectLoader().loadObject( | ||
| "StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB" | ||
| ) | ||
| if not result["OK"]: | ||
| return result | ||
| storagemanagementdb = result["Value"]() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does happen here if an exception is raised? |
||
|
|
||
| gLogger.info("Going to send killing signal to stager as well!") | ||
| result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList) | ||
| if not result["OK"]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,14 +1,15 @@ | ||||||
| """ JobManagerHandler is the implementation of the JobManager service | ||||||
| in the DISET framework | ||||||
| """JobManagerHandler is the implementation of the JobManager service | ||||||
| in the DISET framework | ||||||
|
|
||||||
| The following methods are available in the Service interface | ||||||
| The following methods are available in the Service interface | ||||||
|
|
||||||
| submitJob() | ||||||
| rescheduleJob() | ||||||
| deleteJob() | ||||||
| killJob() | ||||||
| submitJob() | ||||||
| rescheduleJob() | ||||||
| deleteJob() | ||||||
| killJob() | ||||||
|
|
||||||
| """ | ||||||
|
|
||||||
| from pydantic import ValidationError | ||||||
|
|
||||||
| from DIRAC import S_ERROR, S_OK | ||||||
|
|
@@ -22,6 +23,7 @@ | |||||
| from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader | ||||||
| from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager | ||||||
| from DIRAC.WorkloadManagementSystem.Client import JobStatus | ||||||
| from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs | ||||||
| from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( | ||||||
| RIGHT_DELETE, | ||||||
| RIGHT_KILL, | ||||||
|
|
@@ -30,7 +32,6 @@ | |||||
| RIGHT_SUBMIT, | ||||||
| JobPolicy, | ||||||
| ) | ||||||
| from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs | ||||||
| from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel | ||||||
| from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength | ||||||
| from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs | ||||||
|
|
@@ -44,34 +45,33 @@ class JobManagerHandlerMixin: | |||||
| @classmethod | ||||||
| def initializeHandler(cls, serviceInfoDict): | ||||||
| """Initialization of DB objects and OptimizationMind""" | ||||||
| try: | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you remove the |
||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.jobDB = result["Value"](parentLogger=cls.log) | ||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.jobDB = result["Value"](parentLogger=cls.log) | ||||||
|
|
||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.jobLoggingDB = result["Value"](parentLogger=cls.log) | ||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.jobLoggingDB = result["Value"](parentLogger=cls.log) | ||||||
|
|
||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.taskQueueDB = result["Value"](parentLogger=cls.log) | ||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.taskQueueDB = result["Value"](parentLogger=cls.log) | ||||||
|
|
||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) | ||||||
| result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) | ||||||
|
|
||||||
| try: | ||||||
| result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") | ||||||
| if not result["OK"]: | ||||||
| return result | ||||||
| cls.storageManagementDB = result["Value"](parentLogger=cls.log) | ||||||
|
|
||||||
| except RuntimeError as excp: | ||||||
| return S_ERROR(f"Can't connect to DB: {excp!r}") | ||||||
| cls.storageManagementDB = result["Value"]() | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No more
Suggested change
|
||||||
| except RuntimeError: | ||||||
| cls.storageManagementDB = None | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here too |
||||||
|
|
||||||
| cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind") | ||||||
| result = cls.msgClient.connect(JobManager=True) | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why
passand not awarninglog?Based on #8511 (comment)