|
20 | 20 | from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd |
21 | 21 | from DIRAC.Core.Utilities.Decorators import deprecated |
22 | 22 | from DIRAC.Core.Utilities.DErrno import EWMSJMAN, EWMSSUBM, cmpError |
23 | | -from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise |
| 23 | +from DIRAC.Core.Utilities.ReturnValues import S_ERROR, S_OK, convertToReturnValue, returnValueOrRaise, SErrorException |
24 | 24 | from DIRAC.FrameworkSystem.Client.Logger import contextLogger |
25 | 25 | from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus |
26 | 26 | from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus |
@@ -975,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, |
975 | 975 | return S_OK() |
976 | 976 |
|
977 | 977 | ############################################################################# |
| 978 | + @convertToReturnValue |
978 | 979 | def removeJobFromDB(self, jobIDs): |
979 | 980 | """ |
980 | 981 | Remove jobs from the Job DB and clean up all the job related data in various tables |
981 | 982 | """ |
982 | | - |
983 | | - # ret = self._escapeString(jobID) |
984 | | - # if not ret['OK']: |
985 | | - # return ret |
986 | | - # e_jobID = ret['Value'] |
987 | | - |
988 | 983 | if not jobIDs: |
989 | | - return S_OK() |
990 | | - |
991 | | - if not isinstance(jobIDs, list): |
992 | | - jobIDList = [jobIDs] |
993 | | - else: |
994 | | - jobIDList = jobIDs |
| 984 | + return None |
| 985 | + jobIDList = jobIDs if isinstance(jobIDs, list) else [jobIDs] |
995 | 986 |
|
996 | 987 | failedTablesList = [] |
997 | | - for table in [ |
998 | | - "InputData", |
999 | | - "JobParameters", |
1000 | | - "AtticJobParameters", |
1001 | | - "HeartBeatLoggingInfo", |
1002 | | - "OptimizerParameters", |
1003 | | - "JobCommands", |
1004 | | - "Jobs", |
1005 | | - "JobJDLs", |
1006 | | - ]: |
1007 | | - cmd = f"DELETE FROM {table} WHERE JobID in ({','.join(str(j) for j in jobIDList)})" |
1008 | | - result = self._update(cmd) |
1009 | | - if not result["OK"]: |
1010 | | - failedTablesList.append(table) |
1011 | 988 |
|
1012 | | - if failedTablesList: |
1013 | | - return S_ERROR(f"Errors while job removal (tables {','.join(failedTablesList)})") |
| 989 | + sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;" |
| 990 | + returnValueOrRaise(self._update(sqlCmd)) |
| 991 | + try: |
| 992 | + sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )" |
| 993 | + returnValueOrRaise(self._updatemany(sqlCmd, [(j,) for j in jobIDList])) |
| 994 | + |
| 995 | + for table in [ |
| 996 | + "InputData", |
| 997 | + "JobParameters", |
| 998 | + "AtticJobParameters", |
| 999 | + "HeartBeatLoggingInfo", |
| 1000 | + "OptimizerParameters", |
| 1001 | + "JobCommands", |
| 1002 | + "Jobs", |
| 1003 | + "JobJDLs", |
| 1004 | + ]: |
| 1005 | + sqlCmd = f"DELETE m from `{table}` m JOIN to_delete_Jobs t USING (JobID)" |
| 1006 | + if not self._update(sqlCmd)["OK"]: |
| 1007 | + failedTablesList.append(table) |
| 1008 | + finally: |
| 1009 | + sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs" |
| 1010 | + returnValueOrRaise(self._update(sqlCmd)) |
1014 | 1011 |
|
1015 | | - return S_OK() |
| 1012 | + if failedTablesList: |
| 1013 | + raise SErrorException(f"Errors while job removal (tables {','.join(failedTablesList)})") |
1016 | 1014 |
|
1017 | 1015 | ############################################################################# |
1018 | 1016 | def rescheduleJob(self, jobID): |
|
0 commit comments