-
Notifications
You must be signed in to change notification settings - Fork 186
Expand file tree
/
Copy pathGE.py
More file actions
279 lines (238 loc) · 8.85 KB
/
GE.py
File metadata and controls
279 lines (238 loc) · 8.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
############################################################################
# GE class representing SGE batch system
# 10.11.2014
# Author: A.T.
############################################################################
""" Torque.py is a DIRAC independent class representing Torque batch system.
Torque objects are used as backend batch system representation for
LocalComputingElement and SSHComputingElement classes
The GE relies on the SubmitOptions parameter to choose the right queue.
This should be specified in the Queue description in the CS. e.g.
SubmitOption = -l ct=6000
"""
import re
import shlex
import subprocess
import os
class GE(object):
def submitJob(self, **kwargs):
"""Submit nJobs to the condor batch system"""
resultDict = {}
MANDATORY_PARAMETERS = ["Executable", "OutputDir", "ErrorDir", "SubmitOptions"]
for argument in MANDATORY_PARAMETERS:
if argument not in kwargs:
resultDict["Status"] = -1
resultDict["Message"] = "No %s" % argument
return resultDict
nJobs = kwargs.get("NJobs", 1)
preamble = kwargs.get("Preamble")
outputs = []
output = ""
for _i in range(int(nJobs)):
cmd = "%s; " % preamble if preamble else ""
cmd += "qsub -o %(OutputDir)s -e %(ErrorDir)s -N DIRACPilot %(SubmitOptions)s %(Executable)s" % kwargs
sp = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode
if status == 0:
outputs.append(output)
else:
break
if outputs:
resultDict["Status"] = 0
resultDict["Jobs"] = []
for output in outputs:
match = re.match(r"Your job (\d*) ", output)
if match:
resultDict["Jobs"].append(match.groups()[0])
else:
resultDict["Status"] = status
resultDict["Message"] = error
return resultDict
def killJob(self, **kwargs):
"""Kill jobs in the given list"""
resultDict = {}
MANDATORY_PARAMETERS = ["JobIDList"]
for argument in MANDATORY_PARAMETERS:
if argument not in kwargs:
resultDict["Status"] = -1
resultDict["Message"] = "No %s" % argument
return resultDict
jobIDList = kwargs.get("JobIDList")
if not jobIDList:
resultDict["Status"] = -1
resultDict["Message"] = "Empty job list"
return resultDict
successful = []
failed = []
errors = ""
for job in jobIDList:
sp = subprocess.Popen(
shlex.split("qdel %s" % job),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode
if status != 0:
failed.append(job)
errors += error
else:
successful.append(job)
resultDict["Status"] = 0
if failed:
resultDict["Status"] = 1
resultDict["Message"] = errors
resultDict["Successful"] = successful
resultDict["Failed"] = failed
return resultDict
def getJobStatus(self, **kwargs):
"""Get status of the jobs in the given list"""
resultDict = {}
MANDATORY_PARAMETERS = ["JobIDList"]
for argument in MANDATORY_PARAMETERS:
if argument not in kwargs:
resultDict["Status"] = -1
resultDict["Message"] = "No %s" % argument
return resultDict
user = kwargs.get("User")
if not user:
user = os.environ.get("USER")
if not user:
resultDict["Status"] = -1
resultDict["Message"] = "No user name"
return resultDict
jobIDList = kwargs.get("JobIDList")
if not jobIDList:
resultDict["Status"] = -1
resultDict["Message"] = "Empty job list"
return resultDict
sp = subprocess.Popen(
shlex.split("qstat -u %s" % user),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode
if status != 0:
resultDict["Status"] = status
resultDict["Message"] = error
return resultDict
jobDict = {}
if output:
lines = output.split("\n")
for line in lines:
l = line.strip()
for job in jobIDList:
if l.startswith(job):
jobStatus = l.split()[4]
if jobStatus in ["Tt", "Tr"]:
jobDict[job] = "Done"
elif jobStatus in ["Rr", "r"]:
jobDict[job] = "Running"
elif jobStatus in ["qw", "h"]:
jobDict[job] = "Waiting"
sp = subprocess.Popen(
shlex.split("qstat -u %s -s -z" % user),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode
if status == 0:
if output:
lines = output.split("\n")
for line in lines:
l = line.strip()
for job in jobIDList:
if l.startswith(job):
jobDict[job] = "Done"
if len(resultDict) != len(jobIDList):
for job in jobIDList:
if job not in jobDict:
jobDict[job] = "Unknown"
# Final output
status = 0
resultDict["Status"] = 0
resultDict["Jobs"] = jobDict
return resultDict
def getCEStatus(self, **kwargs):
"""Get the overall CE status"""
resultDict = {}
user = kwargs.get("User")
if not user:
user = os.environ.get("USER")
if not user:
resultDict["Status"] = -1
resultDict["Message"] = "No user name"
return resultDict
cmd = "qstat -u %s" % user
sp = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
output, error = sp.communicate()
status = sp.returncode
if status != 0:
resultDict["Status"] = status
resultDict["Message"] = error
return resultDict
waitingJobs = 0
runningJobs = 0
doneJobs = 0
if output:
lines = output.split("\n")
for line in lines:
if not line.strip():
continue
if "DIRACPilot %s" % user in line:
jobStatus = line.split()[4]
if jobStatus in ["Tt", "Tr"]:
doneJobs += 1
elif jobStatus in ["Rr", "r"]:
runningJobs += 1
elif jobStatus in ["qw", "h"]:
waitingJobs = waitingJobs + 1
# Final output
resultDict["Status"] = 0
resultDict["Waiting"] = waitingJobs
resultDict["Running"] = runningJobs
resultDict["Done"] = doneJobs
return resultDict
def getJobOutputFiles(self, **kwargs):
"""Get output file names and templates for the specific CE"""
resultDict = {}
MANDATORY_PARAMETERS = ["JobIDList", "OutputDir", "ErrorDir"]
for argument in MANDATORY_PARAMETERS:
if argument not in kwargs:
resultDict["Status"] = -1
resultDict["Message"] = "No %s" % argument
return resultDict
outputDir = kwargs["OutputDir"]
errorDir = kwargs["ErrorDir"]
outputTemplate = "%s/DIRACPilot.o%%s" % outputDir
errorTemplate = "%s/DIRACPilot.e%%s" % errorDir
outputTemplate = os.path.expandvars(outputTemplate)
errorTemplate = os.path.expandvars(errorTemplate)
jobIDList = kwargs["JobIDList"]
jobDict = {}
for job in jobIDList:
jobDict[job] = {}
jobDict[job]["Output"] = outputTemplate % job
jobDict[job]["Error"] = errorTemplate % job
resultDict["Status"] = 0
resultDict["Jobs"] = jobDict
resultDict["OutputTemplate"] = outputTemplate
resultDict["ErrorTemplate"] = errorTemplate
return resultDict