diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml
index 0f8baf0fcc2..dcccf39dc73 100644
--- a/.github/workflows/basic.yml
+++ b/.github/workflows/basic.yml
@@ -171,14 +171,26 @@ jobs:
path: diracx
- uses: prefix-dev/setup-pixi@v0.9.3
with:
- run-install: false
+ cache: false
post-cleanup: false
+ manifest-path: diracx/pixi.toml
+ environments: >-
+ diracx-core
+ diracx-db
+ diracx-logic
+ diracx-tasks
+ diracx-routers
+ diracx-client
+ diracx-api
+ diracx-cli
- name: Apply workarounds
run: |
cd diracx
# Workaround for https://github.com/prefix-dev/pixi/issues/3762
sed -i.bak 's@editable = true@editable = false@g' pixi.toml
rm pixi.toml.bak
+ sed -i.bak '/solve-group = "gubbins"/d' pixi.toml
+ rm pixi.toml.bak
# Add annotations to github actions
pixi add --pypi --feature diracx-core pytest-github-actions-annotate-failures
# Add the current DIRAC clone to the pixi.toml
@@ -189,11 +201,13 @@ jobs:
- uses: prefix-dev/setup-pixi@v0.9.3
with:
cache: false
+ locked: false
manifest-path: diracx/pixi.toml
environments: >-
diracx-core
diracx-db
diracx-logic
+ diracx-tasks
diracx-routers
diracx-client
diracx-api
diff --git a/README.rst b/README.rst
index 8d540053cce..80fb33ae1ee 100644
--- a/README.rst
+++ b/README.rst
@@ -16,28 +16,28 @@ DIRAC provides a complete solution to one or more user community requiring acces
DIRAC has been started by the `LHCb collaboration `_ who still maintains it. It is now used by several communities (AKA VO=Virtual Organizations) for their distributed computing workflows.
-DIRAC is written in python 3.9.
+DIRAC is written in python 3.11.
-Status rel-v8r0 series (stable, recommended):
+Status rel-v9r0 series (stable, recommended):
-.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=rel-v8r0
- :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Arel-v8r0
+.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=rel-v9r0
+ :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Arel-v9r0
:alt: Basic Tests Status
-.. image:: https://github.com/DIRACGrid/DIRAC/workflows/pilot%20wrapper/badge.svg?branch=rel-v8r0
- :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22pilot+wrapper%22+branch%3Arel-v8r0
+.. image:: https://github.com/DIRACGrid/DIRAC/workflows/pilot%20wrapper/badge.svg?branch=rel-v9r0
+ :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22pilot+wrapper%22+branch%3Arel-v9r0
:alt: Pilot Wrapper Status
-.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Integration%20tests/badge.svg?branch=rel-v8r0
- :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Integration+tests%22+branch%3Arel-v8r0
+.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Integration%20tests/badge.svg?branch=rel-v9r0
+ :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Integration+tests%22+branch%3Arel-v9r0
:alt: Integration Tests Status
-.. image:: https://readthedocs.org/projects/dirac/badge/?version=rel-v8r0
- :target: http://dirac.readthedocs.io/en/rel-v8r0/
+.. image:: https://readthedocs.org/projects/dirac/badge/?version=rel-v9r0
+ :target: http://dirac.readthedocs.io/en/rel-v9r0/
:alt: Documentation Status
-Status integration branch (devel):
+Status integration branch:
.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=integration
:target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Aintegration
@@ -124,7 +124,7 @@ Code quality
~~~~~~~~~~~~
To ensure the code meets DIRAC's coding conventions we recommend installing ``pre-commit`` system wide using your operating system's package manager.
-Alteratively, ``pre-commit`` is included in the Python 3 development environment, see the `development guide `_ for details on how to create one.
+Alteratively, ``pre-commit`` is included in the development environment, see the `development guide `_ for details on how to create one.
Once ``pre-commit`` is installed you can enable it by running:
diff --git a/docs/docs.conf b/docs/docs.conf
index 870e5d28875..8fc711f54e2 100644
--- a/docs/docs.conf
+++ b/docs/docs.conf
@@ -31,6 +31,9 @@ no_inherited_members =
DIRAC.Core.Utilities.Graphs.GraphUtilities,
DIRAC.DataManagementSystem.private.HttpStorageAccessHandler,
DIRAC.FrameworkSystem.private.standardLogging.LogLevels,
+ DIRAC.Resources.IdProvider.OAuth2IdProvider,
+ DIRAC.Resources.IdProvider.CheckInIdProvider,
+ DIRAC.Resources.IdProvider.IAMIdProvider,
# only creating dummy files, because they cannot be safely imported due to sideEffects
create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate, JobWrapperOfflineTemplate
diff --git a/docs/source/DeveloperGuide/CodeTesting/index.rst b/docs/source/DeveloperGuide/CodeTesting/index.rst
index 1f60b1d102e..5e6f91402a5 100644
--- a/docs/source/DeveloperGuide/CodeTesting/index.rst
+++ b/docs/source/DeveloperGuide/CodeTesting/index.rst
@@ -360,13 +360,11 @@ To deactivate a service from being used with DiracX, you can add it in `integrat
"WorkloadManagement/JobMonitoring",
]
-By setting `TEST_DIRACX=Yes` only, it will take the last version of DiracX by default. If you want to provide your own, you have to build your DiracX project, and provide the `dist` folder path when calling `prepare-client`. This path has to be absolute.
+By setting `TEST_DIRACX=Yes` only, it will take the last version of DiracX by default. If you want to test against a local DiracX checkout, provide the source directory path when calling `prepare-environment`. This will build the DiracX container images from source and build wheels for the DIRAC containers.
.. code-block:: bash
- ./integration-tests.py prepare-client TEST_DIRACX=Yes --diracx-dist-dir my-dist-folder/
-
-It will then mount your dist folder into DIRAC and DiracX (in `/diracx_sources`) to install the right dependencies.
+ ./integration-tests.py prepare-environment TEST_DIRACX=Yes --diracx-src-dir /path/to/diracx/
For MacOS, there are two bugs that can be fixed.
diff --git a/integration_tests.py b/integration_tests.py
index 00931ce11a2..22b46170543 100755
--- a/integration_tests.py
+++ b/integration_tests.py
@@ -175,14 +175,14 @@ def create(
flags: Optional[list[str]] = typer.Argument(None),
editable: Optional[bool] = None,
extra_module: Optional[list[str]] = None,
- diracx_dist_dir: Optional[str] = None,
+ diracx_src_dir: Optional[str] = None,
release_var: Optional[str] = None,
run_server_tests: bool = True,
run_client_tests: bool = True,
run_pilot_tests: bool = True,
):
"""Start a local instance of the integration tests"""
- prepare_environment(flags, editable, extra_module, diracx_dist_dir, release_var)
+ prepare_environment(flags, editable, extra_module, diracx_src_dir, release_var)
install_server()
install_client()
install_pilot()
@@ -230,7 +230,7 @@ def prepare_environment(
flags: Optional[list[str]] = typer.Argument(None),
editable: Optional[bool] = None,
extra_module: Optional[list[str]] = None,
- diracx_dist_dir: Optional[str] = None,
+ diracx_src_dir: Optional[str] = None,
release_var: Optional[str] = None,
):
"""Prepare the local environment for installing DIRAC."""
@@ -277,7 +277,7 @@ def prepare_environment(
extra_services = list(chain(*[config["extra-services"] for config in module_configs.values()]))
typer.secho("Running docker compose to create containers", fg=c.GREEN)
- with _gen_docker_compose(modules, diracx_dist_dir=diracx_dist_dir) as docker_compose_fn:
+ with _gen_docker_compose(modules, diracx_src_dir=diracx_src_dir) as docker_compose_fn:
subprocess.run(
[*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn, "up", "-d", "dirac-server", "dirac-client", "dirac-pilot"]
+ extra_services,
@@ -374,11 +374,24 @@ def prepare_environment(
typer.secho("Running docker compose to create DiracX containers", fg=c.GREEN)
typer.secho(f"Will leave a folder behind: {docker_compose_fn_final}", fg=c.YELLOW)
- with _gen_docker_compose(modules, diracx_dist_dir=diracx_dist_dir) as docker_compose_fn:
+ with _gen_docker_compose(modules, diracx_src_dir=diracx_src_dir) as docker_compose_fn:
# We cannot use the temporary directory created in the context manager because
# we don't stay in the contect manager (Popen)
# So we need something that outlives it.
shutil.copytree(docker_compose_fn.parent, docker_compose_fn_final, dirs_exist_ok=True)
+
+ docker_compose_file = docker_compose_fn_final / "docker-compose.yml"
+
+ # Pre-build any images defined with build: directives so that the
+ # background "up -d" below only needs to start containers (fast).
+ if diracx_src_dir is not None:
+ typer.secho("Building DiracX container images from source", fg=c.GREEN)
+ subprocess.run(
+ [*DOCKER_COMPOSE_CMD, "-f", docker_compose_file, "build"],
+ check=True,
+ env=docker_compose_env,
+ )
+
# We use Popen because we don't want to wait for this command to finish.
# It is going to start all the diracx containers, including one which waits
# for the DIRAC installation to be over.
@@ -386,7 +399,7 @@ def prepare_environment(
subStderr = open(docker_compose_fn_final / "stderr", "w")
subprocess.Popen(
- [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn_final / "docker-compose.yml", "up", "-d", "diracx"],
+ [*DOCKER_COMPOSE_CMD, "-f", docker_compose_file, "up", "-d", "diracx"],
env=docker_compose_env,
stdin=None,
stdout=subStdout,
@@ -591,7 +604,7 @@ class TestExit(typer.Exit):
@contextmanager
-def _gen_docker_compose(modules, *, diracx_dist_dir=None):
+def _gen_docker_compose(modules, *, diracx_src_dir=None):
# Load the docker compose configuration and mount the necessary volumes
input_fn = Path(__file__).parent / "tests/CI/docker-compose.yml"
docker_compose = yaml.safe_load(input_fn.read_text())
@@ -607,23 +620,50 @@ def _gen_docker_compose(modules, *, diracx_dist_dir=None):
docker_compose["services"]["diracx-wait-for-db"]["volumes"].extend(volumes[:])
module_configs = _load_module_configs(modules)
- if diracx_dist_dir is not None:
- for container_name in [
- "dirac-client",
- "dirac-pilot",
- "dirac-server",
- "diracx-init-cs",
- "diracx-wait-for-db",
- "diracx-init-db",
- "diracx",
- ]:
+ if diracx_src_dir is not None:
+ diracx_src_dir = str(Path(diracx_src_dir).absolute())
+
+ # Build wheels from the diracx source for the DIRAC containers
+ diracx_wheel_dir = Path(tempfile.mkdtemp(prefix="diracx-wheels-"))
+ typer.secho(f"Building diracx wheels in {diracx_wheel_dir}", fg=c.GREEN)
+ subprocess.run(
+ [
+ sys.executable,
+ "-m",
+ "pip",
+ "wheel",
+ "--no-deps",
+ f"--wheel-dir={diracx_wheel_dir}",
+ *[f"{diracx_src_dir}/diracx-{pkg}" for pkg in ["core", "client", "cli"]],
+ ],
+ check=True,
+ )
+
+ # Mount wheels into DIRAC containers so installDIRACX can find them
+ for container_name in ["dirac-client", "dirac-pilot", "dirac-server"]:
docker_compose["services"][container_name].setdefault("volumes", []).append(
- f"{diracx_dist_dir}:/diracx_sources"
+ f"{diracx_wheel_dir}:/diracx_sources"
)
docker_compose["services"][container_name].setdefault("environment", []).append(
"DIRACX_CUSTOM_SOURCE_PREFIXES=/diracx_sources"
)
+ # Build diracx container images from source instead of pulling pre-built ones
+ diracx_build_services = {
+ "container-services": ["diracx", "diracx-init-keystore", "diracx-init-db"],
+ "container-client": ["diracx-init-cs"],
+ }
+ for pixi_env, container_names in diracx_build_services.items():
+ for container_name in container_names:
+ service = docker_compose["services"][container_name]
+ del service["image"]
+ service.pop("pull_policy", None)
+ service["build"] = {
+ "context": diracx_src_dir,
+ "dockerfile": "containers/Dockerfile",
+ "args": {"PIXI_ENV": pixi_env},
+ }
+
# Add any extension services
for module_name, module_configs in module_configs.items():
for service_name, service_config in module_configs["extra-services"].items():
diff --git a/setup.cfg b/setup.cfg
index f63eb25bcfb..7becf2289f7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -30,7 +30,7 @@ install_requires =
certifi
cwltool
diraccfg
- DIRACCommon==v9.0.20
+ DIRACCommon
diracx-client >=v0.0.1
diracx-core >=v0.0.1
diracx-cli >=v0.0.1
diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py
index 46322129abd..2c01fc2fb63 100644
--- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py
+++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py
@@ -265,7 +265,7 @@ def getDoc(self, index: str, docID: str) -> dict:
"""
sLog.debug(f"Retrieving document {docID} in index {index}")
try:
- return S_OK(self.client.get(index, docID)["_source"])
+ return S_OK(self.client.get(index=index, id=docID)["_source"])
except NotFoundError:
sLog.warn("Could not find the document in index", index)
return S_OK({})
@@ -282,7 +282,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
try:
- response = self.client.mget({"docs": docs})
+ response = self.client.mget(body={"docs": docs})
except RequestError as re:
return S_ERROR(re)
else:
@@ -300,12 +300,12 @@ def updateDoc(self, index: str, docID: str, body) -> dict:
"""
sLog.debug(f"Updating document {docID} in index {index}")
try:
- self.client.update(index, docID, body)
+ self.client.update(index=index, id=docID, body=body)
except ConflictError:
# updates are rather "heavy" operations from ES point of view, needing seqNo to be updated.
# Not ideal, but we just wait and retry.
time.sleep(1)
- self.client.update(index, docID, body, params={"retry_on_conflict": 3})
+ self.client.update(index=index, id=docID, body=body, params={"retry_on_conflict": 3})
except RequestError as re:
return S_ERROR(re)
return S_OK()
@@ -319,7 +319,7 @@ def deleteDoc(self, index: str, docID: str):
"""
sLog.debug(f"Deleting document {docID} in index {index}")
try:
- return S_OK(self.client.delete(index, docID))
+ return S_OK(self.client.delete(index=index, id=docID))
except RequestError as re:
return S_ERROR(re)
except NotFoundError:
@@ -334,7 +334,7 @@ def existsDoc(self, index: str, docID: str) -> bool:
:param docID: document ID
"""
sLog.debug(f"Checking if document {docID} in index {index} exists")
- return self.client.exists(index, docID)
+ return self.client.exists(index=index, id=docID)
@ifConnected
def _Search(self, indexname):
@@ -365,7 +365,7 @@ def getIndexes(self, indexName=None):
indexName = ""
sLog.debug(f"Getting indices alias of {indexName}")
# we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc.
- return list(self.client.indices.get_alias(f"{indexName}*"))
+ return list(self.client.indices.get_alias(index=f"{indexName}*"))
@ifConnected
def getDocTypes(self, indexName):
@@ -378,7 +378,7 @@ def getDocTypes(self, indexName):
result = []
try:
sLog.debug("Getting mappings for ", indexName)
- result = self.client.indices.get_mapping(indexName)
+ result = self.client.indices.get_mapping(index=indexName)
except Exception as e: # pylint: disable=broad-except
sLog.exception()
return S_ERROR(e)
@@ -409,7 +409,7 @@ def existingIndex(self, indexName):
"""
sLog.debug(f"Checking existance of index {indexName}")
try:
- return S_OK(self.client.indices.exists(indexName))
+ return S_OK(self.client.indices.exists(index=indexName))
except TransportError as e:
sLog.exception()
return S_ERROR(e)
@@ -446,7 +446,7 @@ def deleteIndex(self, indexName):
"""
sLog.info("Deleting index", indexName)
try:
- retVal = self.client.indices.delete(indexName)
+ retVal = self.client.indices.delete(index=indexName)
except NotFoundError:
sLog.warn("Index does not exist", indexName)
return S_OK("Nothing to delete")
diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py
index 823bac69b15..323249bba16 100755
--- a/src/DIRAC/Core/Utilities/MySQL.py
+++ b/src/DIRAC/Core/Utilities/MySQL.py
@@ -401,7 +401,7 @@ def __getWithRetry(self, dbName, totalRetries, retriesLeft):
def __ping(self, conn):
try:
- conn.ping(True)
+ conn.ping()
return True
except Exception:
return False
diff --git a/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py b/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py
index d51ab956769..66947900838 100644
--- a/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py
+++ b/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py
@@ -1,5 +1,5 @@
-""" Client for interacting with Framework/BundleDelivery service
-"""
+"""Client for interacting with Framework/BundleDelivery service"""
+
import getpass
import os
import tarfile
@@ -143,9 +143,10 @@ def syncCAs(self):
if "X509_CERT_DIR" in os.environ:
X509_CERT_DIR = os.environ["X509_CERT_DIR"]
del os.environ["X509_CERT_DIR"]
+ result = self.syncDir("CAs", Locations.getCAsLocation())
if X509_CERT_DIR:
os.environ["X509_CERT_DIR"] = X509_CERT_DIR
- return self.syncDir("CAs", Locations.getCAsLocation())
+ return result
def syncCRLs(self):
"""Synchronize CRLs
@@ -156,9 +157,10 @@ def syncCRLs(self):
if "X509_CERT_DIR" in os.environ:
X509_CERT_DIR = os.environ["X509_CERT_DIR"]
del os.environ["X509_CERT_DIR"]
+ result = self.syncDir("CRLs", Locations.getCAsLocation())
if X509_CERT_DIR:
os.environ["X509_CERT_DIR"] = X509_CERT_DIR
- return self.syncDir("CRLs", Locations.getCAsLocation())
+ return result
def getCAs(self):
"""This method can be used to create the CAs. If the file can not be created,
diff --git a/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py b/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py
index 46ace20e700..362904eee3d 100644
--- a/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py
+++ b/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py
@@ -1,9 +1,9 @@
-""" Handler for CAs + CRLs bundles
-"""
+"""Handler for CAs + CRLs bundles"""
import io
import os
import tarfile
+from pathlib import Path
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.Core.DISET.RequestHandler import RequestHandler
@@ -66,12 +66,15 @@ def updateBundles(self):
buffer_ = io.BytesIO()
filesToBundle = sorted(File.getGlobbedFiles(bundlePaths))
if filesToBundle:
- commonPath = os.path.commonprefix(filesToBundle)
- commonEnd = len(commonPath)
- gLogger.info(f"Bundle will have {len(filesToBundle)} files with common path {commonPath}")
+ paths = [Path(f) for f in filesToBundle]
+ # Path.parents is path-component-aware, unlike os.path.commonprefix
+ commonParent = (
+ Path(os.path.commonpath(paths)).parent if len(paths) == 1 else Path(os.path.commonpath(paths))
+ )
+ gLogger.info(f"Bundle will have {len(filesToBundle)} files with common path {commonParent}")
with tarfile.open("dummy", "w:gz", buffer_) as tarBuffer:
- for filePath in filesToBundle:
- tarBuffer.add(filePath, filePath[commonEnd:])
+ for p in paths:
+ tarBuffer.add(str(p), str(p.relative_to(commonParent)))
zippedData = buffer_.getvalue()
buffer_.close()
hash_ = File.getMD5ForFiles(filesToBundle)
diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py
index b8f7b5bf1ce..7e77e0d55ea 100644
--- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py
+++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py
@@ -7,6 +7,7 @@
:caption: TransformationCleaningAgent options
"""
+
# # imports
import ast
import errno
@@ -144,10 +145,13 @@ def initialize(self):
return result
self.taskQueueDB = result["Value"]()
- result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
- if not result["OK"]:
- return result
- self.storageManagementDB = result["Value"]()
+ try:
+ result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
+ if not result["OK"]:
+ return result
+ self.storageManagementDB = result["Value"]()
+ except RuntimeError:
+ pass
return S_OK()
diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
index d8287c07dd4..f68e47782df 100644
--- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
+++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py
@@ -1,4 +1,4 @@
-""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
+"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
This agent will take care of:
- removing all jobs that are in status JobStatus.DELETED
@@ -22,6 +22,7 @@
than 0.
"""
+
import datetime
import os
@@ -30,14 +31,13 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities import TimeUtilities
+from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
-from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
-from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
@@ -54,7 +54,10 @@ def __init__(self, *args, **kwargs):
# clients
self.jobDB = None
+ self.taskQueueDB = None
+ self.pilotAgentsDB = None
self.sandboxDB = None
+ self.storageManagementDB = None
self.maxJobsAtOnce = 500
self.prodTypes = []
@@ -66,8 +69,33 @@ def __init__(self, *args, **kwargs):
def initialize(self):
"""Sets defaults"""
- self.jobDB = JobDB()
- self.sandboxDB = SandboxMetadataDB()
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
+ if not result["OK"]:
+ return result
+ self.jobDB = result["Value"]()
+
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
+ if not result["OK"]:
+ return result
+ self.taskQueueDB = result["Value"]()
+
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
+ if not result["OK"]:
+ return result
+ self.pilotAgentsDB = result["Value"]()
+
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB")
+ if not result["OK"]:
+ return result
+ self.sandboxDB = result["Value"]()
+
+ try:
+ result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
+ if not result["OK"]:
+ return result
+ self.storageManagementDB = result["Value"]()
+ except RuntimeError:
+ pass
agentTSTypes = self.am_getOption("ProductionTypes", [])
if agentTSTypes:
@@ -238,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False):
wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup)
result = wmsClient.removeJob(jobsList)
else:
- result = kill_delete_jobs(RIGHT_DELETE, jobsList)
+ result = kill_delete_jobs(
+ RIGHT_DELETE,
+ jobsList,
+ jobdb=self.jobDB,
+ taskqueuedb=self.taskQueueDB,
+ pilotagentsdb=self.pilotAgentsDB,
+ storagemanagementdb=self.storageManagementDB,
+ )
if not result["OK"]:
self.log.error(
f"Could not {'remove' if remove else 'delete'} jobs",
diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
index 5ddf97e5f45..b193e702475 100755
--- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
+++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py
@@ -72,10 +72,13 @@ def initialize(self):
return result
self.pilotAgentsDB = result["Value"]()
- result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
- if not result["OK"]:
- return result
- self.storageManagementDB = result["Value"]()
+ try:
+ result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
+ if not result["OK"]:
+ return result
+ self.storageManagementDB = result["Value"]()
+ except RuntimeError:
+ pass
# getting parameters
diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py
index ae098c9f756..ef53fd61e93 100644
--- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py
+++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py
@@ -1,5 +1,5 @@
-""" Test class for Job Cleaning Agent
-"""
+"""Test class for Job Cleaning Agent"""
+
from unittest.mock import MagicMock
import pytest
@@ -16,6 +16,9 @@
mockNone = MagicMock()
mockNone.return_value = None
mockJMC = MagicMock()
+mockJobDB = MagicMock()
+mockJobDB.getDistinctJobAttributes = mockReply
+mockJobDB.selectJobs = mockReply
@pytest.fixture
@@ -27,16 +30,21 @@ def jca(mocker):
create=True,
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
+
+ def mock_load_object(module_path, class_name):
+ mocks = {
+ "JobDB": mockJobDB,
+ "TaskQueueDB": MagicMock(),
+ "PilotAgentsDB": MagicMock(),
+ "SandboxMetadataDB": MagicMock(),
+ "StorageManagementDB": MagicMock(),
+ }
+ return {"OK": True, "Value": lambda: mocks[class_name]}
+
mocker.patch(
- "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply
- )
- mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply)
- mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone)
- mocker.patch(
- "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone
+ "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
+ side_effect=mock_load_object,
)
- mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
-
jca = JobCleaningAgent()
jca.log = gLogger
jca.log.setLevel("DEBUG")
@@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected):
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__")
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM)
- mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone)
- mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"])
)
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params)
+ def mock_load_object(module_path, class_name):
+ mocks = {
+ "JobDB": MagicMock(),
+ "TaskQueueDB": MagicMock(),
+ "PilotAgentsDB": MagicMock(),
+ "SandboxMetadataDB": MagicMock(),
+ "StorageManagementDB": MagicMock(),
+ }
+ return {"OK": True, "Value": lambda: mocks[class_name]}
+
+ mocker.patch(
+ "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject",
+ side_effect=mock_load_object,
+ )
jobCleaningAgent = JobCleaningAgent()
+
jobCleaningAgent.log = gLogger
jobCleaningAgent.log.setLevel("DEBUG")
jobCleaningAgent._AgentModule__configDefaults = mockAM
diff --git a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
index 8af19416d1f..30e695e5334 100644
--- a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
+++ b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py
@@ -1,6 +1,6 @@
from DIRAC import S_ERROR, S_OK, gLogger
-from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
+from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL
from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition
@@ -96,11 +96,6 @@ def kill_delete_jobs(
if not result["OK"]:
return result
pilotagentsdb = result["Value"]()
- if storagemanagementdb is None:
- result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
- if not result["OK"]:
- return result
- storagemanagementdb = result["Value"]()
badIDs = []
@@ -133,6 +128,14 @@ def kill_delete_jobs(
stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING]
if stagingJobList:
+ if storagemanagementdb is None:
+ result = ObjectLoader().loadObject(
+ "StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB"
+ )
+ if not result["OK"]:
+ return result
+ storagemanagementdb = result["Value"]()
+
gLogger.info("Going to send killing signal to stager as well!")
result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList)
if not result["OK"]:
diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
index 0f1d82d4ee1..c2d39ad716b 100755
--- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
+++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py
@@ -1,14 +1,15 @@
-""" JobManagerHandler is the implementation of the JobManager service
- in the DISET framework
+"""JobManagerHandler is the implementation of the JobManager service
+in the DISET framework
- The following methods are available in the Service interface
+The following methods are available in the Service interface
- submitJob()
- rescheduleJob()
- deleteJob()
- killJob()
+submitJob()
+rescheduleJob()
+deleteJob()
+killJob()
"""
+
from pydantic import ValidationError
from DIRAC import S_ERROR, S_OK
@@ -22,6 +23,7 @@
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.WorkloadManagementSystem.Client import JobStatus
+from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import (
RIGHT_DELETE,
RIGHT_KILL,
@@ -30,7 +32,6 @@
RIGHT_SUBMIT,
JobPolicy,
)
-from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs
@@ -44,34 +45,33 @@ class JobManagerHandlerMixin:
@classmethod
def initializeHandler(cls, serviceInfoDict):
"""Initialization of DB objects and OptimizationMind"""
- try:
- result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
- if not result["OK"]:
- return result
- cls.jobDB = result["Value"](parentLogger=cls.log)
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB")
+ if not result["OK"]:
+ return result
+ cls.jobDB = result["Value"](parentLogger=cls.log)
- result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
- if not result["OK"]:
- return result
- cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB")
+ if not result["OK"]:
+ return result
+ cls.jobLoggingDB = result["Value"](parentLogger=cls.log)
- result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
- if not result["OK"]:
- return result
- cls.taskQueueDB = result["Value"](parentLogger=cls.log)
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB")
+ if not result["OK"]:
+ return result
+ cls.taskQueueDB = result["Value"](parentLogger=cls.log)
- result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
- if not result["OK"]:
- return result
- cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
+ result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB")
+ if not result["OK"]:
+ return result
+ cls.pilotAgentsDB = result["Value"](parentLogger=cls.log)
+ try:
result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB")
if not result["OK"]:
return result
cls.storageManagementDB = result["Value"](parentLogger=cls.log)
-
- except RuntimeError as excp:
- return S_ERROR(f"Can't connect to DB: {excp!r}")
+ except RuntimeError:
+ cls.storageManagementDB = None
cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind")
result = cls.msgClient.connect(JobManager=True)
diff --git a/tests/CI/docker-compose.yml b/tests/CI/docker-compose.yml
index 34c49700878..845cceb9829 100644
--- a/tests/CI/docker-compose.yml
+++ b/tests/CI/docker-compose.yml
@@ -1,6 +1,8 @@
volumes:
# Volume used to store the certificates of dirac
certs_data:
+ # Volume used to store the crls of dirac
+ crls_data:
# Volume used to store the config of diracx
diracx-cs-store:
# Volume used to store the pair of keys to sign the tokens
@@ -18,7 +20,13 @@ services:
ports:
- 3306:3306
healthcheck:
- test: ["CMD", "sh", "-c", "${MYSQL_ADMIN_COMMAND} ping -h localhost > /tmp/health.log 2>&1;"]
+ test:
+ [
+ "CMD",
+ "sh",
+ "-c",
+ "${MYSQL_ADMIN_COMMAND} ping -h localhost > /tmp/health.log 2>&1;",
+ ]
timeout: 20s
retries: 10
start_period: 60s
@@ -33,7 +41,8 @@ services:
- 9200:9200
env_file: "${ES_VER}.env"
healthcheck:
- test: ["CMD", "curl", "-f", "-u", "elastic:changeme", "http://localhost:9200"]
+ test:
+ ["CMD", "curl", "-f", "-u", "elastic:changeme", "http://localhost:9200"]
interval: 5s
timeout: 2s
retries: 15
@@ -53,7 +62,13 @@ services:
depends_on:
- iam-init-keystore
healthcheck:
- test: ["CMD", "curl", "-f", "http://localhost:8080/.well-known/openid-configuration"]
+ test:
+ [
+ "CMD",
+ "curl",
+ "-f",
+ "http://localhost:8080/.well-known/openid-configuration",
+ ]
interval: 5s
timeout: 2s
retries: 15
@@ -116,6 +131,7 @@ services:
container_name: dirac-init-certificates
volumes:
- certs_data:/ca/certs/
+ - crls_data:/ca/crl/
entrypoint: |
/entrypoint.sh
pull_policy: always
@@ -146,6 +162,7 @@ services:
nofile: 8192
volumes:
- certs_data:/ca/certs
+ - crls_data:/ca/crl/
- diracx-cs-store:/cs_store
- diracx-key-store:/keystore
environment:
@@ -154,7 +171,6 @@ services:
command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround.
pull_policy: always
-
dirac-client:
platform: linux/amd64
image: ${CI_REGISTRY_IMAGE}/${HOST_OS}-dirac
@@ -165,6 +181,7 @@ services:
- dirac-server
volumes:
- certs_data:/ca/certs
+ - crls_data:/ca/crl/
ulimits:
nofile: 8192
command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround.
@@ -180,6 +197,7 @@ services:
- dirac-server
volumes:
- certs_data:/ca/certs
+ - crls_data:/ca/crl/
- type: bind
source: ${CVMFS_DIR}
target: /cvmfs
@@ -195,7 +213,6 @@ services:
start_period: 60s
command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround.
-
diracx-chmod:
platform: linux/amd64
image: ghcr.io/diracgrid/diracx/secret-generation:latest
@@ -210,7 +227,6 @@ services:
bash -xc 'chmod -R o=u /keystore && chmod -R o=u /cs_store'
pull_policy: always
-
diracx-init-keystore:
platform: linux/amd64
image: ghcr.io/diracgrid/diracx/services:dev
@@ -255,7 +271,15 @@ services:
environment:
- DIRACX_DB_URL_AUTHDB=mysql+aiomysql://Dirac:Dirac@mysql/DiracXAuthDB
entrypoint: |
- /entrypoint.sh bash -xc 'micromamba install --yes -c conda-forge mysql-client && mysql -h mysql -u root --password=password -e "CREATE DATABASE DiracXAuthDB" && mysql -h mysql -u root --password=password -e "GRANT SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE ON DiracXAuthDB.* TO Dirac@'"'"'%'"'"'" && python -m diracx.db init-sql && python -m diracx.db init-os'
+ /entrypoint.sh bash -xc 'python3 -c "
+ import pymysql
+ db = pymysql.connect(host=\"mysql\", user=\"root\", password=\"password\")
+ c = db.cursor()
+ c.execute(\"CREATE USER IF NOT EXISTS Dirac@\\x27%%\\x27 IDENTIFIED BY \\x27Dirac\\x27\")
+ c.execute(\"CREATE DATABASE DiracXAuthDB\")
+ c.execute(\"GRANT SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE ON DiracXAuthDB.* TO Dirac@\\x27%%\\x27\")
+ db.commit()
+ " && python -m diracx.db init-sql && python -m diracx.db init-os'
pull_policy: always
diracx:
@@ -300,7 +324,14 @@ services:
/entrypoint.sh bash -xc 'uvicorn --factory diracx.routers:create_app --host=0.0.0.0'
healthcheck:
- test: ["CMD", "/opt/conda/bin/python", "-c", 'import requests; requests.get("http://localhost:8000/.well-known/openid-configuration").raise_for_status()']
+ test:
+ [
+ "CMD",
+ "/entrypoint.sh",
+ "python",
+ "-c",
+ "import requests; requests.get('http://localhost:8000/.well-known/openid-configuration').raise_for_status()",
+ ]
interval: 5s
timeout: 2s
retries: 15
diff --git a/tests/CI/run_pilot.sh b/tests/CI/run_pilot.sh
index f7827b8503f..a94ecce1b41 100755
--- a/tests/CI/run_pilot.sh
+++ b/tests/CI/run_pilot.sh
@@ -23,22 +23,27 @@ mkdir -p /home/dirac/etc/grid-security/vomsdir
mkdir -p /home/dirac/etc/grid-security/vomses
cp /ca/certs/ca.cert.pem /home/dirac/etc/grid-security/certificates
+cp /ca/certs/ca.crl.pem /home/dirac/etc/grid-security/certificates
touch /home/dirac/etc/grid-security/vomsdir/vomsdir
touch /home/dirac/etc/grid-security/vomses/vomses
+# Generate the hash link file required by openSSL to index CA certificates
+caHash=$(openssl x509 -in /home/dirac/etc/grid-security/certificates/ca.cert.pem -noout -hash)
+ln -s ca.cert.pem "/home/dirac/etc/grid-security/certificates/$caHash.0"
+tar --create --file "/home/dirac/etc/grid-security/certificates/$caHash.r0" --gzip /home/dirac/etc/grid-security/certificates/ca.crl.pem
# Copy over the pilot proxy
cp /ca/certs/pilot_proxy /tmp/x509up_u$UID
eval "${PILOT_DOWNLOAD_COMMAND}"
-echo "${PILOT_JSON}" > pilot.json
-jq < pilot.json
+echo "${PILOT_JSON}" >pilot.json
+jq /dev/null; then
+if command -v python &>/dev/null; then
py='python'
-elif command -v python3 &> /dev/null; then
+elif command -v python3 &>/dev/null; then
py='python3'
-elif command -v python2 &> /dev/null; then
+elif command -v python2 &>/dev/null; then
py='python2'
fi
diff --git a/tests/Jenkins/dirac_ci.sh b/tests/Jenkins/dirac_ci.sh
index 87e67db2b66..32787e15bc0 100644
--- a/tests/Jenkins/dirac_ci.sh
+++ b/tests/Jenkins/dirac_ci.sh
@@ -117,8 +117,9 @@ installSite() {
echo "==> CAs and certificates"
- # Copy the CA to the list of trusted CA
+ # Copy the CA and CRL to the list of trusted CA
cp "/ca/certs/ca.cert.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/"
+ cp "/ca/certs/ca.crl.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/"
# Copy the cert and host key to the certificates directory
cp /ca/certs/hostcert.pem "${SERVERINSTALLDIR}/diracos/etc/grid-security/"
@@ -130,6 +131,7 @@ installSite() {
# because otherwise the BundleDeliveryClient will send the full path, which
# will be wrong on the client
ln -s "ca.cert.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/$caHash.0"
+ tar --create --file "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/$caHash.r0" --gzip "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/ca.crl.pem"
rm -rf "${SERVERINSTALLDIR}/etc"
ln -s "${SERVERINSTALLDIR}/diracos/etc" "${SERVERINSTALLDIR}/etc"