Skip to content

Commit 46d2927

Browse files
committed
refactor: moved getJobParameters in JobDBUtils
1 parent 4033c11 commit 46d2927

6 files changed

Lines changed: 75 additions & 74 deletions

File tree

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
1+
"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
22
33
This agent will take care of:
44
- removing all jobs that are in status JobStatus.DELETED
@@ -22,6 +22,7 @@
2222
than 0.
2323
2424
"""
25+
2526
import datetime
2627
import os
2728

@@ -37,10 +38,10 @@
3738
from DIRAC.WorkloadManagementSystem.Client import JobStatus
3839
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
3940
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
41+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import getJobParameters
4042
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
4143
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
4244
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
43-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4445

4546

4647
class JobCleaningAgent(AgentModule):

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" The Push Job Agent class inherits from Job Agent and aims to support job submission in
1+
"""The Push Job Agent class inherits from Job Agent and aims to support job submission in
22
sites with no external connectivity (e.g. some supercomputers).
33
44
.. literalinclude:: ../ConfigTemplate.cfg
@@ -32,6 +32,7 @@
3232
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus
3333
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3434
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
35+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import getJobParameters
3536
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import JobWrapper
3637
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import (
3738
getJobWrapper,
@@ -41,7 +42,6 @@
4142
transferInputSandbox,
4243
)
4344
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
44-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
4545
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
4646
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4747

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
:dedent: 2
99
:caption: StalledJobAgent options
1010
"""
11+
1112
import concurrent.futures
1213
import datetime
1314

@@ -20,9 +21,9 @@
2021
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2122
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2223
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
24+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import getJobParameters
2325
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
2426
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_KILL
25-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2627
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
2728

2829

src/DIRAC/WorkloadManagementSystem/DB/JobDBUtils.py

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,20 @@
33
# Import stateless functions from DIRACCommon for backward compatibility
44
from DIRACCommon.WorkloadManagementSystem.DB.JobDBUtils import *
55

6+
from DIRAC import S_OK
67
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
78
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
89
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
910
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import makeJobManifestConfig
11+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
12+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
1013

1114
getDIRACPlatform = returnValueOrRaise(
1215
ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatform")
1316
)
1417

1518

16-
def checkAndPrepareJob(
17-
jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo
18-
): # pylint: disable=function-redefined
19+
def checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo): # pylint: disable=function-redefined
1920
from DIRACCommon.WorkloadManagementSystem.DB.JobDBUtils import checkAndPrepareJob
2021

2122
config = {
@@ -31,3 +32,63 @@ def checkAndAddOwner(jdl: str, owner: str, ownerGroup: str): # pylint: disable=
3132
from DIRACCommon.WorkloadManagementSystem.DB.JobDBUtils import checkAndAddOwner
3233

3334
return checkAndAddOwner(jdl, owner, ownerGroup, job_manifest_config=makeJobManifestConfig(ownerGroup))
35+
36+
37+
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
38+
"""Utility to get a job parameter for a list of jobIDs pertaining to a VO.
39+
If the jobID is not in the JobParametersDB, it will be looked up in the JobDB.
40+
41+
Requires direct access to the JobParametersDB and JobDB.
42+
43+
:param jobIDs: list of jobIDs
44+
:param parName: name of the parameter to be retrieved
45+
:param vo: VO of the jobIDs
46+
:return: dictionary with jobID as key and the parameter as value
47+
:rtype: dict
48+
"""
49+
50+
elasticJobParametersDB = JobParametersDB()
51+
jobDB = JobDB()
52+
53+
if vo: # a user is connecting, with a proxy
54+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
55+
if not res["OK"]:
56+
return res
57+
parameters = res["Value"]
58+
else: # a service is connecting, no proxy, e.g. StalledJobAgent
59+
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
60+
res = jobDB._query(q)
61+
if not res["OK"]:
62+
return res
63+
if not res["Value"]:
64+
return S_OK({})
65+
# get the VO for each jobID
66+
voDict = {}
67+
for jobID, vo in res["Value"]:
68+
if vo not in voDict:
69+
voDict[vo] = []
70+
voDict[vo].append(jobID)
71+
# get the parameters for each VO
72+
parameters = {}
73+
for vo, jobIDs in voDict.items():
74+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
75+
if not res["OK"]:
76+
return res
77+
parameters.update(res["Value"])
78+
79+
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
80+
res = jobDB.getJobParameters(jobIDs, parName)
81+
if not res["OK"]:
82+
return res
83+
parametersM = res["Value"]
84+
85+
# and now combine
86+
final = dict(parametersM)
87+
# if job in JobDB, update with parameters from ES if any
88+
for jobID in final:
89+
final[jobID].update(parameters.get(jobID, {}))
90+
# if job in ES and not in JobDB, take ES
91+
for jobID in parameters:
92+
if jobID not in final:
93+
final[jobID] = parameters[jobID]
94+
return S_OK(final)

src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
""" JobMonitoringHandler is the implementation of the JobMonitoring service
2-
in the DISET framework
1+
"""JobMonitoringHandler is the implementation of the JobMonitoring service
2+
in the DISET framework
33
4-
The following methods are available in the Service interface
4+
The following methods are available in the Service interface
55
"""
66

77
from DIRAC import S_ERROR, S_OK
@@ -10,7 +10,7 @@
1010
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
1111
from DIRAC.Core.Utilities.JEncode import strToIntDict
1212
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
13-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
13+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import getJobParameters
1414

1515

1616
class JobMonitoringHandlerMixin:

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -141,68 +141,6 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
141141
return 0
142142

143143

144-
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
145-
"""Utility to get a job parameter for a list of jobIDs pertaining to a VO.
146-
If the jobID is not in the JobParametersDB, it will be looked up in the JobDB.
147-
148-
Requires direct access to the JobParametersDB and JobDB.
149-
150-
:param jobIDs: list of jobIDs
151-
:param parName: name of the parameter to be retrieved
152-
:param vo: VO of the jobIDs
153-
:return: dictionary with jobID as key and the parameter as value
154-
:rtype: dict
155-
"""
156-
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
157-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
158-
159-
elasticJobParametersDB = JobParametersDB()
160-
jobDB = JobDB()
161-
162-
if vo: # a user is connecting, with a proxy
163-
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
164-
if not res["OK"]:
165-
return res
166-
parameters = res["Value"]
167-
else: # a service is connecting, no proxy, e.g. StalledJobAgent
168-
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
169-
res = jobDB._query(q)
170-
if not res["OK"]:
171-
return res
172-
if not res["Value"]:
173-
return S_OK({})
174-
# get the VO for each jobID
175-
voDict = {}
176-
for jobID, vo in res["Value"]:
177-
if vo not in voDict:
178-
voDict[vo] = []
179-
voDict[vo].append(jobID)
180-
# get the parameters for each VO
181-
parameters = {}
182-
for vo, jobIDs in voDict.items():
183-
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
184-
if not res["OK"]:
185-
return res
186-
parameters.update(res["Value"])
187-
188-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
189-
res = jobDB.getJobParameters(jobIDs, parName)
190-
if not res["OK"]:
191-
return res
192-
parametersM = res["Value"]
193-
194-
# and now combine
195-
final = dict(parametersM)
196-
# if job in JobDB, update with parameters from ES if any
197-
for jobID in final:
198-
final[jobID].update(parameters.get(jobID, {}))
199-
# if job in ES and not in JobDB, take ES
200-
for jobID in parameters:
201-
if jobID not in final:
202-
final[jobID] = parameters[jobID]
203-
return S_OK(final)
204-
205-
206144
def getAvailableRAM(siteName=None, gridCE=None, queue=None):
207145
"""Gets the available RAM on a certain CE/queue/node (what the pilot administers)
208146

0 commit comments

Comments
 (0)