Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-52585.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add parts to set task dependency
11 changes: 9 additions & 2 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def add_idds_work(config, generic_workflow, idds_workflow):
remote_filename=remote_archive_filename,
qnode_map_filename=qnode_map_filename,
)
work.dependency_tasks = []
name_works[work.task_name] = work
files_to_pre_stage.update(files)
idds_workflow.add_work(work)
Expand Down Expand Up @@ -751,12 +752,15 @@ def add_idds_work(config, generic_workflow, idds_workflow):
else:
inputname = job_to_pseudo_filename[parent_job_name]

parent_task_name = job_to_task[parent_job_name]
deps.append(
{
"task": job_to_task[parent_job_name],
"task": parent_task_name,
"inputname": inputname,
}
)
if parent_task_name not in work.dependency_tasks:
work.dependency_tasks.append(parent_task_name)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like some of the iDDS implementation details (deps structure as well as appending to the dependency_tasks) could be put in an iDDS client function so that multiple lines of code don't have to be repeated in ctrl_bps_panda as well as generically better. Since this is important to get Rubin running better, I'm not asking for this to be done with this ticket, but after seeing how these changes work in production perhaps create a dependency-related function (that handles both a given update name for the deps as well as saving the parent in the dependency_tasks) in the iDDS client and update ctrl_bps_panda then.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest iDDS client, I already have such functions (It will check dependency_task. If it's not set, the client will generate it). I adapt it in the ctrl_bps_panda here because I found that this part scans all jobs and want to avoid looping the dependency_map again (to find the task dependency, I need to go through all jobs). For a small task, scan the dependency_map is ok. For recent big workflows, with 100 tasks and with many 1M dependencies for 10K jobs in one task, the scanning is expensive.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, a function that creates it after entire workflow is created is not what I mean. I meant something like
work.add_dependency(task_name, parent_task_name, inputname) that would make the dependency_map and dependency_tasks entries. It shouldn't be any more expensive than this new ctrl_bps_panda code and the internal details can be hidden from ctrl_bps_panda code.

Speaking of speed, have you tried a set for dependency_tasks? if parent_task_name not in work.dependency_tasks:

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. I missunderstood. I will add these functions in the idds client, then we can optimize ctrl_bps_panda. Thanks.

if not missing_deps:
j_name = job_to_pseudo_filename[gwjob.name]
f_name = f"{job_label}:orderIdMap_{order_id}" if enable_job_name_map else j_name
Expand Down Expand Up @@ -802,12 +806,15 @@ def add_idds_work(config, generic_workflow, idds_workflow):
else:
inputname = job_to_pseudo_filename[parent_job_name]

parent_task_name = job_to_task[parent_job_name]
deps.append(
{
"task": job_to_task[parent_job_name],
"task": parent_task_name,
"inputname": inputname,
}
)
if parent_task_name not in work.dependency_tasks:
work.dependency_tasks.append(parent_task_name)

work.dependency_map.append(
{
Expand Down
Loading