Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,8 @@ def installDatabase(self, dbName):

perms = (
"SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,"
"CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE"
"CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE,"
"CREATE TEMPORARY TABLES"
)
cmd = f"GRANT {perms} ON `{dbName}`.* TO '{self.mysqlUser}'@'%'"
result = self.execMySQL(cmd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def _deleteRemoveJobs(self, jobList, remove=False):
result = wmsClient.deleteJob(jobsList)
if not result["OK"]:
self.log.error(
"Could not {'remove' if remove else 'delete'} jobs",
f"Could not {'remove' if remove else 'delete'} jobs",
f"for {user} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}",
)
fail = True
Expand Down
114 changes: 50 additions & 64 deletions src/DIRAC/WorkloadManagementSystem/DB/JobDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK
from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
Expand Down Expand Up @@ -106,18 +106,7 @@ def getJobParameters(self, jobID, paramList=None):
Returns a dictionary with the Job Parameters.
If parameterList is empty - all the parameters are returned.
"""

if isinstance(jobID, (str, int)):
jobID = [jobID]

jobIDList = []
for jID in jobID:
ret = self._escapeString(str(jID))
if not ret["OK"]:
return ret
jobIDList.append(ret["Value"])

# self.log.debug('JobDB.getParameters: Getting Parameters for jobs %s' % ','.join(jobIDList))
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID

resultDict = {}
if paramList:
Expand All @@ -130,7 +119,7 @@ def getJobParameters(self, jobID, paramList=None):
return ret
paramNameList.append(ret["Value"])
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
",".join(jobIDList),
",".join(str(int(j)) for j in jobIDList),
",".join(paramNameList),
)
result = self._query(cmd)
Expand Down Expand Up @@ -207,13 +196,13 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
return S_ERROR("JobDB.getAtticJobParameters: failed to retrieve parameters")

#############################################################################
@convertToReturnValue
def getJobsAttributes(self, jobIDs, attrList=None):
"""Get all Job(s) Attributes for a given list of jobIDs.
Return a dictionary with all Job Attributes as value pairs
"""

if not jobIDs:
return S_OK({})
return {}

# If no list of attributes is given, return all attributes
if not attrList:
Expand All @@ -229,28 +218,29 @@ def getJobsAttributes(self, jobIDs, attrList=None):

attrNameListS = []
for x in attrList:
ret = self._escapeString(x)
if not ret["OK"]:
return ret
x = "`" + ret["Value"][1:-1] + "`"
x = "`" + returnValueOrRaise(self._escapeString(x))[1:-1] + "`"
attrNameListS.append(x)
attrNames = "JobID," + ",".join(attrNameListS)

cmd = f"SELECT {attrNames} FROM Jobs WHERE JobID IN ({','.join(str(jobID) for jobID in jobIDs)})"
res = self._query(cmd)
if not res["OK"]:
return res
if not res["Value"]:
return S_OK({})
sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
returnValueOrRaise(self._update(sqlCmd))
try:
sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(int(j),) for j in jobIDs]))
sqlCmd = f"SELECT {attrNames} FROM Jobs JOIN to_select_Jobs USING (JobID)"
result = returnValueOrRaise(self._query(sqlCmd))
finally:
sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs"
returnValueOrRaise(self._update(sqlCmd))

attributes = {}
for t_att in res["Value"]:
for t_att in result:
jobID = int(t_att[0])
attributes.setdefault(jobID, {})
for tx, ax in zip(t_att[1:], attrList):
attributes[jobID].setdefault(ax, tx)

return S_OK(attributes)
return attributes

#############################################################################
def getJobAttributes(self, jobID, attrList=None):
Expand Down Expand Up @@ -527,12 +517,10 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
if not isinstance(jobID, (list, tuple)):
jobIDList = [jobID]

jIDList = []
for jID in jobIDList:
ret = self._escapeString(jID)
if not ret["OK"]:
return ret
jIDList.append(ret["Value"])
try:
jIDList = [int(jID) for jID in jobIDList]
except ValueError as e:
return S_ERROR(f"JobDB.setAttributes: {e}")

if len(attrNames) != len(attrValues):
return S_ERROR("JobDB.setAttributes: incompatible Argument length")
Expand Down Expand Up @@ -561,7 +549,7 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
if not attr:
return S_ERROR("JobDB.setAttributes: Nothing to do")

cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(jIDList)} )"
cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(str(int(j)) for j in jIDList)} )"

if myDate:
cmd += f" AND LastUpdateTime < {myDate}"
Expand Down Expand Up @@ -987,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup,
return S_OK()

#############################################################################
@convertToReturnValue
def removeJobFromDB(self, jobIDs):
"""
Remove jobs from the Job DB and clean up all the job related data in various tables
"""

# ret = self._escapeString(jobID)
# if not ret['OK']:
# return ret
# e_jobID = ret['Value']

if not jobIDs:
return S_OK()

if not isinstance(jobIDs, list):
jobIDList = [jobIDs]
else:
jobIDList = jobIDs
return None
jobIDList = jobIDs if isinstance(jobIDs, list) else [jobIDs]

failedTablesList = []
for table in [
"InputData",
"JobParameters",
"AtticJobParameters",
"HeartBeatLoggingInfo",
"OptimizerParameters",
"JobCommands",
"Jobs",
"JobJDLs",
]:
cmd = f"DELETE FROM {table} WHERE JobID in ({','.join(str(j) for j in jobIDList)})"
result = self._update(cmd)
if not result["OK"]:
failedTablesList.append(table)

if failedTablesList:
return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})")
sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
returnValueOrRaise(self._update(sqlCmd))
try:
sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobIDList]))

for table in [
"InputData",
"JobParameters",
"AtticJobParameters",
"HeartBeatLoggingInfo",
"OptimizerParameters",
"JobCommands",
"Jobs",
"JobJDLs",
]:
sqlCmd = f"DELETE m from `{table}` m JOIN to_delete_Jobs t USING (JobID)"
if not self._update(sqlCmd)["OK"]:
failedTablesList.append(table)
finally:
sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs"
returnValueOrRaise(self._update(sqlCmd))

return S_OK()
if failedTablesList:
raise SErrorException(f"Errors while job removal (tables {','.join(failedTablesList)})")

#############################################################################
def rescheduleJob(self, jobID):
Expand Down
25 changes: 19 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from DIRAC import S_ERROR, S_OK
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue
from DIRAC.FrameworkSystem.Client.Logger import contextLogger

MAGIC_EPOC_NUMBER = 1270000000
Expand Down Expand Up @@ -145,21 +146,33 @@ def getJobLoggingInfo(self, jobID):
return S_OK(return_value)

#############################################################################
@convertToReturnValue
def deleteJob(self, jobID):
"""Delete logging records for given jobs"""
if not jobID:
return S_OK()
return None

# Make sure that we have a list of strings of jobIDs
if isinstance(jobID, int):
jobList = [str(jobID)]
jobList = [jobID]
elif isinstance(jobID, str):
jobList = jobID.replace(" ", "").split(",")
else:
jobList = list(str(j) for j in jobID)
jobList = jobID

req = f"DELETE FROM LoggingInfo WHERE JobID IN ({','.join(jobList)})"
return self._update(req)
sqlCmd = (
"CREATE TEMPORARY TABLE to_delete_LoggingInfo (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
)
returnValueOrRaise(self._update(sqlCmd))
try:
sqlCmd = "INSERT INTO to_delete_LoggingInfo (JobID) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobList]))
sqlCmd = "DELETE l from `LoggingInfo` l JOIN to_delete_LoggingInfo t USING (JobID)"
result = returnValueOrRaise(self._update(sqlCmd))
finally:
sqlCmd = "DROP TEMPORARY TABLE to_delete_LoggingInfo"
returnValueOrRaise(self._update(sqlCmd))

return result

#############################################################################
def getWMSTimeStamps(self, jobID):
Expand Down
16 changes: 13 additions & 3 deletions src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.MySQL import _quotedList
from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
from DIRAC.FrameworkSystem.Client.Logger import contextLogger
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
Expand Down Expand Up @@ -401,9 +402,18 @@ def __getPilotID(self, pilotRef):
if result["Value"]:
return int(result["Value"][0][0])
return 0
refString = ",".join(["'" + ref + "'" for ref in pilotRef])
req = f"SELECT PilotID from PilotAgents WHERE PilotJobReference in ( {refString} )"
result = self._query(req)

sqlCmd = "CREATE TEMPORARY TABLE to_select_PilotAgents (PilotID VARCHAR(255) NOT NULL, PRIMARY KEY (PilotID)) ENGINE=MEMORY;"
returnValueOrRaise(self._update(sqlCmd))
try:
sqlCmd = "INSERT INTO to_select_PilotAgents (PilotID) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(p,) for p in pilotRef]))
sqlCmd = "SELECT PilotID FROM PilotAgents JOIN to_select_PilotAgents USING (PilotID)"
result = self._query(sqlCmd)
finally:
sqlCmd = "DROP TEMPORARY TABLE to_select_PilotAgents"
returnValueOrRaise(self._update(sqlCmd))

if not result["OK"]:
return []
if result["Value"]:
Expand Down
64 changes: 29 additions & 35 deletions src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from DIRAC.Core.Base.DB import DB
from DIRAC.Core.Security import Properties
from DIRAC.Core.Utilities import List
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise


class SandboxMetadataDB(DB):
Expand Down Expand Up @@ -219,11 +220,8 @@ def assignSandboxesToEntities(self, enDict, requesterName, requesterGroup, owner
return result
return S_OK(assigned)

def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGroup):
"""
Given a list of entities and a requester, return the ones that the requester is allowed to modify
"""
sqlCond = ["s.OwnerId=o.OwnerId", "s.SBId=e.SBId"]
def __entitiesByRequesterCond(self, requesterName, requesterGroup):
sqlCond = []
requesterProps = Registry.getPropertiesForEntity(requesterGroup, name=requesterName)
if Properties.JOB_ADMINISTRATOR in requesterProps:
# Do nothing, just ensure it doesn't fit in the other cases
Expand All @@ -235,44 +233,40 @@ def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGrou
sqlCond.append(f"o.Owner='{requesterName}'")
else:
return S_ERROR("Not authorized to access sandbox")
for i in range(len(entitiesList)):
entitiesList[i] = self._escapeString(entitiesList[i])["Value"]
if len(entitiesList) == 1:
sqlCond.append(f"e.EntityId = {entitiesList[0]}")
else:
sqlCond.append(f"e.EntityId in ( {', '.join(entitiesList)} )")
sqlCmd = "SELECT DISTINCT e.EntityId FROM `sb_EntityMapping` e, `sb_SandBoxes` s, `sb_Owners` o WHERE"
sqlCmd = f"{sqlCmd} {' AND '.join(sqlCond)}"
result = self._query(sqlCmd)
if not result["OK"]:
return result
return S_OK([row[0] for row in result["Value"]])
return sqlCond

@convertToReturnValue
def unassignEntities(self, entities, requesterName, requesterGroup):
"""
Unassign jobs to sandboxes

:param list entities: list of entities to unassign
"""
updated = 0
if not entities:
return S_OK()
result = self.__filterEntitiesByRequester(entities, requesterName, requesterGroup)
if not result["OK"]:
gLogger.error("Cannot filter entities", result["Message"])
return result
ids = result["Value"]
if not ids:
return S_OK(0)
sqlCmd = "DELETE FROM `sb_EntityMapping` WHERE EntityId in ( %s )" % ", ".join(
["'%s'" % str(eid) for eid in ids]
)
result = self._update(sqlCmd)
if not result["OK"]:
gLogger.error("Cannot unassign entities", result["Message"])
else:
updated += 1
return S_OK(updated)
return None
conds = self.__entitiesByRequesterCond(requesterName, requesterGroup)

sqlCmd = "CREATE TEMPORARY TABLE to_delete_EntityId (EntityId VARCHAR(128) NOT NULL, PRIMARY KEY (EntityId)) ENGINE=MEMORY;"
returnValueOrRaise(self._update(sqlCmd))
try:
sqlCmd = "INSERT INTO to_delete_EntityId (EntityId) VALUES ( %s )"
returnValueOrRaise(self._updatemany(sqlCmd, [(e,) for e in entities]))
sqlCmd = "DELETE m from `sb_EntityMapping` m JOIN to_delete_EntityId t USING (EntityId)"
if conds:
sqlCmd = " ".join(
[
sqlCmd,
"JOIN `sb_SandBoxes` s ON s.SBId = m.SBId",
"JOIN `sb_Owners` o ON s.OwnerId = o.OwnerId",
"WHERE",
" AND ".join(conds),
]
)
returnValueOrRaise(self._update(sqlCmd))
finally:
sqlCmd = "DROP TEMPORARY TABLE to_delete_EntityId"
returnValueOrRaise(self._update(sqlCmd))
return 1

def getSandboxesAssignedToEntity(self, entityId, requesterName, requesterGroup, requestedVO):
"""
Expand Down
Loading