Skip to content

Commit c568403

Browse files
committed
use a map file to reduce the request size
1 parent 7d70c6a commit c568403

4 files changed

Lines changed: 144 additions & 41 deletions

File tree

doc/changes/DM-50973.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
use a map file to reduce the size of the bps PanDA submission size

python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ def replace_event_file(params, files):
184184
Example params:
185185
isr:eventservice_90^10+somethingelse. This part
186186
'isr:eventservice_90^10' is the EventService parameter.
187+
isr:orderIdMap_10. This part is using order_id map file. But it
188+
is not EventService.
187189
The format for the EventService parameter for LSST is
188190
'label:eventservice_<baseid>^<localid>'. The '<localid>' should
189191
start from 1, which means the first event of the file
@@ -217,6 +219,7 @@ def replace_event_file(params, files):
217219
"""
218220
ret_status = True
219221
with_events = False
222+
with_order_id_map = False
220223
files = files.split("+")
221224
file_map = {}
222225
for file in files:
@@ -272,32 +275,63 @@ def replace_event_file(params, files):
272275
ret_status = False
273276
break
274277

275-
params_map[param] = {"event_index": event_index, "order_id_map": order_id_map[label]}
276-
return ret_status, with_events, params_map
278+
params_map[param] = {"order_id": event_index, "order_id_map": order_id_map[label]}
279+
elif 'orderIdMap_' in param:
280+
with_order_id_map = True
281+
label, event = param.split(":")
282+
order_id = event.split("_")[1]
283+
if not order_id_map:
284+
print("orderIdMap is enabled but order_id_map file doesn't exist.")
285+
ret_status = False
286+
break
287+
288+
if label not in order_id_map:
289+
print(
290+
f"orderIdMap is enabled but label {label} doesn't in the keys"
291+
f" of order_id_map {order_id_map.keys()}"
292+
)
293+
ret_status = False
294+
break
295+
if order_id not in order_id_map[label]:
296+
print(
297+
f"orderIdMap is enabled but order_id {order_id} is not"
298+
f" in order_id_map[{label}] {order_id_map[label].keys()}"
299+
)
300+
ret_status = False
301+
break
302+
303+
params_map[param] = {"order_id": order_id, "order_id_map": order_id_map[label]}
304+
return ret_status, with_events, with_order_id_map, params_map
277305

278306

279307
deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5])
280308
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
281309
data_params = sys.argv[2]
282310
cmd_line = replace_environment_vars(cmd_line)
283311

312+
print(f"cmd_line: {cmd_line}")
313+
print(f"data_params: {data_params}")
314+
284315
# If EventService is enabled, data_params will only contain event information.
285316
# So we need to convert the event information to LSST pseudo file names.
286317
# If EventService is not enabled, this part will not change data_params.
287-
ret_event_status, with_events, event_params_map = replace_event_file(data_params, sys.argv[4])
288-
print(f"ret_event_status: {ret_event_status}, with_events: {with_events}")
318+
ret_event_status, with_events, with_order_id_map, event_params_map = replace_event_file(data_params, sys.argv[4])
319+
print(f"ret_event_status: {ret_event_status}, with_events: {with_events}, with_order_id_map: {with_order_id_map}")
289320
if not ret_event_status:
290-
print("failed to map EventService parameters to original LSST pseudo file names")
321+
print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names")
291322
exit_code = 1
292323
sys.exit(exit_code)
293324

294325
for event_param in event_params_map:
295-
event_index = event_params_map[event_param]["event_index"]
296-
pseudo_file_name = event_params_map[event_param]["order_id_map"][event_index]
297-
print(f"replacing event {event_param} with event_index {event_index} to: {pseudo_file_name}")
326+
order_id = event_params_map[event_param]["order_id"]
327+
pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id]
328+
print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}")
298329
cmd_line = cmd_line.replace(event_param, pseudo_file_name)
299330
data_params = data_params.replace(event_param, pseudo_file_name)
300331

332+
# If job name map is enabled, data_params will only contain order_id information.
333+
# Here we will convert order_id information to LSST pseudo file names.
334+
301335
data_params = data_params.split("+")
302336

303337
"""Replace the pipetask command line placeholders

python/lsst/ctrl/bps/panda/panda_service.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow
3939

40+
from lsst.utils.timer import time_this
41+
from lsst.ctrl.bps import DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT
4042
from lsst.ctrl.bps import BaseWmsService, BaseWmsWorkflow, WmsRunReport, WmsStates
4143
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
4244
from lsst.ctrl.bps.panda.utils import (
@@ -58,10 +60,22 @@ class PanDAService(BaseWmsService):
5860
def prepare(self, config, generic_workflow, out_prefix=None):
5961
# Docstring inherited from BaseWmsService.prepare.
6062
_LOG.debug("out_prefix = '%s'", out_prefix)
61-
workflow = PandaBpsWmsWorkflow.from_generic_workflow(
62-
config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}"
63-
)
64-
workflow.write(out_prefix)
63+
64+
_LOG.info("Starting PanDA prepare stage (creating specific implementation of workflow)")
65+
66+
with time_this(
67+
log=_LOG,
68+
level=logging.INFO,
69+
prefix=None,
70+
msg="PanDA prepare stage completed",
71+
mem_usage=True,
72+
mem_unit=DEFAULT_MEM_UNIT,
73+
mem_fmt=DEFAULT_MEM_FMT,
74+
):
75+
workflow = PandaBpsWmsWorkflow.from_generic_workflow(
76+
config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}"
77+
)
78+
workflow.write(out_prefix)
6579
return workflow
6680

6781
def submit(self, workflow, **kwargs):

python/lsst/ctrl/bps/panda/utils.py

Lines changed: 83 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ def _make_doma_work(
222222
task_count,
223223
task_chunk,
224224
enable_event_service=False,
225-
es_files=None,
225+
enable_job_name_map=False,
226+
order_id_map_files=None,
226227
es_label=None,
227228
max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
228229
max_wms_job_wall_time=None,
@@ -248,8 +249,8 @@ def _make_doma_work(
248249
local_pfns : `dict` [`str`, `str`]
249250
Files which need to be copied to a workflow staging area.
250251
"""
251-
if es_files is None:
252-
es_files = {}
252+
if order_id_map_files is None:
253+
order_id_map_files = {}
253254
_LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob)
254255
cvals = {"curr_cluster": gwjob.label}
255256
_, site = config.search("computeSite", opt={"curvals": cvals, "required": True})
@@ -338,13 +339,15 @@ def _make_doma_work(
338339

339340
maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME
340341
if enable_event_service:
341-
for es_name in es_files:
342-
local_pfns[es_name] = es_files[es_name]
343342
if gwjob.request_walltime and max_payloads_per_panda_job:
344343
maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job
345344
elif max_wms_job_wall_time:
346345
maxwalltime = max_wms_job_wall_time
347346

347+
if enable_event_service or enable_job_name_map:
348+
for es_name in order_id_map_files:
349+
local_pfns[es_name] = order_id_map_files[es_name]
350+
348351
for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True):
349352
local_pfns[gwfile.name] = gwfile.src_uri
350353
if os.path.isdir(gwfile.src_uri):
@@ -596,6 +599,16 @@ def add_idds_work(config, generic_workflow, idds_workflow):
596599
)
597600
_LOG.info(my_log)
598601

602+
# job name map: Use a short job name to map the long job name
603+
_, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None})
604+
_LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}")
605+
if enable_event_service and not enable_job_name_map:
606+
enable_job_name_map = True
607+
my_log = (
608+
"enable_event_service is set, set enable_job_name_map True."
609+
)
610+
_LOG.info(my_log)
611+
599612
# Limit number of jobs in single PanDA task
600613
_, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK})
601614

@@ -614,23 +627,31 @@ def add_idds_work(config, generic_workflow, idds_workflow):
614627
archive_filename = create_archive_file(submit_path, archive_filename, files)
615628
remote_archive_filename = copy_files_to_pandacache(archive_filename)
616629

617-
es_files = {}
630+
order_id_map_files = {}
618631
name_works = {}
619632
order_id_map = {}
633+
job_name_to_order_id_map = {}
620634
doma_tree = None
621635
order_id_map_file = None
622636
if enable_event_service:
623637
enable_event_service = enable_event_service.split(",")
624638
enable_event_service = [i.strip() for i in enable_event_service]
639+
if enable_job_name_map:
625640
doma_tree = DomaTree(name=generic_workflow.name)
626641
submit_path = config[".bps_defined.submitPath"]
627642
_, order_id_map_filename = config.search(
628643
"orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
629644
)
630645
order_id_map_file = os.path.join(submit_path, order_id_map_filename)
631646
order_id_map = doma_tree.order_jobs_from_generic_workflow(generic_workflow, order_id_map_file)
632-
es_files = {"orderIdMapFilename": order_id_map_file}
633-
files_to_pre_stage.update(es_files)
647+
order_id_map_files = {"orderIdMapFilename": order_id_map_file}
648+
files_to_pre_stage.update(order_id_map_files)
649+
650+
# job name to order id map
651+
job_name_to_order_id_map = {
652+
label: {job_name: order_id for order_id, job_name in orders.items()}
653+
for label, orders in order_id_map.items()
654+
}
634655

635656
# To avoid dying due to optimizing number of times through workflow,
636657
# catch dependency issues to loop through again later.
@@ -684,7 +705,8 @@ def add_idds_work(config, generic_workflow, idds_workflow):
684705
task_count,
685706
task_chunk,
686707
enable_event_service=work_enable_event_service,
687-
es_files=es_files,
708+
enable_job_name_map=enable_job_name_map,
709+
order_id_map_files=order_id_map_files,
688710
es_label=job_label,
689711
max_payloads_per_panda_job=max_payloads_per_panda_job,
690712
max_wms_job_wall_time=max_wms_job_wall_time,
@@ -707,19 +729,34 @@ def add_idds_work(config, generic_workflow, idds_workflow):
707729
missing_deps = True
708730
break
709731
else:
710-
deps.append(
711-
{
712-
"task": job_to_task[parent_job_name],
713-
"inputname": job_to_pseudo_filename[parent_job_name],
714-
"available": False,
715-
}
716-
)
732+
if enable_job_name_map:
733+
parent_job = generic_workflow.get_job(parent_job_name)
734+
parent_job_label = parent_job.label
735+
parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
736+
deps.append(
737+
{
738+
"task": job_to_task[parent_job_name],
739+
"inputname": f"{parent_job_label}:orderIdMap_{parent_order_id}",
740+
}
741+
)
742+
else:
743+
deps.append(
744+
{
745+
"task": job_to_task[parent_job_name],
746+
"inputname": job_to_pseudo_filename[parent_job_name],
747+
}
748+
)
717749
if not missing_deps:
718-
work.dependency_map.append(
719-
{"name": pseudo_filename, "order_id": order_id, "dependencies": deps}
720-
)
750+
if enable_job_name_map:
751+
work.dependency_map.append(
752+
{"name": f"{job_label}:orderIdMap_{order_id}", "order_id": order_id, "dependencies": deps}
753+
)
754+
else:
755+
work.dependency_map.append(
756+
{"name": pseudo_filename, "order_id": order_id, "dependencies": deps}
757+
)
721758
else:
722-
jobs_with_dependency_issues[gwjob.name] = {"work": work, "order_id": order_id}
759+
jobs_with_dependency_issues[gwjob.name] = {"work": work, "order_id": order_id, 'label': job_label}
723760

724761
# If there were any issues figuring out dependencies through earlier loop
725762
if jobs_with_dependency_issues:
@@ -729,22 +766,39 @@ def add_idds_work(config, generic_workflow, idds_workflow):
729766
deps = []
730767
work = work_item["work"]
731768
order_id = work_item["order_id"]
769+
job_label = work_item["label"]
732770

733771
for parent_job_name in generic_workflow.predecessors(job_name):
734772
if parent_job_name not in job_to_task:
735773
_LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
736774
raise RuntimeError(
737775
"Could not recover from dependency issues ({job_name} missing {parent_job_name})."
738776
)
739-
deps.append(
740-
{
741-
"task": job_to_task[parent_job_name],
742-
"inputname": job_to_pseudo_filename[parent_job_name],
743-
"available": False,
744-
}
777+
if enable_job_name_map:
778+
parent_job = generic_workflow.get_job(parent_job_name)
779+
parent_job_label = parent_job.label
780+
parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
781+
deps.append(
782+
{
783+
"task": job_to_task[parent_job_name],
784+
"inputname": f"{parent_job_label}:orderIdMap_{parent_order_id}",
785+
}
786+
)
787+
else:
788+
deps.append(
789+
{
790+
"task": job_to_task[parent_job_name],
791+
"inputname": job_to_pseudo_filename[parent_job_name],
792+
}
793+
)
794+
if enable_job_name_map:
795+
work.dependency_map.append(
796+
{"name": f"{job_label}:orderIdMap_{order_id}", "order_id": order_id, "dependencies": deps}
745797
)
746-
pseudo_filename = job_to_pseudo_filename[job_name]
747-
work.dependency_map.append({"name": pseudo_filename, "order_id": order_id, "dependencies": deps})
798+
else:
799+
pseudo_filename = job_to_pseudo_filename[job_name]
800+
work.dependency_map.append({"name": pseudo_filename, "order_id": order_id, "dependencies": deps})
801+
748802
_LOG.info("Successfully recovered.")
749803

750804
for task_name in name_works:
@@ -754,7 +808,7 @@ def add_idds_work(config, generic_workflow, idds_workflow):
754808
# 2) check to avoid duplicated items.
755809
work.dependency_map = work.dependency_map
756810

757-
if enable_event_service:
811+
if enable_job_name_map:
758812
for label_name in order_id_map:
759813
for order_id in order_id_map[label_name]:
760814
job_name = order_id_map[label_name][order_id]

0 commit comments

Comments
 (0)