Skip to content

Commit 39fc6c7

Browse files
committed
refactor: extracted getJobParameters in utility, using it in agents
1 parent 185d2c6 commit 39fc6c7

5 files changed

Lines changed: 69 additions & 60 deletions

File tree

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
import hashlib
1313
import json
1414
import os
15-
from pathlib import Path
1615
import random
1716
import shutil
1817
import sys
19-
from collections import defaultdict
2018
import time
19+
from collections import defaultdict
20+
from pathlib import Path
2121

22-
from diraccfg import CFG
23-
24-
from DIRAC import gConfig, S_OK, S_ERROR
22+
from DIRAC import S_ERROR, S_OK, gConfig
2523
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2624
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getQueues
2725
from DIRAC.Core.Utilities import DErrno
@@ -30,6 +28,7 @@
3028
from DIRAC.Core.Utilities.Version import getVersion
3129
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
3230
from DIRAC.Resources.Computing import ComputingElement
31+
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
3332
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus, PilotStatus
3433
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
3534
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
@@ -41,11 +40,11 @@
4140
resolveInputData,
4241
transferInputSandbox,
4342
)
44-
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
45-
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
46-
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
4743
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
4844
from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved
45+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
46+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
47+
4948

5049
MAX_JOBS_MANAGED = 100
5150

@@ -740,7 +739,7 @@ def _checkSubmittedJobWrappers(self, ce: ComputingElement, site: str):
740739
return S_OK()
741740

742741
# Get their parameters
743-
if not (result := self.jobMonitoring.getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
742+
if not (result := getJobParameters(jobs, ["GridCE", "TaskID", "Stamp"]))["OK"]:
744743
self.log.error("Failed to get the list of taskIDs", result["Message"])
745744
return result
746745

src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
from DIRAC.Core.Utilities.TimeUtilities import fromString, second, toEpoch
2222
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus, JobStatus
2323
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
24-
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
2524
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
2625
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
2726
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
2827
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
28+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
2929

3030

3131
class StalledJobAgent(AgentModule):
@@ -254,11 +254,11 @@ def _failStalledJobs(self, jobID):
254254

255255
def _getJobPilotStatus(self, jobID):
256256
"""Get the job pilot status."""
257-
result = JobMonitoringClient().getJobParameter(jobID, "Pilot_Reference")
257+
result = getJobParameters(jobID, "Pilot_Reference")
258258
if not result["OK"]:
259259
return result
260-
pilotReference = result["Value"].get("Pilot_Reference", "Unknown")
261-
if pilotReference == "Unknown":
260+
pilotReference = result["Value"].get("Pilot_Reference")
261+
if not pilotReference:
262262
# There is no pilot reference, hence its status is unknown
263263
return S_OK("NoPilot")
264264

@@ -389,7 +389,7 @@ def _sendAccounting(self, jobID):
389389
if lastHeartBeatTime is not None and lastHeartBeatTime > endTime:
390390
endTime = lastHeartBeatTime
391391

392-
result = JobMonitoringClient().getJobParameter(jobID, "CPUNormalizationFactor")
392+
result = getJobParameters(jobID, "CPUNormalizationFactor")
393393
if not result["OK"] or not result["Value"]:
394394
self.log.error(
395395
"Error getting Job Parameter CPUNormalizationFactor, setting 0",

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_StalledJobAgent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def sja(mocker):
2525
)
2626
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobDB")
2727
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobLoggingDB")
28-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobMonitoringClient", return_value=MagicMock())
2928
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.JobManagerClient", return_value=MagicMock())
3029
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.PilotAgentsDB", return_value=MagicMock())
30+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getJobParameters", return_value=MagicMock())
3131
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.WMSClient", return_value=MagicMock())
3232
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.getDNForUsername", return_value=MagicMock())
3333

src/DIRAC/WorkloadManagementSystem/Service/JobMonitoringHandler.py

Lines changed: 3 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@
44
The following methods are available in the Service interface
55
"""
66

7-
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities
87
from DIRAC import S_ERROR, S_OK
98
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
109
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1110
from DIRAC.Core.DISET.RequestHandler import RequestHandler
1211
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
1312
from DIRAC.Core.Utilities.JEncode import strToIntDict
1413
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
15-
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1614
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
1715
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_GET_INFO, JobPolicy
16+
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
1817

1918

2019
class JobMonitoringHandlerMixin:
@@ -444,48 +443,8 @@ def export_getJobParameters(self, jobIDs, parName=None):
444443
if not isinstance(jobIDs, list):
445444
jobIDs = [jobIDs]
446445
jobIDs = [int(jobID) for jobID in jobIDs]
447-
if self.vo: # a user is connecting, with a proxy
448-
res = self.elasticJobParametersDB.getJobParameters(jobIDs, self.vo, parName)
449-
if not res["OK"]:
450-
return res
451-
parameters = res["Value"]
452-
else: # a service is connecting, no proxy, e.g. StalledJobAgent
453-
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
454-
res = self.jobDB._query(q)
455-
if not res["OK"]:
456-
return res
457-
if not res["Value"]:
458-
return S_OK({})
459-
# get the VO for each jobID
460-
voDict = {}
461-
for jobID, vo in res["Value"]:
462-
if vo not in voDict:
463-
voDict[vo] = []
464-
voDict[vo].append(jobID)
465-
# get the parameters for each VO
466-
parameters = {}
467-
for vo, jobIDs in voDict.items():
468-
res = self.elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
469-
if not res["OK"]:
470-
return res
471-
parameters.update(res["Value"])
472-
473-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
474-
res = self.jobDB.getJobParameters(jobIDs, parName)
475-
if not res["OK"]:
476-
return res
477-
parametersM = res["Value"]
478-
479-
# and now combine
480-
final = dict(parametersM)
481-
# if job in JobDB, update with parameters from ES if any
482-
for jobID in final:
483-
final[jobID].update(parameters.get(jobID, {}))
484-
# if job in ES and not in JobDB, take ES
485-
for jobID in parameters:
486-
if jobID not in final:
487-
final[jobID] = parameters[jobID]
488-
return S_OK(final)
446+
447+
return getJobParameters(jobIDs, parName, self.vo or "")
489448

490449
##############################################################################
491450
types_getAtticJobParameters = [int]

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
"""
33
import multiprocessing
44

5-
from DIRAC import gConfig, gLogger
5+
from DIRAC import gConfig, gLogger, S_OK
66
from DIRAC.Core.Utilities.List import fromChar
7+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
8+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
79

810

911
def getMemoryFromProc():
@@ -139,3 +141,52 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
139141
# 3) return 0
140142
gLogger.info("NumberOfGPUs could not be found in CS")
141143
return 0
144+
145+
146+
def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> dict:
147+
"""Utility to get a job parameter for a list of jobIDs"""
148+
elasticJobParametersDB = JobParametersDB()
149+
jobDB = JobDB()
150+
151+
if vo: # a user is connecting, with a proxy
152+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
153+
if not res["OK"]:
154+
return res
155+
parameters = res["Value"]
156+
else: # a service is connecting, no proxy, e.g. StalledJobAgent
157+
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
158+
res = jobDB._query(q)
159+
if not res["OK"]:
160+
return res
161+
if not res["Value"]:
162+
return S_OK({})
163+
# get the VO for each jobID
164+
voDict = {}
165+
for jobID, vo in res["Value"]:
166+
if vo not in voDict:
167+
voDict[vo] = []
168+
voDict[vo].append(jobID)
169+
# get the parameters for each VO
170+
parameters = {}
171+
for vo, jobIDs in voDict.items():
172+
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
173+
if not res["OK"]:
174+
return res
175+
parameters.update(res["Value"])
176+
177+
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
178+
res = jobDB.getJobParameters(jobIDs, parName)
179+
if not res["OK"]:
180+
return res
181+
parametersM = res["Value"]
182+
183+
# and now combine
184+
final = dict(parametersM)
185+
# if job in JobDB, update with parameters from ES if any
186+
for jobID in final:
187+
final[jobID].update(parameters.get(jobID, {}))
188+
# if job in ES and not in JobDB, take ES
189+
for jobID in parameters:
190+
if jobID not in final:
191+
final[jobID] = parameters[jobID]
192+
return S_OK(final)

0 commit comments

Comments
 (0)