Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-52283.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix the order_id for EventService (EventService requires the order_id being continuous. When a big task is splitted to multiple tasks, the order_id was not continuous)
42 changes: 17 additions & 25 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import idds.common.utils as idds_utils
import pandaclient.idds_api
from idds.doma.workflowv2.domapandawork import DomaPanDAWork
from idds.doma.workflowv2.domatree import DomaTree
from idds.workflowv2.workflow import AndCondition
from idds.workflowv2.workflow import Workflow as IDDS_client_workflow

Expand Down Expand Up @@ -636,27 +635,18 @@ def add_idds_work(config, generic_workflow, idds_workflow):
name_works = {}
order_id_map = {}
job_name_to_order_id_map = {}
doma_tree = None
order_id_map_file = None
if enable_event_service:
enable_event_service = enable_event_service.split(",")
enable_event_service = [i.strip() for i in enable_event_service]
if enable_job_name_map:
doma_tree = DomaTree(name=generic_workflow.name)
_, order_id_map_filename = config.search(
"orderIdMapFilename", opt={"default": PANDA_DEFAULT_ORDER_ID_MAP_FILE}
)
order_id_map_file = os.path.join(submit_path, order_id_map_filename)
order_id_map = doma_tree.order_jobs_from_generic_workflow(generic_workflow, order_id_map_file)
order_id_map_files = {"orderIdMapFilename": order_id_map_file}
files_to_pre_stage.update(order_id_map_files)

# job name to order id map
job_name_to_order_id_map = {
label: {job_name: order_id for order_id, job_name in orders.items()}
for label, orders in order_id_map.items()
}

# To avoid dying due to optimizing number of times through workflow,
# catch dependency issues to loop through again later.
jobs_with_dependency_issues = {}
Expand All @@ -672,12 +662,18 @@ def add_idds_work(config, generic_workflow, idds_workflow):
# Assume jobs with same label share config values
for job_label in generic_workflow.labels:
_LOG.debug("job_label = %s", job_label)

if enable_job_name_map:
order_id_map[job_label] = {}
job_name_to_order_id_map[job_label] = {}

# Add each job with a particular label to a corresponding PanDA task
# A PanDA task has a limit on number of jobs, so break into multiple
# PanDA tasks if needed.
job_count = 0 # Number of jobs in idds task used for task chunking
task_chunk = 1 # Task chunk number within job label used for unique name
work = None
order_id = -1

# Instead of changing code to make chunks up front and round-robin
# assign jobs to chunks, for now keeping chunk creation in loop
Expand All @@ -694,16 +690,18 @@ def add_idds_work(config, generic_workflow, idds_workflow):
max_jobs_per_task_this_label,
)
for gwjob in jobs_by_label:
order_id += 1
pseudo_filename = _make_pseudo_filename(config, gwjob)
job_to_pseudo_filename[gwjob.name] = pseudo_filename
if enable_job_name_map:
order_id_map[job_label][str(order_id)] = pseudo_filename
job_name_to_order_id_map[job_label][gwjob.name] = str(order_id)

job_count += 1
if job_count > max_jobs_per_task_this_label:
job_count = 1
task_chunk += 1

order_id = job_count
job_order_id = gwjob.attrs.get("order_id", None)
if job_order_id is not None:
order_id = job_order_id

if job_count == 1:
# Create new PanDA task object
task_count += 1
Expand Down Expand Up @@ -731,9 +729,6 @@ def add_idds_work(config, generic_workflow, idds_workflow):
if generic_workflow.out_degree(gwjob.name) == 0:
dag_sink_work.append(work)

pseudo_filename = _make_pseudo_filename(config, gwjob)
job_to_pseudo_filename[gwjob.name] = pseudo_filename

if enable_qnode_map:
job_name_PH = "PH:" + gwjob.name
job_to_pseudo_filename[gwjob.name] = job_name_PH
Expand Down Expand Up @@ -829,15 +824,12 @@ def add_idds_work(config, generic_workflow, idds_workflow):
# trigger the setter function which will validate the dependency_map:
# 1) check the name length to avoid the the name too long,
# 2) check to avoid duplicated items.
work.dependency_map = work.dependency_map
sorted_dep_map = sorted(work.dependency_map, key=lambda x: x["order_id"])
work.dependency_map = sorted_dep_map

if enable_job_name_map:
for label_name in order_id_map:
for order_id in order_id_map[label_name]:
job_name = order_id_map[label_name][order_id]
if job_name in job_to_pseudo_filename:
order_id_map[label_name][order_id] = job_to_pseudo_filename[job_name]
doma_tree.save_order_id_map(order_id_map, order_id_map_file)
with open(order_id_map_file, "w") as f:
json.dump(order_id_map, f)

return files_to_pre_stage, dag_sink_work, task_count

Expand Down
Loading