diff --git a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py index 0e173e38190..6607cc998e0 100644 --- a/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py +++ b/src/DIRAC/FrameworkSystem/Client/ComponentInstaller.py @@ -2108,7 +2108,8 @@ def installDatabase(self, dbName): perms = ( "SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES," - "CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE" + "CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE," + "CREATE TEMPORARY TABLES" ) cmd = f"GRANT {perms} ON `{dbName}`.* TO '{self.mysqlUser}'@'%'" result = self.execMySQL(cmd) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index 28631f4a7fa..e3fd627a4b3 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -236,7 +236,7 @@ def _deleteRemoveJobs(self, jobList, remove=False): result = wmsClient.deleteJob(jobsList) if not result["OK"]: self.log.error( - "Could not {'remove' if remove else 'delete'} jobs", + f"Could not {'remove' if remove else 'delete'} jobs", f"for {user} : {ownerGroup} (n={len(jobsList)}) : {result['Message']}", ) fail = True diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index d1391da4288..90f95a7f7fc 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -20,7 +20,7 @@ from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd from DIRAC.Core.Utilities.Decorators import deprecated from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError -from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK +from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus @@ -106,18 +106,7 @@ def getJobParameters(self, jobID, paramList=None): Returns a dictionary with the Job Parameters. If parameterList is empty - all the parameters are returned. """ - - if isinstance(jobID, (str, int)): - jobID = [jobID] - - jobIDList = [] - for jID in jobID: - ret = self._escapeString(str(jID)) - if not ret["OK"]: - return ret - jobIDList.append(ret["Value"]) - - # self.log.debug('JobDB.getParameters: Getting Parameters for jobs %s' % ','.join(jobIDList)) + jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID resultDict = {} if paramList: @@ -130,7 +119,7 @@ def getJobParameters(self, jobID, paramList=None): return ret paramNameList.append(ret["Value"]) cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format( - ",".join(jobIDList), + ",".join(str(int(j)) for j in jobIDList), ",".join(paramNameList), ) result = self._query(cmd) @@ -207,13 +196,13 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1): return S_ERROR("JobDB.getAtticJobParameters: failed to retrieve parameters") ############################################################################# + @convertToReturnValue def getJobsAttributes(self, jobIDs, attrList=None): """Get all Job(s) Attributes for a given list of jobIDs. Return a dictionary with all Job Attributes as value pairs """ - if not jobIDs: - return S_OK({}) + return {} # If no list of attributes is given, return all attributes if not attrList: @@ -229,28 +218,29 @@ def getJobsAttributes(self, jobIDs, attrList=None): attrNameListS = [] for x in attrList: - ret = self._escapeString(x) - if not ret["OK"]: - return ret - x = "`" + ret["Value"][1:-1] + "`" + x = "`" + returnValueOrRaise(self._escapeString(x))[1:-1] + "`" attrNameListS.append(x) attrNames = "JobID," + ",".join(attrNameListS) - cmd = f"SELECT {attrNames} FROM Jobs WHERE JobID IN ({','.join(str(jobID) for jobID in jobIDs)})" - res = self._query(cmd) - if not res["OK"]: - return res - if not res["Value"]: - return S_OK({}) + sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;" + returnValueOrRaise(self._update(sqlCmd)) + try: + sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )" + returnValueOrRaise(self._updatemany(sqlCmd, [(int(j),) for j in jobIDs])) + sqlCmd = f"SELECT {attrNames} FROM Jobs JOIN to_select_Jobs USING (JobID)" + result = returnValueOrRaise(self._query(sqlCmd)) + finally: + sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs" + returnValueOrRaise(self._update(sqlCmd)) attributes = {} - for t_att in res["Value"]: + for t_att in result: jobID = int(t_att[0]) attributes.setdefault(jobID, {}) for tx, ax in zip(t_att[1:], attrList): attributes[jobID].setdefault(ax, tx) - return S_OK(attributes) + return attributes ############################################################################# def getJobAttributes(self, jobID, attrList=None): @@ -527,12 +517,10 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No if not isinstance(jobID, (list, tuple)): jobIDList = [jobID] - jIDList = [] - for jID in jobIDList: - ret = self._escapeString(jID) - if not ret["OK"]: - return ret - jIDList.append(ret["Value"]) + try: + jIDList = [int(jID) for jID in jobIDList] + except ValueError as e: + return S_ERROR(f"JobDB.setAttributes: {e}") if len(attrNames) != len(attrValues): return S_ERROR("JobDB.setAttributes: incompatible Argument length") @@ -561,7 +549,7 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No if not attr: return S_ERROR("JobDB.setAttributes: Nothing to do") - cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(jIDList)} )" + cmd = f"UPDATE Jobs SET {', '.join(attr)} WHERE JobID in ( {', '.join(str(int(j)) for j in jIDList)} )" if myDate: cmd += f" AND LastUpdateTime < {myDate}" @@ -987,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, return S_OK() ############################################################################# + @convertToReturnValue def removeJobFromDB(self, jobIDs): """ Remove jobs from the Job DB and clean up all the job related data in various tables """ - - # ret = self._escapeString(jobID) - # if not ret['OK']: - # return ret - # e_jobID = ret['Value'] - if not jobIDs: - return S_OK() - - if not isinstance(jobIDs, list): - jobIDList = [jobIDs] - else: - jobIDList = jobIDs + return None + jobIDList = jobIDs if isinstance(jobIDs, list) else [jobIDs] failedTablesList = [] - for table in [ - "InputData", - "JobParameters", - "AtticJobParameters", - "HeartBeatLoggingInfo", - "OptimizerParameters", - "JobCommands", - "Jobs", - "JobJDLs", - ]: - cmd = f"DELETE FROM {table} WHERE JobID in ({','.join(str(j) for j in jobIDList)})" - result = self._update(cmd) - if not result["OK"]: - failedTablesList.append(table) - if failedTablesList: - return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})") + sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;" + returnValueOrRaise(self._update(sqlCmd)) + try: + sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )" + returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobIDList])) + + for table in [ + "InputData", + "JobParameters", + "AtticJobParameters", + "HeartBeatLoggingInfo", + "OptimizerParameters", + "JobCommands", + "Jobs", + "JobJDLs", + ]: + sqlCmd = f"DELETE m from `{table}` m JOIN to_delete_Jobs t USING (JobID)" + if not self._update(sqlCmd)["OK"]: + failedTablesList.append(table) + finally: + sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs" + returnValueOrRaise(self._update(sqlCmd)) - return S_OK() + if failedTablesList: + raise SErrorException(f"Errors while job removal (tables {','.join(failedTablesList)})") ############################################################################# def rescheduleJob(self, jobID): diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py index e4304344e58..6c3aacbb03d 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py @@ -11,6 +11,7 @@ from DIRAC import S_ERROR, S_OK from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue from DIRAC.FrameworkSystem.Client.Logger import contextLogger MAGIC_EPOC_NUMBER = 1270000000 @@ -145,21 +146,33 @@ def getJobLoggingInfo(self, jobID): return S_OK(return_value) ############################################################################# + @convertToReturnValue def deleteJob(self, jobID): """Delete logging records for given jobs""" if not jobID: - return S_OK() + return None - # Make sure that we have a list of strings of jobIDs if isinstance(jobID, int): - jobList = [str(jobID)] + jobList = [jobID] elif isinstance(jobID, str): jobList = jobID.replace(" ", "").split(",") else: - jobList = list(str(j) for j in jobID) + jobList = jobID - req = f"DELETE FROM LoggingInfo WHERE JobID IN ({','.join(jobList)})" - return self._update(req) + sqlCmd = ( + "CREATE TEMPORARY TABLE to_delete_LoggingInfo (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;" + ) + returnValueOrRaise(self._update(sqlCmd)) + try: + sqlCmd = "INSERT INTO to_delete_LoggingInfo (JobID) VALUES ( %s )" + returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobList])) + sqlCmd = "DELETE l from `LoggingInfo` l JOIN to_delete_LoggingInfo t USING (JobID)" + result = returnValueOrRaise(self._update(sqlCmd)) + finally: + sqlCmd = "DROP TEMPORARY TABLE to_delete_LoggingInfo" + returnValueOrRaise(self._update(sqlCmd)) + + return result ############################################################################# def getWMSTimeStamps(self, jobID): diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index 5e66cd54a0b..da3ccb46ed1 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -28,6 +28,7 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.MySQL import _quotedList +from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise from DIRAC.FrameworkSystem.Client.Logger import contextLogger from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus from DIRAC.WorkloadManagementSystem.Client import PilotStatus @@ -401,9 +402,18 @@ def __getPilotID(self, pilotRef): if result["Value"]: return int(result["Value"][0][0]) return 0 - refString = ",".join(["'" + ref + "'" for ref in pilotRef]) - req = f"SELECT PilotID from PilotAgents WHERE PilotJobReference in ( {refString} )" - result = self._query(req) + + sqlCmd = "CREATE TEMPORARY TABLE to_select_PilotAgents (PilotID VARCHAR(255) NOT NULL, PRIMARY KEY (PilotID)) ENGINE=MEMORY;" + returnValueOrRaise(self._update(sqlCmd)) + try: + sqlCmd = "INSERT INTO to_select_PilotAgents (PilotID) VALUES ( %s )" + returnValueOrRaise(self._updatemany(sqlCmd, [(p,) for p in pilotRef])) + sqlCmd = "SELECT PilotID FROM PilotAgents JOIN to_select_PilotAgents USING (PilotID)" + result = self._query(sqlCmd) + finally: + sqlCmd = "DROP TEMPORARY TABLE to_select_PilotAgents" + returnValueOrRaise(self._update(sqlCmd)) + if not result["OK"]: return [] if result["Value"]: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py b/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py index 04d43dbcc39..1978f34bd4a 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/SandboxMetadataDB.py @@ -5,6 +5,7 @@ from DIRAC.Core.Base.DB import DB from DIRAC.Core.Security import Properties from DIRAC.Core.Utilities import List +from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise class SandboxMetadataDB(DB): @@ -219,11 +220,8 @@ def assignSandboxesToEntities(self, enDict, requesterName, requesterGroup, owner return result return S_OK(assigned) - def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGroup): - """ - Given a list of entities and a requester, return the ones that the requester is allowed to modify - """ - sqlCond = ["s.OwnerId=o.OwnerId", "s.SBId=e.SBId"] + def __entitiesByRequesterCond(self, requesterName, requesterGroup): + sqlCond = [] requesterProps = Registry.getPropertiesForEntity(requesterGroup, name=requesterName) if Properties.JOB_ADMINISTRATOR in requesterProps: # Do nothing, just ensure it doesn't fit in the other cases @@ -235,44 +233,40 @@ def __filterEntitiesByRequester(self, entitiesList, requesterName, requesterGrou sqlCond.append(f"o.Owner='{requesterName}'") else: return S_ERROR("Not authorized to access sandbox") - for i in range(len(entitiesList)): - entitiesList[i] = self._escapeString(entitiesList[i])["Value"] - if len(entitiesList) == 1: - sqlCond.append(f"e.EntityId = {entitiesList[0]}") - else: - sqlCond.append(f"e.EntityId in ( {', '.join(entitiesList)} )") - sqlCmd = "SELECT DISTINCT e.EntityId FROM `sb_EntityMapping` e, `sb_SandBoxes` s, `sb_Owners` o WHERE" - sqlCmd = f"{sqlCmd} {' AND '.join(sqlCond)}" - result = self._query(sqlCmd) - if not result["OK"]: - return result - return S_OK([row[0] for row in result["Value"]]) + return sqlCond + @convertToReturnValue def unassignEntities(self, entities, requesterName, requesterGroup): """ Unassign jobs to sandboxes :param list entities: list of entities to unassign """ - updated = 0 if not entities: - return S_OK() - result = self.__filterEntitiesByRequester(entities, requesterName, requesterGroup) - if not result["OK"]: - gLogger.error("Cannot filter entities", result["Message"]) - return result - ids = result["Value"] - if not ids: - return S_OK(0) - sqlCmd = "DELETE FROM `sb_EntityMapping` WHERE EntityId in ( %s )" % ", ".join( - ["'%s'" % str(eid) for eid in ids] - ) - result = self._update(sqlCmd) - if not result["OK"]: - gLogger.error("Cannot unassign entities", result["Message"]) - else: - updated += 1 - return S_OK(updated) + return None + conds = self.__entitiesByRequesterCond(requesterName, requesterGroup) + + sqlCmd = "CREATE TEMPORARY TABLE to_delete_EntityId (EntityId VARCHAR(128) NOT NULL, PRIMARY KEY (EntityId)) ENGINE=MEMORY;" + returnValueOrRaise(self._update(sqlCmd)) + try: + sqlCmd = "INSERT INTO to_delete_EntityId (EntityId) VALUES ( %s )" + returnValueOrRaise(self._updatemany(sqlCmd, [(e,) for e in entities])) + sqlCmd = "DELETE m from `sb_EntityMapping` m JOIN to_delete_EntityId t USING (EntityId)" + if conds: + sqlCmd = " ".join( + [ + sqlCmd, + "JOIN `sb_SandBoxes` s ON s.SBId = m.SBId", + "JOIN `sb_Owners` o ON s.OwnerId = o.OwnerId", + "WHERE", + " AND ".join(conds), + ] + ) + returnValueOrRaise(self._update(sqlCmd)) + finally: + sqlCmd = "DROP TEMPORARY TABLE to_delete_EntityId" + returnValueOrRaise(self._update(sqlCmd)) + return 1 def getSandboxesAssignedToEntity(self, entityId, requesterName, requesterGroup, requestedVO): """