Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,49 @@ jobs:
tests/Integration/WorkloadManagementSystem/Test_GenerateAndExecutePilotWrapper.py \
src/DIRAC/WorkloadManagementSystem/Utilities/PilotWrapper.py \
src/DIRAC/Resources/Computing/BatchSystems/*.py

diracx:
name: DiracX Unit Tests
runs-on: ubuntu-latest
steps:
- name: Checkout DIRAC
uses: actions/checkout@v4
with:
fetch-depth: 0
path: DIRAC
- name: Checkout DiracX
uses: actions/checkout@v4
with:
fetch-depth: 0
repository: DIRACGrid/diracx
path: diracx
- uses: prefix-dev/setup-pixi@v0.8.8
with:
run-install: false
post-cleanup: false
- name: Apply workarounds
run: |
cd diracx
# Workaround for https://github.com/prefix-dev/pixi/issues/3762
sed -i.bak 's@editable = true@editable = false@g' pixi.toml
rm pixi.toml.bak
# Add annotations to github actions
pixi add --no-install --pypi --feature diracx-core pytest-github-actions-annotate-failures
# Add the current DIRAC clone to the pixi.toml
pixi add --no-install --pypi --feature diracx-core 'DIRAC @ file://'$PWD'/../DIRAC'
# Show any changes
git diff
- uses: prefix-dev/setup-pixi@v0.8.8
with:
cache: false
manifest-path: diracx/pixi.toml
environments: >-
diracx-core
diracx-db
diracx-logic
diracx-routers
diracx-client
diracx-api
diracx-cli
- name: Run pytest
run: cd diracx && pixi run pytest-diracx-all-one-by-one
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs


class StalledJobAgent(AgentModule):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
)
from DIRAC.WorkloadManagementSystem.Utilities.JobModel import JobDescriptionModel
from DIRAC.WorkloadManagementSystem.Utilities.ParametricJob import generateParametricJobs, getParameterVectorLength
from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import rescheduleJobs
from DIRAC.WorkloadManagementSystem.Utilities.Utils import rescheduleJobs

MAX_PARAMETRIC_JOBS = 20

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
from DIRAC.Core.Utilities import TimeUtilities
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB

if TYPE_CHECKING:
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB


class JobStatusUtility:
Expand Down Expand Up @@ -242,35 +243,3 @@ def getNewStatus(
minor = sDict.get("MinorStatus", minor)
application = sDict.get("ApplicationStatus", application)
return S_OK((status, minor, application))


def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
"""Utility to reschedule jobs (not atomic, nor bulk)
Requires direct access to the JobDB and TaskQueueDB

:param jobIDs: list of jobIDs
:param source: source of the reschedule
:return: S_OK/S_ERROR
:rtype: dict

"""

failedJobs = []

for jobID in jobIDs:
result = JobDB().rescheduleJob(jobID)
if not result["OK"]:
failedJobs.append(jobID)
continue
TaskQueueDB().deleteJob(jobID)
JobLoggingDB().addLoggingRecord(
result["JobID"],
status=result["Status"],
minorStatus=result["MinorStatus"],
applicationStatus="Unknown",
source=source,
)

if failedJobs:
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
return S_OK()
37 changes: 36 additions & 1 deletion src/DIRAC/WorkloadManagementSystem/Utilities/Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import sys
import json

from DIRAC import gLogger, S_OK
from DIRAC import gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.File import mkDir
from DIRAC.FrameworkSystem.private.standardLogging.Logging import Logging
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB


def createJobWrapper(
Expand Down Expand Up @@ -113,3 +116,35 @@ def createJobWrapper(
if rootLocation != wrapperPath:
generatedFiles["JobExecutableRelocatedPath"] = os.path.join(rootLocation, os.path.basename(jobExeFile))
return S_OK(generatedFiles)


def rescheduleJobs(jobIDs: list[int], source: str = "") -> dict:
"""Utility to reschedule jobs (not atomic, nor bulk)
Requires direct access to the JobDB and TaskQueueDB

:param jobIDs: list of jobIDs
:param source: source of the reschedule
:return: S_OK/S_ERROR
:rtype: dict

"""

failedJobs = []

for jobID in jobIDs:
result = JobDB().rescheduleJob(jobID)
if not result["OK"]:
failedJobs.append(jobID)
continue
TaskQueueDB().deleteJob(jobID)
JobLoggingDB().addLoggingRecord(
result["JobID"],
status=result["Status"],
minorStatus=result["MinorStatus"],
applicationStatus="Unknown",
source=source,
)

if failedJobs:
return S_ERROR(f"Failed to reschedule jobs {failedJobs}")
return S_OK()
Loading