Skip to content

Commit fc2c815

Browse files
authored
Merge pull request #8119 from chrisburr/diracx-client-fixes
JobStateUpdate legacy adapter
2 parents 576e146 + fde5b3d commit fc2c815

3 files changed

Lines changed: 298 additions & 45 deletions

File tree

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
import importlib
1+
from __future__ import annotations
2+
23
import functools
4+
import time
35
from datetime import datetime, timezone
4-
6+
from typing import TYPE_CHECKING
57

68
from DIRAC.Core.Security.DiracX import DiracXClient, FutureClient, addRPCStub
7-
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
9+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue, returnValueOrRaise
810
from DIRAC.Core.Utilities.TimeUtilities import fromString
11+
from DIRAC.WorkloadManagementSystem.Client import JobStatus
12+
13+
if TYPE_CHECKING:
14+
from diracx.client.models import JobCommand
915

1016

1117
def stripValueIfOK(func):
@@ -21,65 +27,85 @@ def stripValueIfOK(func):
2127
def wrapper(*args, **kwargs):
2228
result = func(*args, **kwargs)
2329
if result.get("OK"):
24-
assert result.pop("Value") is None, "Value should be None if OK"
30+
if result.get("Value") is None:
31+
result.pop("Value")
2532
return result
2633

2734
return wrapper
2835

2936

3037
class JobStateUpdateClient(FutureClient):
31-
@stripValueIfOK
3238
@convertToReturnValue
3339
def sendHeartBeat(self, jobID: str | int, dynamicData: dict, staticData: dict):
34-
print("HACK: This is a no-op until we decide what to do")
40+
"""Send a heartbeat from a Job.
41+
42+
The behaviour of this function is not strictly the same as in legacy
43+
DIRAC. Most notably, in legacy DIRAC the heartbeat always overrides the
44+
job status to Running whereas in DiracX the job state machine is still
45+
respected. Additionally, DiracX updates the job logging information
46+
when status transitions occur as a result of the heartbeat.
47+
"""
48+
49+
with DiracXClient() as api:
50+
body = {jobID: dynamicData | staticData}
51+
if len(body[jobID]) != len(dynamicData) + len(staticData):
52+
raise NotImplementedError(f"Duplicate keys: {dynamicData=} {staticData=}")
53+
commands: list[JobCommand] = api.jobs.add_heartbeat(body)
54+
# Legacy DIRAC returns a dictionary of {command: arguments}
55+
result = {}
56+
for command in commands:
57+
if command.job_id != jobID:
58+
raise NotImplementedError(f"Job ID mismatch: {jobID=} {command.job_id=}")
59+
result[command.command] = command.arguments
60+
return result
3561

3662
@stripValueIfOK
3763
@convertToReturnValue
3864
def setJobApplicationStatus(self, jobID: str | int, appStatus: str, source: str = "Unknown"):
3965
statusDict = {
40-
"application_status": appStatus,
66+
"ApplicationStatus": appStatus,
4167
}
4268
if source:
4369
statusDict["Source"] = source
4470
with DiracXClient() as api:
45-
api.jobs.set_single_job_status(
46-
jobID,
47-
{datetime.now(tz=timezone.utc): statusDict},
71+
api.jobs.set_job_statuses(
72+
{jobID: {datetime.now(tz=timezone.utc): statusDict}},
4873
)
4974

5075
@stripValueIfOK
5176
@convertToReturnValue
5277
def setJobAttribute(self, jobID: str | int, attribute: str, value: str):
5378
with DiracXClient() as api:
5479
if attribute == "Status":
55-
api.jobs.set_single_job_status(
56-
jobID,
57-
{datetime.now(tz=timezone.utc): {"status": value}},
80+
api.jobs.set_job_statuses(
81+
{jobID: {datetime.now(tz=timezone.utc): {"Status": value}}},
5882
)
5983
else:
60-
api.jobs.set_single_job_properties(jobID, {attribute: value})
84+
api.jobs.patch_metadata({jobID: {attribute: value}})
6185

6286
@stripValueIfOK
6387
@convertToReturnValue
6488
def setJobFlag(self, jobID: str | int, flag: str):
6589
with DiracXClient() as api:
66-
api.jobs.set_single_job_properties(jobID, {flag: True})
90+
api.jobs.patch_metadata({jobID: {flag: True}})
6791

6892
@stripValueIfOK
6993
@convertToReturnValue
7094
def setJobParameter(self, jobID: str | int, name: str, value: str):
71-
print("HACK: This is a no-op until we decide what to do")
95+
with DiracXClient() as api:
96+
api.jobs.patch_metadata({jobID: {name: value}})
7297

7398
@stripValueIfOK
7499
@convertToReturnValue
75100
def setJobParameters(self, jobID: str | int, parameters: list):
76-
print("HACK: This is a no-op until we decide what to do")
101+
with DiracXClient() as api:
102+
api.jobs.patch_metadata({jobID: {k: v for k, v in parameters}})
77103

78104
@stripValueIfOK
79105
@convertToReturnValue
80106
def setJobSite(self, jobID: str | int, site: str):
81107
with DiracXClient() as api:
82-
api.jobs.set_single_job_properties(jobID, {"Site": site})
108+
api.jobs.patch_metadata({jobID: {"Site": site}})
83109

84110
@stripValueIfOK
85111
@convertToReturnValue
@@ -102,9 +128,8 @@ def setJobStatus(
102128
if datetime_ is None:
103129
datetime_ = datetime.utcnow()
104130
with DiracXClient() as api:
105-
api.jobs.set_single_job_status(
106-
jobID,
107-
{fromString(datetime_).replace(tzinfo=timezone.utc): statusDict},
131+
api.jobs.set_job_statuses(
132+
{jobID: {fromString(datetime_).replace(tzinfo=timezone.utc): statusDict}},
108133
force=force,
109134
)
110135

@@ -114,21 +139,50 @@ def setJobStatus(
114139
def setJobStatusBulk(self, jobID: str | int, statusDict: dict, force=False):
115140
statusDict = {fromString(k).replace(tzinfo=timezone.utc): v for k, v in statusDict.items()}
116141
with DiracXClient() as api:
117-
api.jobs.set_job_status_bulk(
142+
api.jobs.set_job_statuses(
118143
{jobID: statusDict},
119144
force=force,
120145
)
121146

122147
@stripValueIfOK
123148
@convertToReturnValue
124149
def setJobsParameter(self, jobsParameterDict: dict):
125-
print("HACK: This is a no-op until we decide what to do")
150+
with DiracXClient() as api:
151+
updates = {job_id: {k: v} for job_id, (k, v) in jobsParameterDict.items()}
152+
api.jobs.patch_metadata(updates)
126153

127154
@stripValueIfOK
128155
@convertToReturnValue
129156
def unsetJobFlag(self, jobID: str | int, flag: str):
130157
with DiracXClient() as api:
131-
api.jobs.set_single_job_properties(jobID, {flag: False})
158+
api.jobs.patch_metadata({jobID: {flag: False}})
132159

160+
@stripValueIfOK
161+
@convertToReturnValue
133162
def updateJobFromStager(self, jobID: str | int, status: str):
134-
raise NotImplementedError("TODO")
163+
if status == "Done":
164+
jobStatus = JobStatus.CHECKING
165+
minorStatus = "JobScheduling"
166+
else:
167+
jobStatus = None
168+
minorStatus = "Staging input files failed"
169+
170+
trials = 10
171+
query = [{"parameter": "JobID", "operator": "eq", "value": jobID}]
172+
with DiracXClient() as api:
173+
for i in range(trials):
174+
result = api.jobs.search(parameters=["Status"], search=query)
175+
if not result:
176+
return None
177+
if result[0]["Status"] == JobStatus.STAGING:
178+
break
179+
time.sleep(1)
180+
else:
181+
return f"Job is not in Staging after {trials} seconds"
182+
183+
retVal = self.setJobStatus(jobID, status=jobStatus, minorStatus=minorStatus, source="StagerSystem")
184+
# As there might not be a value (see stripValueIfOK), only call
185+
# returnValueOrRaise if the return value is not OK
186+
if not retVal["OK"]: # pylint: disable=unsubscriptable-object
187+
returnValueOrRaise(retVal)
188+
return None if i == 0 else f"Found job in Staging after {i} seconds"

0 commit comments

Comments
 (0)