Skip to content

Commit 990efee

Browse files
chrisburrclaude
andcommitted
refactor: migrate core utilities and job management to DIRACCommon
- Move JDL, List utilities from DIRAC.Core.Utilities to DIRACCommon - Migrate JobManifest and JobDBUtils from WorkloadManagementSystem to DIRACCommon - Add Pydantic-based JobModel with validation for job descriptions - Update DIRAC modules to import from DIRACCommon maintaining backward compatibility - Add diraccfg and pydantic dependencies to support enhanced functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 365c718 commit 990efee

25 files changed

Lines changed: 1860 additions & 1086 deletions

File tree

dirac-common/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ classifiers = [
2121
]
2222
dependencies = [
2323
"typing-extensions>=4.0.0",
24+
"diraccfg",
25+
"pydantic>=2.0.0",
2426
]
2527
dynamic = ["version"]
2628

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
"""Transformation classes around the JDL format."""
2+
3+
from diraccfg import CFG
4+
from pydantic import ValidationError
5+
6+
from DIRACCommon.Core.Utilities.ReturnValues import S_OK, S_ERROR
7+
from DIRACCommon.Core.Utilities import List
8+
from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd
9+
from DIRACCommon.WorkloadManagementSystem.Utilities.JobModel import BaseJobDescriptionModel
10+
11+
ARGUMENTS = "Arguments"
12+
BANNED_SITES = "BannedSites"
13+
CPU_TIME = "CPUTime"
14+
EXECUTABLE = "Executable"
15+
EXECUTION_ENVIRONMENT = "ExecutionEnvironment"
16+
GRID_CE = "GridCE"
17+
INPUT_DATA = "InputData"
18+
INPUT_DATA_POLICY = "InputDataPolicy"
19+
INPUT_SANDBOX = "InputSandbox"
20+
JOB_CONFIG_ARGS = "JobConfigArgs"
21+
JOB_TYPE = "JobType"
22+
JOB_GROUP = "JobGroup"
23+
LOG_LEVEL = "LogLevel"
24+
NUMBER_OF_PROCESSORS = "NumberOfProcessors"
25+
MAX_NUMBER_OF_PROCESSORS = "MaxNumberOfProcessors"
26+
MIN_NUMBER_OF_PROCESSORS = "MinNumberOfProcessors"
27+
OUTPUT_DATA = "OutputData"
28+
OUTPUT_PATH = "OutputPath"
29+
OUTPUT_SE = "OutputSE"
30+
PLATFORM = "Platform"
31+
PRIORITY = "Priority"
32+
STD_ERROR = "StdError"
33+
STD_OUTPUT = "StdOutput"
34+
OUTPUT_SANDBOX = "OutputSandbox"
35+
JOB_NAME = "JobName"
36+
SITE = "Site"
37+
TAGS = "Tags"
38+
39+
OWNER = "Owner"
40+
OWNER_GROUP = "OwnerGroup"
41+
VO = "VirtualOrganization"
42+
43+
CREDENTIALS_FIELDS = {OWNER, OWNER_GROUP, VO}
44+
45+
46+
def loadJDLAsCFG(jdl):
47+
"""
48+
Load a JDL as CFG
49+
"""
50+
51+
def cleanValue(value):
52+
value = value.strip()
53+
if value[0] == '"':
54+
entries = []
55+
iPos = 1
56+
current = ""
57+
state = "in"
58+
while iPos < len(value):
59+
if value[iPos] == '"':
60+
if state == "in":
61+
entries.append(current)
62+
current = ""
63+
state = "out"
64+
elif state == "out":
65+
current = current.strip()
66+
if current not in (",",):
67+
return S_ERROR("value seems a list but is not separated in commas")
68+
current = ""
69+
state = "in"
70+
else:
71+
current += value[iPos]
72+
iPos += 1
73+
if state == "in":
74+
return S_ERROR('value is opened with " but is not closed')
75+
return S_OK(", ".join(entries))
76+
else:
77+
return S_OK(value.replace('"', ""))
78+
79+
def assignValue(key, value, cfg):
80+
key = key.strip()
81+
if len(key) == 0:
82+
return S_ERROR("Invalid key name")
83+
value = value.strip()
84+
if not value:
85+
return S_ERROR(f"No value for key {key}")
86+
if value[0] == "{":
87+
if value[-1] != "}":
88+
return S_ERROR("Value '%s' seems a list but does not end in '}'" % (value))
89+
valList = List.fromChar(value[1:-1])
90+
for i in range(len(valList)):
91+
result = cleanValue(valList[i])
92+
if not result["OK"]:
93+
return S_ERROR(f"Var {key} : {result['Message']}")
94+
valList[i] = result["Value"]
95+
if valList[i] is None:
96+
return S_ERROR(f"List value '{value}' seems invalid for item {i}")
97+
value = ", ".join(valList)
98+
else:
99+
result = cleanValue(value)
100+
if not result["OK"]:
101+
return S_ERROR(f"Var {key} : {result['Message']}")
102+
nV = result["Value"]
103+
if nV is None:
104+
return S_ERROR(f"Value '{value} seems invalid")
105+
value = nV
106+
cfg.setOption(key, value)
107+
return S_OK()
108+
109+
if jdl[0] == "[":
110+
iPos = 1
111+
else:
112+
iPos = 0
113+
key = ""
114+
value = ""
115+
action = "key"
116+
insideLiteral = False
117+
cfg = CFG()
118+
while iPos < len(jdl):
119+
char = jdl[iPos]
120+
if char == ";" and not insideLiteral:
121+
if key.strip():
122+
result = assignValue(key, value, cfg)
123+
if not result["OK"]:
124+
return result
125+
key = ""
126+
value = ""
127+
action = "key"
128+
elif char == "[" and not insideLiteral:
129+
key = key.strip()
130+
if not key:
131+
return S_ERROR("Invalid key in JDL")
132+
if value.strip():
133+
return S_ERROR(f"Key {key} seems to have a value and open a sub JDL at the same time")
134+
result = loadJDLAsCFG(jdl[iPos:])
135+
if not result["OK"]:
136+
return result
137+
subCfg, subPos = result["Value"]
138+
cfg.createNewSection(key, contents=subCfg)
139+
key = ""
140+
value = ""
141+
action = "key"
142+
insideLiteral = False
143+
iPos += subPos
144+
elif char == "=" and not insideLiteral:
145+
if action == "key":
146+
action = "value"
147+
insideLiteral = False
148+
else:
149+
value += char
150+
elif char == "]" and not insideLiteral:
151+
key = key.strip()
152+
if len(key) > 0:
153+
result = assignValue(key, value, cfg)
154+
if not result["OK"]:
155+
return result
156+
return S_OK((cfg, iPos))
157+
else:
158+
if action == "key":
159+
key += char
160+
else:
161+
value += char
162+
if char == '"':
163+
insideLiteral = not insideLiteral
164+
iPos += 1
165+
166+
return S_OK((cfg, iPos))
167+
168+
169+
def dumpCFGAsJDL(cfg, level=1, tab=" "):
170+
indent = tab * level
171+
contents = [f"{tab * (level - 1)}["]
172+
sections = cfg.listSections()
173+
174+
for key in cfg:
175+
if key in sections:
176+
contents.append(f"{indent}{key} =")
177+
contents.append(f"{dumpCFGAsJDL(cfg[key], level + 1, tab)};")
178+
else:
179+
val = List.fromChar(cfg[key])
180+
# Some attributes are never lists
181+
if len(val) < 2 or key in [ARGUMENTS, EXECUTABLE, STD_OUTPUT, STD_ERROR]:
182+
value = cfg[key]
183+
try:
184+
try_value = float(value)
185+
contents.append(f"{tab * level}{key} = {value};")
186+
except Exception:
187+
contents.append(f'{tab * level}{key} = "{value}";')
188+
else:
189+
contents.append(f"{indent}{key} =")
190+
contents.append("%s{" % indent)
191+
for iPos in range(len(val)):
192+
try:
193+
value = float(val[iPos])
194+
except Exception:
195+
val[iPos] = f'"{val[iPos]}"'
196+
contents.append(",\n".join([f"{tab * (level + 1)}{value}" for value in val]))
197+
contents.append("%s};" % indent)
198+
contents.append(f"{tab * (level - 1)}]")
199+
return "\n".join(contents)
200+
201+
202+
def jdlToBaseJobDescriptionModel(classAd: ClassAd):
203+
"""
204+
Converts a JDL string into a JSON string for data validation from the BaseJob model
205+
This method allows compatibility with older Client versions that used the _toJDL method
206+
"""
207+
try:
208+
jobDescription = BaseJobDescriptionModel(
209+
executable=classAd.getAttributeString(EXECUTABLE),
210+
)
211+
if classAd.lookupAttribute(ARGUMENTS):
212+
jobDescription.arguments = classAd.getAttributeString(ARGUMENTS)
213+
classAd.deleteAttribute(ARGUMENTS)
214+
215+
if classAd.lookupAttribute(BANNED_SITES):
216+
jobDescription.bannedSites = classAd.getListFromExpression(BANNED_SITES)
217+
classAd.deleteAttribute(BANNED_SITES)
218+
219+
if classAd.lookupAttribute(CPU_TIME):
220+
jobDescription.cpuTime = classAd.getAttributeInt(CPU_TIME)
221+
classAd.deleteAttribute(CPU_TIME)
222+
223+
if classAd.lookupAttribute(EXECUTABLE):
224+
jobDescription.executable = classAd.getAttributeString(EXECUTABLE)
225+
classAd.deleteAttribute(EXECUTABLE)
226+
227+
if classAd.lookupAttribute(EXECUTION_ENVIRONMENT):
228+
executionEnvironment = classAd.getListFromExpression(EXECUTION_ENVIRONMENT)
229+
if executionEnvironment:
230+
jobDescription.executionEnvironment = {}
231+
for element in executionEnvironment:
232+
key, value = element.split("=")
233+
if value.isdigit():
234+
value = int(value)
235+
else:
236+
try:
237+
value = float(value)
238+
except ValueError:
239+
pass
240+
jobDescription.executionEnvironment[key] = value
241+
classAd.deleteAttribute(EXECUTION_ENVIRONMENT)
242+
243+
if classAd.lookupAttribute(GRID_CE):
244+
jobDescription.gridCE = classAd.getAttributeString(GRID_CE)
245+
classAd.deleteAttribute(GRID_CE)
246+
247+
if classAd.lookupAttribute(INPUT_DATA):
248+
jobDescription.inputData = classAd.getListFromExpression(INPUT_DATA)
249+
classAd.deleteAttribute(INPUT_DATA)
250+
251+
if classAd.lookupAttribute(INPUT_DATA_POLICY):
252+
jobDescription.inputDataPolicy = classAd.getAttributeString(INPUT_DATA_POLICY)
253+
classAd.deleteAttribute(INPUT_DATA_POLICY)
254+
255+
if classAd.lookupAttribute(INPUT_SANDBOX):
256+
jobDescription.inputSandbox = classAd.getListFromExpression(INPUT_SANDBOX)
257+
classAd.deleteAttribute(INPUT_SANDBOX)
258+
259+
if classAd.lookupAttribute(JOB_CONFIG_ARGS):
260+
jobDescription.jobConfigArgs = classAd.getAttributeString(JOB_CONFIG_ARGS)
261+
classAd.deleteAttribute(JOB_CONFIG_ARGS)
262+
263+
if classAd.lookupAttribute(JOB_GROUP):
264+
jobDescription.jobGroup = classAd.getAttributeString(JOB_GROUP)
265+
classAd.deleteAttribute(JOB_GROUP)
266+
267+
if classAd.lookupAttribute(JOB_NAME):
268+
jobDescription.jobName = classAd.getAttributeString(JOB_NAME)
269+
classAd.deleteAttribute(JOB_NAME)
270+
271+
if classAd.lookupAttribute(JOB_TYPE):
272+
jobDescription.jobType = classAd.getAttributeString(JOB_TYPE)
273+
classAd.deleteAttribute(JOB_TYPE)
274+
275+
if classAd.lookupAttribute(LOG_LEVEL):
276+
jobDescription.logLevel = classAd.getAttributeString(LOG_LEVEL)
277+
classAd.deleteAttribute(LOG_LEVEL)
278+
279+
if classAd.lookupAttribute(NUMBER_OF_PROCESSORS):
280+
jobDescription.maxNumberOfProcessors = classAd.getAttributeInt(NUMBER_OF_PROCESSORS)
281+
jobDescription.minNumberOfProcessors = classAd.getAttributeInt(NUMBER_OF_PROCESSORS)
282+
classAd.deleteAttribute(NUMBER_OF_PROCESSORS)
283+
classAd.deleteAttribute(MAX_NUMBER_OF_PROCESSORS)
284+
classAd.deleteAttribute(MIN_NUMBER_OF_PROCESSORS)
285+
else:
286+
if classAd.lookupAttribute(MAX_NUMBER_OF_PROCESSORS):
287+
jobDescription.maxNumberOfProcessors = classAd.getAttributeInt(MAX_NUMBER_OF_PROCESSORS)
288+
classAd.deleteAttribute(MAX_NUMBER_OF_PROCESSORS)
289+
if classAd.lookupAttribute(MIN_NUMBER_OF_PROCESSORS):
290+
jobDescription.minNumberOfProcessors = classAd.getAttributeInt(MIN_NUMBER_OF_PROCESSORS)
291+
classAd.deleteAttribute(MIN_NUMBER_OF_PROCESSORS)
292+
293+
if classAd.lookupAttribute(OUTPUT_DATA):
294+
jobDescription.outputData = set(classAd.getListFromExpression(OUTPUT_DATA))
295+
classAd.deleteAttribute(OUTPUT_DATA)
296+
297+
if classAd.lookupAttribute(OUTPUT_SANDBOX):
298+
jobDescription.outputSandbox = set(classAd.getListFromExpression(OUTPUT_SANDBOX))
299+
classAd.deleteAttribute(OUTPUT_SANDBOX)
300+
301+
if classAd.lookupAttribute(OUTPUT_PATH):
302+
jobDescription.outputPath = classAd.getAttributeString(OUTPUT_PATH)
303+
classAd.deleteAttribute(OUTPUT_PATH)
304+
305+
if classAd.lookupAttribute(OUTPUT_SE):
306+
jobDescription.outputSE = classAd.getAttributeString(OUTPUT_SE)
307+
classAd.deleteAttribute(OUTPUT_SE)
308+
309+
if classAd.lookupAttribute(SITE):
310+
jobDescription.sites = classAd.getListFromExpression(SITE)
311+
classAd.deleteAttribute(SITE)
312+
313+
if classAd.lookupAttribute(PLATFORM):
314+
jobDescription.platform = classAd.getAttributeString(PLATFORM)
315+
classAd.deleteAttribute(PLATFORM)
316+
317+
if classAd.lookupAttribute(PRIORITY):
318+
jobDescription.priority = classAd.getAttributeInt(PRIORITY)
319+
classAd.deleteAttribute(PRIORITY)
320+
321+
if classAd.lookupAttribute(STD_OUTPUT):
322+
jobDescription.stdout = classAd.getAttributeString(STD_OUTPUT)
323+
classAd.deleteAttribute(STD_OUTPUT)
324+
325+
if classAd.lookupAttribute(STD_ERROR):
326+
jobDescription.stderr = classAd.getAttributeString(STD_ERROR)
327+
classAd.deleteAttribute(STD_ERROR)
328+
329+
if classAd.lookupAttribute(TAGS):
330+
jobDescription.tags = classAd.getListFromExpression(TAGS)
331+
classAd.deleteAttribute(TAGS)
332+
333+
# Remove credentials
334+
for attribute in CREDENTIALS_FIELDS:
335+
classAd.deleteAttribute(attribute)
336+
337+
# Remove legacy attributes
338+
for attribute in {"DIRACSetup", "OwnerDN"}:
339+
classAd.deleteAttribute(attribute)
340+
341+
for attribute in classAd.getAttributes():
342+
if not jobDescription.extraFields:
343+
jobDescription.extraFields = {}
344+
345+
value = classAd.getAttributeString(attribute)
346+
if value.isdigit():
347+
value = int(value)
348+
else:
349+
try:
350+
value = float(value)
351+
except ValueError:
352+
pass
353+
354+
jobDescription.extraFields[attribute] = value
355+
356+
except ValidationError as e:
357+
return S_ERROR(f"Invalid JDL: {e}")
358+
359+
return S_OK(jobDescription)

0 commit comments

Comments
 (0)