Skip to content

Commit 6894bb1

Browse files
authored
Merge pull request #97 from lsst/tickets/DM-52886
DM-52886: Update bps report function to aggregate panda task slices into task labels
2 parents 110c3fb + f0d77a3 commit 6894bb1

3 files changed

Lines changed: 292 additions & 135 deletions

File tree

doc/changes/DM-52866.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Update "bps report" function to aggregate panda task slices into task labels

python/lsst/ctrl/bps/panda/panda_service.py

Lines changed: 160 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,13 @@
4949
from lsst.ctrl.bps.panda.utils import (
5050
add_final_idds_work,
5151
add_idds_work,
52+
aggregate_by_basename,
5253
copy_files_for_distribution,
5354
create_idds_build_workflow,
55+
extract_taskname,
5456
get_idds_client,
5557
get_idds_result,
58+
idds_call_with_check,
5659
)
5760
from lsst.resources import ResourcePath
5861
from lsst.utils.timer import time_this
@@ -172,154 +175,177 @@ def report(
172175
return run_reports, message
173176

174177
idds_client = get_idds_client(self.config)
175-
ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=True)
176-
_LOG.debug("PanDA get workflow status returned = %s", str(ret))
177-
178-
request_status = ret[0]
179-
if request_status != 0:
180-
raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}")
178+
ret = idds_call_with_check(
179+
idds_client.get_requests,
180+
func_name="get workflow status",
181+
request_id=wms_workflow_id,
182+
with_detail=True,
183+
)
181184

182185
tasks = ret[1][1]
183186
if not tasks:
184187
message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id"
185-
else:
186-
head = tasks[0]
187-
wms_report = WmsRunReport(
188-
wms_id=str(head["request_id"]),
189-
operator=head["username"],
190-
project="",
191-
campaign="",
192-
payload="",
193-
run=head["name"],
194-
state=WmsStates.UNKNOWN,
195-
total_number_jobs=0,
196-
job_state_counts=dict.fromkeys(WmsStates, 0),
197-
job_summary={},
198-
run_summary="",
199-
exit_code_summary=[],
200-
)
188+
return run_reports, message
201189

202-
# The status of a task is taken from the first item of state_map.
203-
# The workflow is in status WmsStates.FAILED when:
204-
# All tasks have failed.
205-
# SubFinished tasks has jobs in
206-
# output_processed_files: Finished
207-
# output_failed_files: Failed
208-
# output_missing_files: Missing
209-
state_map = {
210-
"Finished": [WmsStates.SUCCEEDED],
211-
"SubFinished": [
212-
WmsStates.SUCCEEDED,
213-
WmsStates.FAILED,
214-
WmsStates.PRUNED,
215-
],
216-
"Transforming": [
217-
WmsStates.RUNNING,
218-
WmsStates.SUCCEEDED,
219-
WmsStates.FAILED,
220-
WmsStates.UNREADY,
221-
WmsStates.PRUNED,
222-
],
223-
"Failed": [WmsStates.FAILED, WmsStates.PRUNED],
224-
}
225-
226-
file_map = {
227-
WmsStates.SUCCEEDED: "output_processed_files",
228-
WmsStates.RUNNING: "output_processing_files",
229-
WmsStates.FAILED: "output_failed_files",
230-
WmsStates.UNREADY: "input_new_files",
231-
WmsStates.PRUNED: "output_missing_files",
232-
}
233-
234-
workflow_status = head["status"]["attributes"]["_name_"]
235-
if workflow_status in ["Finished", "SubFinished"]:
236-
wms_report.state = WmsStates.SUCCEEDED
237-
elif workflow_status in ["Failed", "Expired"]:
238-
wms_report.state = WmsStates.FAILED
239-
elif workflow_status in ["Cancelled"]:
240-
wms_report.state = WmsStates.DELETED
241-
elif workflow_status in ["Suspended"]:
242-
wms_report.state = WmsStates.HELD
243-
else:
244-
wms_report.state = WmsStates.RUNNING
245-
246-
try:
247-
tasks.sort(key=lambda x: x["transform_workload_id"])
248-
except Exception:
249-
tasks.sort(key=lambda x: x["transform_id"])
250-
251-
exit_codes_all = {}
252-
# Loop over all tasks data returned by idds_client
253-
for task in tasks:
254-
if task["transform_id"] is None:
255-
# Not created task (It happens because of an outer join
256-
# between requests table and transforms table).
257-
continue
258-
259-
exit_codes = []
260-
totaljobs = task["output_total_files"]
261-
wms_report.total_number_jobs += totaljobs
262-
tasklabel = task["transform_name"]
263-
tasklabel = re.sub(wms_report.run + "_", "", tasklabel)
264-
status = task["transform_status"]["attributes"]["_name_"]
265-
taskstatus = {}
266-
# if the state is failed, gather exit code information
267-
if status in ["SubFinished", "Failed"]:
268-
transform_workload_id = task["transform_workload_id"]
269-
if not (task["transform_name"] and task["transform_name"].startswith("build_task")):
270-
new_ret = idds_client.get_contents_output_ext(
271-
request_id=wms_workflow_id, workload_id=transform_workload_id
272-
)
273-
_LOG.debug(
274-
"PanDA get task %s detail returned = %s", transform_workload_id, str(new_ret)
275-
)
190+
# Create initial WmsRunReport
191+
head = tasks[0]
192+
wms_report = WmsRunReport(
193+
wms_id=str(head["request_id"]),
194+
operator=head["username"],
195+
project="",
196+
campaign="",
197+
payload="",
198+
run=head["name"],
199+
state=WmsStates.UNKNOWN,
200+
total_number_jobs=0,
201+
job_state_counts=dict.fromkeys(WmsStates, 0),
202+
job_summary={},
203+
run_summary="",
204+
exit_code_summary={},
205+
)
276206

277-
request_status = new_ret[0]
278-
if request_status != 0:
279-
raise RuntimeError(
280-
f"Error to get workflow status: {new_ret} for id: {wms_workflow_id}"
281-
)
207+
# Define workflow status mapping
208+
workflow_status = head["status"]["attributes"]["_name_"]
209+
if workflow_status in ("Finished", "SubFinished"):
210+
wms_report.state = WmsStates.SUCCEEDED
211+
elif workflow_status in ("Failed", "Expired"):
212+
wms_report.state = WmsStates.FAILED
213+
elif workflow_status == "Cancelled":
214+
wms_report.state = WmsStates.DELETED
215+
elif workflow_status == "Suspended":
216+
wms_report.state = WmsStates.HELD
217+
else:
218+
wms_report.state = WmsStates.RUNNING
219+
220+
# Define state mapping for job aggregation
221+
# The status of a task is taken from the first item of state_map.
222+
# The workflow is in status WmsStates.FAILED when:
223+
# All tasks have failed.
224+
# SubFinished tasks has jobs in
225+
# output_processed_files: Finished
226+
# output_failed_files: Failed
227+
# output_missing_files: Missing
228+
state_map = {
229+
"Finished": [WmsStates.SUCCEEDED],
230+
"SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.PRUNED],
231+
"Transforming": [
232+
WmsStates.RUNNING,
233+
WmsStates.SUCCEEDED,
234+
WmsStates.FAILED,
235+
# WmsStates.READY,
236+
WmsStates.UNREADY,
237+
WmsStates.PRUNED,
238+
],
239+
"Failed": [WmsStates.FAILED, WmsStates.PRUNED],
240+
}
241+
242+
file_map = {
243+
WmsStates.SUCCEEDED: "output_processed_files",
244+
WmsStates.RUNNING: "output_processing_files",
245+
WmsStates.FAILED: "output_failed_files",
246+
# WmsStates.READY: "output_activated_files",
247+
WmsStates.UNREADY: "input_new_files",
248+
WmsStates.PRUNED: "output_missing_files",
249+
}
250+
251+
# Sort tasks by workload_id or fallback
252+
try:
253+
tasks.sort(key=lambda x: x["transform_workload_id"])
254+
except (KeyError, TypeError):
255+
tasks.sort(key=lambda x: x["transform_id"])
256+
257+
exit_codes_all = {}
258+
259+
# --- Process each task sequentially ---
260+
for task in tasks:
261+
if task.get("transform_id") is None:
262+
# Not created task (It happens because of an outer join
263+
# between requests table and transforms table).
264+
continue
265+
266+
task_name = task.get("transform_name", "")
267+
tasklabel = extract_taskname(task_name)
268+
status = task["transform_status"]["attributes"]["_name_"]
269+
totaljobs = task.get("output_total_files", 0)
270+
wms_report.total_number_jobs += totaljobs
271+
272+
# --- If task failed/subfinished, fetch exit codes ---
273+
if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"):
274+
transform_workload_id = task.get("transform_workload_id")
275+
if transform_workload_id:
276+
# When there are failed jobs, ctrl_bps check
277+
# the number of exit codes
278+
nfailed = task.get("output_failed_files", 0)
279+
exit_codes_all[tasklabel] = [1] * nfailed
280+
if return_exit_codes:
281+
new_ret = idds_call_with_check(
282+
idds_client.get_contents_output_ext,
283+
func_name=f"get task {transform_workload_id} detail",
284+
request_id=wms_workflow_id,
285+
workload_id=transform_workload_id,
286+
)
282287
# task_info is a dictionary of len 1 that contains
283288
# a list of dicts containing panda job info
284289
task_info = new_ret[1][1]
285-
286290
if len(task_info) == 1:
287-
wmskey = list(task_info.keys())[0]
288-
wmsjobs = task_info[wmskey]
291+
_, wmsjobs = next(iter(task_info.items()))
292+
exit_codes_all[tasklabel] = [
293+
j["trans_exit_code"]
294+
for j in wmsjobs
295+
if j.get("trans_exit_code") not in (None, 0, "0")
296+
]
297+
if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0:
298+
_LOG.debug(
299+
f"No exit codes in iDDS task info for workload {transform_workload_id}"
300+
)
289301
else:
290-
err_msg = "Unexpected job return from PanDA: "
291-
err_msg += f"{task_info} for id: {transform_workload_id}"
292-
raise RuntimeError(err_msg)
293-
exit_codes = [
294-
wmsjob["trans_exit_code"]
295-
for wmsjob in wmsjobs
296-
if wmsjob["trans_exit_code"] is not None and int(wmsjob["trans_exit_code"]) != 0
297-
]
298-
exit_codes_all[tasklabel] = exit_codes
299-
# Fill number of jobs in all WmsStates
300-
for state in WmsStates:
301-
njobs = 0
302-
# Each WmsState have many iDDS status mapped to it.
303-
if status in state_map:
304-
for mappedstate in state_map[status]:
305-
if state in file_map and mappedstate == state:
306-
if task[file_map[mappedstate]] is not None:
307-
njobs = task[file_map[mappedstate]]
308-
if state == WmsStates.RUNNING:
309-
njobs += task["output_new_files"] - task["input_new_files"]
310-
break
311-
wms_report.job_state_counts[state] += njobs
312-
taskstatus[state] = njobs
313-
wms_report.job_summary[tasklabel] = taskstatus
302+
raise RuntimeError(
303+
f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}"
304+
)
314305

315-
# To fill the EXPECTED column
316-
if wms_report.run_summary:
317-
wms_report.run_summary += ";"
318-
wms_report.run_summary += f"{tasklabel}:{totaljobs}"
306+
# --- Aggregate job states ---
307+
taskstatus = {}
308+
mapped_states = state_map.get(status, [])
309+
for state in WmsStates:
310+
njobs = 0
311+
if state in mapped_states and state in file_map:
312+
val = task.get(file_map[state])
313+
if val:
314+
njobs = val
315+
if state == WmsStates.RUNNING:
316+
njobs += task.get("output_new_files", 0) - task.get("input_new_files", 0)
317+
if state != WmsStates.UNREADY:
318+
wms_report.job_state_counts[state] += njobs
319+
taskstatus[state] = njobs
319320

320-
wms_report.exit_code_summary = exit_codes_all
321-
run_reports.append(wms_report)
321+
# Count UNREADY
322+
unready = WmsStates.UNREADY
323+
taskstatus[unready] = totaljobs - sum(
324+
taskstatus[state] for state in WmsStates if state != unready
325+
)
326+
wms_report.job_state_counts[unready] += taskstatus[unready]
327+
328+
# Store task summary
329+
wms_report.job_summary[tasklabel] = taskstatus
330+
summary_part = f"{tasklabel}:{totaljobs}"
331+
if wms_report.run_summary:
332+
summary_part = f";{summary_part}"
333+
wms_report.run_summary += summary_part
334+
335+
# Store all exit codes
336+
wms_report.exit_code_summary = exit_codes_all
337+
338+
(
339+
wms_report.job_summary,
340+
wms_report.exit_code_summary,
341+
wms_report.run_summary,
342+
) = aggregate_by_basename(
343+
wms_report.job_summary,
344+
wms_report.exit_code_summary,
345+
wms_report.run_summary,
346+
)
322347

348+
run_reports.append(wms_report)
323349
return run_reports, message
324350

325351
def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):

0 commit comments

Comments
 (0)