Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/bps_panda_orderid_map.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# enable order_id map to use a short name to map the long pseudo file name

enableJobNameMap: true
1 change: 1 addition & 0 deletions doc/changes/DM-50973.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use a map file to reduce the size of the bps PanDA submission size
54 changes: 46 additions & 8 deletions python/lsst/ctrl/bps/panda/edgenode/cmd_line_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def replace_event_file(params, files):
Example params:
isr:eventservice_90^10+somethingelse. This part
'isr:eventservice_90^10' is the EventService parameter.
isr:orderIdMap_10. This part is using order_id map file. But it
is not EventService.
The format for the EventService parameter for LSST is
'label:eventservice_<baseid>^<localid>'. The '<localid>' should
start from 1, which means the first event of the file
Expand Down Expand Up @@ -217,6 +219,7 @@ def replace_event_file(params, files):
"""
ret_status = True
with_events = False
with_order_id_map = False
files = files.split("+")
file_map = {}
for file in files:
Expand Down Expand Up @@ -272,32 +275,67 @@ def replace_event_file(params, files):
ret_status = False
break

params_map[param] = {"event_index": event_index, "order_id_map": order_id_map[label]}
return ret_status, with_events, params_map
params_map[param] = {"order_id": event_index, "order_id_map": order_id_map[label]}
elif "orderIdMap_" in param:
Comment thread
wguanicedew marked this conversation as resolved.
with_order_id_map = True
label, event = param.split(":")
order_id = event.split("_")[1]
if not order_id_map:
print("orderIdMap is enabled but order_id_map file doesn't exist.")
ret_status = False
break

if label not in order_id_map:
print(
f"orderIdMap is enabled but label {label} doesn't in the keys"
f" of order_id_map {order_id_map.keys()}"
)
ret_status = False
break
if order_id not in order_id_map[label]:
print(
f"orderIdMap is enabled but order_id {order_id} is not"
f" in order_id_map[{label}] {order_id_map[label].keys()}"
)
ret_status = False
break

params_map[param] = {"order_id": order_id, "order_id_map": order_id_map[label]}
return ret_status, with_events, with_order_id_map, params_map


deliver_input_files(sys.argv[3], sys.argv[4], sys.argv[5])
cmd_line = str(binascii.unhexlify(sys.argv[1]).decode())
data_params = sys.argv[2]
cmd_line = replace_environment_vars(cmd_line)

print(f"cmd_line: {cmd_line}")
print(f"data_params: {data_params}")

# If EventService is enabled, data_params will only contain event information.
# So we need to convert the event information to LSST pseudo file names.
# If EventService is not enabled, this part will not change data_params.
ret_event_status, with_events, event_params_map = replace_event_file(data_params, sys.argv[4])
print(f"ret_event_status: {ret_event_status}, with_events: {with_events}")
ret_rep = replace_event_file(data_params, sys.argv[4])
ret_event_status, with_events, with_order_id_map, event_params_map = ret_rep
print(
f"ret_event_status: {ret_event_status}, with_events: {with_events} with_order_id_map: {with_order_id_map}"
Comment thread
wguanicedew marked this conversation as resolved.
)
if not ret_event_status:
print("failed to map EventService parameters to original LSST pseudo file names")
print("failed to map EventService/orderIdMap parameters to original LSST pseudo file names")
exit_code = 1
sys.exit(exit_code)

for event_param in event_params_map:
event_index = event_params_map[event_param]["event_index"]
pseudo_file_name = event_params_map[event_param]["order_id_map"][event_index]
print(f"replacing event {event_param} with event_index {event_index} to: {pseudo_file_name}")
order_id = event_params_map[event_param]["order_id"]
pseudo_file_name = event_params_map[event_param]["order_id_map"][order_id]
print(f"replacing event {event_param} with order_id {order_id} to: {pseudo_file_name}")
cmd_line = cmd_line.replace(event_param, pseudo_file_name)
data_params = data_params.replace(event_param, pseudo_file_name)

# If job name map is enabled, data_params will only contain order_id
# information. Here we will convert order_id information to LSST pseudo
# file names.

data_params = data_params.split("+")

"""Replace the pipetask command line placeholders
Expand Down
30 changes: 25 additions & 5 deletions python/lsst/ctrl/bps/panda/panda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@

from idds.workflowv2.workflow import Workflow as IDDS_client_workflow

from lsst.ctrl.bps import BaseWmsService, BaseWmsWorkflow, WmsRunReport, WmsStates
from lsst.ctrl.bps import (
DEFAULT_MEM_FMT,
DEFAULT_MEM_UNIT,
BaseWmsService,
BaseWmsWorkflow,
WmsRunReport,
WmsStates,
)
from lsst.ctrl.bps.panda.constants import PANDA_DEFAULT_MAX_COPY_WORKERS
from lsst.ctrl.bps.panda.utils import (
add_final_idds_work,
Expand All @@ -48,6 +55,7 @@
get_idds_result,
)
from lsst.resources import ResourcePath
from lsst.utils.timer import time_this

_LOG = logging.getLogger(__name__)

Expand All @@ -58,10 +66,22 @@
def prepare(self, config, generic_workflow, out_prefix=None):
# Docstring inherited from BaseWmsService.prepare.
_LOG.debug("out_prefix = '%s'", out_prefix)
workflow = PandaBpsWmsWorkflow.from_generic_workflow(
config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}"
)
workflow.write(out_prefix)

_LOG.info("Starting PanDA prepare stage (creating specific implementation of workflow)")

Check warning on line 70 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L70

Added line #L70 was not covered by tests

with time_this(

Check warning on line 72 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L72

Added line #L72 was not covered by tests
log=_LOG,
level=logging.INFO,
prefix=None,
msg="PanDA prepare stage completed",
mem_usage=True,
mem_unit=DEFAULT_MEM_UNIT,
mem_fmt=DEFAULT_MEM_FMT,
):
workflow = PandaBpsWmsWorkflow.from_generic_workflow(

Check warning on line 81 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L81

Added line #L81 was not covered by tests
config, generic_workflow, out_prefix, f"{self.__class__.__module__}.{self.__class__.__name__}"
)
workflow.write(out_prefix)

Check warning on line 84 in python/lsst/ctrl/bps/panda/panda_service.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/panda_service.py#L84

Added line #L84 was not covered by tests
return workflow

def submit(self, workflow, **kwargs):
Expand Down
87 changes: 69 additions & 18 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@
task_count,
task_chunk,
enable_event_service=False,
es_files=None,
enable_job_name_map=False,
order_id_map_files=None,
es_label=None,
max_payloads_per_panda_job=PANDA_DEFAULT_MAX_PAYLOADS_PER_PANDA_JOB,
max_wms_job_wall_time=None,
Expand All @@ -248,8 +249,8 @@
local_pfns : `dict` [`str`, `str`]
Files which need to be copied to a workflow staging area.
"""
if es_files is None:
es_files = {}
if order_id_map_files is None:
order_id_map_files = {}

Check warning on line 253 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L253

Added line #L253 was not covered by tests
_LOG.debug("Using gwjob %s to create new PanDA task (gwjob=%s)", gwjob.name, gwjob)
cvals = {"curr_cluster": gwjob.label}
_, site = config.search("computeSite", opt={"curvals": cvals, "required": True})
Expand Down Expand Up @@ -338,13 +339,15 @@

maxwalltime = gwjob.request_walltime if gwjob.request_walltime else PANDA_DEFAULT_MAX_WALLTIME
if enable_event_service:
for es_name in es_files:
local_pfns[es_name] = es_files[es_name]
if gwjob.request_walltime and max_payloads_per_panda_job:
maxwalltime = gwjob.request_walltime * max_payloads_per_panda_job
elif max_wms_job_wall_time:
maxwalltime = max_wms_job_wall_time

if enable_event_service or enable_job_name_map:
for es_name in order_id_map_files:
local_pfns[es_name] = order_id_map_files[es_name]

Check warning on line 349 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L349

Added line #L349 was not covered by tests

for gwfile in generic_workflow.get_job_inputs(gwjob.name, transfer_only=True):
local_pfns[gwfile.name] = gwfile.src_uri
if os.path.isdir(gwfile.src_uri):
Expand Down Expand Up @@ -596,6 +599,14 @@
)
_LOG.info(my_log)

# job name map: Use a short job name to map the long job name
_, enable_job_name_map = config.search("enableJobNameMap", opt={"default": None})
_LOG.info(f"enable_job_name_map: {enable_job_name_map}, {type(enable_job_name_map)}")

Check warning on line 604 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L603-L604

Added lines #L603 - L604 were not covered by tests
if enable_event_service and not enable_job_name_map:
enable_job_name_map = True
my_log = "enable_event_service is set, set enable_job_name_map True."
_LOG.info(my_log)

Check warning on line 608 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L606-L608

Added lines #L606 - L608 were not covered by tests

# Limit number of jobs in single PanDA task
_, max_jobs_per_task = config.search("maxJobsPerTask", opt={"default": PANDA_DEFAULT_MAX_JOBS_PER_TASK})

Expand All @@ -614,23 +625,31 @@
archive_filename = create_archive_file(submit_path, archive_filename, files)
remote_archive_filename = copy_files_to_pandacache(archive_filename)

es_files = {}
order_id_map_files = {}

Check warning on line 628 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L628

Added line #L628 was not covered by tests
name_works = {}
order_id_map = {}
job_name_to_order_id_map = {}

Check warning on line 631 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L631

Added line #L631 was not covered by tests
doma_tree = None
order_id_map_file = None
if enable_event_service:
enable_event_service = enable_event_service.split(",")
enable_event_service = [i.strip() for i in enable_event_service]
if enable_job_name_map:
doma_tree = DomaTree(name=generic_workflow.name)
submit_path = config[".bps_defined.submitPath"]
_, order_id_map_filename = config.search(
"orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
)
order_id_map_file = os.path.join(submit_path, order_id_map_filename)
order_id_map = doma_tree.order_jobs_from_generic_workflow(generic_workflow, order_id_map_file)
es_files = {"orderIdMapFilename": order_id_map_file}
files_to_pre_stage.update(es_files)
order_id_map_files = {"orderIdMapFilename": order_id_map_file}
files_to_pre_stage.update(order_id_map_files)

Check warning on line 646 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L645-L646

Added lines #L645 - L646 were not covered by tests

# job name to order id map
job_name_to_order_id_map = {

Check warning on line 649 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L649

Added line #L649 was not covered by tests
label: {job_name: order_id for order_id, job_name in orders.items()}
for label, orders in order_id_map.items()
}

# To avoid dying due to optimizing number of times through workflow,
# catch dependency issues to loop through again later.
Expand Down Expand Up @@ -684,7 +703,8 @@
task_count,
task_chunk,
enable_event_service=work_enable_event_service,
es_files=es_files,
enable_job_name_map=enable_job_name_map,
order_id_map_files=order_id_map_files,
es_label=job_label,
max_payloads_per_panda_job=max_payloads_per_panda_job,
max_wms_job_wall_time=max_wms_job_wall_time,
Expand All @@ -707,19 +727,35 @@
missing_deps = True
break
else:
if enable_job_name_map:
parent_job = generic_workflow.get_job(parent_job_name)
parent_job_label = parent_job.label
parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"

Check warning on line 734 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L731-L734

Added lines #L731 - L734 were not covered by tests
else:
inputname = job_to_pseudo_filename[parent_job_name]

Check warning on line 736 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L736

Added line #L736 was not covered by tests

deps.append(
{
"task": job_to_task[parent_job_name],
"inputname": job_to_pseudo_filename[parent_job_name],
"available": False,
"inputname": inputname,
}
)
if not missing_deps:
f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else pseudo_filename

Check warning on line 745 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L745

Added line #L745 was not covered by tests
work.dependency_map.append(
{"name": pseudo_filename, "order_id": order_id, "dependencies": deps}
{
"name": f_name,
"order_id": order_id,
"dependencies": deps,
}
)
else:
jobs_with_dependency_issues[gwjob.name] = {"work": work, "order_id": order_id}
jobs_with_dependency_issues[gwjob.name] = {

Check warning on line 754 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L754

Added line #L754 was not covered by tests
"work": work,
"order_id": order_id,
"label": job_label,
}

# If there were any issues figuring out dependencies through earlier loop
if jobs_with_dependency_issues:
Expand All @@ -729,22 +765,37 @@
deps = []
work = work_item["work"]
order_id = work_item["order_id"]
job_label = work_item["label"]

Check warning on line 768 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L768

Added line #L768 was not covered by tests

for parent_job_name in generic_workflow.predecessors(job_name):
if parent_job_name not in job_to_task:
_LOG.debug("job_to_task.keys() = %s", job_to_task.keys())
raise RuntimeError(
"Could not recover from dependency issues ({job_name} missing {parent_job_name})."
)
if enable_job_name_map:
parent_job = generic_workflow.get_job(parent_job_name)
parent_job_label = parent_job.label
parent_order_id = job_name_to_order_id_map[parent_job_label][parent_job_name]
inputname = f"{parent_job_label}:orderIdMap_{parent_order_id}"

Check warning on line 780 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L777-L780

Added lines #L777 - L780 were not covered by tests
else:
inputname = job_to_pseudo_filename[parent_job_name]

Check warning on line 782 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L782

Added line #L782 was not covered by tests

deps.append(
{
"task": job_to_task[parent_job_name],
"inputname": job_to_pseudo_filename[parent_job_name],
"available": False,
"inputname": inputname,
}
)
pseudo_filename = job_to_pseudo_filename[job_name]
work.dependency_map.append({"name": pseudo_filename, "order_id": order_id, "dependencies": deps})

work.dependency_map.append(

Check warning on line 791 in python/lsst/ctrl/bps/panda/utils.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/panda/utils.py#L791

Added line #L791 was not covered by tests
{
"name": f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else pseudo_filename,
"order_id": order_id,
"dependencies": deps,
}
)

_LOG.info("Successfully recovered.")

for task_name in name_works:
Expand All @@ -754,7 +805,7 @@
# 2) check to avoid duplicated items.
work.dependency_map = work.dependency_map

if enable_event_service:
if enable_job_name_map:
for label_name in order_id_map:
for order_id in order_id_map[label_name]:
job_name = order_id_map[label_name][order_id]
Expand Down
Loading