Skip to content

Commit beaeefb

Browse files
committed
Merge branch 'main' into resource_requirements
2 parents cd29c0f + 4cf8c29 commit beaeefb

16 files changed

Lines changed: 390 additions & 103 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ cython_debug/
169169
workernode
170170
sandboxstore
171171
filecatalog
172+
status
172173

173174
# Generated schemas - these are build artifacts generated by CI/CD
174175
generated_schemas/*

src/dirac_cwl/execution_hooks/core.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pydantic import BaseModel, ConfigDict, Field, PrivateAttr
3333

3434
from dirac_cwl.commands import PostProcessCommand, PreProcessCommand
35-
from dirac_cwl.data_management_mocks.data_manager import MockDataManager
35+
from dirac_cwl.mocks.data_manager import MockDataManager
3636

3737
logger = logging.getLogger(__name__)
3838

@@ -105,7 +105,7 @@ def name(cls) -> str:
105105
"""Auto-derive hook plugin identifier from class name."""
106106
return cls.__name__
107107

108-
def store_output(
108+
async def store_output(
109109
self,
110110
outputs: dict[str, str | Path | Sequence[str | Path]],
111111
**kwargs: Any,
@@ -119,14 +119,13 @@ def store_output(
119119
Additional keyword arguments for extensibility.
120120
"""
121121
for output_name, src_path in outputs.items():
122-
logger.info("Storing output %s, with source %s", output_name, src_path)
123-
124122
if not src_path:
125123
raise RuntimeError(f"src_path parameter required for filesystem storage of {output_name}")
126124

127125
lfn = self.output_paths.get(output_name, None)
128126

129127
if lfn:
128+
logger.info("Storing output %s, with source %s", output_name, src_path)
130129
if isinstance(src_path, str) or isinstance(src_path, Path):
131130
src_path = [src_path]
132131
for src in src_path:

src/dirac_cwl/job/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ def run_job(job_id: int, job: JobModel, logger: logging.Logger) -> bool:
231231
"-m",
232232
"dirac_cwl.job.job_wrapper_template",
233233
str(job_json_path),
234+
str(job_id),
234235
],
235236
capture_output=True,
236237
text=True,

src/dirac_cwl/job/job_report.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""All classes related to job reports."""
2+
3+
from datetime import datetime, timezone
4+
from enum import StrEnum
5+
6+
from diracx.client.aio import AsyncDiracClient
7+
from diracx.core.models.job import JobStatus, JobStatusUpdate
8+
9+
10+
class JobMinorStatus(StrEnum):
11+
"""List of all available job minor statuses."""
12+
13+
APPLICATION = "Executing Payload"
14+
APP_ERRORS = "Application Finished With Errors"
15+
# APP_NOT_FOUND = "Application not found"
16+
APP_SUCCESS = "Application Finished Successfully"
17+
# APP_THREAD_FAILED = "Application thread failed"
18+
# APP_THREAD_NOT_COMPLETE = "Application thread did not complete"
19+
DOWNLOADING_INPUT_SANDBOX = "Downloading InputSandbox"
20+
# DOWNLOADING_INPUT_SANDBOX_LFN = "Downloading InputSandbox LFN(s)"
21+
# EXCEPTION_DURING_EXEC = "Exception During Execution"
22+
EXEC_COMPLETE = "Execution Complete"
23+
FAILED_DOWNLOADING_INPUT_SANDBOX = "Failed Downloading InputSandbox"
24+
# FAILED_DOWNLOADING_INPUT_SANDBOX_LFN = "Failed Downloading InputSandbox LFN(s)"
25+
# FAILED_SENDING_REQUESTS = "Failed sending requests"
26+
# GOING_RESCHEDULE = "Going to reschedule job"
27+
# ILLEGAL_JOB_JDL = "Illegal Job JDL"
28+
INPUT_DATA_RESOLUTION = "Resolving Input Data"
29+
# INPUT_NOT_AVAILABLE = "Input Data Not Available"
30+
# JOB_EXCEEDED_CPU = "Job has reached the CPU limit of the queue"
31+
# JOB_EXCEEDED_WALL_CLOCK = "Job has exceeded maximum wall clock time"
32+
JOB_INITIALIZATION = "Initializing Job"
33+
# JOB_INSUFFICIENT_DISK = "Job has insufficient disk space to continue"
34+
# JOB_WRAPPER_EXECUTION = "JobWrapper execution"
35+
# JOB_WRAPPER_INITIALIZATION = "Job Wrapper Initialization"
36+
# MARKED_FOR_TERMINATION = "Marked for termination"
37+
# NO_CANDIDATE_SITE_FOUND = "No candidate sites available"
38+
OUTPUT_DATA_UPLOADED = "Output Data Uploaded"
39+
OUTPUT_SANDBOX_UPLOADED = "Output Sandbox Uploaded"
40+
# PENDING_REQUESTS = "Pending Requests"
41+
# PILOT_AGENT_SUBMISSION = "Pilot Agent Submission"
42+
# RECEIVED_KILL_SIGNAL = "Received Kill signal"
43+
# REQUESTS_DONE = "Requests done"
44+
# RESCHEDULED = "Job Rescheduled"
45+
RESOLVING_OUTPUT_SANDBOX = "Resolving Output Sandbox"
46+
# STALLED_PILOT_NOT_RUNNING = "Job stalled: pilot not running"
47+
# UPLOADING_JOB_OUTPUTS = "Uploading Outputs"
48+
UPLOADING_OUTPUT_DATA = "Uploading Output Data"
49+
UPLOADING_OUTPUT_SANDBOX = "Uploading Output Sandbox"
50+
# WATCHDOG_STALLED = "Watchdog identified this job as stalled"
51+
52+
53+
class JobReport:
54+
"""JobReport."""
55+
56+
def __init__(self, job_id: int, source: str, client: AsyncDiracClient) -> None:
57+
"""
58+
Initialize Job Report.
59+
60+
:param job_id: the job ID
61+
:param source: source for the reports
62+
:param client: DiracX client instance
63+
"""
64+
self.job_status_info: dict[str, dict[str, str]] = {} # where job status updates are cumulated
65+
self.job_id = job_id
66+
self.source = source
67+
self._client = client
68+
69+
def set_job_status(
70+
self,
71+
status: JobStatus | None = None,
72+
minor_status: JobMinorStatus | None = None,
73+
application_status: str | None = None,
74+
) -> None:
75+
"""
76+
Add a new job status to the job report.
77+
78+
:param status: job status
79+
:param minor_status: job minor status
80+
:param application_status: application status
81+
"""
82+
timestamp = str(datetime.now(timezone.utc))
83+
# add job status record
84+
self.job_status_info.update(
85+
{
86+
timestamp: JobStatusUpdate(
87+
Status=status,
88+
MinorStatus=minor_status,
89+
ApplicationStatus=application_status,
90+
Source=self.source,
91+
).model_dump()
92+
}
93+
)
94+
95+
async def send_stored_status_info(self):
96+
"""Send all the accumulated job status information."""
97+
if not self.job_status_info:
98+
return
99+
body = {self.job_id: self.job_status_info}
100+
ret = await self._client.jobs.set_job_statuses(body)
101+
if ret.success:
102+
self.job_status_info = {}
103+
else:
104+
raise RuntimeError(f"Could not set job statuses: {ret}")
105+
106+
async def commit(self):
107+
"""Send all the accumulated information."""
108+
await self.send_stored_status_info()

0 commit comments

Comments
 (0)