Skip to content

Commit 546245a

Browse files
committed
fix: Optimize JobDB.getJobsAttributes
1 parent 6797f53 commit 546245a

2 files changed

Lines changed: 17 additions & 16 deletions

File tree

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
2121
from DIRAC.Core.Utilities.Decorators import deprecated
2222
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
23-
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
23+
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise
2424
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
2525
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
2626
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
@@ -207,13 +207,13 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
207207
return S_ERROR("JobDB.getAtticJobParameters: failed to retrieve parameters")
208208

209209
#############################################################################
210+
@convertToReturnValue
210211
def getJobsAttributes(self, jobIDs, attrList=None):
211212
"""Get all Job(s) Attributes for a given list of jobIDs.
212213
Return a dictionary with all Job Attributes as value pairs
213214
"""
214-
215215
if not jobIDs:
216-
return S_OK({})
216+
return {}
217217

218218
# If no list of attributes is given, return all attributes
219219
if not attrList:
@@ -229,28 +229,29 @@ def getJobsAttributes(self, jobIDs, attrList=None):
229229

230230
attrNameListS = []
231231
for x in attrList:
232-
ret = self._escapeString(x)
233-
if not ret["OK"]:
234-
return ret
235-
x = "`" + ret["Value"][1:-1] + "`"
232+
x = "`" + returnValueOrRaise(self._escapeString(x))[1:-1] + "`"
236233
attrNameListS.append(x)
237234
attrNames = "JobID," + ",".join(attrNameListS)
238235

239-
cmd = f"SELECT {attrNames} FROM Jobs WHERE JobID IN ({','.join(str(jobID) for jobID in jobIDs)})"
240-
res = self._query(cmd)
241-
if not res["OK"]:
242-
return res
243-
if not res["Value"]:
244-
return S_OK({})
236+
sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
237+
returnValueOrRaise(self._update(sqlCmd))
238+
try:
239+
sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )"
240+
returnValueOrRaise(self._updatemany(sqlCmd, [(int(j),) for j in jobIDs]))
241+
sqlCmd = f"SELECT {attrNames} FROM Jobs JOIN to_select_Jobs USING (JobID)"
242+
result = returnValueOrRaise(self._query(sqlCmd))
243+
finally:
244+
sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs"
245+
returnValueOrRaise(self._update(sqlCmd))
245246

246247
attributes = {}
247-
for t_att in res["Value"]:
248+
for t_att in result:
248249
jobID = int(t_att[0])
249250
attributes.setdefault(jobID, {})
250251
for tx, ax in zip(t_att[1:], attrList):
251252
attributes[jobID].setdefault(ax, tx)
252253

253-
return S_OK(attributes)
254+
return attributes
254255

255256
#############################################################################
256257
def getJobAttributes(self, jobID, attrList=None):

src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def deleteJob(self, jobID):
164164
)
165165
returnValueOrRaise(self._update(sqlCmd))
166166
try:
167-
sqlCmd = "INSERT INTO to_delete_LoggingInfo (EntityId) VALUES ( %s )"
167+
sqlCmd = "INSERT INTO to_delete_LoggingInfo (JobID) VALUES ( %s )"
168168
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobList]))
169169
sqlCmd = "DELETE l from `LoggingInfo` l JOIN to_delete_LoggingInfo t USING (JobID)"
170170
result = returnValueOrRaise(self._update(sqlCmd))

0 commit comments

Comments
 (0)