Skip to content

Commit ad569d0

Browse files
committed
refactor: move DIRAC TS WebApp related RPC calls to dedicated service
1 parent 9134a00 commit ad569d0

5 files changed

Lines changed: 212 additions & 347 deletions

File tree

src/DIRAC/MonitoringSystem/Service/WebAppHandler.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,18 @@
88
from DIRAC.Core.DISET.RequestHandler import RequestHandler
99
from DIRAC.Core.Utilities.JEncode import strToIntDict
1010
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
11+
from DIRAC.RequestManagementSystem.Client.Operation import Operation
12+
from DIRAC.RequestManagementSystem.Client.Request import Request
13+
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
14+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
1115
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy
1216

17+
TASKS_STATE_NAMES = ["TotalCreated", "Created"] + sorted(
18+
set(JobStatus.JOB_STATES) | set(Request.ALL_STATES) | set(Operation.ALL_STATES)
19+
)
20+
FILES_STATE_NAMES = ["PercentProcessed", "Total"] + TransformationFilesStatus.TRANSFORMATION_FILES_STATES
21+
22+
1323

1424
class WebAppHandler(RequestHandler):
1525
@classmethod
@@ -320,3 +330,200 @@ def export_getSiteSummarySelectors(cls):
320330
resultDict["Site"] = siteList
321331

322332
return S_OK(resultDict)
333+
334+
335+
##############################################################################
336+
# Transformations
337+
##############################################################################
338+
339+
types_getDistinctAttributeValues = [str, dict]
340+
341+
@classmethod
342+
def export_getDistinctAttributeValues(cls, attribute, selectDict):
343+
res = cls.transformationDB.getTableDistinctAttributeValues("Transformations", [attribute], selectDict)
344+
if not res["OK"]:
345+
return res
346+
return S_OK(res["Value"][attribute])
347+
348+
types_getTransformationFilesSummaryWeb = [dict, list, int, int]
349+
350+
def export_getTransformationFilesSummaryWeb(self, selectDict, sortList, startItem, maxItems):
351+
selectColumns=["TransformationID", "Status", "UsedSE", "TargetSE"],
352+
timeStamp="LastUpdate",
353+
statusColumn="Status",
354+
fromDate = selectDict.get("FromDate", None)
355+
if fromDate:
356+
del selectDict["FromDate"]
357+
# if not fromDate:
358+
# fromDate = last_update
359+
toDate = selectDict.get("ToDate", None)
360+
if toDate:
361+
del selectDict["ToDate"]
362+
# Sorting instructions. Only one for the moment.
363+
if sortList:
364+
orderAttribute = sortList[0][0] + ":" + sortList[0][1]
365+
else:
366+
orderAttribute = None
367+
# Get the columns that match the selection
368+
fcn = None
369+
fcnName = "getTransformationFiles"
370+
if hasattr(self.transformationDB, fcnName) and callable(getattr(self.transformationDB, fcnName)):
371+
fcn = getattr(self.transformationDB, fcnName)
372+
if not fcn:
373+
return S_ERROR(f"Unable to invoke gTransformationDB.{fcnName}, it isn't a member function")
374+
res = fcn(condDict=selectDict, older=toDate, newer=fromDate, timeStamp=timeStamp, orderAttribute=orderAttribute)
375+
if not res["OK"]:
376+
return res
377+
378+
# The full list of columns in contained here
379+
allRows = res["Value"]
380+
# Prepare the standard structure now within the resultDict dictionary
381+
resultDict = {}
382+
# Create the total records entry
383+
resultDict["TotalRecords"] = len(allRows)
384+
385+
# Get the rows which are within the selected window
386+
if resultDict["TotalRecords"] == 0:
387+
return S_OK(resultDict)
388+
ini = startItem
389+
last = ini + maxItems
390+
if ini >= resultDict["TotalRecords"]:
391+
return S_ERROR("Item number out of range")
392+
if last > resultDict["TotalRecords"]:
393+
last = resultDict["TotalRecords"]
394+
395+
selectedRows = allRows[ini:last]
396+
resultDict["Records"] = []
397+
for row in selectedRows:
398+
resultDict["Records"].append(list(row.values()))
399+
400+
# Create the ParameterNames entry
401+
resultDict["ParameterNames"] = list(selectedRows[0].keys())
402+
# Find which element in the tuple contains the requested status
403+
if statusColumn not in resultDict["ParameterNames"]:
404+
return S_ERROR("Provided status column not present")
405+
406+
# Generate the status dictionary
407+
statusDict = {}
408+
for row in selectedRows:
409+
status = row[statusColumn]
410+
statusDict[status] = statusDict.setdefault(status, 0) + 1
411+
resultDict["Extras"] = statusDict
412+
413+
# Obtain the distinct values of the selection parameters
414+
res = self.transformationDB.getTableDistinctAttributeValues(
415+
"TransformationFiles", selectColumns, selectDict, older=toDate, newer=fromDate
416+
)
417+
distinctSelections = zip(selectColumns, [])
418+
if res["OK"]:
419+
distinctSelections = res["Value"]
420+
resultDict["Selections"] = distinctSelections
421+
422+
return S_OK(resultDict)
423+
424+
types_getTransformationSummaryWeb = [dict, list, int, int]
425+
426+
def export_getTransformationSummaryWeb(self, selectDict, sortList, startItem, maxItems):
427+
"""Get the summary of the transformation information for a given page in the generic format"""
428+
429+
# Obtain the timing information from the selectDict
430+
last_update = selectDict.get("CreationDate", None)
431+
if last_update:
432+
del selectDict["CreationDate"]
433+
fromDate = selectDict.get("FromDate", None)
434+
if fromDate:
435+
del selectDict["FromDate"]
436+
if not fromDate:
437+
fromDate = last_update
438+
toDate = selectDict.get("ToDate", None)
439+
if toDate:
440+
del selectDict["ToDate"]
441+
# Sorting instructions. Only one for the moment.
442+
if sortList:
443+
orderAttribute = []
444+
for i in sortList:
445+
orderAttribute += [i[0] + ":" + i[1]]
446+
else:
447+
orderAttribute = None
448+
449+
# Get the transformations that match the selection
450+
res = self.transformationDB.getTransformations(
451+
condDict=selectDict, older=toDate, newer=fromDate, orderAttribute=orderAttribute
452+
)
453+
if not res["OK"]:
454+
return res
455+
456+
ops = Operations()
457+
# Prepare the standard structure now within the resultDict dictionary
458+
resultDict = {}
459+
trList = res["Records"]
460+
# Create the total records entry
461+
nTrans = len(trList)
462+
resultDict["TotalRecords"] = nTrans
463+
# Create the ParameterNames entry
464+
# As this list is a reference to the list in the DB, we cannot extend it, therefore copy it
465+
resultDict["ParameterNames"] = list(res["ParameterNames"])
466+
# Add the job states to the ParameterNames entry
467+
taskStateNames = TASKS_STATE_NAMES + ops.getValue("Transformations/AdditionalTaskStates", [])
468+
resultDict["ParameterNames"] += ["Jobs_" + x for x in taskStateNames]
469+
# Add the file states to the ParameterNames entry
470+
fileStateNames = FILES_STATE_NAMES + ops.getValue("Transformations/AdditionalFileStates", [])
471+
resultDict["ParameterNames"] += ["Files_" + x for x in fileStateNames]
472+
473+
# Get the transformations which are within the selected window
474+
if nTrans == 0:
475+
return S_OK(resultDict)
476+
ini = startItem
477+
last = ini + maxItems
478+
if ini >= nTrans:
479+
return S_ERROR("Item number out of range")
480+
if last > nTrans:
481+
last = nTrans
482+
transList = trList[ini:last]
483+
484+
statusDict = {}
485+
extendableTranfs = ops.getValue("Transformations/ExtendableTransfTypes", ["Simulation", "MCsimulation"])
486+
givenUpFileStatus = ops.getValue("Transformations/GivenUpFileStatus", ["MissingInFC"])
487+
problematicStatuses = ops.getValue("Transformations/ProblematicStatuses", ["Problematic"])
488+
# Add specific information for each selected transformation
489+
for trans in transList:
490+
transDict = dict(zip(resultDict["ParameterNames"], trans))
491+
492+
# Update the status counters
493+
status = transDict["Status"]
494+
statusDict[status] = statusDict.setdefault(status, 0) + 1
495+
496+
# Get the statistics on the number of jobs for the transformation
497+
transID = transDict["TransformationID"]
498+
res = self.transformationDB.getTransformationTaskStats(transID)
499+
taskDict = {}
500+
if res["OK"] and res["Value"]:
501+
taskDict = res["Value"]
502+
for state in taskStateNames:
503+
trans.append(taskDict.get(state, 0))
504+
505+
# Get the statistics for the number of files for the transformation
506+
fileDict = {}
507+
transType = transDict["Type"]
508+
if transType.lower() in extendableTranfs:
509+
fileDict["PercentProcessed"] = "-"
510+
else:
511+
res = self.transformationDB.getTransformationStats(transID)
512+
if res["OK"]:
513+
fileDict = res["Value"]
514+
total = fileDict["Total"]
515+
for stat in givenUpFileStatus:
516+
total -= fileDict.get(stat, 0)
517+
processed = fileDict.get(TransformationFilesStatus.PROCESSED, 0)
518+
fileDict["PercentProcessed"] = f"{int(processed * 1000.0 / total) / 10.0:.1f}" if total else 0.0
519+
problematic = 0
520+
for stat in problematicStatuses:
521+
problematic += fileDict.get(stat, 0)
522+
fileDict["Problematic"] = problematic
523+
for state in fileStateNames:
524+
trans.append(fileDict.get(state, 0))
525+
526+
resultDict["Records"] = transList
527+
resultDict["Extras"] = statusDict
528+
return S_OK(resultDict)
529+

src/DIRAC/ProductionSystem/scripts/dirac_prod_get_trans.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ def main():
1616
Script.registerArgument("prodID: Production ID")
1717
_, args = Script.parseCommandLine()
1818

19+
from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient
1920
from DIRAC.ProductionSystem.Client.ProductionClient import ProductionClient
20-
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
2121

2222
# get arguments
2323
prodID = args[0]
2424

2525
prodClient = ProductionClient()
26-
transClient = TransformationClient()
26+
webAppClient = WebAppClient()
2727

2828
res = prodClient.getProductionTransformations(prodID)
2929
transIDs = []
@@ -70,7 +70,7 @@ def main():
7070
]
7171
resList = []
7272

73-
res = transClient.getTransformationSummaryWeb({"TransformationID": transIDs}, [], 0, len(transIDs))
73+
res = webAppClient.getTransformationSummaryWeb({"TransformationID": transIDs}, [], 0, len(transIDs))
7474

7575
if not res["OK"]:
7676
DIRAC.gLogger.error(res["Message"])

src/DIRAC/TransformationSystem/Client/Transformation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from DIRAC.Core.Base.API import API
1010
from DIRAC.Core.Utilities.JEncode import encode
1111
from DIRAC.Core.Utilities.PromptUser import promptUser
12+
from DIRAC.MonitoringSystem.Client.WebAppClient import WebAppClient
1213
from DIRAC.RequestManagementSystem.Client.Operation import Operation
1314
from DIRAC.TransformationSystem.Client.BodyPlugin.BaseBody import BaseBody
1415
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
@@ -498,7 +499,7 @@ def getSummaryTransformations(self, transID=[]):
498499
]
499500
dictList = []
500501

501-
result = self.transClient.getTransformationSummaryWeb(condDict, orderby, start, maxitems)
502+
result = WebAppClient().getTransformationSummaryWeb(condDict, orderby, start, maxitems)
502503
if not result["OK"]:
503504
self._prettyPrint(result)
504505
return result

src/DIRAC/TransformationSystem/Client/TransformationClient.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,6 @@ class TransformationClient(Client):
4848
4949
getFileSummary(lfns)
5050
exists(lfns)
51-
52-
Web monitoring tools
53-
54-
getDistinctAttributeValues(attribute, selectDict)
55-
getTransformationStatusCounters()
56-
getTransformationSummary()
57-
getTransformationSummaryWeb(selectDict, sortList, startItem, maxItems)
5851
"""
5952

6053
def __init__(self, **kwargs):

0 commit comments

Comments
 (0)