Skip to content

Commit df38096

Browse files
committed
refactor: load StorageManagementDB if present
1 parent ea34091 commit df38096

5 files changed

Lines changed: 59 additions & 21 deletions

File tree

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def __init__(self, *args, **kwargs):
7070
self.jobDB = None
7171
self.pilotAgentsDB = None
7272
self.taskQueueDB = None
73+
self.storageManagementDB = None
7374

7475
# # transformations types
7576
self.transformationTypes = None
@@ -144,6 +145,14 @@ def initialize(self):
144145
return result
145146
self.taskQueueDB = result["Value"]()
146147

148+
try:
149+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
150+
if not result["OK"]:
151+
return result
152+
self.storageManagementDB = result["Value"]()
153+
except RuntimeError:
154+
pass
155+
147156
return S_OK()
148157

149158
#############################################################################
@@ -635,6 +644,7 @@ def __removeWMSTasks(self, transJobIDs):
635644
jobdb=self.jobDB,
636645
taskqueuedb=self.taskQueueDB,
637646
pilotagentsdb=self.pilotAgentsDB,
647+
storagemanagementdb=self.storageManagementDB,
638648
)
639649
# Prevent 0 job IDs
640650
jobIDs = [int(j) for j in transJobIDs if int(j)]

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(self, *args, **kwargs):
5757
self.taskQueueDB = None
5858
self.pilotAgentsDB = None
5959
self.sandboxDB = None
60+
self.storageManagementDB = None
6061

6162
self.maxJobsAtOnce = 500
6263
self.prodTypes = []
@@ -88,6 +89,14 @@ def initialize(self):
8889
return result
8990
self.sandboxDB = result["Value"]()
9091

92+
try:
93+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
94+
if not result["OK"]:
95+
return result
96+
self.storageManagementDB = result["Value"]()
97+
except RuntimeError:
98+
pass
99+
91100
agentTSTypes = self.am_getOption("ProductionTypes", [])
92101
if agentTSTypes:
93102
self.prodTypes = agentTSTypes
@@ -263,6 +272,7 @@ def _deleteRemoveJobs(self, jobList, remove=False):
263272
jobdb=self.jobDB,
264273
taskqueuedb=self.taskQueueDB,
265274
pilotagentsdb=self.pilotAgentsDB,
275+
storagemanagementdb=self.storageManagementDB,
266276
)
267277
if not result["OK"]:
268278
self.log.error(

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self, *args, **kwargs):
4141
self.logDB = None
4242
self.taskQueueDB = None
4343
self.pilotAgentsDB = None
44+
self.storageManagementDB = None
4445
self.matchedTime = 7200
4546
self.rescheduledTime = 600
4647
self.submittingTime = 300
@@ -72,6 +73,14 @@ def initialize(self):
7273
return result
7374
self.pilotAgentsDB = result["Value"]()
7475

76+
try:
77+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
78+
if not result["OK"]:
79+
return result
80+
self.storageManagementDB = result["Value"]()
81+
except RuntimeError:
82+
pass
83+
7584
# getting parameters
7685

7786
if not self.am_getOption("Enable", True):
@@ -261,6 +270,7 @@ def _failStalledJobs(self, jobID):
261270
jobdb=self.jobDB,
262271
taskqueuedb=self.taskQueueDB,
263272
pilotagentsdb=self.pilotAgentsDB,
273+
storagemanagementdb=self.storageManagementDB,
264274
)
265275
if not res["OK"]:
266276
self.log.error("Failed to kill job", jobID)

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,12 @@ def mock_load_object(module_path, class_name):
3737
"TaskQueueDB": MagicMock(),
3838
"PilotAgentsDB": MagicMock(),
3939
"SandboxMetadataDB": MagicMock(),
40+
"StorageManagementDB": MagicMock(),
4041
}
4142
return {"OK": True, "Value": lambda: mocks[class_name]}
4243

4344
mocker.patch(
44-
"DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject",
45+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
4546
side_effect=mock_load_object,
4647
)
4748
jca = JobCleaningAgent()
@@ -147,11 +148,12 @@ def mock_load_object(module_path, class_name):
147148
"TaskQueueDB": MagicMock(),
148149
"PilotAgentsDB": MagicMock(),
149150
"SandboxMetadataDB": MagicMock(),
151+
"StorageManagementDB": MagicMock(),
150152
}
151153
return {"OK": True, "Value": lambda: mocks[class_name]}
152154

153155
mocker.patch(
154-
"DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject",
156+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
155157
side_effect=mock_load_object,
156158
)
157159
jobCleaningAgent = JobCleaningAgent()

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2424
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2525
from DIRAC.WorkloadManagementSystem.Client import JobStatus
26+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
2627
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
2728
RIGHT_DELETE,
2829
RIGHT_KILL,
@@ -31,7 +32,6 @@
3132
RIGHT_SUBMIT,
3233
JobPolicy,
3334
)
34-
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
3535
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3636
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
3737
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
@@ -45,29 +45,33 @@ class JobManagerHandlerMixin:
4545
@classmethod
4646
def initializeHandler(cls, serviceInfoDict):
4747
"""Initialization of DB objects and OptimizationMind"""
48-
try:
49-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
50-
if not result["OK"]:
51-
return result
52-
cls.jobDB = result["Value"](parentLogger=cls.log)
48+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
49+
if not result["OK"]:
50+
return result
51+
cls.jobDB = result["Value"](parentLogger=cls.log)
5352

54-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
55-
if not result["OK"]:
56-
return result
57-
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
53+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
54+
if not result["OK"]:
55+
return result
56+
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
5857

59-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
60-
if not result["OK"]:
61-
return result
62-
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
58+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
59+
if not result["OK"]:
60+
return result
61+
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
6362

64-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
63+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
64+
if not result["OK"]:
65+
return result
66+
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
67+
68+
try:
69+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
6570
if not result["OK"]:
6671
return result
67-
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
68-
69-
except RuntimeError as excp:
70-
return S_ERROR(f"Can't connect to DB: {excp!r}")
72+
cls.storageManagementDB = result["Value"](parentLogger=cls.log)
73+
except RuntimeError:
74+
cls.storageManagementDB = None
7175

7276
cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
7377
result = cls.msgClient.connect(JobManager=True)
@@ -464,6 +468,7 @@ def export_deleteJob(self, jobIDs, force=False):
464468
jobdb=self.jobDB,
465469
taskqueuedb=self.taskQueueDB,
466470
pilotagentsdb=self.pilotAgentsDB,
471+
storagemanagementdb=self.storageManagementDB,
467472
)
468473

469474
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
@@ -501,6 +506,7 @@ def export_killJob(self, jobIDs, force=False):
501506
jobdb=self.jobDB,
502507
taskqueuedb=self.taskQueueDB,
503508
pilotagentsdb=self.pilotAgentsDB,
509+
storagemanagementdb=self.storageManagementDB,
504510
)
505511

506512
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()

0 commit comments

Comments
 (0)