diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index 0f8baf0fcc2..dcccf39dc73 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -171,14 +171,26 @@ jobs: path: diracx - uses: prefix-dev/setup-pixi@v0.9.3 with: - run-install: false + cache: false post-cleanup: false + manifest-path: diracx/pixi.toml + environments: >- + diracx-core + diracx-db + diracx-logic + diracx-tasks + diracx-routers + diracx-client + diracx-api + diracx-cli - name: Apply workarounds run: | cd diracx # Workaround for https://github.com/prefix-dev/pixi/issues/3762 sed -i.bak 's@editable = true@editable = false@g' pixi.toml rm pixi.toml.bak + sed -i.bak '/solve-group = "gubbins"/d' pixi.toml + rm pixi.toml.bak # Add annotations to github actions pixi add --pypi --feature diracx-core pytest-github-actions-annotate-failures # Add the current DIRAC clone to the pixi.toml @@ -189,11 +201,13 @@ jobs: - uses: prefix-dev/setup-pixi@v0.9.3 with: cache: false + locked: false manifest-path: diracx/pixi.toml environments: >- diracx-core diracx-db diracx-logic + diracx-tasks diracx-routers diracx-client diracx-api diff --git a/README.rst b/README.rst index 8d540053cce..80fb33ae1ee 100644 --- a/README.rst +++ b/README.rst @@ -16,28 +16,28 @@ DIRAC provides a complete solution to one or more user community requiring acces DIRAC has been started by the `LHCb collaboration `_ who still maintains it. It is now used by several communities (AKA VO=Virtual Organizations) for their distributed computing workflows. -DIRAC is written in python 3.9. +DIRAC is written in python 3.11. -Status rel-v8r0 series (stable, recommended): +Status rel-v9r0 series (stable, recommended): -.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=rel-v8r0 - :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Arel-v8r0 +.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=rel-v9r0 + :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Arel-v9r0 :alt: Basic Tests Status -.. image:: https://github.com/DIRACGrid/DIRAC/workflows/pilot%20wrapper/badge.svg?branch=rel-v8r0 - :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22pilot+wrapper%22+branch%3Arel-v8r0 +.. image:: https://github.com/DIRACGrid/DIRAC/workflows/pilot%20wrapper/badge.svg?branch=rel-v9r0 + :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22pilot+wrapper%22+branch%3Arel-v9r0 :alt: Pilot Wrapper Status -.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Integration%20tests/badge.svg?branch=rel-v8r0 - :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Integration+tests%22+branch%3Arel-v8r0 +.. image:: https://github.com/DIRACGrid/DIRAC/workflows/Integration%20tests/badge.svg?branch=rel-v9r0 + :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Integration+tests%22+branch%3Arel-v9r0 :alt: Integration Tests Status -.. image:: https://readthedocs.org/projects/dirac/badge/?version=rel-v8r0 - :target: http://dirac.readthedocs.io/en/rel-v8r0/ +.. image:: https://readthedocs.org/projects/dirac/badge/?version=rel-v9r0 + :target: http://dirac.readthedocs.io/en/rel-v9r0/ :alt: Documentation Status -Status integration branch (devel): +Status integration branch: .. image:: https://github.com/DIRACGrid/DIRAC/workflows/Basic%20tests/badge.svg?branch=integration :target: https://github.com/DIRACGrid/DIRAC/actions?query=workflow%3A%22Basic+tests%22+branch%3Aintegration @@ -124,7 +124,7 @@ Code quality ~~~~~~~~~~~~ To ensure the code meets DIRAC's coding conventions we recommend installing ``pre-commit`` system wide using your operating system's package manager. -Alteratively, ``pre-commit`` is included in the Python 3 development environment, see the `development guide `_ for details on how to create one. +Alteratively, ``pre-commit`` is included in the development environment, see the `development guide `_ for details on how to create one. Once ``pre-commit`` is installed you can enable it by running: diff --git a/docs/docs.conf b/docs/docs.conf index 870e5d28875..8fc711f54e2 100644 --- a/docs/docs.conf +++ b/docs/docs.conf @@ -31,6 +31,9 @@ no_inherited_members = DIRAC.Core.Utilities.Graphs.GraphUtilities, DIRAC.DataManagementSystem.private.HttpStorageAccessHandler, DIRAC.FrameworkSystem.private.standardLogging.LogLevels, + DIRAC.Resources.IdProvider.OAuth2IdProvider, + DIRAC.Resources.IdProvider.CheckInIdProvider, + DIRAC.Resources.IdProvider.IAMIdProvider, # only creating dummy files, because they cannot be safely imported due to sideEffects create_dummy_files = lfc_dfc_copy, lfc_dfc_db_copy, JobWrapperTemplate, JobWrapperOfflineTemplate diff --git a/docs/source/DeveloperGuide/CodeTesting/index.rst b/docs/source/DeveloperGuide/CodeTesting/index.rst index 1f60b1d102e..5e6f91402a5 100644 --- a/docs/source/DeveloperGuide/CodeTesting/index.rst +++ b/docs/source/DeveloperGuide/CodeTesting/index.rst @@ -360,13 +360,11 @@ To deactivate a service from being used with DiracX, you can add it in `integrat "WorkloadManagement/JobMonitoring", ] -By setting `TEST_DIRACX=Yes` only, it will take the last version of DiracX by default. If you want to provide your own, you have to build your DiracX project, and provide the `dist` folder path when calling `prepare-client`. This path has to be absolute. +By setting `TEST_DIRACX=Yes` only, it will take the last version of DiracX by default. If you want to test against a local DiracX checkout, provide the source directory path when calling `prepare-environment`. This will build the DiracX container images from source and build wheels for the DIRAC containers. .. code-block:: bash - ./integration-tests.py prepare-client TEST_DIRACX=Yes --diracx-dist-dir my-dist-folder/ - -It will then mount your dist folder into DIRAC and DiracX (in `/diracx_sources`) to install the right dependencies. + ./integration-tests.py prepare-environment TEST_DIRACX=Yes --diracx-src-dir /path/to/diracx/ For MacOS, there are two bugs that can be fixed. diff --git a/integration_tests.py b/integration_tests.py index 00931ce11a2..22b46170543 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -175,14 +175,14 @@ def create( flags: Optional[list[str]] = typer.Argument(None), editable: Optional[bool] = None, extra_module: Optional[list[str]] = None, - diracx_dist_dir: Optional[str] = None, + diracx_src_dir: Optional[str] = None, release_var: Optional[str] = None, run_server_tests: bool = True, run_client_tests: bool = True, run_pilot_tests: bool = True, ): """Start a local instance of the integration tests""" - prepare_environment(flags, editable, extra_module, diracx_dist_dir, release_var) + prepare_environment(flags, editable, extra_module, diracx_src_dir, release_var) install_server() install_client() install_pilot() @@ -230,7 +230,7 @@ def prepare_environment( flags: Optional[list[str]] = typer.Argument(None), editable: Optional[bool] = None, extra_module: Optional[list[str]] = None, - diracx_dist_dir: Optional[str] = None, + diracx_src_dir: Optional[str] = None, release_var: Optional[str] = None, ): """Prepare the local environment for installing DIRAC.""" @@ -277,7 +277,7 @@ def prepare_environment( extra_services = list(chain(*[config["extra-services"] for config in module_configs.values()])) typer.secho("Running docker compose to create containers", fg=c.GREEN) - with _gen_docker_compose(modules, diracx_dist_dir=diracx_dist_dir) as docker_compose_fn: + with _gen_docker_compose(modules, diracx_src_dir=diracx_src_dir) as docker_compose_fn: subprocess.run( [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn, "up", "-d", "dirac-server", "dirac-client", "dirac-pilot"] + extra_services, @@ -374,11 +374,24 @@ def prepare_environment( typer.secho("Running docker compose to create DiracX containers", fg=c.GREEN) typer.secho(f"Will leave a folder behind: {docker_compose_fn_final}", fg=c.YELLOW) - with _gen_docker_compose(modules, diracx_dist_dir=diracx_dist_dir) as docker_compose_fn: + with _gen_docker_compose(modules, diracx_src_dir=diracx_src_dir) as docker_compose_fn: # We cannot use the temporary directory created in the context manager because # we don't stay in the contect manager (Popen) # So we need something that outlives it. shutil.copytree(docker_compose_fn.parent, docker_compose_fn_final, dirs_exist_ok=True) + + docker_compose_file = docker_compose_fn_final / "docker-compose.yml" + + # Pre-build any images defined with build: directives so that the + # background "up -d" below only needs to start containers (fast). + if diracx_src_dir is not None: + typer.secho("Building DiracX container images from source", fg=c.GREEN) + subprocess.run( + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_file, "build"], + check=True, + env=docker_compose_env, + ) + # We use Popen because we don't want to wait for this command to finish. # It is going to start all the diracx containers, including one which waits # for the DIRAC installation to be over. @@ -386,7 +399,7 @@ def prepare_environment( subStderr = open(docker_compose_fn_final / "stderr", "w") subprocess.Popen( - [*DOCKER_COMPOSE_CMD, "-f", docker_compose_fn_final / "docker-compose.yml", "up", "-d", "diracx"], + [*DOCKER_COMPOSE_CMD, "-f", docker_compose_file, "up", "-d", "diracx"], env=docker_compose_env, stdin=None, stdout=subStdout, @@ -591,7 +604,7 @@ class TestExit(typer.Exit): @contextmanager -def _gen_docker_compose(modules, *, diracx_dist_dir=None): +def _gen_docker_compose(modules, *, diracx_src_dir=None): # Load the docker compose configuration and mount the necessary volumes input_fn = Path(__file__).parent / "tests/CI/docker-compose.yml" docker_compose = yaml.safe_load(input_fn.read_text()) @@ -607,23 +620,50 @@ def _gen_docker_compose(modules, *, diracx_dist_dir=None): docker_compose["services"]["diracx-wait-for-db"]["volumes"].extend(volumes[:]) module_configs = _load_module_configs(modules) - if diracx_dist_dir is not None: - for container_name in [ - "dirac-client", - "dirac-pilot", - "dirac-server", - "diracx-init-cs", - "diracx-wait-for-db", - "diracx-init-db", - "diracx", - ]: + if diracx_src_dir is not None: + diracx_src_dir = str(Path(diracx_src_dir).absolute()) + + # Build wheels from the diracx source for the DIRAC containers + diracx_wheel_dir = Path(tempfile.mkdtemp(prefix="diracx-wheels-")) + typer.secho(f"Building diracx wheels in {diracx_wheel_dir}", fg=c.GREEN) + subprocess.run( + [ + sys.executable, + "-m", + "pip", + "wheel", + "--no-deps", + f"--wheel-dir={diracx_wheel_dir}", + *[f"{diracx_src_dir}/diracx-{pkg}" for pkg in ["core", "client", "cli"]], + ], + check=True, + ) + + # Mount wheels into DIRAC containers so installDIRACX can find them + for container_name in ["dirac-client", "dirac-pilot", "dirac-server"]: docker_compose["services"][container_name].setdefault("volumes", []).append( - f"{diracx_dist_dir}:/diracx_sources" + f"{diracx_wheel_dir}:/diracx_sources" ) docker_compose["services"][container_name].setdefault("environment", []).append( "DIRACX_CUSTOM_SOURCE_PREFIXES=/diracx_sources" ) + # Build diracx container images from source instead of pulling pre-built ones + diracx_build_services = { + "container-services": ["diracx", "diracx-init-keystore", "diracx-init-db"], + "container-client": ["diracx-init-cs"], + } + for pixi_env, container_names in diracx_build_services.items(): + for container_name in container_names: + service = docker_compose["services"][container_name] + del service["image"] + service.pop("pull_policy", None) + service["build"] = { + "context": diracx_src_dir, + "dockerfile": "containers/Dockerfile", + "args": {"PIXI_ENV": pixi_env}, + } + # Add any extension services for module_name, module_configs in module_configs.items(): for service_name, service_config in module_configs["extra-services"].items(): diff --git a/setup.cfg b/setup.cfg index f63eb25bcfb..7becf2289f7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,7 +30,7 @@ install_requires = certifi cwltool diraccfg - DIRACCommon==v9.0.20 + DIRACCommon diracx-client >=v0.0.1 diracx-core >=v0.0.1 diracx-cli >=v0.0.1 diff --git a/src/DIRAC/Core/Utilities/ElasticSearchDB.py b/src/DIRAC/Core/Utilities/ElasticSearchDB.py index 46322129abd..2c01fc2fb63 100644 --- a/src/DIRAC/Core/Utilities/ElasticSearchDB.py +++ b/src/DIRAC/Core/Utilities/ElasticSearchDB.py @@ -265,7 +265,7 @@ def getDoc(self, index: str, docID: str) -> dict: """ sLog.debug(f"Retrieving document {docID} in index {index}") try: - return S_OK(self.client.get(index, docID)["_source"]) + return S_OK(self.client.get(index=index, id=docID)["_source"]) except NotFoundError: sLog.warn("Could not find the document in index", index) return S_OK({}) @@ -282,7 +282,7 @@ def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]: sLog.debug(f"Retrieving documents {docIDs}") docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs] try: - response = self.client.mget({"docs": docs}) + response = self.client.mget(body={"docs": docs}) except RequestError as re: return S_ERROR(re) else: @@ -300,12 +300,12 @@ def updateDoc(self, index: str, docID: str, body) -> dict: """ sLog.debug(f"Updating document {docID} in index {index}") try: - self.client.update(index, docID, body) + self.client.update(index=index, id=docID, body=body) except ConflictError: # updates are rather "heavy" operations from ES point of view, needing seqNo to be updated. # Not ideal, but we just wait and retry. time.sleep(1) - self.client.update(index, docID, body, params={"retry_on_conflict": 3}) + self.client.update(index=index, id=docID, body=body, params={"retry_on_conflict": 3}) except RequestError as re: return S_ERROR(re) return S_OK() @@ -319,7 +319,7 @@ def deleteDoc(self, index: str, docID: str): """ sLog.debug(f"Deleting document {docID} in index {index}") try: - return S_OK(self.client.delete(index, docID)) + return S_OK(self.client.delete(index=index, id=docID)) except RequestError as re: return S_ERROR(re) except NotFoundError: @@ -334,7 +334,7 @@ def existsDoc(self, index: str, docID: str) -> bool: :param docID: document ID """ sLog.debug(f"Checking if document {docID} in index {index} exists") - return self.client.exists(index, docID) + return self.client.exists(index=index, id=docID) @ifConnected def _Search(self, indexname): @@ -365,7 +365,7 @@ def getIndexes(self, indexName=None): indexName = "" sLog.debug(f"Getting indices alias of {indexName}") # we only return indexes which belong to a specific prefix for example 'lhcb-production' or 'dirac-production etc. - return list(self.client.indices.get_alias(f"{indexName}*")) + return list(self.client.indices.get_alias(index=f"{indexName}*")) @ifConnected def getDocTypes(self, indexName): @@ -378,7 +378,7 @@ def getDocTypes(self, indexName): result = [] try: sLog.debug("Getting mappings for ", indexName) - result = self.client.indices.get_mapping(indexName) + result = self.client.indices.get_mapping(index=indexName) except Exception as e: # pylint: disable=broad-except sLog.exception() return S_ERROR(e) @@ -409,7 +409,7 @@ def existingIndex(self, indexName): """ sLog.debug(f"Checking existance of index {indexName}") try: - return S_OK(self.client.indices.exists(indexName)) + return S_OK(self.client.indices.exists(index=indexName)) except TransportError as e: sLog.exception() return S_ERROR(e) @@ -446,7 +446,7 @@ def deleteIndex(self, indexName): """ sLog.info("Deleting index", indexName) try: - retVal = self.client.indices.delete(indexName) + retVal = self.client.indices.delete(index=indexName) except NotFoundError: sLog.warn("Index does not exist", indexName) return S_OK("Nothing to delete") diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 823bac69b15..323249bba16 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -401,7 +401,7 @@ def __getWithRetry(self, dbName, totalRetries, retriesLeft): def __ping(self, conn): try: - conn.ping(True) + conn.ping() return True except Exception: return False diff --git a/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py b/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py index d51ab956769..66947900838 100644 --- a/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py +++ b/src/DIRAC/FrameworkSystem/Client/BundleDeliveryClient.py @@ -1,5 +1,5 @@ -""" Client for interacting with Framework/BundleDelivery service -""" +"""Client for interacting with Framework/BundleDelivery service""" + import getpass import os import tarfile @@ -143,9 +143,10 @@ def syncCAs(self): if "X509_CERT_DIR" in os.environ: X509_CERT_DIR = os.environ["X509_CERT_DIR"] del os.environ["X509_CERT_DIR"] + result = self.syncDir("CAs", Locations.getCAsLocation()) if X509_CERT_DIR: os.environ["X509_CERT_DIR"] = X509_CERT_DIR - return self.syncDir("CAs", Locations.getCAsLocation()) + return result def syncCRLs(self): """Synchronize CRLs @@ -156,9 +157,10 @@ def syncCRLs(self): if "X509_CERT_DIR" in os.environ: X509_CERT_DIR = os.environ["X509_CERT_DIR"] del os.environ["X509_CERT_DIR"] + result = self.syncDir("CRLs", Locations.getCAsLocation()) if X509_CERT_DIR: os.environ["X509_CERT_DIR"] = X509_CERT_DIR - return self.syncDir("CRLs", Locations.getCAsLocation()) + return result def getCAs(self): """This method can be used to create the CAs. If the file can not be created, diff --git a/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py b/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py index 46ace20e700..362904eee3d 100644 --- a/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py +++ b/src/DIRAC/FrameworkSystem/Service/BundleDeliveryHandler.py @@ -1,9 +1,9 @@ -""" Handler for CAs + CRLs bundles -""" +"""Handler for CAs + CRLs bundles""" import io import os import tarfile +from pathlib import Path from DIRAC import S_ERROR, S_OK, gConfig, gLogger from DIRAC.Core.DISET.RequestHandler import RequestHandler @@ -66,12 +66,15 @@ def updateBundles(self): buffer_ = io.BytesIO() filesToBundle = sorted(File.getGlobbedFiles(bundlePaths)) if filesToBundle: - commonPath = os.path.commonprefix(filesToBundle) - commonEnd = len(commonPath) - gLogger.info(f"Bundle will have {len(filesToBundle)} files with common path {commonPath}") + paths = [Path(f) for f in filesToBundle] + # Path.parents is path-component-aware, unlike os.path.commonprefix + commonParent = ( + Path(os.path.commonpath(paths)).parent if len(paths) == 1 else Path(os.path.commonpath(paths)) + ) + gLogger.info(f"Bundle will have {len(filesToBundle)} files with common path {commonParent}") with tarfile.open("dummy", "w:gz", buffer_) as tarBuffer: - for filePath in filesToBundle: - tarBuffer.add(filePath, filePath[commonEnd:]) + for p in paths: + tarBuffer.add(str(p), str(p.relative_to(commonParent))) zippedData = buffer_.getvalue() buffer_.close() hash_ = File.getMD5ForFiles(filesToBundle) diff --git a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py index b8f7b5bf1ce..7e77e0d55ea 100644 --- a/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py +++ b/src/DIRAC/TransformationSystem/Agent/TransformationCleaningAgent.py @@ -7,6 +7,7 @@ :caption: TransformationCleaningAgent options """ + # # imports import ast import errno @@ -144,10 +145,13 @@ def initialize(self): return result self.taskQueueDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index d8287c07dd4..f68e47782df 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -1,4 +1,4 @@ -""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. +"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle. This agent will take care of: - removing all jobs that are in status JobStatus.DELETED @@ -22,6 +22,7 @@ than 0. """ + import datetime import os @@ -30,14 +31,13 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.Utilities import TimeUtilities +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.RequestManagementSystem.Client.File import File from DIRAC.RequestManagementSystem.Client.Operation import Operation from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.RequestManagementSystem.Client.Request import Request from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB -from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters @@ -54,7 +54,10 @@ def __init__(self, *args, **kwargs): # clients self.jobDB = None + self.taskQueueDB = None + self.pilotAgentsDB = None self.sandboxDB = None + self.storageManagementDB = None self.maxJobsAtOnce = 500 self.prodTypes = [] @@ -66,8 +69,33 @@ def __init__(self, *args, **kwargs): def initialize(self): """Sets defaults""" - self.jobDB = JobDB() - self.sandboxDB = SandboxMetadataDB() + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + self.jobDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") + if not result["OK"]: + return result + self.taskQueueDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + self.pilotAgentsDB = result["Value"]() + + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.SandboxMetadataDB", "SandboxMetadataDB") + if not result["OK"]: + return result + self.sandboxDB = result["Value"]() + + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass agentTSTypes = self.am_getOption("ProductionTypes", []) if agentTSTypes: @@ -238,7 +266,14 @@ def _deleteRemoveJobs(self, jobList, remove=False): wmsClient = WMSClient(useCertificates=True, delegatedDN=res["Value"][0], delegatedGroup=ownerGroup) result = wmsClient.removeJob(jobsList) else: - result = kill_delete_jobs(RIGHT_DELETE, jobsList) + result = kill_delete_jobs( + RIGHT_DELETE, + jobsList, + jobdb=self.jobDB, + taskqueuedb=self.taskQueueDB, + pilotagentsdb=self.pilotAgentsDB, + storagemanagementdb=self.storageManagementDB, + ) if not result["OK"]: self.log.error( f"Could not {'remove' if remove else 'delete'} jobs", diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index 5ddf97e5f45..b193e702475 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -72,10 +72,13 @@ def initialize(self): return result self.pilotAgentsDB = result["Value"]() - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - self.storageManagementDB = result["Value"]() + try: + result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") + if not result["OK"]: + return result + self.storageManagementDB = result["Value"]() + except RuntimeError: + pass # getting parameters diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py index ae098c9f756..ef53fd61e93 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py @@ -1,5 +1,5 @@ -""" Test class for Job Cleaning Agent -""" +"""Test class for Job Cleaning Agent""" + from unittest.mock import MagicMock import pytest @@ -16,6 +16,9 @@ mockNone = MagicMock() mockNone.return_value = None mockJMC = MagicMock() +mockJobDB = MagicMock() +mockJobDB.getDistinctJobAttributes = mockReply +mockJobDB.selectJobs = mockReply @pytest.fixture @@ -27,16 +30,21 @@ def jca(mocker): create=True, ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) + + def mock_load_object(module_path, class_name): + mocks = { + "JobDB": mockJobDB, + "TaskQueueDB": MagicMock(), + "PilotAgentsDB": MagicMock(), + "SandboxMetadataDB": MagicMock(), + "StorageManagementDB": MagicMock(), + } + return {"OK": True, "Value": lambda: mocks[class_name]} + mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply - ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) - mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB.__init__", side_effect=mockNone + "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject", + side_effect=mock_load_object, ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) - jca = JobCleaningAgent() jca.log = gLogger jca.log.setLevel("DEBUG") @@ -128,15 +136,28 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.SandboxMetadataDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getDNForUsername", return_value=S_OK(["/bih/boh/DN"]) ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.getJobParameters", return_value=params) + def mock_load_object(module_path, class_name): + mocks = { + "JobDB": MagicMock(), + "TaskQueueDB": MagicMock(), + "PilotAgentsDB": MagicMock(), + "SandboxMetadataDB": MagicMock(), + "StorageManagementDB": MagicMock(), + } + return {"OK": True, "Value": lambda: mocks[class_name]} + + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ObjectLoader.loadObject", + side_effect=mock_load_object, + ) jobCleaningAgent = JobCleaningAgent() + jobCleaningAgent.log = gLogger jobCleaningAgent.log.setLevel("DEBUG") jobCleaningAgent._AgentModule__configDefaults = mockAM diff --git a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py index 8af19416d1f..30e695e5334 100644 --- a/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/StatusUtils.py @@ -1,6 +1,6 @@ from DIRAC import S_ERROR, S_OK, gLogger -from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE, RIGHT_KILL from DIRAC.WorkloadManagementSystem.Utilities.jobAdministration import _filterJobStateTransition @@ -96,11 +96,6 @@ def kill_delete_jobs( if not result["OK"]: return result pilotagentsdb = result["Value"]() - if storagemanagementdb is None: - result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") - if not result["OK"]: - return result - storagemanagementdb = result["Value"]() badIDs = [] @@ -133,6 +128,14 @@ def kill_delete_jobs( stagingJobList = [jobID for jobID, sDict in jobStates.items() if sDict["Status"] == JobStatus.STAGING] if stagingJobList: + if storagemanagementdb is None: + result = ObjectLoader().loadObject( + "StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB" + ) + if not result["OK"]: + return result + storagemanagementdb = result["Value"]() + gLogger.info("Going to send killing signal to stager as well!") result = storagemanagementdb.killTasksBySourceTaskID(stagingJobList) if not result["OK"]: diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 0f1d82d4ee1..c2d39ad716b 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -1,14 +1,15 @@ -""" JobManagerHandler is the implementation of the JobManager service - in the DISET framework +"""JobManagerHandler is the implementation of the JobManager service +in the DISET framework - The following methods are available in the Service interface +The following methods are available in the Service interface - submitJob() - rescheduleJob() - deleteJob() - killJob() +submitJob() +rescheduleJob() +deleteJob() +killJob() """ + from pydantic import ValidationError from DIRAC import S_ERROR, S_OK @@ -22,6 +23,7 @@ from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.WorkloadManagementSystem.Client import JobStatus +from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Service.JobPolicy import ( RIGHT_DELETE, RIGHT_KILL, @@ -30,7 +32,6 @@ RIGHT_SUBMIT, JobPolicy, ) -from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs @@ -44,34 +45,33 @@ class JobManagerHandlerMixin: @classmethod def initializeHandler(cls, serviceInfoDict): """Initialization of DB objects and OptimizationMind""" - try: - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") - if not result["OK"]: - return result - cls.jobDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobDB", "JobDB") + if not result["OK"]: + return result + cls.jobDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") - if not result["OK"]: - return result - cls.jobLoggingDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobLoggingDB", "JobLoggingDB") + if not result["OK"]: + return result + cls.jobLoggingDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") - if not result["OK"]: - return result - cls.taskQueueDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") + if not result["OK"]: + return result + cls.taskQueueDB = result["Value"](parentLogger=cls.log) - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") - if not result["OK"]: - return result - cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.PilotAgentsDB", "PilotAgentsDB") + if not result["OK"]: + return result + cls.pilotAgentsDB = result["Value"](parentLogger=cls.log) + try: result = ObjectLoader().loadObject("StorageManagementSystem.DB.StorageManagementDB", "StorageManagementDB") if not result["OK"]: return result cls.storageManagementDB = result["Value"](parentLogger=cls.log) - - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp!r}") + except RuntimeError: + cls.storageManagementDB = None cls.msgClient = MessageClient("WorkloadManagement/OptimizationMind") result = cls.msgClient.connect(JobManager=True) diff --git a/tests/CI/docker-compose.yml b/tests/CI/docker-compose.yml index 34c49700878..845cceb9829 100644 --- a/tests/CI/docker-compose.yml +++ b/tests/CI/docker-compose.yml @@ -1,6 +1,8 @@ volumes: # Volume used to store the certificates of dirac certs_data: + # Volume used to store the crls of dirac + crls_data: # Volume used to store the config of diracx diracx-cs-store: # Volume used to store the pair of keys to sign the tokens @@ -18,7 +20,13 @@ services: ports: - 3306:3306 healthcheck: - test: ["CMD", "sh", "-c", "${MYSQL_ADMIN_COMMAND} ping -h localhost > /tmp/health.log 2>&1;"] + test: + [ + "CMD", + "sh", + "-c", + "${MYSQL_ADMIN_COMMAND} ping -h localhost > /tmp/health.log 2>&1;", + ] timeout: 20s retries: 10 start_period: 60s @@ -33,7 +41,8 @@ services: - 9200:9200 env_file: "${ES_VER}.env" healthcheck: - test: ["CMD", "curl", "-f", "-u", "elastic:changeme", "http://localhost:9200"] + test: + ["CMD", "curl", "-f", "-u", "elastic:changeme", "http://localhost:9200"] interval: 5s timeout: 2s retries: 15 @@ -53,7 +62,13 @@ services: depends_on: - iam-init-keystore healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8080/.well-known/openid-configuration"] + test: + [ + "CMD", + "curl", + "-f", + "http://localhost:8080/.well-known/openid-configuration", + ] interval: 5s timeout: 2s retries: 15 @@ -116,6 +131,7 @@ services: container_name: dirac-init-certificates volumes: - certs_data:/ca/certs/ + - crls_data:/ca/crl/ entrypoint: | /entrypoint.sh pull_policy: always @@ -146,6 +162,7 @@ services: nofile: 8192 volumes: - certs_data:/ca/certs + - crls_data:/ca/crl/ - diracx-cs-store:/cs_store - diracx-key-store:/keystore environment: @@ -154,7 +171,6 @@ services: command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround. pull_policy: always - dirac-client: platform: linux/amd64 image: ${CI_REGISTRY_IMAGE}/${HOST_OS}-dirac @@ -165,6 +181,7 @@ services: - dirac-server volumes: - certs_data:/ca/certs + - crls_data:/ca/crl/ ulimits: nofile: 8192 command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround. @@ -180,6 +197,7 @@ services: - dirac-server volumes: - certs_data:/ca/certs + - crls_data:/ca/crl/ - type: bind source: ${CVMFS_DIR} target: /cvmfs @@ -195,7 +213,6 @@ services: start_period: 60s command: ["sleep", "infinity"] # This is necessary because of the issue described in https://github.com/moby/moby/issues/42275. What is added here is a hack/workaround. - diracx-chmod: platform: linux/amd64 image: ghcr.io/diracgrid/diracx/secret-generation:latest @@ -210,7 +227,6 @@ services: bash -xc 'chmod -R o=u /keystore && chmod -R o=u /cs_store' pull_policy: always - diracx-init-keystore: platform: linux/amd64 image: ghcr.io/diracgrid/diracx/services:dev @@ -255,7 +271,15 @@ services: environment: - DIRACX_DB_URL_AUTHDB=mysql+aiomysql://Dirac:Dirac@mysql/DiracXAuthDB entrypoint: | - /entrypoint.sh bash -xc 'micromamba install --yes -c conda-forge mysql-client && mysql -h mysql -u root --password=password -e "CREATE DATABASE DiracXAuthDB" && mysql -h mysql -u root --password=password -e "GRANT SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE ON DiracXAuthDB.* TO Dirac@'"'"'%'"'"'" && python -m diracx.db init-sql && python -m diracx.db init-os' + /entrypoint.sh bash -xc 'python3 -c " + import pymysql + db = pymysql.connect(host=\"mysql\", user=\"root\", password=\"password\") + c = db.cursor() + c.execute(\"CREATE USER IF NOT EXISTS Dirac@\\x27%%\\x27 IDENTIFIED BY \\x27Dirac\\x27\") + c.execute(\"CREATE DATABASE DiracXAuthDB\") + c.execute(\"GRANT SELECT,INSERT,LOCK TABLES,UPDATE,DELETE,CREATE,DROP,ALTER,REFERENCES,CREATE VIEW,SHOW VIEW,INDEX,TRIGGER,ALTER ROUTINE,CREATE ROUTINE ON DiracXAuthDB.* TO Dirac@\\x27%%\\x27\") + db.commit() + " && python -m diracx.db init-sql && python -m diracx.db init-os' pull_policy: always diracx: @@ -300,7 +324,14 @@ services: /entrypoint.sh bash -xc 'uvicorn --factory diracx.routers:create_app --host=0.0.0.0' healthcheck: - test: ["CMD", "/opt/conda/bin/python", "-c", 'import requests; requests.get("http://localhost:8000/.well-known/openid-configuration").raise_for_status()'] + test: + [ + "CMD", + "/entrypoint.sh", + "python", + "-c", + "import requests; requests.get('http://localhost:8000/.well-known/openid-configuration').raise_for_status()", + ] interval: 5s timeout: 2s retries: 15 diff --git a/tests/CI/run_pilot.sh b/tests/CI/run_pilot.sh index f7827b8503f..a94ecce1b41 100755 --- a/tests/CI/run_pilot.sh +++ b/tests/CI/run_pilot.sh @@ -23,22 +23,27 @@ mkdir -p /home/dirac/etc/grid-security/vomsdir mkdir -p /home/dirac/etc/grid-security/vomses cp /ca/certs/ca.cert.pem /home/dirac/etc/grid-security/certificates +cp /ca/certs/ca.crl.pem /home/dirac/etc/grid-security/certificates touch /home/dirac/etc/grid-security/vomsdir/vomsdir touch /home/dirac/etc/grid-security/vomses/vomses +# Generate the hash link file required by openSSL to index CA certificates +caHash=$(openssl x509 -in /home/dirac/etc/grid-security/certificates/ca.cert.pem -noout -hash) +ln -s ca.cert.pem "/home/dirac/etc/grid-security/certificates/$caHash.0" +tar --create --file "/home/dirac/etc/grid-security/certificates/$caHash.r0" --gzip /home/dirac/etc/grid-security/certificates/ca.crl.pem # Copy over the pilot proxy cp /ca/certs/pilot_proxy /tmp/x509up_u$UID eval "${PILOT_DOWNLOAD_COMMAND}" -echo "${PILOT_JSON}" > pilot.json -jq < pilot.json +echo "${PILOT_JSON}" >pilot.json +jq /dev/null; then +if command -v python &>/dev/null; then py='python' -elif command -v python3 &> /dev/null; then +elif command -v python3 &>/dev/null; then py='python3' -elif command -v python2 &> /dev/null; then +elif command -v python2 &>/dev/null; then py='python2' fi diff --git a/tests/Jenkins/dirac_ci.sh b/tests/Jenkins/dirac_ci.sh index 87e67db2b66..32787e15bc0 100644 --- a/tests/Jenkins/dirac_ci.sh +++ b/tests/Jenkins/dirac_ci.sh @@ -117,8 +117,9 @@ installSite() { echo "==> CAs and certificates" - # Copy the CA to the list of trusted CA + # Copy the CA and CRL to the list of trusted CA cp "/ca/certs/ca.cert.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/" + cp "/ca/certs/ca.crl.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/" # Copy the cert and host key to the certificates directory cp /ca/certs/hostcert.pem "${SERVERINSTALLDIR}/diracos/etc/grid-security/" @@ -130,6 +131,7 @@ installSite() { # because otherwise the BundleDeliveryClient will send the full path, which # will be wrong on the client ln -s "ca.cert.pem" "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/$caHash.0" + tar --create --file "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/$caHash.r0" --gzip "${SERVERINSTALLDIR}/diracos/etc/grid-security/certificates/ca.crl.pem" rm -rf "${SERVERINSTALLDIR}/etc" ln -s "${SERVERINSTALLDIR}/diracos/etc" "${SERVERINSTALLDIR}/etc"