Skip to content

Commit a739f23

Browse files
author
zhaoyu
committed
Addressed the docstring and idds client function call ments from the PR review
1 parent 0df3cf8 commit a739f23

3 files changed

Lines changed: 75 additions & 52 deletions

File tree

doc/changes/DM-52866.feature.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
Update bps report function to aggregate panda task slices into task labels
1+
Update "bps report" function to aggregate panda task slices into task labels

python/lsst/ctrl/bps/panda/panda_service.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
extract_taskname,
5656
get_idds_client,
5757
get_idds_result,
58+
idds_call_with_check,
5859
)
5960
from lsst.resources import ResourcePath
6061
from lsst.utils.timer import time_this
@@ -174,12 +175,12 @@ def report(
174175
return run_reports, message
175176

176177
idds_client = get_idds_client(self.config)
177-
ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=True)
178-
_LOG.debug("PanDA get workflow status returned = %s", str(ret))
179-
180-
request_status = ret[0]
181-
if request_status != 0:
182-
raise RuntimeError(f"Error to get workflow status: {ret} for id: {wms_workflow_id}")
178+
ret = idds_call_with_check(
179+
idds_client.get_requests,
180+
func_name="get workflow status",
181+
request_id=wms_workflow_id,
182+
with_detail=True,
183+
)
183184

184185
tasks = ret[1][1]
185186
if not tasks:
@@ -268,8 +269,6 @@ def report(
268269
totaljobs = task.get("output_total_files", 0)
269270
wms_report.total_number_jobs += totaljobs
270271

271-
taskstatus = {}
272-
273272
# --- If task failed/subfinished, fetch exit codes ---
274273
if status in ("SubFinished", "Failed") and not task_name.startswith("build_task"):
275274
transform_workload_id = task.get("transform_workload_id")
@@ -279,18 +278,12 @@ def report(
279278
nfailed = task.get("output_failed_files", 0)
280279
exit_codes_all[tasklabel] = [1] * nfailed
281280
if return_exit_codes:
282-
new_ret = idds_client.get_contents_output_ext(
283-
request_id=wms_workflow_id, workload_id=transform_workload_id
281+
new_ret = idds_call_with_check(
282+
idds_client.get_contents_output_ext,
283+
func_name=f"get task {transform_workload_id} detail",
284+
request_id=wms_workflow_id,
285+
workload_id=transform_workload_id,
284286
)
285-
_LOG.debug(
286-
"PanDA get task %s detail returned = %s",
287-
transform_workload_id,
288-
str(new_ret),
289-
)
290-
if new_ret[0] != 0:
291-
raise RuntimeError(
292-
f"Error getting workflow status: {new_ret} for id: {wms_workflow_id}"
293-
)
294287
# task_info is a dictionary of len 1 that contains
295288
# a list of dicts containing panda job info
296289
task_info = new_ret[1][1]
@@ -302,15 +295,16 @@ def report(
302295
if j.get("trans_exit_code") not in (None, 0, "0")
303296
]
304297
if nfailed > 0 and len(exit_codes_all[tasklabel]) == 0:
305-
_LOG.warning(
298+
_LOG.debug(
306299
f"No exit codes in iDDS task info for workload {transform_workload_id}"
307300
)
308301
else:
309-
err_msg = "Unexpected iDDS task info for workload "
310-
err_msg += f"{transform_workload_id}: {task_info}"
311-
raise RuntimeError(err_msg)
302+
raise RuntimeError(
303+
f"Unexpected iDDS task info for workload {transform_workload_id}: {task_info}"
304+
)
312305

313306
# --- Aggregate job states ---
307+
taskstatus = {}
314308
mapped_states = state_map.get(status, [])
315309
for state in WmsStates:
316310
njobs = 0

python/lsst/ctrl/bps/panda/utils.py

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"extract_taskname",
3636
"get_idds_client",
3737
"get_idds_result",
38+
"idds_call_with_check",
3839
]
3940

4041
import binascii
@@ -79,32 +80,34 @@
7980

8081

8182
def extract_taskname(s: str) -> str:
82-
"""
83-
Extract the task name from a string that follows a pattern
83+
"""Extract the task name from a string that follows a pattern
8484
CampaignName_timestamp_TaskNumber_TaskLabel_ChunkNumber.
8585
8686
Parameters
8787
----------
88-
s: `str`
88+
s : `str`
8989
The input string from which to extract the task name.
9090
9191
Returns
9292
-------
93-
`str`:
93+
taskname : `str`
9494
The extracted task name as per the rules above.
9595
"""
9696
# remove surrounding quotes/spaces if present
9797
s = s.strip().strip("'\"")
9898

9999
# find all occurrences of underscore + digits + underscore,
100100
# take the last one
101-
matches = list(re.finditer(r"_(\d+)_", s))
101+
matches = re.findall(r"_(\d+)_", s)
102102
if matches:
103-
last = matches[-1]
104-
return s[last.end() :]
103+
last_number = matches[-1]
104+
last_pos = s.rfind(f"_{last_number}_") + len(f"_{last_number}_")
105+
taskname = s[last_pos:]
106+
return taskname
105107

106108
# fallback: if no such pattern, return everything
107-
return s
109+
taskname = s
110+
return taskname
108111

109112

110113
def aggregate_by_basename(job_summary, exit_code_summary, run_summary):
@@ -113,35 +116,31 @@ def aggregate_by_basename(job_summary, exit_code_summary, run_summary):
113116
114117
Parameters
115118
----------
116-
job_summary : `dict`[`str`, `dict`[`str`, `int`]]
119+
job_summary : `dict` [`str`, `dict` [`str`, `int`]]
117120
A mapping of job labels to state-count mappings.
118-
exit_code_summary : `dict`[`str`, `list`[`int`]]
121+
exit_code_summary : `dict` [`str`, `list` [`int`]]
119122
A mapping of job labels to lists of exit codes.
120123
run_summary : `str`
121124
A semicolon-separated string of job summaries
122125
where each entry has the format "<label>:<count>".
123126
124127
Returns
125128
-------
126-
`tuple`[`dict`[`str`, `dict`[`str`, `int`]],
127-
`dict`[`str`, `list`[`int`]], `str`]
128-
aggregated_jobs : `dict`
129-
A dictionary mapping each base label to the summed
130-
job state counts across all matching labels.
131-
aggregated_exits : `dict`
132-
A dictionary mapping each base label to a combined list of
133-
exit codes from all matching labels.
134-
aggregated_run: `str`
135-
A semicolon-separated string with aggregated job counts
136-
by base label.
129+
aggregated_jobs : `dict` [`str`, `dict` [`str`, `int`]]
130+
A dictionary mapping each base label to the summed job state counts
131+
across all matching labels.
132+
aggregated_exits : `dict` [`str`, `list` [`int`]]
133+
A dictionary mapping each base label to a combined list of exit codes
134+
from all matching labels.
135+
aggregated_run : `str`
136+
A semicolon-separated string with aggregated job counts by base label.
137137
"""
138138

139139
def base_label(label):
140140
return re.sub(r"_\d+$", "", label)
141141

142142
aggregated_jobs = {}
143143
aggregated_exits = {}
144-
aggregated_run = ""
145144

146145
for label, states in job_summary.items():
147146
base = base_label(label)
@@ -155,7 +154,6 @@ def base_label(label):
155154
aggregated_exits.setdefault(base, []).extend(codes)
156155

157156
aggregated = {}
158-
ordered_labels = []
159157
for entry in run_summary.split(";"):
160158
entry = entry.strip()
161159
if not entry:
@@ -167,12 +165,9 @@ def base_label(label):
167165
continue
168166

169167
base = base_label(label)
170-
if base not in aggregated:
171-
aggregated[base] = 0
172-
ordered_labels.append(base)
173-
aggregated[base] += num
168+
aggregated[base] = aggregated.get(base, 0) + num
174169

175-
aggregated_run = ";".join(f"{base}:{aggregated[base]}" for base in ordered_labels)
170+
aggregated_run = ";".join(f"{base}:{count}" for base, count in aggregated.items())
176171
return aggregated_jobs, aggregated_exits, aggregated_run
177172

178173

@@ -294,6 +289,40 @@ def get_idds_result(ret):
294289
return status, result, error
295290

296291

292+
def idds_call_with_check(func, *, func_name: str, request_id: int, **kwargs):
293+
"""Call an iDDS client function, log, and check the return code.
294+
295+
Parameters
296+
----------
297+
func : callable
298+
The iDDS client function to call.
299+
func_name : `str`
300+
Name used for logging.
301+
request_id : `int`
302+
The request or workflow ID.
303+
**kwargs
304+
Additional keyword arguments passed to the function.
305+
306+
Returns
307+
-------
308+
ret : `Any`
309+
The return value from the iDDS client function.
310+
"""
311+
call_kwargs = dict(kwargs)
312+
if request_id is not None:
313+
call_kwargs["request_id"] = request_id
314+
315+
ret = func(**call_kwargs)
316+
317+
_LOG.debug("PanDA %s returned = %s", func_name, str(ret))
318+
319+
request_status = ret[0]
320+
if request_status != 0:
321+
raise RuntimeError(f"Error calling {func_name}: {ret} for id: {request_id}")
322+
323+
return ret
324+
325+
297326
def _make_pseudo_filename(config, gwjob):
298327
"""Make the job pseudo filename.
299328

0 commit comments

Comments
 (0)