|
2 | 2 | """ |
3 | 3 | import multiprocessing |
4 | 4 |
|
5 | | -from DIRAC import gConfig, gLogger |
| 5 | +from DIRAC import gConfig, gLogger, S_OK |
6 | 6 | from DIRAC.Core.Utilities.List import fromChar |
| 7 | +from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB |
| 8 | +from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB |
7 | 9 |
|
8 | 10 |
|
9 | 11 | def getMemoryFromProc(): |
@@ -139,3 +141,61 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None): |
139 | 141 | # 3) return 0 |
140 | 142 | gLogger.info("NumberOfGPUs could not be found in CS") |
141 | 143 | return 0 |
| 144 | + |
| 145 | + |
| 146 | +def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict: |
| 147 | + """Utility to get a job parameter for a list of jobIDs pertaining to a VO. |
| 148 | + If the jobID is not in the JobParametersDB, it will be looked up in the JobDB. |
| 149 | + |
| 150 | + Requires direct access to the JobParametersDB and JobDB. |
| 151 | + :param jobIDs: list of jobIDs |
| 152 | + :param parName: name of the parameter to be retrieved |
| 153 | + :param vo: VO of the jobIDs |
| 154 | + :return: dictionary with jobID as key and the parameter as value |
| 155 | + :rtype: dict |
| 156 | + """ |
| 157 | + elasticJobParametersDB = JobParametersDB() |
| 158 | + jobDB = JobDB() |
| 159 | + |
| 160 | + if vo: # a user is connecting, with a proxy |
| 161 | + res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) |
| 162 | + if not res["OK"]: |
| 163 | + return res |
| 164 | + parameters = res["Value"] |
| 165 | + else: # a service is connecting, no proxy, e.g. StalledJobAgent |
| 166 | + q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})" |
| 167 | + res = jobDB._query(q) |
| 168 | + if not res["OK"]: |
| 169 | + return res |
| 170 | + if not res["Value"]: |
| 171 | + return S_OK({}) |
| 172 | + # get the VO for each jobID |
| 173 | + voDict = {} |
| 174 | + for jobID, vo in res["Value"]: |
| 175 | + if vo not in voDict: |
| 176 | + voDict[vo] = [] |
| 177 | + voDict[vo].append(jobID) |
| 178 | + # get the parameters for each VO |
| 179 | + parameters = {} |
| 180 | + for vo, jobIDs in voDict.items(): |
| 181 | + res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName) |
| 182 | + if not res["OK"]: |
| 183 | + return res |
| 184 | + parameters.update(res["Value"]) |
| 185 | + |
| 186 | + # Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends |
| 187 | + res = jobDB.getJobParameters(jobIDs, parName) |
| 188 | + if not res["OK"]: |
| 189 | + return res |
| 190 | + parametersM = res["Value"] |
| 191 | + |
| 192 | + # and now combine |
| 193 | + final = dict(parametersM) |
| 194 | + # if job in JobDB, update with parameters from ES if any |
| 195 | + for jobID in final: |
| 196 | + final[jobID].update(parameters.get(jobID, {})) |
| 197 | + # if job in ES and not in JobDB, take ES |
| 198 | + for jobID in parameters: |
| 199 | + if jobID not in final: |
| 200 | + final[jobID] = parameters[jobID] |
| 201 | + return S_OK(final) |
0 commit comments