Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:caption: TransformationCleaningAgent options

"""

# # imports
import ast
import errno
Expand Down Expand Up @@ -144,10 +145,13 @@ def initialize(self):
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass and not a warning log?
Based on #8511 (comment)


return S_OK()

Expand Down
44 changes: 39 additions & 5 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import getJobParameters
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE

Expand All @@ -55,7 +54,10 @@ def __init__(self, *args, **kwargs):

# clients
self.jobDB = None
self.taskQueueDB = None
self.pilotAgentsDB = None
self.sandboxDB = None
self.storageManagementDB = None

self.maxJobsAtOnce = 500
self.prodTypes = []
Expand All @@ -67,8 +69,33 @@ def __init__(self, *args, **kwargs):
def initialize(self):
"""Sets defaults"""

self.jobDB = JobDB()
self.sandboxDB = SandboxMetadataDB()
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
self.jobDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
self.taskQueueDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
if not result["OK"]:
return result
self.sandboxDB = result["Value"]()

try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here


agentTSTypes = self.am_getOption("ProductionTypes", [])
if agentTSTypes:
Expand Down Expand Up @@ -239,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False):
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
result = wmsClient.removeJob(jobsList)
else:
result = kill_delete_jobs(RIGHT_DELETE, jobsList)
result = kill_delete_jobs(
RIGHT_DELETE,
jobsList,
jobdb=self.jobDB,
taskqueuedb=self.taskQueueDB,
pilotagentsdb=self.pilotAgentsDB,
storagemanagementdb=self.storageManagementDB,
)
if not result["OK"]:
self.log.error(
f"Could not {'remove' if remove else 'delete'} jobs",
Expand Down
11 changes: 7 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ def initialize(self):
return result
self.pilotAgentsDB = result["Value"]()

result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
self.storageManagementDB = result["Value"]()
except RuntimeError:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here


# getting parameters

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
""" Test class for Job Cleaning Agent
"""
"""Test class for Job Cleaning Agent"""

from unittest.mock import MagicMock

import pytest
Expand All @@ -16,6 +16,9 @@
mockNone = MagicMock()
mockNone.return_value = None
mockJMC = MagicMock()
mockJobDB = MagicMock()
mockJobDB.getDistinctJobAttributes = mockReply
mockJobDB.selectJobs = mockReply


@pytest.fixture
Expand All @@ -27,16 +30,21 @@ def jca(mocker):
create=True,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)

def mock_load_object(module_path, class_name):
mocks = {
"JobDB": mockJobDB,
"TaskQueueDB": MagicMock(),
"PilotAgentsDB": MagicMock(),
"SandboxMetadataDB": MagicMock(),
"StorageManagementDB": MagicMock(),
}
return {"OK": True, "Value": lambda: mocks[class_name]}

mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone
"DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you patching the right module here? Shouldn't it be JobCleaningAgent?

side_effect=mock_load_object,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)

jca = JobCleaningAgent()
jca.log = gLogger
jca.log.setLevel("DEBUG")
Expand Down Expand Up @@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)

def mock_load_object(module_path, class_name):
mocks = {
"JobDB": MagicMock(),
"TaskQueueDB": MagicMock(),
"PilotAgentsDB": MagicMock(),
"SandboxMetadataDB": MagicMock(),
"StorageManagementDB": MagicMock(),
}
return {"OK": True, "Value": lambda: mocks[class_name]}

mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.StalledJobAgent.ObjectLoader.loadObject",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here?

side_effect=mock_load_object,
)
jobCleaningAgent = JobCleaningAgent()

jobCleaningAgent.log = gLogger
jobCleaningAgent.log.setLevel("DEBUG")
jobCleaningAgent._AgentModule__configDefaults = mockAM
Expand Down
15 changes: 9 additions & 6 deletions src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from DIRAC import S_ERROR, S_OK, gLogger
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition

Expand Down Expand Up @@ -96,11 +96,6 @@ def kill_delete_jobs(
if not result["OK"]:
return result
pilotagentsdb = result["Value"]()
if storagemanagementdb is None:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()

badIDs = []

Expand Down Expand Up @@ -133,6 +128,14 @@ def kill_delete_jobs(
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]

if stagingJobList:
if storagemanagementdb is None:
result = ObjectLoader().loadObject(
"StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB"
)
if not result["OK"]:
return result
storagemanagementdb = result["Value"]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does happen here if an exception is raised?


gLogger.info("Going to send killing signal to stager as well!")
result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList)
if not result["OK"]:
Expand Down
58 changes: 29 additions & 29 deletions src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
""" JobManagerHandler is the implementation of the JobManager service
in the DISET framework
"""JobManagerHandler is the implementation of the JobManager service
in the DISET framework

The following methods are available in the Service interface
The following methods are available in the Service interface

submitJob()
rescheduleJob()
deleteJob()
killJob()
submitJob()
rescheduleJob()
deleteJob()
killJob()

"""

from pydantic import ValidationError

from DIRAC import S_ERROR, S_OK
Expand All @@ -22,6 +23,7 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
Expand All @@ -30,7 +32,6 @@
RIGHT_SUBMIT,
JobPolicy,
)
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
Expand All @@ -44,34 +45,33 @@ class JobManagerHandlerMixin:
@classmethod
def initializeHandler(cls, serviceInfoDict):
"""Initialization of DB objects and OptimizationMind"""
try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove the try/except here? Don't you want to return S_ERROR?

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
cls.jobDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
if not result["OK"]:
return result
cls.jobDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
if not result["OK"]:
return result
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
if not result["OK"]:
return result
cls.jobLoggingDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
cls.taskQueueDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
if not result["OK"]:
return result
cls.taskQueueDB = result["Value"](parentLogger=cls.log)

result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
if not result["OK"]:
return result
cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)

try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
cls.storageManagementDB = result["Value"](parentLogger=cls.log)

except RuntimeError as excp:
return S_ERROR(f"Can't connect to DB: {excp!r}")
cls.storageManagementDB = result["Value"]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more parentLogger here. Is that expected?

Suggested change
cls.storageManagementDB = result["Value"]()
cls.storageManagementDB = result["Value"](parentLogger=cls.log)

except RuntimeError:
cls.storageManagementDB = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here too


cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
result = cls.msgClient.connect(JobManager=True)
Expand Down
Loading