Skip to content

Commit c6316b5

Browse files
authored
Merge pull request #8515 from fstagni/fix_doNotNeedStorageManagementDB2
Fix: do not need storage management db
2 parents 5dab8ef + df38096 commit c6316b5

6 files changed

Lines changed: 124 additions & 59 deletions

File tree

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
:caption: TransformationCleaningAgent options
88
99
"""
10+
1011
# # imports
1112
import ast
1213
import errno
@@ -144,10 +145,13 @@ def initialize(self):
144145
return result
145146
self.taskQueueDB = result["Value"]()
146147

147-
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
148-
if not result["OK"]:
149-
return result
150-
self.storageManagementDB = result["Value"]()
148+
try:
149+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
150+
if not result["OK"]:
151+
return result
152+
self.storageManagementDB = result["Value"]()
153+
except RuntimeError:
154+
pass
151155

152156
return S_OK()
153157

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@
3131
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
3232
from DIRAC.Core.Base.AgentModule import AgentModule
3333
from DIRAC.Core.Utilities import TimeUtilities
34+
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
3435
from DIRAC.RequestManagementSystem.Client.File import File
3536
from DIRAC.RequestManagementSystem.Client.Operation import Operation
3637
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
3738
from DIRAC.RequestManagementSystem.Client.Request import Request
3839
from DIRAC.WorkloadManagementSystem.Client import JobStatus
3940
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
40-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
4141
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters
42-
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
4342
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
4443
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
4544

@@ -55,7 +54,10 @@ def __init__(self, *args, **kwargs):
5554

5655
# clients
5756
self.jobDB = None
57+
self.taskQueueDB = None
58+
self.pilotAgentsDB = None
5859
self.sandboxDB = None
60+
self.storageManagementDB = None
5961

6062
self.maxJobsAtOnce = 500
6163
self.prodTypes = []
@@ -67,8 +69,33 @@ def __init__(self, *args, **kwargs):
6769
def initialize(self):
6870
"""Sets defaults"""
6971

70-
self.jobDB = JobDB()
71-
self.sandboxDB = SandboxMetadataDB()
72+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
73+
if not result["OK"]:
74+
return result
75+
self.jobDB = result["Value"]()
76+
77+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
78+
if not result["OK"]:
79+
return result
80+
self.taskQueueDB = result["Value"]()
81+
82+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
83+
if not result["OK"]:
84+
return result
85+
self.pilotAgentsDB = result["Value"]()
86+
87+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
88+
if not result["OK"]:
89+
return result
90+
self.sandboxDB = result["Value"]()
91+
92+
try:
93+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
94+
if not result["OK"]:
95+
return result
96+
self.storageManagementDB = result["Value"]()
97+
except RuntimeError:
98+
pass
7299

73100
agentTSTypes = self.am_getOption("ProductionTypes", [])
74101
if agentTSTypes:
@@ -239,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False):
239266
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
240267
result = wmsClient.removeJob(jobsList)
241268
else:
242-
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
269+
result = kill_delete_jobs(
270+
RIGHT_DELETE,
271+
jobsList,
272+
jobdb=self.jobDB,
273+
taskqueuedb=self.taskQueueDB,
274+
pilotagentsdb=self.pilotAgentsDB,
275+
storagemanagementdb=self.storageManagementDB,
276+
)
243277
if not result["OK"]:
244278
self.log.error(
245279
f"Could not {'remove' if remove else 'delete'} jobs",

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,13 @@ def initialize(self):
7373
return result
7474
self.pilotAgentsDB = result["Value"]()
7575

76-
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
77-
if not result["OK"]:
78-
return result
79-
self.storageManagementDB = result["Value"]()
76+
try:
77+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
78+
if not result["OK"]:
79+
return result
80+
self.storageManagementDB = result["Value"]()
81+
except RuntimeError:
82+
pass
8083

8184
# getting parameters
8285

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
""" Test class for Job Cleaning Agent
2-
"""
1+
"""Test class for Job Cleaning Agent"""
2+
33
from unittest.mock import MagicMock
44

55
import pytest
@@ -16,6 +16,9 @@
1616
mockNone = MagicMock()
1717
mockNone.return_value = None
1818
mockJMC = MagicMock()
19+
mockJobDB = MagicMock()
20+
mockJobDB.getDistinctJobAttributes = mockReply
21+
mockJobDB.selectJobs = mockReply
1922

2023

2124
@pytest.fixture
@@ -27,16 +30,21 @@ def jca(mocker):
2730
create=True,
2831
)
2932
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
33+
34+
def mock_load_object(module_path, class_name):
35+
mocks = {
36+
"JobDB": mockJobDB,
37+
"TaskQueueDB": MagicMock(),
38+
"PilotAgentsDB": MagicMock(),
39+
"SandboxMetadataDB": MagicMock(),
40+
"StorageManagementDB": MagicMock(),
41+
}
42+
return {"OK": True, "Value": lambda: mocks[class_name]}
43+
3044
mocker.patch(
31-
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply
32-
)
33-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
34-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
35-
mocker.patch(
36-
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone
45+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
46+
side_effect=mock_load_object,
3747
)
38-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
39-
4048
jca = JobCleaningAgent()
4149
jca.log = gLogger
4250
jca.log.setLevel("DEBUG")
@@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
128136

129137
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__")
130138
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
131-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
132-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone)
133139
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
134140
mocker.patch(
135141
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
136142
)
137143
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)
138144

145+
def mock_load_object(module_path, class_name):
146+
mocks = {
147+
"JobDB": MagicMock(),
148+
"TaskQueueDB": MagicMock(),
149+
"PilotAgentsDB": MagicMock(),
150+
"SandboxMetadataDB": MagicMock(),
151+
"StorageManagementDB": MagicMock(),
152+
}
153+
return {"OK": True, "Value": lambda: mocks[class_name]}
154+
155+
mocker.patch(
156+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
157+
side_effect=mock_load_object,
158+
)
139159
jobCleaningAgent = JobCleaningAgent()
160+
140161
jobCleaningAgent.log = gLogger
141162
jobCleaningAgent.log.setLevel("DEBUG")
142163
jobCleaningAgent._AgentModule__configDefaults = mockAM

src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from DIRAC import S_ERROR, S_OK, gLogger
2-
from DIRAC.WorkloadManagementSystem.Client import JobStatus
32
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
3+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
44
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
55
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition
66

@@ -96,11 +96,6 @@ def kill_delete_jobs(
9696
if not result["OK"]:
9797
return result
9898
pilotagentsdb = result["Value"]()
99-
if storagemanagementdb is None:
100-
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
101-
if not result["OK"]:
102-
return result
103-
storagemanagementdb = result["Value"]()
10499

105100
badIDs = []
106101

@@ -133,6 +128,14 @@ def kill_delete_jobs(
133128
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]
134129

135130
if stagingJobList:
131+
if storagemanagementdb is None:
132+
result = ObjectLoader().loadObject(
133+
"StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB"
134+
)
135+
if not result["OK"]:
136+
return result
137+
storagemanagementdb = result["Value"]()
138+
136139
gLogger.info("Going to send killing signal to stager as well!")
137140
result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList)
138141
if not result["OK"]:

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
""" JobManagerHandler is the implementation of the JobManager service
2-
in the DISET framework
1+
"""JobManagerHandler is the implementation of the JobManager service
2+
in the DISET framework
33
4-
The following methods are available in the Service interface
4+
The following methods are available in the Service interface
55
6-
submitJob()
7-
rescheduleJob()
8-
deleteJob()
9-
killJob()
6+
submitJob()
7+
rescheduleJob()
8+
deleteJob()
9+
killJob()
1010
1111
"""
12+
1213
from pydantic import ValidationError
1314

1415
from DIRAC import S_ERROR, S_OK
@@ -22,6 +23,7 @@
2223
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2324
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2425
from DIRAC.WorkloadManagementSystem.Client import JobStatus
26+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
2527
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
2628
RIGHT_DELETE,
2729
RIGHT_KILL,
@@ -30,7 +32,6 @@
3032
RIGHT_SUBMIT,
3133
JobPolicy,
3234
)
33-
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
3435
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3536
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
3637
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
@@ -44,34 +45,33 @@ class JobManagerHandlerMixin:
4445
@classmethod
4546
def initializeHandler(cls, serviceInfoDict):
4647
"""Initialization of DB objects and OptimizationMind"""
47-
try:
48-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
49-
if not result["OK"]:
50-
return result
51-
cls.jobDB = result["Value"](parentLogger=cls.log)
48+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
49+
if not result["OK"]:
50+
return result
51+
cls.jobDB = result["Value"](parentLogger=cls.log)
5252

53-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
54-
if not result["OK"]:
55-
return result
56-
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
53+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
54+
if not result["OK"]:
55+
return result
56+
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
5757

58-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
59-
if not result["OK"]:
60-
return result
61-
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
58+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
59+
if not result["OK"]:
60+
return result
61+
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
6262

63-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
64-
if not result["OK"]:
65-
return result
66-
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
63+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
64+
if not result["OK"]:
65+
return result
66+
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
6767

68+
try:
6869
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
6970
if not result["OK"]:
7071
return result
7172
cls.storageManagementDB = result["Value"](parentLogger=cls.log)
72-
73-
except RuntimeError as excp:
74-
return S_ERROR(f"Can't connect to DB: {excp!r}")
73+
except RuntimeError:
74+
cls.storageManagementDB = None
7575

7676
cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
7777
result = cls.msgClient.connect(JobManager=True)

0 commit comments

Comments
 (0)