Skip to content

Commit 01377d7

Browse files
committed
refactor: load StorageManagementDB if present
1 parent ea34091 commit 01377d7

File tree

4 files changed

+55
-21
lines changed

4 files changed

+55
-21
lines changed

src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def __init__(self, *args, **kwargs):
7070
self.jobDB = None
7171
self.pilotAgentsDB = None
7272
self.taskQueueDB = None
73+
self.storageManagementDB = None
7374

7475
# # transformations types
7576
self.transformationTypes = None
@@ -144,6 +145,14 @@ def initialize(self):
144145
return result
145146
self.taskQueueDB = result["Value"]()
146147

148+
try:
149+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
150+
if not result["OK"]:
151+
return result
152+
self.storageManagementDB = result["Value"]()
153+
except RuntimeError:
154+
pass
155+
147156
return S_OK()
148157

149158
#############################################################################
@@ -635,6 +644,7 @@ def __removeWMSTasks(self, transJobIDs):
635644
jobdb=self.jobDB,
636645
taskqueuedb=self.taskQueueDB,
637646
pilotagentsdb=self.pilotAgentsDB,
647+
storagemanagementdb=self.storageManagementDB,
638648
)
639649
# Prevent 0 job IDs
640650
jobIDs = [int(j) for j in transJobIDs if int(j)]

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def __init__(self, *args, **kwargs):
5757
self.taskQueueDB = None
5858
self.pilotAgentsDB = None
5959
self.sandboxDB = None
60+
self.storageManagementDB = None
6061

6162
self.maxJobsAtOnce = 500
6263
self.prodTypes = []
@@ -88,6 +89,14 @@ def initialize(self):
8889
return result
8990
self.sandboxDB = result["Value"]()
9091

92+
try:
93+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
94+
if not result["OK"]:
95+
return result
96+
self.storageManagementDB = result["Value"]()
97+
except RuntimeError:
98+
pass
99+
91100
agentTSTypes = self.am_getOption("ProductionTypes", [])
92101
if agentTSTypes:
93102
self.prodTypes = agentTSTypes
@@ -263,6 +272,7 @@ def _deleteRemoveJobs(self, jobList, remove=False):
263272
jobdb=self.jobDB,
264273
taskqueuedb=self.taskQueueDB,
265274
pilotagentsdb=self.pilotAgentsDB,
275+
storagemanagementdb=self.storageManagementDB,
266276
)
267277
if not result["OK"]:
268278
self.log.error(

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self, *args, **kwargs):
4141
self.logDB = None
4242
self.taskQueueDB = None
4343
self.pilotAgentsDB = None
44+
self.storageManagementDB = None
4445
self.matchedTime = 7200
4546
self.rescheduledTime = 600
4647
self.submittingTime = 300
@@ -72,6 +73,14 @@ def initialize(self):
7273
return result
7374
self.pilotAgentsDB = result["Value"]()
7475

76+
try:
77+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
78+
if not result["OK"]:
79+
return result
80+
self.storageManagementDB = result["Value"]()
81+
except RuntimeError:
82+
pass
83+
7584
# getting parameters
7685

7786
if not self.am_getOption("Enable", True):
@@ -261,6 +270,7 @@ def _failStalledJobs(self, jobID):
261270
jobdb=self.jobDB,
262271
taskqueuedb=self.taskQueueDB,
263272
pilotagentsdb=self.pilotAgentsDB,
273+
storagemanagementdb=self.storageManagementDB,
264274
)
265275
if not res["OK"]:
266276
self.log.error("Failed to kill job", jobID)

src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
from DIRAC import S_ERROR, S_OK
1616
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getVOForGroup
17-
from DIRAC.Core.DISET.MessageClient import MessageClient
1817
from DIRAC.Core.DISET.RequestHandler import RequestHandler
1918
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2019
from DIRAC.Core.Utilities.DErrno import EWMSJDL, EWMSSUBM
@@ -23,6 +22,7 @@
2322
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2423
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2524
from DIRAC.WorkloadManagementSystem.Client import JobStatus
25+
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
2626
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
2727
RIGHT_DELETE,
2828
RIGHT_KILL,
@@ -31,7 +31,6 @@
3131
RIGHT_SUBMIT,
3232
JobPolicy,
3333
)
34-
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
3534
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
3635
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
3736
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
@@ -45,31 +44,34 @@ class JobManagerHandlerMixin:
4544
@classmethod
4645
def initializeHandler(cls, serviceInfoDict):
4746
"""Initialization of DB objects and OptimizationMind"""
48-
try:
49-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
50-
if not result["OK"]:
51-
return result
52-
cls.jobDB = result["Value"](parentLogger=cls.log)
47+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
48+
if not result["OK"]:
49+
return result
50+
cls.jobDB = result["Value"](parentLogger=cls.log)
5351

54-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
55-
if not result["OK"]:
56-
return result
57-
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
52+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
53+
if not result["OK"]:
54+
return result
55+
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
5856

59-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
60-
if not result["OK"]:
61-
return result
62-
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
57+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
58+
if not result["OK"]:
59+
return result
60+
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
6361

64-
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
62+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
63+
if not result["OK"]:
64+
return result
65+
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
66+
67+
try:
68+
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
6569
if not result["OK"]:
6670
return result
67-
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
68-
69-
except RuntimeError as excp:
70-
return S_ERROR(f"Can't connect to DB: {excp!r}")
71+
cls.storageManagementDB = result["Value"]()
72+
except RuntimeError:
73+
cls.storageManagementDB = None
7174

72-
cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
7375
result = cls.msgClient.connect(JobManager=True)
7476
if not result["OK"]:
7577
cls.log.warn("Cannot connect to OptimizationMind!", result["Message"])
@@ -464,6 +466,7 @@ def export_deleteJob(self, jobIDs, force=False):
464466
jobdb=self.jobDB,
465467
taskqueuedb=self.taskQueueDB,
466468
pilotagentsdb=self.pilotAgentsDB,
469+
storagemanagementDB=self.storageManagementDB,
467470
)
468471

469472
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()
@@ -501,6 +504,7 @@ def export_killJob(self, jobIDs, force=False):
501504
jobdb=self.jobDB,
502505
taskqueuedb=self.taskQueueDB,
503506
pilotagentsdb=self.pilotAgentsDB,
507+
storagemanagementDB=self.storageManagementDB,
504508
)
505509

506510
result["requireProxyUpload"] = len(ownerJobList) > 0 and self.__checkIfProxyUploadIsRequired()

0 commit comments

Comments
 (0)