Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dirac-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ classifiers = [
]
dependencies = [
"typing-extensions>=4.0.0",
"diraccfg",
"pydantic>=2.0.0",
]
dynamic = ["version"]

Expand Down
199 changes: 199 additions & 0 deletions dirac-common/src/DIRACCommon/Core/Utilities/JDL.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Transformation classes around the JDL format."""

from diraccfg import CFG
from pydantic import ValidationError

from DIRACCommon.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRACCommon.Core.Utilities import List
from DIRACCommon.Core.Utilities.ClassAd.ClassAdLight import ClassAd
from DIRACCommon.WorkloadManagementSystem.Utilities.JobModel import BaseJobDescriptionModel

ARGUMENTS = "Arguments"
BANNED_SITES = "BannedSites"
CPU_TIME = "CPUTime"
EXECUTABLE = "Executable"
EXECUTION_ENVIRONMENT = "ExecutionEnvironment"
GRID_CE = "GridCE"
INPUT_DATA = "InputData"
INPUT_DATA_POLICY = "InputDataPolicy"
INPUT_SANDBOX = "InputSandbox"
JOB_CONFIG_ARGS = "JobConfigArgs"
JOB_TYPE = "JobType"
JOB_GROUP = "JobGroup"
LOG_LEVEL = "LogLevel"
NUMBER_OF_PROCESSORS = "NumberOfProcessors"
MAX_NUMBER_OF_PROCESSORS = "MaxNumberOfProcessors"
MIN_NUMBER_OF_PROCESSORS = "MinNumberOfProcessors"
OUTPUT_DATA = "OutputData"
OUTPUT_PATH = "OutputPath"
OUTPUT_SE = "OutputSE"
PLATFORM = "Platform"
PRIORITY = "Priority"
STD_ERROR = "StdError"
STD_OUTPUT = "StdOutput"
OUTPUT_SANDBOX = "OutputSandbox"
JOB_NAME = "JobName"
SITE = "Site"
TAGS = "Tags"

OWNER = "Owner"
OWNER_GROUP = "OwnerGroup"
VO = "VirtualOrganization"

CREDENTIALS_FIELDS = {OWNER, OWNER_GROUP, VO}


def loadJDLAsCFG(jdl):
"""
Load a JDL as CFG
"""

def cleanValue(value):
value = value.strip()
if value[0] == '"':
entries = []
iPos = 1
current = ""
state = "in"
while iPos < len(value):
if value[iPos] == '"':
if state == "in":
entries.append(current)
current = ""
state = "out"
elif state == "out":
current = current.strip()
if current not in (",",):
return S_ERROR("value seems a list but is not separated in commas")
current = ""
state = "in"
else:
current += value[iPos]
iPos += 1
if state == "in":
return S_ERROR('value is opened with " but is not closed')
return S_OK(", ".join(entries))
else:
return S_OK(value.replace('"', ""))

def assignValue(key, value, cfg):
key = key.strip()
if len(key) == 0:
return S_ERROR("Invalid key name")
value = value.strip()
if not value:
return S_ERROR(f"No value for key {key}")
if value[0] == "{":
if value[-1] != "}":
return S_ERROR("Value '%s' seems a list but does not end in '}'" % (value))
valList = List.fromChar(value[1:-1])
for i in range(len(valList)):
result = cleanValue(valList[i])
if not result["OK"]:
return S_ERROR(f"Var {key} : {result['Message']}")
valList[i] = result["Value"]
if valList[i] is None:
return S_ERROR(f"List value '{value}' seems invalid for item {i}")
value = ", ".join(valList)
else:
result = cleanValue(value)
if not result["OK"]:
return S_ERROR(f"Var {key} : {result['Message']}")
nV = result["Value"]
if nV is None:
return S_ERROR(f"Value '{value} seems invalid")
value = nV
cfg.setOption(key, value)
return S_OK()

if jdl[0] == "[":
iPos = 1
else:
iPos = 0
key = ""
value = ""
action = "key"
insideLiteral = False
cfg = CFG()
while iPos < len(jdl):
char = jdl[iPos]
if char == ";" and not insideLiteral:
if key.strip():
result = assignValue(key, value, cfg)
if not result["OK"]:
return result
key = ""
value = ""
action = "key"
elif char == "[" and not insideLiteral:
key = key.strip()
if not key:
return S_ERROR("Invalid key in JDL")
if value.strip():
return S_ERROR(f"Key {key} seems to have a value and open a sub JDL at the same time")
result = loadJDLAsCFG(jdl[iPos:])
if not result["OK"]:
return result
subCfg, subPos = result["Value"]
cfg.createNewSection(key, contents=subCfg)
key = ""
value = ""
action = "key"
insideLiteral = False
iPos += subPos
elif char == "=" and not insideLiteral:
if action == "key":
action = "value"
insideLiteral = False
else:
value += char
elif char == "]" and not insideLiteral:
key = key.strip()
if len(key) > 0:
result = assignValue(key, value, cfg)
if not result["OK"]:
return result
return S_OK((cfg, iPos))
else:
if action == "key":
key += char
else:
value += char
if char == '"':
insideLiteral = not insideLiteral
iPos += 1

return S_OK((cfg, iPos))


def dumpCFGAsJDL(cfg, level=1, tab=" "):
indent = tab * level
contents = [f"{tab * (level - 1)}["]
sections = cfg.listSections()

for key in cfg:
if key in sections:
contents.append(f"{indent}{key} =")
contents.append(f"{dumpCFGAsJDL(cfg[key], level + 1, tab)};")
else:
val = List.fromChar(cfg[key])
# Some attributes are never lists
if len(val) < 2 or key in [ARGUMENTS, EXECUTABLE, STD_OUTPUT, STD_ERROR]:
value = cfg[key]
try:
try_value = float(value)
contents.append(f"{tab * level}{key} = {value};")
except Exception:
contents.append(f'{tab * level}{key} = "{value}";')
else:
contents.append(f"{indent}{key} =")
contents.append("%s{" % indent)
for iPos in range(len(val)):
try:
value = float(val[iPos])
except Exception:
val[iPos] = f'"{val[iPos]}"'
contents.append(",\n".join([f"{tab * (level + 1)}{value}" for value in val]))
contents.append("%s};" % indent)
contents.append(f"{tab * (level - 1)}]")
return "\n".join(contents)
127 changes: 127 additions & 0 deletions dirac-common/src/DIRACCommon/Core/Utilities/List.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Collection of DIRAC useful list related modules.
By default on Error they return None.
"""
import random
import sys
from typing import Any, TypeVar
from collections.abc import Iterable

T = TypeVar("T")


def uniqueElements(aList: list) -> list:
"""Utility to retrieve list of unique elements in a list (order is kept)."""

# Use dict.fromkeys instead of set ensure the order is preserved
return list(dict.fromkeys(aList))


def appendUnique(aList: list, anObject: Any):
"""Append to list if object does not exist.

:param aList: list of elements
:param anObject: object you want to append
"""
if anObject not in aList:
aList.append(anObject)


def fromChar(inputString: str, sepChar: str = ","):
"""Generates a list splitting a string by the required character(s)
resulting string items are stripped and empty items are removed.

:param inputString: list serialised to string
:param sepChar: separator
:return: list of strings or None if sepChar has a wrong type
"""
# to prevent getting an empty String as argument
if not (isinstance(inputString, str) and isinstance(sepChar, str) and sepChar):
return None
return [fieldString.strip() for fieldString in inputString.split(sepChar) if len(fieldString.strip()) > 0]


def randomize(aList: Iterable[T]) -> list[T]:
"""Return a randomly sorted list.

:param aList: list to permute
"""
tmpList = list(aList)
random.shuffle(tmpList)
return tmpList


def pop(aList, popElement):
"""Pop the first element equal to popElement from the list.

:param aList: list
:type aList: python:list
:param popElement: element to pop
"""
if popElement in aList:
return aList.pop(aList.index(popElement))


def stringListToString(aList: list) -> str:
"""This function is used for making MySQL queries with a list of string elements.

:param aList: list to be serialized to string for making queries
"""
return ",".join(f"'{x}'" for x in aList)


def intListToString(aList: list) -> str:
"""This function is used for making MySQL queries with a list of int elements.

:param aList: list to be serialized to string for making queries
"""
return ",".join(str(x) for x in aList)


def getChunk(aList: list, chunkSize: int):
"""Generator yielding chunk from a list of a size chunkSize.

:param aList: list to be splitted
:param chunkSize: lenght of one chunk
:raise: StopIteration

Usage:

>>> for chunk in getChunk( aList, chunkSize=10):
process( chunk )

"""
chunkSize = int(chunkSize)
for i in range(0, len(aList), chunkSize):
yield aList[i : i + chunkSize]


def breakListIntoChunks(aList: list, chunkSize: int):
"""This function takes a list as input and breaks it into list of size 'chunkSize'.
It returns a list of lists.

:param aList: list of elements
:param chunkSize: len of a single chunk
:return: list of lists of length of chunkSize
:raise: RuntimeError if numberOfFilesInChunk is less than 1
"""
if chunkSize < 1:
raise RuntimeError("chunkSize cannot be less than 1")
if isinstance(aList, (set, dict, tuple, {}.keys().__class__, {}.items().__class__, {}.values().__class__)):
aList = list(aList)
return [chunk for chunk in getChunk(aList, chunkSize)]


def getIndexInList(anItem: Any, aList: list) -> int:
"""Return the index of the element x in the list l
or sys.maxint if it does not exist

:param anItem: element to look for
:param aList: list to look into

:return: the index or sys.maxint
"""
# try:
if anItem in aList:
return aList.index(anItem)
else:
return sys.maxsize
Loading
Loading