-
Notifications
You must be signed in to change notification settings - Fork 1
DM-52886: Update bps report function to aggregate panda task slices into task labels #97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Update "bps report" function to aggregate panda task slices into task labels |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,10 +49,13 @@ | |
| from lsst.ctrl.bps.panda.utils import ( | ||
| add_final_idds_work, | ||
| add_idds_work, | ||
| aggregate_by_basename, | ||
| copy_files_for_distribution, | ||
| create_idds_build_workflow, | ||
| extract_taskname, | ||
| get_idds_client, | ||
| get_idds_result, | ||
| idds_call_with_check, | ||
| ) | ||
| from lsst.resources import ResourcePath | ||
| from lsst.utils.timer import time_this | ||
|
|
@@ -172,154 +175,177 @@ def report( | |
| return run_reports, message | ||
|
|
||
| idds_client = get_idds_client(self.config) | ||
| ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=True) | ||
| _LOG.debug("PanDA get workflow status returned = %s", str(ret)) | ||
|
|
||
| request_status = ret[0] | ||
| if request_status != 0: | ||
| raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}") | ||
| ret = idds_call_with_check( | ||
| idds_client.get_requests, | ||
| func_name="get workflow status", | ||
| request_id=wms_workflow_id, | ||
| with_detail=True, | ||
| ) | ||
|
|
||
| tasks = ret[1][1] | ||
| if not tasks: | ||
| message = f"No records found for workflow id '{wms_workflow_id}'. Hint: double check the id" | ||
| else: | ||
| head = tasks[0] | ||
| wms_report = WmsRunReport( | ||
| wms_id=str(head["request_id"]), | ||
| operator=head["username"], | ||
| project="", | ||
| campaign="", | ||
| payload="", | ||
| run=head["name"], | ||
| state=WmsStates.UNKNOWN, | ||
| total_number_jobs=0, | ||
| job_state_counts=dict.fromkeys(WmsStates, 0), | ||
| job_summary={}, | ||
| run_summary="", | ||
| exit_code_summary=[], | ||
| ) | ||
| return run_reports, message | ||
|
|
||
| # The status of a task is taken from the first item of state_map. | ||
| # The workflow is in status WmsStates.FAILED when: | ||
| # All tasks have failed. | ||
| # SubFinished tasks has jobs in | ||
| # output_processed_files: Finished | ||
| # output_failed_files: Failed | ||
| # output_missing_files: Missing | ||
| state_map = { | ||
| "Finished": [WmsStates.SUCCEEDED], | ||
| "SubFinished": [ | ||
| WmsStates.SUCCEEDED, | ||
| WmsStates.FAILED, | ||
| WmsStates.PRUNED, | ||
| ], | ||
| "Transforming": [ | ||
| WmsStates.RUNNING, | ||
| WmsStates.SUCCEEDED, | ||
| WmsStates.FAILED, | ||
| WmsStates.UNREADY, | ||
| WmsStates.PRUNED, | ||
| ], | ||
| "Failed": [WmsStates.FAILED, WmsStates.PRUNED], | ||
| } | ||
|
|
||
| file_map = { | ||
| WmsStates.SUCCEEDED: "output_processed_files", | ||
| WmsStates.RUNNING: "output_processing_files", | ||
| WmsStates.FAILED: "output_failed_files", | ||
| WmsStates.UNREADY: "input_new_files", | ||
| WmsStates.PRUNED: "output_missing_files", | ||
| } | ||
|
|
||
| workflow_status = head["status"]["attributes"]["_name_"] | ||
| if workflow_status in ["Finished", "SubFinished"]: | ||
| wms_report.state = WmsStates.SUCCEEDED | ||
| elif workflow_status in ["Failed", "Expired"]: | ||
| wms_report.state = WmsStates.FAILED | ||
| elif workflow_status in ["Cancelled"]: | ||
| wms_report.state = WmsStates.DELETED | ||
| elif workflow_status in ["Suspended"]: | ||
| wms_report.state = WmsStates.HELD | ||
| else: | ||
| wms_report.state = WmsStates.RUNNING | ||
|
|
||
| try: | ||
| tasks.sort(key=lambda x: x["transform_workload_id"]) | ||
| except Exception: | ||
| tasks.sort(key=lambda x: x["transform_id"]) | ||
|
|
||
| exit_codes_all = {} | ||
| # Loop over all tasks data returned by idds_client | ||
| for task in tasks: | ||
| if task["transform_id"] is None: | ||
| # Not created task (It happens because of an outer join | ||
| # between requests table and transforms table). | ||
| continue | ||
|
|
||
| exit_codes = [] | ||
| totaljobs = task["output_total_files"] | ||
| wms_report.total_number_jobs += totaljobs | ||
| tasklabel = task["transform_name"] | ||
| tasklabel = re.sub(wms_report.run + "_", "", tasklabel) | ||
| status = task["transform_status"]["attributes"]["_name_"] | ||
| taskstatus = {} | ||
| # if the state is failed, gather exit code information | ||
| if status in ["SubFinished", "Failed"]: | ||
| transform_workload_id = task["transform_workload_id"] | ||
| if not (task["transform_name"] and task["transform_name"].startswith("build_task")): | ||
| new_ret = idds_client.get_contents_output_ext( | ||
| request_id=wms_workflow_id, workload_id=transform_workload_id | ||
| ) | ||
| _LOG.debug( | ||
| "PanDA get task %s detail returned = %s", transform_workload_id, str(new_ret) | ||
| ) | ||
| # Create initial WmsRunReport | ||
| head = tasks[0] | ||
| wms_report = WmsRunReport( | ||
| wms_id=str(head["request_id"]), | ||
| operator=head["username"], | ||
| project="", | ||
| campaign="", | ||
| payload="", | ||
| run=head["name"], | ||
| state=WmsStates.UNKNOWN, | ||
| total_number_jobs=0, | ||
| job_state_counts=dict.fromkeys(WmsStates, 0), | ||
| job_summary={}, | ||
| run_summary="", | ||
| exit_code_summary={}, | ||
| ) | ||
|
|
||
| request_status = new_ret[0] | ||
| if request_status != 0: | ||
| raise RuntimeError( | ||
| f"Error to get workflow status: {new_ret} for id: {wms_workflow_id}" | ||
| ) | ||
| # Define workflow status mapping | ||
| workflow_status = head["status"]["attributes"]["_name_"] | ||
| if workflow_status in ("Finished", "SubFinished"): | ||
| wms_report.state = WmsStates.SUCCEEDED | ||
| elif workflow_status in ("Failed", "Expired"): | ||
| wms_report.state = WmsStates.FAILED | ||
| elif workflow_status == "Cancelled": | ||
| wms_report.state = WmsStates.DELETED | ||
| elif workflow_status == "Suspended": | ||
| wms_report.state = WmsStates.HELD | ||
| else: | ||
| wms_report.state = WmsStates.RUNNING | ||
|
|
||
| # Define state mapping for job aggregation | ||
| # The status of a task is taken from the first item of state_map. | ||
| # The workflow is in status WmsStates.FAILED when: | ||
| # All tasks have failed. | ||
| # SubFinished tasks has jobs in | ||
| # output_processed_files: Finished | ||
| # output_failed_files: Failed | ||
| # output_missing_files: Missing | ||
| state_map = { | ||
| "Finished": [WmsStates.SUCCEEDED], | ||
| "SubFinished": [WmsStates.SUCCEEDED, WmsStates.FAILED, WmsStates.PRUNED], | ||
| "Transforming": [ | ||
| WmsStates.RUNNING, | ||
| WmsStates.SUCCEEDED, | ||
| WmsStates.FAILED, | ||
| # WmsStates.READY, | ||
| WmsStates.UNREADY, | ||
| WmsStates.PRUNED, | ||
| ], | ||
| "Failed": [WmsStates.FAILED, WmsStates.PRUNED], | ||
| } | ||
|
|
||
| file_map = { | ||
| WmsStates.SUCCEEDED: "output_processed_files", | ||
| WmsStates.RUNNING: "output_processing_files", | ||
| WmsStates.FAILED: "output_failed_files", | ||
| # WmsStates.READY: "output_activated_files", | ||
| WmsStates.UNREADY: "input_new_files", | ||
| WmsStates.PRUNED: "output_missing_files", | ||
| } | ||
|
|
||
| # Sort tasks by workload_id or fallback | ||
| try: | ||
| tasks.sort(key=lambda x: x["transform_workload_id"]) | ||
| except (KeyError, TypeError): | ||
| tasks.sort(key=lambda x: x["transform_id"]) | ||
|
|
||
| exit_codes_all = {} | ||
|
|
||
| # --- Process each task sequentially --- | ||
| for task in tasks: | ||
| if task.get("transform_id") is None: | ||
| # Not created task (It happens because of an outer join | ||
| # between requests table and transforms table). | ||
| continue | ||
|
|
||
| task_name = task.get("transform_name", "") | ||
| tasklabel = extract_taskname(task_name) | ||
| status = task["transform_status"]["attributes"]["_name_"] | ||
| totaljobs = task.get("output_total_files", 0) | ||
| wms_report.total_number_jobs += totaljobs | ||
|
|
||
| # --- If task failed/subfinished, fetch exit codes --- | ||
| if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"): | ||
| transform_workload_id = task.get("transform_workload_id") | ||
| if transform_workload_id: | ||
| # When there are failed jobs, ctrl_bps check | ||
| # the number of exit codes | ||
| nfailed = task.get("output_failed_files", 0) | ||
| exit_codes_all[tasklabel] = [1] * nfailed | ||
| if return_exit_codes: | ||
| new_ret = idds_call_with_check( | ||
| idds_client.get_contents_output_ext, | ||
| func_name=f"get task {transform_workload_id} detail", | ||
| request_id=wms_workflow_id, | ||
| workload_id=transform_workload_id, | ||
| ) | ||
| # task_info is a dictionary of len 1 that contains | ||
| # a list of dicts containing panda job info | ||
| task_info = new_ret[1][1] | ||
|
|
||
| if len(task_info) == 1: | ||
| wmskey = list(task_info.keys())[0] | ||
| wmsjobs = task_info[wmskey] | ||
| _, wmsjobs = next(iter(task_info.items())) | ||
| exit_codes_all[tasklabel] = [ | ||
| j["trans_exit_code"] | ||
| for j in wmsjobs | ||
| if j.get("trans_exit_code") not in (None, 0, "0") | ||
| ] | ||
| if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0: | ||
| _LOG.debug( | ||
| f"No exit codes in iDDS task info for workload {transform_workload_id}" | ||
| ) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since DM-51261 was merged, ctrl_bps performs a similar check and prints out a warning if such a situation occurs. For this reason, we may consider using DEBUG level here to avoid printing two separate warnings concerning the same thing . |
||
| else: | ||
| err_msg = "Unexpected job return from PanDA: " | ||
| err_msg += f"{task_info} for id: {transform_workload_id}" | ||
| raise RuntimeError(err_msg) | ||
| exit_codes = [ | ||
| wmsjob["trans_exit_code"] | ||
| for wmsjob in wmsjobs | ||
| if wmsjob["trans_exit_code"] is not None and int(wmsjob["trans_exit_code"]) != 0 | ||
| ] | ||
| exit_codes_all[tasklabel] = exit_codes | ||
| # Fill number of jobs in all WmsStates | ||
| for state in WmsStates: | ||
| njobs = 0 | ||
| # Each WmsState have many iDDS status mapped to it. | ||
| if status in state_map: | ||
| for mappedstate in state_map[status]: | ||
| if state in file_map and mappedstate == state: | ||
| if task[file_map[mappedstate]] is not None: | ||
| njobs = task[file_map[mappedstate]] | ||
| if state == WmsStates.RUNNING: | ||
| njobs += task["output_new_files"] - task["input_new_files"] | ||
| break | ||
| wms_report.job_state_counts[state] += njobs | ||
| taskstatus[state] = njobs | ||
| wms_report.job_summary[tasklabel] = taskstatus | ||
| raise RuntimeError( | ||
| f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}" | ||
| ) | ||
|
|
||
| # To fill the EXPECTED column | ||
| if wms_report.run_summary: | ||
| wms_report.run_summary += ";" | ||
| wms_report.run_summary += f"{tasklabel}:{totaljobs}" | ||
| # --- Aggregate job states --- | ||
| taskstatus = {} | ||
| mapped_states = state_map.get(status, []) | ||
| for state in WmsStates: | ||
| njobs = 0 | ||
| if state in mapped_states and state in file_map: | ||
| val = task.get(file_map[state]) | ||
| if val: | ||
| njobs = val | ||
| if state == WmsStates.RUNNING: | ||
| njobs += task.get("output_new_files", 0) - task.get("input_new_files", 0) | ||
| if state != WmsStates.UNREADY: | ||
| wms_report.job_state_counts[state] += njobs | ||
| taskstatus[state] = njobs | ||
|
|
||
| wms_report.exit_code_summary = exit_codes_all | ||
| run_reports.append(wms_report) | ||
| # Count UNREADY | ||
| unready = WmsStates.UNREADY | ||
| taskstatus[unready] = totaljobs - sum( | ||
| taskstatus[state] for state in WmsStates if state != unready | ||
| ) | ||
| wms_report.job_state_counts[unready] += taskstatus[unready] | ||
|
|
||
| # Store task summary | ||
| wms_report.job_summary[tasklabel] = taskstatus | ||
| summary_part = f"{tasklabel}:{totaljobs}" | ||
| if wms_report.run_summary: | ||
| summary_part = f";{summary_part}" | ||
| wms_report.run_summary += summary_part | ||
|
|
||
| # Store all exit codes | ||
| wms_report.exit_code_summary = exit_codes_all | ||
|
|
||
| ( | ||
| wms_report.job_summary, | ||
| wms_report.exit_code_summary, | ||
| wms_report.run_summary, | ||
| ) = aggregate_by_basename( | ||
| wms_report.job_summary, | ||
| wms_report.exit_code_summary, | ||
| wms_report.run_summary, | ||
| ) | ||
|
|
||
| run_reports.append(wms_report) | ||
| return run_reports, message | ||
|
|
||
| def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thru=None, is_global=False): | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.