Skip to content

Commit fdd1fc8

Browse files
committed
refactor: migrate getNewStatus and getStartAndEndTime to DIRACCommon
Move getNewStatus and getStartAndEndTime functions from DIRAC JobStatusUtility to DIRACCommon as stateless utility functions. The original DIRAC module now imports these functions from DIRACCommon. The migrated functions intentionally retain references to undefined dependencies (TimeUtilities, JobStatus) as part of the stateless utility migration strategy.
1 parent 0a4f397 commit fdd1fc8

File tree

2 files changed

+92
-63
lines changed

2 files changed

+92
-63
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""Stateless job status utility functions"""
2+
3+
from __future__ import annotations
4+
5+
from datetime import datetime
6+
from typing import Any
7+
8+
from DIRACCommon.Core.Utilities.ReturnValues import S_OK, S_ERROR
9+
10+
11+
def getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict):
12+
"""Get start and end times from job status updates
13+
14+
:param startTime: current start time
15+
:param endTime: current end time
16+
:param updateTimes: list of update times
17+
:param timeStamps: list of (timestamp, status) tuples
18+
:param statusDict: dictionary mapping update times to status dictionaries
19+
:return: tuple of (newStartTime, newEndTime)
20+
"""
21+
newStat = ""
22+
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
23+
for ts, st in timeStamps:
24+
if firstUpdate >= ts:
25+
newStat = st
26+
# Pick up start and end times from all updates
27+
for updTime in updateTimes:
28+
sDict = statusDict[updTime]
29+
newStat = sDict.get("Status", newStat)
30+
31+
if not startTime and newStat == JobStatus.RUNNING:
32+
# Pick up the start date when the job starts running if not existing
33+
startTime = updTime
34+
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
35+
# Pick up the end time when the job is in a final status
36+
endTime = updTime
37+
38+
return startTime, endTime
39+
40+
41+
def getNewStatus(
42+
jobID: int,
43+
updateTimes: list[datetime],
44+
lastTime: datetime,
45+
statusDict: dict[datetime, Any],
46+
currentStatus,
47+
force: bool,
48+
log,
49+
):
50+
"""Get new job status from status updates
51+
52+
:param jobID: job ID
53+
:param updateTimes: list of update times
54+
:param lastTime: last update time
55+
:param statusDict: dictionary mapping update times to status dictionaries
56+
:param currentStatus: current job status
57+
:param force: whether to force status update without state machine validation
58+
:param log: logger object
59+
:return: S_OK((status, minor, application)) or S_ERROR
60+
"""
61+
status = ""
62+
minor = ""
63+
application = ""
64+
# Get the last status values looping on the most recent upupdateTimes in chronological order
65+
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
66+
sDict = statusDict[updTime]
67+
log.debug(f"\tTime {updTime} - Statuses {str(sDict)}")
68+
status = sDict.get("Status", currentStatus)
69+
# evaluate the state machine if the status is changing
70+
if not force and status != currentStatus:
71+
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
72+
if not res["OK"]:
73+
return res
74+
newStat = res["Value"]
75+
# If the JobsStateMachine does not accept the candidate, don't update
76+
if newStat != status:
77+
# keeping the same status
78+
log.error(
79+
f"Job Status Error: {jobID} can't move from {currentStatus} to {status}: using {newStat}",
80+
)
81+
status = newStat
82+
sDict["Status"] = newStat
83+
# Change the source to indicate this is not what was requested
84+
source = sDict.get("Source", "")
85+
sDict["Source"] = source + "(SM)"
86+
# at this stage status == newStat. Set currentStatus to this new status
87+
currentStatus = newStat
88+
89+
minor = sDict.get("MinorStatus", minor)
90+
application = sDict.get("ApplicationStatus", application)
91+
return S_OK((status, minor, application))

src/DIRAC/WorkloadManagementSystem/Utilities/JobStatusUtility.py

Lines changed: 1 addition & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from DIRAC.Core.Utilities import TimeUtilities
1010
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1111
from DIRAC.WorkloadManagementSystem.Client import JobStatus
12+
from DIRACCommon.WorkloadManagementSystem.Utilities.JobStatusUtility import getStartAndEndTime, getNewStatus
1213

1314
if TYPE_CHECKING:
1415
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB
@@ -180,66 +181,3 @@ def setJobStatusBulk(self, jobID: int, statusDict: dict, force: bool = False):
180181
return result
181182

182183
return S_OK((attrNames, attrValues))
183-
184-
185-
def getStartAndEndTime(startTime, endTime, updateTimes, timeStamps, statusDict):
186-
newStat = ""
187-
firstUpdate = TimeUtilities.toEpoch(TimeUtilities.fromString(updateTimes[0]))
188-
for ts, st in timeStamps:
189-
if firstUpdate >= ts:
190-
newStat = st
191-
# Pick up start and end times from all updates
192-
for updTime in updateTimes:
193-
sDict = statusDict[updTime]
194-
newStat = sDict.get("Status", newStat)
195-
196-
if not startTime and newStat == JobStatus.RUNNING:
197-
# Pick up the start date when the job starts running if not existing
198-
startTime = updTime
199-
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
200-
# Pick up the end time when the job is in a final status
201-
endTime = updTime
202-
203-
return startTime, endTime
204-
205-
206-
def getNewStatus(
207-
jobID: int,
208-
updateTimes: list[datetime],
209-
lastTime: datetime,
210-
statusDict: dict[datetime, Any],
211-
currentStatus,
212-
force: bool,
213-
log,
214-
):
215-
status = ""
216-
minor = ""
217-
application = ""
218-
# Get the last status values looping on the most recent upupdateTimes in chronological order
219-
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
220-
sDict = statusDict[updTime]
221-
log.debug(f"\tTime {updTime} - Statuses {str(sDict)}")
222-
status = sDict.get("Status", currentStatus)
223-
# evaluate the state machine if the status is changing
224-
if not force and status != currentStatus:
225-
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
226-
if not res["OK"]:
227-
return res
228-
newStat = res["Value"]
229-
# If the JobsStateMachine does not accept the candidate, don't update
230-
if newStat != status:
231-
# keeping the same status
232-
log.error(
233-
f"Job Status Error: {jobID} can't move from {currentStatus} to {status}: using {newStat}",
234-
)
235-
status = newStat
236-
sDict["Status"] = newStat
237-
# Change the source to indicate this is not what was requested
238-
source = sDict.get("Source", "")
239-
sDict["Source"] = source + "(SM)"
240-
# at this stage status == newStat. Set currentStatus to this new status
241-
currentStatus = newStat
242-
243-
minor = sDict.get("MinorStatus", minor)
244-
application = sDict.get("ApplicationStatus", application)
245-
return S_OK((status, minor, application))

0 commit comments

Comments
 (0)