diff --git a/.github/workflows/basic.yml b/.github/workflows/basic.yml index c78f5d98d52..23b2778cf6d 100644 --- a/.github/workflows/basic.yml +++ b/.github/workflows/basic.yml @@ -108,3 +108,49 @@ jobs: tests/Integration/WorkloadManagementSystem/Test_GenerateAndExecutePilotWrapper.py \ src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py \ src/DIRAC/Resources/Computing/BatchSystems/*.py + + diracx: + name: DiracX Unit Tests + runs-on: ubuntu-latest + steps: + - name: Checkout DIRAC + uses: actions/checkout@v4 + with: + fetch-depth: 0 + path: DIRAC + - name: Checkout DiracX + uses: actions/checkout@v4 + with: + fetch-depth: 0 + repository: DIRACGrid/diracx + path: diracx + - uses: prefix-dev/setup-pixi@v0.8.8 + with: + run-install: false + post-cleanup: false + - name: Apply workarounds + run: | + cd diracx + # Workaround for https://github.com/prefix-dev/pixi/issues/3762 + sed -i.bak 's@editable = true@editable = false@g' pixi.toml + rm pixi.toml.bak + # Add annotations to github actions + pixi add --no-install --pypi --feature diracx-core pytest-github-actions-annotate-failures + # Add the current DIRAC clone to the pixi.toml + pixi add --no-install --pypi --feature diracx-core 'DIRAC @ file://'$PWD'/../DIRAC' + # Show any changes + git diff + - uses: prefix-dev/setup-pixi@v0.8.8 + with: + cache: false + manifest-path: diracx/pixi.toml + environments: >- + diracx-core + diracx-db + diracx-logic + diracx-routers + diracx-client + diracx-api + diracx-cli + - name: Run pytest + run: cd diracx && pixi run pytest-diracx-all-one-by-one diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py index dd27a33dbc9..d9451ae467a 100755 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StalledJobAgent.py @@ -25,7 +25,7 @@ from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters -from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs +from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs class StalledJobAgent(AgentModule): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 0b09c2b197a..cce2d879129 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -34,7 +34,7 @@ ) from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength -from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs +from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs MAX_PARAMETRIC_JOBS = 20 diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py index 8b8e263a983..5bdc81014f6 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py @@ -9,9 +9,10 @@ from DIRAC.Core.Utilities import TimeUtilities from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.WorkloadManagementSystem.Client import JobStatus -from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB + +if TYPE_CHECKING: + from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB + from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB class JobStatusUtility: @@ -242,35 +243,3 @@ def getNewStatus( minor = sDict.get("MinorStatus", minor) application = sDict.get("ApplicationStatus", application) return S_OK((status, minor, application)) - - -def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict: - """Utility to reschedule jobs (not atomic, nor bulk) - Requires direct access to the JobDB and TaskQueueDB - - :param jobIDs: list of jobIDs - :param source: source of the reschedule - :return: S_OK/S_ERROR - :rtype: dict - - """ - - failedJobs = [] - - for jobID in jobIDs: - result = JobDB().rescheduleJob(jobID) - if not result["OK"]: - failedJobs.append(jobID) - continue - TaskQueueDB().deleteJob(jobID) - JobLoggingDB().addLoggingRecord( - result["JobID"], - status=result["Status"], - minorStatus=result["MinorStatus"], - applicationStatus="Unknown", - source=source, - ) - - if failedJobs: - return S_ERROR(f"Failed to reschedule jobs {failedJobs}") - return S_OK() diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py index d20476f768c..83f35fb5abf 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py @@ -4,9 +4,12 @@ import sys import json -from DIRAC import gLogger, S_OK +from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Utilities.File import mkDir from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging +from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB +from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB +from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB def createJobWrapper( @@ -113,3 +116,35 @@ def createJobWrapper( if rootLocation != wrapperPath: generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile)) return S_OK(generatedFiles) + + +def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict: + """Utility to reschedule jobs (not atomic, nor bulk) + Requires direct access to the JobDB and TaskQueueDB + + :param jobIDs: list of jobIDs + :param source: source of the reschedule + :return: S_OK/S_ERROR + :rtype: dict + + """ + + failedJobs = [] + + for jobID in jobIDs: + result = JobDB().rescheduleJob(jobID) + if not result["OK"]: + failedJobs.append(jobID) + continue + TaskQueueDB().deleteJob(jobID) + JobLoggingDB().addLoggingRecord( + result["JobID"], + status=result["Status"], + minorStatus=result["MinorStatus"], + applicationStatus="Unknown", + source=source, + ) + + if failedJobs: + return S_ERROR(f"Failed to reschedule jobs {failedJobs}") + return S_OK()