Skip to content

Commit 8105367

Browse files
committed
refactor: move DIRAC WebApp related RPC calls to dedicated service
1 parent 44c5e0d commit 8105367

9 files changed

Lines changed: 350 additions & 318 deletions

File tree

src/DIRAC/MonitoringSystem/ConfigTemplate.cfg

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,13 @@ Services
2828
}
2929
}
3030
##END
31+
##BEGIN WebApp
32+
WebApp
33+
{
34+
Port = 9199
35+
Authorization
36+
{
37+
Default = authenticated
38+
}
39+
}
3140
}
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
"""
2+
The WebAppHandler module provides a class to handle web requests from the DIRAC WebApp.
3+
It is not indented to be used in diracx
4+
"""
5+
from DIRAC import S_ERROR, S_OK
6+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
7+
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getSites
8+
from DIRAC.Core.DISET.RequestHandler import RequestHandler
9+
from DIRAC.Core.Utilities.JEncode import strToIntDict
10+
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
11+
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy
12+
13+
14+
class WebAppHandler(RequestHandler):
15+
@classmethod
16+
def initializeHandler(cls, serviceInfoDict):
17+
"""Initialization of DB objects"""
18+
19+
try:
20+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
21+
if not result["OK"]:
22+
return result
23+
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
24+
25+
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
26+
if not result["OK"]:
27+
return result
28+
cls.jobDB = result["Value"](parentLogger=cls.log)
29+
30+
except RuntimeError as excp:
31+
return S_ERROR(f"Can't connect to DB: {excp}")
32+
33+
return S_OK()
34+
35+
##############################################################################
36+
# PilotAgents
37+
##############################################################################
38+
39+
types_getPilotMonitorWeb = [dict, list, int, int]
40+
41+
@classmethod
42+
def export_getPilotMonitorWeb(cls, selectDict, sortList, startItem, maxItems):
43+
"""Get the summary of the pilot information for a given page in the
44+
pilot monitor in a generic format
45+
"""
46+
47+
return cls.pilotAgentsDB.getPilotMonitorWeb(selectDict, sortList, startItem, maxItems)
48+
49+
types_getPilotMonitorSelectors = []
50+
51+
@classmethod
52+
def export_getPilotMonitorSelectors(cls):
53+
"""Get all the distinct selector values for the Pilot Monitor web portal page"""
54+
55+
return cls.pilotAgentsDB.getPilotMonitorSelectors()
56+
57+
types_getPilotSummaryWeb = [dict, list, int, int]
58+
59+
@classmethod
60+
def export_getPilotSummaryWeb(cls, selectDict, sortList, startItem, maxItems):
61+
"""Get the summary of the pilot information for a given page in the
62+
pilot monitor in a generic format
63+
"""
64+
65+
return cls.pilotAgentsDB.getPilotSummaryWeb(selectDict, sortList, startItem, maxItems)
66+
67+
types_getPilotStatistics = [str, dict]
68+
69+
@classmethod
70+
def export_getPilotStatistics(cls, attribute, selectDict):
71+
"""Get pilot statistics distribution per attribute value with a given selection"""
72+
73+
startDate = selectDict.get("FromDate", None)
74+
if startDate:
75+
del selectDict["FromDate"]
76+
77+
if startDate is None:
78+
startDate = selectDict.get("LastUpdate", None)
79+
if startDate:
80+
del selectDict["LastUpdate"]
81+
endDate = selectDict.get("ToDate", None)
82+
if endDate:
83+
del selectDict["ToDate"]
84+
85+
result = cls.pilotAgentsDB.getCounters(
86+
"PilotAgents", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
87+
)
88+
statistics = {}
89+
if result["OK"]:
90+
for status, count in result["Value"]:
91+
statistics[status[attribute]] = count
92+
93+
return S_OK(statistics)
94+
95+
types_getPilotsCounters = [str, list, dict]
96+
97+
# This was PilotManagerHandler.getCounters
98+
@classmethod
99+
def export_getPilotsCounters(cls, table, keys, condDict, newer=None, timeStamp="SubmissionTime"):
100+
"""Set the pilot agent status"""
101+
102+
return cls.pilotAgentsDB.getCounters(table, keys, condDict, newer=newer, timeStamp=timeStamp)
103+
104+
##############################################################################
105+
# Jobs
106+
##############################################################################
107+
108+
types_getJobPageSummaryWeb = [dict, list, int, int]
109+
110+
def export_getJobPageSummaryWeb(self, selectDict, sortList, startItem, maxItems, selectJobs=True):
111+
"""Get the summary of the job information for a given page in the
112+
job monitor in a generic format
113+
"""
114+
115+
resultDict = {}
116+
117+
startDate, endDate, selectDict = self.parseSelectors(selectDict)
118+
119+
# initialize jobPolicy
120+
credDict = self.getRemoteCredentials()
121+
owner = credDict["username"]
122+
ownerGroup = credDict["group"]
123+
operations = Operations(group=ownerGroup)
124+
globalJobsInfo = operations.getValue("/Services/JobMonitoring/GlobalJobsInfo", True)
125+
jobPolicy = JobPolicy(owner, ownerGroup, globalJobsInfo)
126+
jobPolicy.jobDB = self.jobDB
127+
result = jobPolicy.getControlledUsers(RIGHT_GET_INFO)
128+
if not result["OK"]:
129+
return result
130+
if not result["Value"]:
131+
return S_ERROR(f"User and group combination has no job rights ({owner!r}, {ownerGroup!r})")
132+
if result["Value"] != "ALL":
133+
selectDict[("Owner", "OwnerGroup")] = result["Value"]
134+
135+
# Sorting instructions. Only one for the moment.
136+
if sortList:
137+
orderAttribute = sortList[0][0] + ":" + sortList[0][1]
138+
else:
139+
orderAttribute = None
140+
141+
result = self.jobDB.getCounters(
142+
"Jobs", ["Status"], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
143+
)
144+
if not result["OK"]:
145+
return result
146+
147+
statusDict = {}
148+
nJobs = 0
149+
for stDict, count in result["Value"]:
150+
nJobs += count
151+
statusDict[stDict["Status"]] = count
152+
153+
resultDict["TotalRecords"] = nJobs
154+
if nJobs == 0:
155+
return S_OK(resultDict)
156+
157+
resultDict["Extras"] = statusDict
158+
159+
if selectJobs:
160+
iniJob = startItem
161+
if iniJob >= nJobs:
162+
return S_ERROR("Item number out of range")
163+
164+
result = self.jobDB.selectJobs(
165+
selectDict, orderAttribute=orderAttribute, newer=startDate, older=endDate, limit=(maxItems, iniJob)
166+
)
167+
if not result["OK"]:
168+
return result
169+
170+
summaryJobList = result["Value"]
171+
if not globalJobsInfo:
172+
validJobs, _invalidJobs, _nonauthJobs, _ownJobs = jobPolicy.evaluateJobRights(
173+
summaryJobList, RIGHT_GET_INFO
174+
)
175+
summaryJobList = validJobs
176+
177+
result = self.getJobsAttributes(summaryJobList)
178+
179+
res = self.jobDB.getJobsAttributes(summaryJobList)
180+
if not res["OK"]:
181+
return res
182+
return S_OK(strToIntDict(res["Value"]))
183+
184+
summaryDict = result["Value"]
185+
# If no jobs can be selected after the properties check
186+
if not summaryDict:
187+
return S_OK(resultDict)
188+
189+
# Evaluate last sign of life time
190+
for jobDict in summaryDict.values():
191+
if not jobDict.get("HeartBeatTime") or jobDict["HeartBeatTime"] == "None":
192+
jobDict["LastSignOfLife"] = jobDict["LastUpdateTime"]
193+
else:
194+
jobDict["LastSignOfLife"] = jobDict["HeartBeatTime"]
195+
196+
# prepare the standard structure now
197+
# This should be faster than making a list of values()
198+
for jobDict in summaryDict.values():
199+
paramNames = list(jobDict)
200+
break
201+
records = [list(jobDict.values()) for jobDict in summaryDict.values()]
202+
203+
resultDict["ParameterNames"] = paramNames
204+
resultDict["Records"] = records
205+
206+
return S_OK(resultDict)
207+
208+
types_getJobStats = [str, dict]
209+
210+
@classmethod
211+
def export_getJobStats(cls, attribute, selectDict):
212+
"""Get job statistics distribution per attribute value with a given selection"""
213+
startDate, endDate, selectDict = cls.parseSelectors(selectDict)
214+
result = cls.jobDB.getCounters(
215+
"Jobs", [attribute], selectDict, newer=startDate, older=endDate, timeStamp="LastUpdateTime"
216+
)
217+
if not result["OK"]:
218+
return result
219+
resultDict = {}
220+
for cDict, count in result["Value"]:
221+
resultDict[cDict[attribute]] = count
222+
223+
return S_OK(resultDict)
224+
225+
@classmethod
226+
def parseSelectors(cls, selectDict=None):
227+
"""Parse selectors before DB query
228+
229+
:param dict selectDict: selectors
230+
231+
:return: str, str, dict -- start/end date, selectors
232+
"""
233+
selectDict = selectDict or {}
234+
235+
# Get time period
236+
startDate = selectDict.get("FromDate", None)
237+
if startDate:
238+
del selectDict["FromDate"]
239+
# For backward compatibility
240+
if startDate is None:
241+
startDate = selectDict.get("LastUpdate", None)
242+
if startDate:
243+
del selectDict["LastUpdate"]
244+
endDate = selectDict.get("ToDate", None)
245+
if endDate:
246+
del selectDict["ToDate"]
247+
248+
# Provide JobID bound to a specific PilotJobReference
249+
# There is no reason to have both PilotJobReference and JobID in selectDict
250+
# If that occurs, use the JobID instead of the PilotJobReference
251+
pilotJobRefs = selectDict.get("PilotJobReference")
252+
if pilotJobRefs:
253+
del selectDict["PilotJobReference"]
254+
if not selectDict.get("JobID"):
255+
for pilotJobRef in [pilotJobRefs] if isinstance(pilotJobRefs, str) else pilotJobRefs:
256+
res = cls.pilotAgentsDB.getPilotInfo(pilotJobRef)
257+
if res["OK"] and "Jobs" in res["Value"][pilotJobRef]:
258+
selectDict["JobID"] = selectDict.get("JobID", [])
259+
selectDict["JobID"].extend(res["Value"][pilotJobRef]["Jobs"])
260+
261+
return startDate, endDate, selectDict
262+
263+
types_getJobsCounters = [list]
264+
265+
# This was JobManagerHanlder.getCounters
266+
@classmethod
267+
def export_getJobsCounters(cls, attrList, attrDict=None, cutDate=""):
268+
"""
269+
Retrieve list of distinct attributes values from attrList
270+
with attrDict as condition.
271+
For each set of distinct values, count number of occurences.
272+
Return a list. Each item is a list with 2 items, the list of distinct
273+
attribute values and the counter
274+
"""
275+
276+
_, _, attrDict = cls.parseSelectors(attrDict)
277+
return cls.jobDB.getCounters("Jobs", attrList, attrDict, newer=str(cutDate), timeStamp="LastUpdateTime")
278+
279+
types_getSiteSummaryWeb = [dict, list, int, int]
280+
281+
@classmethod
282+
def export_getSiteSummaryWeb(cls, selectDict, sortList, startItem, maxItems):
283+
"""Get the summary of the jobs running on sites in a generic format
284+
285+
:param dict selectDict: selectors
286+
:param list sortList: sorting list
287+
:param int startItem: start item number
288+
:param int maxItems: maximum of items
289+
290+
:return: S_OK(dict)/S_ERROR()
291+
"""
292+
return cls.jobDB.getSiteSummaryWeb(selectDict, sortList, startItem, maxItems)
293+
294+
types_getSiteSummarySelectors = []
295+
296+
@classmethod
297+
def export_getSiteSummarySelectors(cls):
298+
"""Get all the distinct selector values for the site summary web portal page
299+
300+
:return: S_OK(dict)/S_ERROR()
301+
"""
302+
resultDict = {}
303+
statusList = ["Good", "Fair", "Poor", "Bad", "Idle"]
304+
resultDict["Status"] = statusList
305+
maskStatus = ["Active", "Banned", "NoMask", "Reduced"]
306+
resultDict["MaskStatus"] = maskStatus
307+
308+
res = getSites()
309+
if not res["OK"]:
310+
return res
311+
siteList = res["Value"]
312+
313+
countryList = []
314+
for site in siteList:
315+
if site.find(".") != -1:
316+
country = site.split(".")[2].lower()
317+
if country not in countryList:
318+
countryList.append(country)
319+
countryList.sort()
320+
resultDict["Country"] = countryList
321+
siteList.sort()
322+
resultDict["Site"] = siteList
323+
324+
return S_OK(resultDict)

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
2323
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
2424
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
25-
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
2625
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2726
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2827
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
28+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -262,7 +262,7 @@ def _getJobPilotStatus(self, jobID):
262262
# There is no pilot reference, hence its status is unknown
263263
return S_OK("NoPilot")
264264

265-
result = PilotManagerClient().getPilotInfo(pilotReference)
265+
result = PilotAgentsDB().getPilotInfo(pilotReference)
266266
if not result["OK"]:
267267
if DErrno.cmpError(result, DErrno.EWMSNOPILOT):
268268
self.log.warn("No pilot found", f"for job {jobID}: {result['Message']}")

0 commit comments

Comments
 (0)