Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-52866.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update "bps report" function to aggregate panda task slices into task labels
294 changes: 160 additions & 134 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
from lsst.ctrl.bps.panda.utils import (
add_final_idds_work,
add_idds_work,
aggregate_by_basename,
copy_files_for_distribution,
create_idds_build_workflow,
extract_taskname,
get_idds_client,
get_idds_result,
idds_call_with_check,
)
from lsst.resources import ResourcePath
from lsst.utils.timer import time_this
Expand Down Expand Up @@ -172,154 +175,177 @@ def report(
return run_reports, message

idds_client = get_idds_client(self.config)
ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=True)
_LOG.debug("PanDA get workflow status returned = %s", str(ret))

request_status = ret[0]
if request_status != 0:
raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}")
ret = idds_call_with_check(
idds_client.get_requests,
func_name="get workflow status",
request_id=wms_workflow_id,
with_detail=True,
)

tasks = ret[1][1]
if not tasks:
message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id"
else:
head = tasks[0]
wms_report = WmsRunReport(
wms_id=str(head["request_id"]),
operator=head["username"],
project="",
campaign="",
payload="",
run=head["name"],
state=WmsStates.UNKNOWN,
total_number_jobs=0,
job_state_counts=dict.fromkeys(WmsStates, 0),
job_summary={},
run_summary="",
exit_code_summary=[],
)
return run_reports, message

# The status of a task is taken from the first item of state_map.
# The workflow is in status WmsStates.FAILED when:
# All tasks have failed.
# SubFinished tasks has jobs in
# output_processed_files: Finished
# output_failed_files: Failed
# output_missing_files: Missing
state_map = {
"Finished": [WmsStates.SUCCEEDED],
"SubFinished": [
WmsStates.SUCCEEDED,
WmsStates.FAILED,
WmsStates.PRUNED,
],
"Transforming": [
WmsStates.RUNNING,
WmsStates.SUCCEEDED,
WmsStates.FAILED,
WmsStates.UNREADY,
WmsStates.PRUNED,
],
"Failed": [WmsStates.FAILED, WmsStates.PRUNED],
}

file_map = {
WmsStates.SUCCEEDED: "output_processed_files",
WmsStates.RUNNING: "output_processing_files",
WmsStates.FAILED: "output_failed_files",
WmsStates.UNREADY: "input_new_files",
WmsStates.PRUNED: "output_missing_files",
}

workflow_status = head["status"]["attributes"]["_name_"]
if workflow_status in ["Finished", "SubFinished"]:
wms_report.state = WmsStates.SUCCEEDED
elif workflow_status in ["Failed", "Expired"]:
wms_report.state = WmsStates.FAILED
elif workflow_status in ["Cancelled"]:
wms_report.state = WmsStates.DELETED
elif workflow_status in ["Suspended"]:
wms_report.state = WmsStates.HELD
else:
wms_report.state = WmsStates.RUNNING

try:
tasks.sort(key=lambda x: x["transform_workload_id"])
except Exception:
tasks.sort(key=lambda x: x["transform_id"])

exit_codes_all = {}
# Loop over all tasks data returned by idds_client
for task in tasks:
if task["transform_id"] is None:
# Not created task (It happens because of an outer join
# between requests table and transforms table).
continue

exit_codes = []
totaljobs = task["output_total_files"]
wms_report.total_number_jobs += totaljobs
tasklabel = task["transform_name"]
tasklabel = re.sub(wms_report.run + "_", "", tasklabel)
status = task["transform_status"]["attributes"]["_name_"]
taskstatus = {}
# if the state is failed, gather exit code information
if status in ["SubFinished", "Failed"]:
transform_workload_id = task["transform_workload_id"]
if not (task["transform_name"] and task["transform_name"].startswith("build_task")):
new_ret = idds_client.get_contents_output_ext(
request_id=wms_workflow_id, workload_id=transform_workload_id
)
_LOG.debug(
"PanDA get task %s detail returned = %s", transform_workload_id, str(new_ret)
)
# Create initial WmsRunReport
head = tasks[0]
wms_report = WmsRunReport(
wms_id=str(head["request_id"]),
operator=head["username"],
project="",
campaign="",
payload="",
run=head["name"],
state=WmsStates.UNKNOWN,
total_number_jobs=0,
job_state_counts=dict.fromkeys(WmsStates, 0),
job_summary={},
run_summary="",
exit_code_summary={},
)

request_status = new_ret[0]
if request_status != 0:
raise RuntimeError(
f"Error to get workflow status: {new_ret} for id: {wms_workflow_id}"
)
# Define workflow status mapping
workflow_status = head["status"]["attributes"]["_name_"]
if workflow_status in ("Finished", "SubFinished"):
wms_report.state = WmsStates.SUCCEEDED
elif workflow_status in ("Failed", "Expired"):
wms_report.state = WmsStates.FAILED
elif workflow_status == "Cancelled":
wms_report.state = WmsStates.DELETED
elif workflow_status == "Suspended":
wms_report.state = WmsStates.HELD
else:
wms_report.state = WmsStates.RUNNING

# Define state mapping for job aggregation
# The status of a task is taken from the first item of state_map.
# The workflow is in status WmsStates.FAILED when:
# All tasks have failed.
# SubFinished tasks has jobs in
# output_processed_files: Finished
# output_failed_files: Failed
# output_missing_files: Missing
state_map = {
"Finished": [WmsStates.SUCCEEDED],
"SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.PRUNED],
"Transforming": [
WmsStates.RUNNING,
WmsStates.SUCCEEDED,
WmsStates.FAILED,
# WmsStates.READY,
WmsStates.UNREADY,
WmsStates.PRUNED,
],
"Failed": [WmsStates.FAILED, WmsStates.PRUNED],
}

file_map = {
WmsStates.SUCCEEDED: "output_processed_files",
WmsStates.RUNNING: "output_processing_files",
WmsStates.FAILED: "output_failed_files",
# WmsStates.READY: "output_activated_files",
WmsStates.UNREADY: "input_new_files",
WmsStates.PRUNED: "output_missing_files",
}

# Sort tasks by workload_id or fallback
try:
tasks.sort(key=lambda x: x["transform_workload_id"])
except (KeyError, TypeError):
tasks.sort(key=lambda x: x["transform_id"])

exit_codes_all = {}

# --- Process each task sequentially ---
for task in tasks:
if task.get("transform_id") is None:
# Not created task (It happens because of an outer join
# between requests table and transforms table).
continue

task_name = task.get("transform_name", "")
tasklabel = extract_taskname(task_name)
status = task["transform_status"]["attributes"]["_name_"]
totaljobs = task.get("output_total_files", 0)
wms_report.total_number_jobs += totaljobs

# --- If task failed/subfinished, fetch exit codes ---
if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"):
transform_workload_id = task.get("transform_workload_id")
if transform_workload_id:
# When there are failed jobs, ctrl_bps check
# the number of exit codes
nfailed = task.get("output_failed_files", 0)
exit_codes_all[tasklabel] = [1] * nfailed
if return_exit_codes:
Comment thread
mxk62 marked this conversation as resolved.
new_ret = idds_call_with_check(
idds_client.get_contents_output_ext,
func_name=f"get task {transform_workload_id} detail",
request_id=wms_workflow_id,
workload_id=transform_workload_id,
)
# task_info is a dictionary of len 1 that contains
# a list of dicts containing panda job info
task_info = new_ret[1][1]

if len(task_info) == 1:
wmskey = list(task_info.keys())[0]
wmsjobs = task_info[wmskey]
_, wmsjobs = next(iter(task_info.items()))
exit_codes_all[tasklabel] = [
j["trans_exit_code"]
for j in wmsjobs
if j.get("trans_exit_code") not in (None, 0, "0")
]
if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0:
_LOG.debug(
f"No exit codes in iDDS task info for workload {transform_workload_id}"
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DM-51261 was merged, ctrl_bps performs a similar check and prints out a warning if such a situation occurs. For this reason, we may consider using DEBUG level here to avoid printing two separate warnings concerning the same thing .

else:
err_msg = "Unexpected job return from PanDA: "
err_msg += f"{task_info} for id: {transform_workload_id}"
raise RuntimeError(err_msg)
exit_codes = [
wmsjob["trans_exit_code"]
for wmsjob in wmsjobs
if wmsjob["trans_exit_code"] is not None and int(wmsjob["trans_exit_code"]) != 0
]
exit_codes_all[tasklabel] = exit_codes
# Fill number of jobs in all WmsStates
for state in WmsStates:
njobs = 0
# Each WmsState have many iDDS status mapped to it.
if status in state_map:
for mappedstate in state_map[status]:
if state in file_map and mappedstate == state:
if task[file_map[mappedstate]] is not None:
njobs = task[file_map[mappedstate]]
if state == WmsStates.RUNNING:
njobs += task["output_new_files"] - task["input_new_files"]
break
wms_report.job_state_counts[state] += njobs
taskstatus[state] = njobs
wms_report.job_summary[tasklabel] = taskstatus
raise RuntimeError(
f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}"
)

# To fill the EXPECTED column
if wms_report.run_summary:
wms_report.run_summary += ";"
wms_report.run_summary += f"{tasklabel}:{totaljobs}"
# --- Aggregate job states ---
taskstatus = {}
mapped_states = state_map.get(status, [])
for state in WmsStates:
njobs = 0
if state in mapped_states and state in file_map:
val = task.get(file_map[state])
if val:
njobs = val
if state == WmsStates.RUNNING:
njobs += task.get("output_new_files", 0) - task.get("input_new_files", 0)
if state != WmsStates.UNREADY:
wms_report.job_state_counts[state] += njobs
taskstatus[state] = njobs

wms_report.exit_code_summary = exit_codes_all
run_reports.append(wms_report)
# Count UNREADY
unready = WmsStates.UNREADY
taskstatus[unready] = totaljobs - sum(
taskstatus[state] for state in WmsStates if state != unready
)
wms_report.job_state_counts[unready] += taskstatus[unready]

# Store task summary
wms_report.job_summary[tasklabel] = taskstatus
summary_part = f"{tasklabel}:{totaljobs}"
if wms_report.run_summary:
summary_part = f";{summary_part}"
wms_report.run_summary += summary_part

# Store all exit codes
wms_report.exit_code_summary = exit_codes_all

(
wms_report.job_summary,
wms_report.exit_code_summary,
wms_report.run_summary,
) = aggregate_by_basename(
wms_report.job_summary,
wms_report.exit_code_summary,
wms_report.run_summary,
)

run_reports.append(wms_report)
return run_reports, message

def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False):
Expand Down
Loading
Loading