From fa4cc0481d9d204ad658245c141024bf4a723d8e Mon Sep 17 00:00:00 2001 From: George Marshall Date: Wed, 15 Apr 2026 16:31:15 -0700 Subject: [PATCH 1/8] add merge in channel rules --- workflow/rules/dsp_pars_geds.smk | 63 ++++---- workflow/rules/merge_in_channel.smk | 105 +++++++++++++ workflow/rules/psp_pars_geds.smk | 140 ++++++++++++++++++ .../scripts/flow/merge_channels.py | 15 +- .../scripts/flow/merge_in_channel.py | 94 ++++++++++++ .../src/legenddataflow/scripts/flow/utils.py | 13 ++ 6 files changed, 388 insertions(+), 42 deletions(-) create mode 100644 workflow/rules/merge_in_channel.smk create mode 100644 workflow/src/legenddataflow/scripts/flow/merge_in_channel.py create mode 100644 workflow/src/legenddataflow/scripts/flow/utils.py diff --git a/workflow/rules/dsp_pars_geds.smk b/workflow/rules/dsp_pars_geds.smk index 629358d..71b6f5a 100644 --- a/workflow/rules/dsp_pars_geds.smk +++ b/workflow/rules/dsp_pars_geds.smk @@ -2,6 +2,8 @@ Snakemake rules for building dsp pars for HPGes, before running build_dsp() - extraction of pole zero constant(s) for each channel from cal data - extraction of energy filter parameters and charge trapping correction for each channel from cal data + +Outputs should be dsp_pars, dsp_plots, dsp_pars_lh5 and dsp_objects, to be fed into merge rules """ from legenddataflow.methods import ParsKeyResolve @@ -19,7 +21,6 @@ from legenddataflow.methods.paths import ( ) from legenddataflowscripts.workflow import execenv_pyexe - rule build_pars_dsp_pz_geds: input: files=( @@ -32,8 +33,8 @@ rule build_pars_dsp_pz_geds: ), pulser=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"), output: - decay_const=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")), + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")), log: get_pattern_log_channel(config, "par_dsp_decay_constant", time), group: @@ -78,8 +79,8 @@ rule build_pars_dsp_pz_geds: "--config-file {params.config_file} " "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " - "--plot-path {output.plots} " - "--output-file {output.decay_const} " + "--plot-path {output.dsp_plots} " + "--output-file {output.dsp_pars} " "--pulser-file {input.pulser} " "--raw-files {input.files} " "--pz-files {input.pz_files}" @@ -91,7 +92,7 @@ rule build_pars_evtsel_geds: filelist_path(config), "all-{experiment}-{period}-{run}-cal-raw.filelist" ), pulser_file=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"), - database=rules.build_pars_dsp_pz_geds.output.decay_const, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, raw_cal_curve=get_blinding_curve_file, output: peak_file=temp( @@ -157,13 +158,13 @@ rule build_pars_dsp_nopt_geds: files=os.path.join( filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" ), - database=rules.build_pars_dsp_pz_geds.output.decay_const, - inplots=rules.build_pars_dsp_pz_geds.output.plots, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars_nopt=temp( get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization") ), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")), log: get_pattern_log_channel(config, "par_dsp_noise_optimization", time), group: @@ -210,7 +211,7 @@ rule build_pars_dsp_nopt_geds: "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " "--inplots {input.inplots} " - "--plot-path {output.plots} " + "--plot-path {output.dsp_plots} " "--dsp-pars {output.dsp_pars_nopt} " "--raw-filelist {input.files}" @@ -222,12 +223,12 @@ rule build_pars_dsp_dplms_geds: filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" ), peak_file=rules.build_pars_evtsel_geds.output.peak_file, - database=rules.build_pars_dsp_nopt_geds.output.dsp_pars_nopt, - inplots=rules.build_pars_dsp_nopt_geds.output.plots, + dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), - lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), + dsp_pars_lh5=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms", extension="lh5")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), log: get_pattern_log_channel(config, "pars_dsp_dplms", time), group: @@ -270,7 +271,7 @@ rule build_pars_dsp_dplms_geds: shell: execenv_pyexe(config, "par-geds-dsp-dplms") + "--peak-file {input.peak_file} " "--fft-raw-filelist {input.fft_files} " - "--database {input.database} " + "--database {input.dsp_pars} " "--inplots {input.inplots} " "--log {log} " "--log-config {params.log_config} " @@ -279,22 +280,22 @@ rule build_pars_dsp_dplms_geds: "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " "--dsp-pars {output.dsp_pars} " - "--lh5-path {output.lh5_path} " - "--plot-path {output.plots} " + "--lh5-path {output.dsp_pars_lh5} " + "--plot-path {output.dsp_plots} " # This rule builds the optimal energy filter parameters for the dsp using calibration dsp files rule build_pars_dsp_eopt_geds: input: peak_file=rules.build_pars_evtsel_geds.output.peak_file, - decay_const=rules.build_pars_dsp_dplms_geds.output.dsp_pars, - inplots=rules.build_pars_dsp_dplms_geds.output.plots, + dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_dplms_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")), - qbb_grid=temp( - get_pattern_pars_tmp_channel(config, "dsp", "objects", extension="pkl") + dsp_objects=temp( + get_pattern_pars_tmp_channel(config, "dsp", "eopt_objects", extension="pkl") ), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "eopt")), log: get_pattern_log_channel(config, "pars_dsp_eopt", time), group: @@ -340,9 +341,9 @@ rule build_pars_dsp_eopt_geds: "--raw-table-name {params.raw_table_name} " "--peak-file {input.peak_file} " "--inplots {input.inplots} " - "--decay-const {input.decay_const} " - "--plot-path {output.plots} " - "--qbb-grid-path {output.qbb_grid} " + "--decay-const {input.dsp_pars} " + "--plot-path {output.dsp_plots} " + "--qbb-grid-path {output.dsp_objects} " "--final-dsp-pars {output.dsp_pars}" @@ -400,10 +401,9 @@ rule build_svm_dsp_geds: rule build_pars_dsp_svm_geds: input: - dsp_pars=rules.build_pars_dsp_eopt_geds.output.dsp_pars, - svm_file=rules.build_svm_dsp_geds.output.dsp_pars, + svm_file=rules.build_svm_dsp_geds.output.dsp_pars output: - dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp")), + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "svm")), log: get_pattern_log_channel(config, "pars_dsp_svm", time), group: @@ -415,3 +415,10 @@ rule build_pars_dsp_svm_geds: "--input-file {input.dsp_pars} " "--output-file {output.dsp_pars} " "--svm-file {input.svm_file}" + +build_in_channel_merge_rules([ + build_pars_dsp_eopt_geds, + build_pars_dsp_nopt_geds, + build_pars_dsp_svm_geds, + build_pars_dsp_dplms_geds, +], "dsp") \ No newline at end of file diff --git a/workflow/rules/merge_in_channel.smk b/workflow/rules/merge_in_channel.smk new file mode 100644 index 0000000..891d6af --- /dev/null +++ b/workflow/rules/merge_in_channel.smk @@ -0,0 +1,105 @@ +import inspect + +from legenddataflow.methods import patterns +from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name +from legenddataflow.scripts.flow.build_chanlist import get_plt_chanlist, get_par_chanlist + + +def build_in_channel_merge_rules(rules, tier, output_name=None): + + + input_dsp_pars = [] + input_dsp_pars_lh5 = [] + input_dsp_objects = [] + input_dsp_plots = [] + + name = f"tier" if output_name is None else f"{tier}_{output_name}" + + group_name = f"merge-inchan-{tier}" + if output_name is not None: + group_name += f"_{output_name}" + + for r in [rules]: + if hasattr(r.output, dsp_pars): + input_dsp_pars.append(r.output.dsp_pars) + if hasattr(r.output, dsp_pars_lh5): + input_dsp_pars_lh5.append(r.output.dsp_pars_lh5) + if hasattr(r.output, plots): + input_dsp_plots.append(r.output.dsp_plots) + if hasattr(r.output, dsp_objects): + input_dsp_objects.append(r.output.dsp_objects) + + par_file = temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)), + lh5_file = temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="lh5")), + object_file = + plot_file = + + rule: + input: + input_dsp_plots, + output: + temp(get_pattern_plts_tmp_channel(config, "dsp", name=output_name)), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + \ + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_plts_inchan_{name}") + + rule: + input: + input_dsp_objects, + output: + temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="pkl")), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + \ + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_pars_inchan_{name}_objects") + + if len(input_dsp_pars_lh5) > 0: + rule: + input: + input_dsp_pars_lh5, + output: + temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name + "_tmp" if output_name is not None else "tmp")), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + \ + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_pars_inchan_{name}_db") + + + rule: + input: + in_files=input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars, + in_db=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name + "_tmp" if output_name is not None else "tmp")) if len(input_dsp_pars_lh5) > 0 else [], + plts=temp(get_pattern_plts_tmp_channel(config, "dsp", name=output_name)), + objects=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="pkl")), + output: + out_file=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="lh5")) if len(input_dsp_pars_lh5) > 0 else temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)), + out_db=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)) if len(input_dsp_pars_lh5) > 0 else [], + group: + group_name + run: + shell_string = ( + execenv_pyexe(config, "merge-in-channels") + \ + "--output {output.out_file} " + "--input {input.in_files} " + ) + if len(input_dsp_pars_lh5) > 0: + shell_string += ( + "--in-db {input.in_db} " + "--out-db {output.out_db} " + ) + shell(shell_string) + + set_last_rule_name(workflow, f"build_pars_inchan_{name}") diff --git a/workflow/rules/psp_pars_geds.smk b/workflow/rules/psp_pars_geds.smk index fa790eb..999a71c 100644 --- a/workflow/rules/psp_pars_geds.smk +++ b/workflow/rules/psp_pars_geds.smk @@ -16,6 +16,10 @@ from legenddataflow.methods.patterns import ( from legenddataflow.methods.paths import config_path from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name +# merge just needed rules here, eopt, nopt + + + psp_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): @@ -111,6 +115,74 @@ for key, dataset in part.datasets.items(): psp_rules[key] = [list(workflow.rules)[-1]] + # This rule builds the dplms energy filter for the dsp using fft and cal files + rule build_pars_psp_dplms_geds_fallback: + input: + fft_files=os.path.join( + filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" + ), + peak_file=rules.build_pars_evtsel_geds.output.peak_file, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.plots, + output: + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), + lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), + plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), + log: + get_pattern_log_channel(config, "pars_psp_dplms", time), + group: + "par-psp" + resources: + runtime=300, + params: + config_file=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "dplms_pars", + ), + processing_chain=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "proc_chain", + ), + log_config=lambda wildcards: get_log_config( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + "pars_psp_dplms", + ), + raw_table_name=lambda wildcards: get_table_name( + channelmap_textdb, + config, + "cal", + wildcards.timestamp, + wildcards.channel, + "raw", + ), + configs=config_path(config), + channel="{channel}", + shell: + execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-file {input.peak_file} " + "--fft-raw-filelist {input.fft_files} " + "--database {input.database} " + "--inplots {input.inplots} " + "--log {log} " + "--log-config {params.log_config} " + "--config-file {params.config_file} " + "--channel {params.channel} " + "--processing-chain {params.processing_chain} " + "--raw-table-name {params.raw_table_name} " + "--dsp-pars {output.dsp_pars} " + "--lh5-path {output.lh5_path} " + "--plot-path {output.plots} " + + # Merged energy and a/e supercalibrations to reduce number of rules as they have same inputs/outputs # This rule builds the a/e calibration using the calibration dsp files for the whole partition rule build_par_psp_fallback: @@ -159,6 +231,74 @@ rule_order_list.append(fallback_psp_rule.name) workflow._ruleorder.add(*rule_order_list) # [::-1] +# This rule builds the dplms energy filter for the dsp using fft and cal files +rule build_pars_psp_dplms_geds_fallback: + input: + fft_files=os.path.join( + filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" + ), + peak_file=rules.build_pars_evtsel_geds.output.peak_file, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.plots, + output: + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), + lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), + plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), + log: + get_pattern_log_channel(config, "pars_psp_dplms", time), + group: + "par-psp" + resources: + runtime=300, + params: + config_file=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "dplms_pars", + ), + processing_chain=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "proc_chain", + ), + log_config=lambda wildcards: get_log_config( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + "pars_psp_dplms", + ), + raw_table_name=lambda wildcards: get_table_name( + channelmap_textdb, + config, + "cal", + wildcards.timestamp, + wildcards.channel, + "raw", + ), + configs=config_path(config), + channel="{channel}", + shell: + execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-file {input.peak_file} " + "--fft-raw-filelist {input.fft_files} " + "--database {input.database} " + "--inplots {input.inplots} " + "--log {log} " + "--log-config {params.log_config} " + "--config-file {params.config_file} " + "--channel {params.channel} " + "--processing-chain {params.processing_chain} " + "--raw-table-name {params.raw_table_name} " + "--dsp-pars {output.dsp_pars} " + "--lh5-path {output.lh5_path} " + "--plot-path {output.plots} " + + rule build_svm_psp: input: hyperpars=lambda wildcards: get_input_par_file( diff --git a/workflow/src/legenddataflow/scripts/flow/merge_channels.py b/workflow/src/legenddataflow/scripts/flow/merge_channels.py index e648577..95a5d81 100644 --- a/workflow/src/legenddataflow/scripts/flow/merge_channels.py +++ b/workflow/src/legenddataflow/scripts/flow/merge_channels.py @@ -10,20 +10,7 @@ from lgdo import lh5 from legenddataflow.methods import ChannelProcKey - - -def replace_path(d, old_path, new_path): - if isinstance(d, dict): - for k, v in d.items(): - d[k] = replace_path(v, old_path, new_path) - elif isinstance(d, list): - for i in range(len(d)): - d[i] = replace_path(d[i], old_path, new_path) - elif isinstance(d, str) and old_path in d: - d = d.replace(old_path, new_path) - d = d.replace(new_path, f"$_/{Path(new_path).name}") - return d - +from .utils import replace_path def merge_channels() -> None: argparser = argparse.ArgumentParser() diff --git a/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py new file mode 100644 index 0000000..3d918e9 --- /dev/null +++ b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import argparse +import pickle as pkl +from pathlib import Path + +from dbetto.catalog import Props +from lgdo import lh5 +from lgdo.types import Struct + +from legenddataflow.methods import ChannelProcKey +from .utils import replace_path + + +def replace_path(d, old_path, new_path): + if isinstance(d, dict): + for k, v in d.items(): + d[k] = replace_path(v, old_path, new_path) + elif isinstance(d, list): + for i in range(len(d)): + d[i] = replace_path(d[i], old_path, new_path) + elif isinstance(d, str) and old_path in d: + d = d.replace(old_path, new_path) + d = d.replace(new_path, f"$_/{Path(new_path).name}") + return d + + +def merge_in_channel() -> None: + argparser = argparse.ArgumentParser() + argparser.add_argument( + "--input", help="input file", nargs="*", type=str, required=True + ) + argparser.add_argument("--output", help="output file", type=str, required=True) + argparser.add_argument( + "--in-db", + help="in db file (used for when lh5 files referred to in db)", + type=str, + required=False, + ) + argparser.add_argument( + "--out-db", + help="lh5 file (used for when lh5 files referred to in db)", + type=str, + required=False, + ) + args = argparser.parse_args() + + # change to only have 1 output file for multiple inputs + # don't care about processing step, check if extension matches + + input_files = args.input.infiles if hasattr(args.input, "infiles") else args.input + + file_extension = Path(args.output).suffix + out_file = args.output + + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + + if file_extension in (".json", ".yaml", ".yml"): + Props.write_to(out_file, Props.read_from(input_files)) + + elif file_extension == ".pkl": + out_dict = {} + for infile in input_files: + with Path(infile).open("rb") as r: + indict = pkl.load(r) + out_dict.update(indict) + + with Path(out_file).open("wb") as w: + pkl.dump(out_dict, w, protocol=pkl.HIGHEST_PROTOCOL) + + elif file_extension == ".lh5": + if args.in_db: + db_dict = Props.read_from(args.in_db) + objects = {} + for infile in input_files: + fkey = ChannelProcKey.get_filekey_from_pattern(Path(infile).name) + tb_in = lh5.read(f"{fkey.channel}", infile) + + objects[f"{fkey.processing_step}".split("_", maxsplit=3)[2]] = tb_in + + lh5.write( + Struct(objects), + name=fkey.channel, + lh5_file=out_file, + wo_mode="a", + ) + + if args.out_db: + if args.in_db: + db_dict = replace_path( + db_dict, infile, args.output + ) + + Props.write_to(args.out_db, db_dict) \ No newline at end of file diff --git a/workflow/src/legenddataflow/scripts/flow/utils.py b/workflow/src/legenddataflow/scripts/flow/utils.py new file mode 100644 index 0000000..af33844 --- /dev/null +++ b/workflow/src/legenddataflow/scripts/flow/utils.py @@ -0,0 +1,13 @@ +from pathlib import Path + +def replace_path(d, old_path, new_path): + if isinstance(d, dict): + for k, v in d.items(): + d[k] = replace_path(v, old_path, new_path) + elif isinstance(d, list): + for i in range(len(d)): + d[i] = replace_path(d[i], old_path, new_path) + elif isinstance(d, str) and old_path in d: + d = d.replace(old_path, new_path) + d = d.replace(new_path, f"$_/{Path(new_path).name}") + return d \ No newline at end of file From 310647fc1bd6682339a13ddbbd96e0a0a023803e Mon Sep 17 00:00:00 2001 From: ggmarshall Date: Thu, 23 Apr 2026 06:31:18 +0200 Subject: [PATCH 2/8] add in channel merging methods, add dplms for psp --- pyproject.toml | 2 + workflow/Snakefile | 1 + workflow/profiles/lngs/config.yaml | 2 +- workflow/rules/common.smk | 1 - workflow/rules/dsp_pars_geds.smk | 18 +- workflow/rules/merge_in_channel.smk | 39 +- workflow/rules/pht_pars_geds.smk | 11 +- workflow/rules/psp.smk | 2 +- workflow/rules/psp_pars_geds.smk | 264 ++++++++++---- .../scripts/flow/merge_channels.py | 2 + .../scripts/flow/merge_in_channel.py | 22 +- .../src/legenddataflow/scripts/flow/utils.py | 5 +- .../scripts/par/geds/pht/util.py | 19 +- .../scripts/par/geds/psp/dplms.py | 336 ++++++++++++++++++ 14 files changed, 587 insertions(+), 137 deletions(-) create mode 100644 workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py diff --git a/pyproject.toml b/pyproject.toml index 4a658d7..90f0a02 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,6 +96,7 @@ docs = [ [project.scripts] create-chankeylist = "legenddataflow.scripts.create_chankeylist:create_chankeylist" merge-channels = "legenddataflow.scripts.flow.merge_channels:merge_channels" +merge-in-channels = "legenddataflow.scripts.flow.merge_in_channel:merge_in_channel" build-tier-evt = "legenddataflow.scripts.tier.evt:build_tier_evt" build-tier-raw-blind = "legenddataflow.scripts.tier.raw_blind:build_tier_raw_blind" build-tier-raw-fcio = "legenddataflow.scripts.tier.raw_fcio:build_tier_raw_fcio" @@ -108,6 +109,7 @@ par-geds-pht-fast = "legenddataflow.scripts.par.geds.pht.fast:par_geds_pht par-geds-pht-qc-phy = "legenddataflow.scripts.par.geds.pht.qc_phy:par_geds_pht_qc_phy" par-geds-pht-qc = "legenddataflow.scripts.par.geds.pht.qc:par_geds_pht_qc" par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_psp_average" +par-geds-psp-dplms = "legenddataflow.scripts.par.geds.psp.dplms:par_geds_psp_dplms" par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal" par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck" par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser" diff --git a/workflow/Snakefile b/workflow/Snakefile index 1589848..ff26cc1 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -73,6 +73,7 @@ wildcard_constraints: include: "rules/channel_merge.smk" +include: "rules/merge_in_channel.smk" include: "rules/common.smk" include: "rules/main.smk" include: "rules/tcm.smk" diff --git a/workflow/profiles/lngs/config.yaml b/workflow/profiles/lngs/config.yaml index 689c134..961f818 100644 --- a/workflow/profiles/lngs/config.yaml +++ b/workflow/profiles/lngs/config.yaml @@ -1,5 +1,5 @@ cores: 90 -restart-times: 2 +restart-times: 0 resources: - mem_swap=3500 configfile: dataflow-config.yaml diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 01227f3..287e335 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -149,7 +149,6 @@ def get_table_name(metadata, config, datatype, timestamp, detector, tier): return config.table_format[tier].format(ch=chmap[detector[0]].daq.rawid) return config.table_format[tier].format(ch=chmap[detector].daq.rawid) - def get_all_channels(channelmap, timestamp, datatype): if isinstance(channelmap, (str, Path)): channelmap = TextDB(channelmap, lazy=True) diff --git a/workflow/rules/dsp_pars_geds.smk b/workflow/rules/dsp_pars_geds.smk index 71b6f5a..6965702 100644 --- a/workflow/rules/dsp_pars_geds.smk +++ b/workflow/rules/dsp_pars_geds.smk @@ -161,7 +161,7 @@ rule build_pars_dsp_nopt_geds: database=rules.build_pars_dsp_pz_geds.output.dsp_pars, inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: - dsp_pars_nopt=temp( + dsp_pars=temp( get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization") ), dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")), @@ -212,7 +212,7 @@ rule build_pars_dsp_nopt_geds: "--raw-table-name {params.raw_table_name} " "--inplots {input.inplots} " "--plot-path {output.dsp_plots} " - "--dsp-pars {output.dsp_pars_nopt} " + "--dsp-pars {output.dsp_pars} " "--raw-filelist {input.files}" @@ -289,7 +289,7 @@ rule build_pars_dsp_eopt_geds: input: peak_file=rules.build_pars_evtsel_geds.output.peak_file, dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars, - inplots=rules.build_pars_dsp_dplms_geds.output.dsp_plots, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")), dsp_objects=temp( @@ -412,13 +412,13 @@ rule build_pars_dsp_svm_geds: runtime=300, shell: execenv_pyexe(config, "par-geds-dsp-svm") + "--log {log} " - "--input-file {input.dsp_pars} " + #"--input-file {input.dsp_pars} " "--output-file {output.dsp_pars} " "--svm-file {input.svm_file}" build_in_channel_merge_rules([ - build_pars_dsp_eopt_geds, - build_pars_dsp_nopt_geds, - build_pars_dsp_svm_geds, - build_pars_dsp_dplms_geds, -], "dsp") \ No newline at end of file + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, + rules.build_pars_dsp_svm_geds, + rules.build_pars_dsp_dplms_geds, +], "dsp") diff --git a/workflow/rules/merge_in_channel.smk b/workflow/rules/merge_in_channel.smk index 891d6af..0fccf84 100644 --- a/workflow/rules/merge_in_channel.smk +++ b/workflow/rules/merge_in_channel.smk @@ -4,7 +4,6 @@ from legenddataflow.methods import patterns from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name from legenddataflow.scripts.flow.build_chanlist import get_plt_chanlist, get_par_chanlist - def build_in_channel_merge_rules(rules, tier, output_name=None): @@ -15,30 +14,25 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): name = f"tier" if output_name is None else f"{tier}_{output_name}" - group_name = f"merge-inchan-{tier}" + group_name = f"merge-inchan-{tier}" if output_name is not None: group_name += f"_{output_name}" - for r in [rules]: - if hasattr(r.output, dsp_pars): + for r in rules: + if hasattr(r.output, f"{tier}_pars"): input_dsp_pars.append(r.output.dsp_pars) - if hasattr(r.output, dsp_pars_lh5): + if hasattr(r.output, f"{tier}_pars_lh5"): input_dsp_pars_lh5.append(r.output.dsp_pars_lh5) - if hasattr(r.output, plots): + if hasattr(r.output, f"{tier}_plots"): input_dsp_plots.append(r.output.dsp_plots) - if hasattr(r.output, dsp_objects): + if hasattr(r.output, f"{tier}_objects"): input_dsp_objects.append(r.output.dsp_objects) - par_file = temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)), - lh5_file = temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="lh5")), - object_file = - plot_file = - rule: input: input_dsp_plots, output: - temp(get_pattern_plts_tmp_channel(config, "dsp", name=output_name)), + temp(get_pattern_plts_tmp_channel(config, tier, name=output_name)), group: group_name shell: @@ -48,11 +42,14 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): set_last_rule_name(workflow, f"build_plts_inchan_{name}") + object_output_name = "objects" + if output_name is not None: + object_output_name += f"_{output_name}" rule: input: input_dsp_objects, output: - temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="pkl")), + temp(get_pattern_pars_tmp_channel(config, tier, name=object_output_name, extension="pkl")), group: group_name shell: @@ -65,9 +62,9 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): if len(input_dsp_pars_lh5) > 0: rule: input: - input_dsp_pars_lh5, + input_dsp_pars, output: - temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name + "_tmp" if output_name is not None else "tmp")), + temp(get_pattern_pars_tmp_channel(config, tier, name=output_name + "_tmp" if output_name is not None else "tmp")), group: group_name shell: @@ -81,12 +78,12 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): rule: input: in_files=input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars, - in_db=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name + "_tmp" if output_name is not None else "tmp")) if len(input_dsp_pars_lh5) > 0 else [], - plts=temp(get_pattern_plts_tmp_channel(config, "dsp", name=output_name)), - objects=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="pkl")), + in_db=get_pattern_pars_tmp_channel(config, tier, name=output_name + "_tmp" if output_name is not None else "tmp") if len(input_dsp_pars_lh5) > 0 else [], + plts=get_pattern_plts_tmp_channel(config, tier, name=output_name), + objects=get_pattern_pars_tmp_channel(config, tier, name=object_output_name, extension="pkl"), output: - out_file=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name, extension="lh5")) if len(input_dsp_pars_lh5) > 0 else temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)), - out_db=temp(get_pattern_pars_tmp_channel(config, "dsp", name=output_name)) if len(input_dsp_pars_lh5) > 0 else [], + out_file=temp(get_pattern_pars_tmp_channel(config, tier, name=output_name, extension="lh5")) if len(input_dsp_pars_lh5) > 0 else temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)), + out_db=temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) if len(input_dsp_pars_lh5) > 0 else [], group: group_name run: diff --git a/workflow/rules/pht_pars_geds.smk b/workflow/rules/pht_pars_geds.smk index f8c65cf..3609f2e 100644 --- a/workflow/rules/pht_pars_geds.smk +++ b/workflow/rules/pht_pars_geds.smk @@ -70,12 +70,7 @@ for key, dataset in part.datasets.items(): overwrite_files=get_input_par_file( config, tier="pht", - timestamp=part.get_timestamp( - pht_par_catalog, - partition, - key, - tier="pht", - ), + timestamp=tstamp, ), output: hit_pars=[ @@ -117,9 +112,7 @@ for key, dataset in part.datasets.items(): params: datatype="cal", channel="{channel}" if key == "default" else key, - timestamp=part.get_timestamp( - pht_par_catalog, partition, key, tier="pht" - ), + timestamp=tstamp, dsp_table_name=dsp_table_name, configs=config_path(config), meta=metadata_path(config), diff --git a/workflow/rules/psp.smk b/workflow/rules/psp.smk index 3ee3377..4f314f9 100644 --- a/workflow/rules/psp.smk +++ b/workflow/rules/psp.smk @@ -24,7 +24,7 @@ psp_par_catalog = ParsKeyResolve.get_par_catalog( ignore_keys_file=Path(det_status) / "ignored_daq_cycles.yaml", ) -build_merge_rules("psp", lh5_merge=True, lh5_tier="dsp") +build_merge_rules("psp", lh5_merge=True, lh5_tier="psp") rule build_psp: diff --git a/workflow/rules/psp_pars_geds.smk b/workflow/rules/psp_pars_geds.smk index 999a71c..38ef790 100644 --- a/workflow/rules/psp_pars_geds.smk +++ b/workflow/rules/psp_pars_geds.smk @@ -17,13 +17,40 @@ from legenddataflow.methods.paths import config_path from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name # merge just needed rules here, eopt, nopt - - +build_in_channel_merge_rules([ + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, +], "dsp", "psp_input") psp_rules = {} +psp_dplms_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): + fft_files = part.get_filelists(partition, key, "raw", datatype="fft") + tstamp = part.get_timestamp(psp_par_catalog, partition, key, tier="psp") + wildcard_constrain = part.get_wildcard_constraints(partition, key) + + if key == "default": + def raw_table_name(wildcards): + return get_table_name( + channelmap_textdb, + config, + "cal", + tstamp, + wildcards.channel, + "raw", + ) + + else: + raw_table_name = get_table_name( + channelmap_textdb, + config, + "cal", + tstamp, + key, + "raw", + ) rule: input: dsp_pars=part.get_par_files( @@ -31,18 +58,18 @@ for key, dataset in part.datasets.items(): partition, key, tier="dsp", - name="eopt", + name="psp_input", ), dsp_objs=part.get_par_files( dsp_par_catalog, partition, key, tier="dsp", - name="objects", + name="objects_psp_input", extension="pkl", ), dsp_plots=part.get_plt_files( - dsp_par_catalog, partition, key, tier="dsp" + dsp_par_catalog, partition, key, tier="dsp", name = "psp_input", ), output: psp_pars=temp( @@ -70,6 +97,7 @@ for key, dataset in part.datasets.items(): partition, key, tier="psp", + name="eopt" ) ), log: @@ -90,9 +118,7 @@ for key, dataset in part.datasets.items(): params: datatype="cal", channel="{channel}" if key == "default" else key, - timestamp=part.get_timestamp( - psp_par_catalog, partition, key, tier="psp" - ), + timestamp=tstamp, configs=ro(config_path(config)), shell: execenv_pyexe(config, "par-geds-psp-average") + "--log {log} " @@ -114,62 +140,127 @@ for key, dataset in part.datasets.items(): else: psp_rules[key] = [list(workflow.rules)[-1]] - - # This rule builds the dplms energy filter for the dsp using fft and cal files - rule build_pars_psp_dplms_geds_fallback: - input: - fft_files=os.path.join( - filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" - ), - peak_file=rules.build_pars_evtsel_geds.output.peak_file, - database=rules.build_pars_dsp_pz_geds.output.dsp_pars, - inplots=rules.build_pars_dsp_pz_geds.output.plots, - output: - dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), - lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), - log: - get_pattern_log_channel(config, "pars_psp_dplms", time), - group: - "par-psp" - resources: - runtime=300, - params: - config_file=lambda wildcards: get_config_files( + if key == "default": + def config_file(wildcards): + return get_config_files( dataflow_configs_texdb, - wildcards.timestamp, + tstamp, "cal", wildcards.channel, "pars_psp_dplms", "dplms_pars", - ), - processing_chain=lambda wildcards: get_config_files( + ) + def processing_chain(wildcards): + return get_config_files( dataflow_configs_texdb, - wildcards.timestamp, + tstamp, "cal", wildcards.channel, "pars_psp_dplms", "proc_chain", - ), - log_config=lambda wildcards: get_log_config( + ) + else: + config_file=get_config_files( dataflow_configs_texdb, - wildcards.timestamp, + tstamp, "cal", + key, "pars_psp_dplms", + "dplms_pars", ), - raw_table_name=lambda wildcards: get_table_name( - channelmap_textdb, - config, + processing_chain=get_config_files( + dataflow_configs_texdb, + tstamp, + "cal", + key, + "pars_psp_dplms", + "proc_chain", + ) + + + # This rule builds the dplms energy filter for the dsp using fft and cal files + rule: + input: + fft_files=fft_files, + peak_file=part.get_par_files( + dsp_par_catalog, + partition, + key, + tier="dsp", + name="peaks", + extension="lh5" + ), + database=part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + name="eopt", + ), + inplots=part.get_plt_files( + psp_par_catalog, + partition, + key, + tier="psp", + name = "eopt" + ), + output: + psp_pars=temp( + part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + name="dplms", + ) + ), + psp_pars_lh5=temp( + part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + extension="lh5", + ) + ), + psp_plots=temp( + part.get_plt_files( + psp_par_catalog, + partition, + key, + tier="psp", + ) + ), + log: + part.get_log_file( + psp_par_catalog, + partition, + key, + "psp", + time, + name="pars_psp_dplms", + ) + wildcard_constraints: + channel=wildcard_constrain, + group: + "par-psp" + resources: + runtime=300, + params: + config_file=config_file, + processing_chain= processing_chain, + log_config= get_log_config( + dataflow_configs_texdb, + tstamp, "cal", - wildcards.timestamp, - wildcards.channel, - "raw", + "pars_psp_dplms", ), + raw_table_name=raw_table_name, configs=config_path(config), - channel="{channel}", + channel="{channel}" if key == "default" else key, shell: - execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-file {input.peak_file} " - "--fft-raw-filelist {input.fft_files} " + execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-files {input.peak_file} " + "--fft-raw-filelists {input.fft_files} " "--database {input.database} " "--inplots {input.inplots} " "--log {log} " @@ -178,24 +269,31 @@ for key, dataset in part.datasets.items(): "--channel {params.channel} " "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " - "--dsp-pars {output.dsp_pars} " - "--lh5-path {output.lh5_path} " - "--plot-path {output.plots} " + "--dsp-pars {output.psp_pars} " + "--lh5-path {output.psp_pars_lh5} " + "--plot-path {output.psp_plots} " + + set_last_rule_name(workflow, f"{key}-{partition}-build_par_psp_dplms") + + if key in psp_dplms_rules: + psp_dplms_rules[key].append(list(workflow.rules)[-1]) + else: + psp_dplms_rules[key] = [list(workflow.rules)[-1]] # Merged energy and a/e supercalibrations to reduce number of rules as they have same inputs/outputs # This rule builds the a/e calibration using the calibration dsp files for the whole partition rule build_par_psp_fallback: input: - dsp_pars=get_pattern_pars_tmp_channel(config, "dsp", "eopt"), - dsp_objs=get_pattern_pars_tmp_channel(config, "dsp", "objects", extension="pkl"), - dsp_plots=get_pattern_plts_tmp_channel(config, "dsp"), + dsp_pars=get_pattern_pars_tmp_channel(config, "dsp", "psp_input"), + dsp_objs=get_pattern_pars_tmp_channel(config, "dsp", "objects_psp_input", extension="pkl"), + dsp_plots=get_pattern_plts_tmp_channel(config, "dsp", "psp_input"), output: psp_pars=temp(get_pattern_pars_tmp_channel(config, "psp", "eopt")), psp_objs=temp( get_pattern_pars_tmp_channel(config, "psp", "objects", extension="pkl") ), - psp_plots=temp(get_pattern_plts_tmp_channel(config, "psp")), + psp_plots=temp(get_pattern_plts_tmp_channel(config, "psp", "eopt")), log: get_pattern_log_channel(config, "pars_psp", time), group: @@ -238,12 +336,12 @@ rule build_pars_psp_dplms_geds_fallback: filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" ), peak_file=rules.build_pars_evtsel_geds.output.peak_file, - database=rules.build_pars_dsp_pz_geds.output.dsp_pars, - inplots=rules.build_pars_dsp_pz_geds.output.plots, + database=rules.build_par_psp_fallback.output.psp_pars, + inplots=rules.build_par_psp_fallback.output.psp_plots, output: - dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), - lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "psp", "dplms")), + lh5_path=temp(get_pattern_pars_tmp_channel(config, "psp", extension="lh5")), + plots=temp(get_pattern_plts_tmp_channel(config, "psp")), log: get_pattern_log_channel(config, "pars_psp_dplms", time), group: @@ -298,19 +396,50 @@ rule build_pars_psp_dplms_geds_fallback: "--lh5-path {output.lh5_path} " "--plot-path {output.plots} " +fallback_psp_dplms_rule = list(workflow.rules)[-1] +rule_order_list = [] +ordered = OrderedDict(psp_dplms_rules) +ordered.move_to_end("default") +for key, items in ordered.items(): + rule_order_list += [item.name for item in items] +rule_order_list.append(fallback_psp_dplms_rule.name) +workflow._ruleorder.add(*rule_order_list) # [::-1] rule build_svm_psp: input: hyperpars=lambda wildcards: get_input_par_file( - config=config, wildcards=wildcards, tier="psp", name="svm_hyperpars" + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, ), - train_data=lambda wildcards: str( - get_input_par_file( - config=config, wildcards=wildcards, tier="psp", name="svm_hyperpars" + train_data=lambda wildcards: ( + str( + get_input_par_file( + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, + overwrite=False, + ) + ).replace("hyperpars.yaml", "train.lh5") + if not isinstance( + get_input_par_file( + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, + overwrite=False, + ), + list, ) - ).replace("hyperpars.yaml", "train.lh5"), + else [] + ), output: - dsp_pars=get_pattern_pars(config, "psp", "svm", extension="pkl"), + psp_pars=get_pattern_pars(config, "psp", "svm", extension="pkl"), log: str(get_pattern_log(config, "pars_psp_svm", time)).replace("{datatype}", "cal"), group: @@ -325,15 +454,12 @@ rule build_svm_psp: execenv_pyexe(config, "par-geds-dsp-svm-build") + "--log {log} " "--train-data {input.train_data} " "--train-hyperpars {input.hyperpars} " - "--output-file {output.dsp_pars} " - "--timestamp {params.timestamp} " - "--datatype {params.datatype} " - "--configs {params.configs} " + "--output-file {output.psp_pars} " rule build_pars_psp_svm: input: - dsp_pars=get_pattern_pars_tmp_channel(config, "psp_eopt"), + dsp_pars=get_pattern_pars_tmp_channel(config, "psp_dplms"), svm_model=get_pattern_pars(config, "psp", "svm", extension="pkl"), output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "psp")), diff --git a/workflow/src/legenddataflow/scripts/flow/merge_channels.py b/workflow/src/legenddataflow/scripts/flow/merge_channels.py index 95a5d81..7458690 100644 --- a/workflow/src/legenddataflow/scripts/flow/merge_channels.py +++ b/workflow/src/legenddataflow/scripts/flow/merge_channels.py @@ -10,8 +10,10 @@ from lgdo import lh5 from legenddataflow.methods import ChannelProcKey + from .utils import replace_path + def merge_channels() -> None: argparser = argparse.ArgumentParser() argparser.add_argument( diff --git a/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py index 3d918e9..7111ce8 100644 --- a/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py +++ b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py @@ -9,20 +9,8 @@ from lgdo.types import Struct from legenddataflow.methods import ChannelProcKey -from .utils import replace_path - -def replace_path(d, old_path, new_path): - if isinstance(d, dict): - for k, v in d.items(): - d[k] = replace_path(v, old_path, new_path) - elif isinstance(d, list): - for i in range(len(d)): - d[i] = replace_path(d[i], old_path, new_path) - elif isinstance(d, str) and old_path in d: - d = d.replace(old_path, new_path) - d = d.replace(new_path, f"$_/{Path(new_path).name}") - return d +from .utils import replace_path def merge_in_channel() -> None: @@ -87,8 +75,6 @@ def merge_in_channel() -> None: if args.out_db: if args.in_db: - db_dict = replace_path( - db_dict, infile, args.output - ) - - Props.write_to(args.out_db, db_dict) \ No newline at end of file + db_dict = replace_path(db_dict, infile, args.output) + + Props.write_to(args.out_db, db_dict) diff --git a/workflow/src/legenddataflow/scripts/flow/utils.py b/workflow/src/legenddataflow/scripts/flow/utils.py index af33844..3699137 100644 --- a/workflow/src/legenddataflow/scripts/flow/utils.py +++ b/workflow/src/legenddataflow/scripts/flow/utils.py @@ -1,5 +1,8 @@ +from __future__ import annotations + from pathlib import Path + def replace_path(d, old_path, new_path): if isinstance(d, dict): for k, v in d.items(): @@ -10,4 +13,4 @@ def replace_path(d, old_path, new_path): elif isinstance(d, str) and old_path in d: d = d.replace(old_path, new_path) d = d.replace(new_path, f"$_/{Path(new_path).name}") - return d \ No newline at end of file + return d diff --git a/workflow/src/legenddataflow/scripts/par/geds/pht/util.py b/workflow/src/legenddataflow/scripts/par/geds/pht/util.py index eb2df4c..d909022 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/pht/util.py +++ b/workflow/src/legenddataflow/scripts/par/geds/pht/util.py @@ -6,7 +6,13 @@ import numpy as np from dbetto import Props -from legenddataflow.FileKey import ChannelProcKey, ProcessingFileKey, run_splitter +from legenddataflowscripts.utils import convert_dict_np_to_float + +from legenddataflow.methods.FileKey import ( + ChannelProcKey, + ProcessingFileKey, + run_splitter, +) def update_cal_dicts(cal_dicts, update_dict): @@ -37,17 +43,16 @@ def get_run_dict(files): return out_dicts -def split_files_by_run(files): +def split_files_by_run(infiles): # sort files in dictionary where keys are first timestamp from run - if isinstance(files, list): + if isinstance(infiles, list): files = [] - for file in files: + for file in infiles: with Path(file).open() as f: files += f.read().splitlines() else: - with Path(files).open() as f: + with Path(infiles).open() as f: files = f.read().splitlines() - files = sorted( np.unique(files) ) # need this as sometimes files get double counted as it somehow puts in the p%-* filelist and individual runs also @@ -68,7 +73,7 @@ def save_dict_to_files(files, out_dict): with Path(file).open("wb") as w: pkl.dump(out_dict[fk.timestamp], w, protocol=pkl.HIGHEST_PROTOCOL) elif Path(file).suffix in (".json", ".yml", ".yaml"): - Props.write_to(file, out_dict[fk.timestamp]) + Props.write_to(file, convert_dict_np_to_float(out_dict[fk.timestamp])) else: err = f"File {file} not recognized as .pkl, .json, .yml or .yaml" raise ValueError(err) diff --git a/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py b/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py new file mode 100644 index 0000000..1a504fa --- /dev/null +++ b/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +import argparse +import time +from pathlib import Path + +import numpy as np +import pygama.math.distributions as pmd # noqa: F401 +from dbetto.catalog import Props +from legenddataflowscripts.utils import build_log +from lgdo import Array, Table, WaveformTable, lh5 +from pygama.evt.build_tcm import _concat_tables +from pygama.pargen.data_cleaning import generate_cuts +from pygama.pargen.dplms_ge_dict import dplms_ge_dict +from pygama.pargen.dsp_optimize import run_one_dsp + +from legenddataflow.scripts.par.geds.pht.util import ( + get_run_dict, + save_dict_to_files, + split_files_by_run, +) + + +def filter_table(raw_data_tables, final_masks): + + waveform_windowed = WaveformTable( + t0=np.concatenate( + [ + tbl["waveform_windowed"]["t0"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + t0_units=raw_data_tables[0]["waveform_windowed"]["t0"].attrs["units"], + dt=np.concatenate( + [ + tbl["waveform_windowed"]["dt"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + dt_units=raw_data_tables[0]["waveform_windowed"]["dt"].attrs["units"], + values=np.concatenate( + [ + tbl["waveform_windowed"]["values"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + ) + waveform_presummed = WaveformTable( + t0=np.concatenate( + [ + tbl["waveform_presummed"]["t0"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + t0_units=raw_data_tables[0]["waveform_presummed"]["t0"].attrs["units"], + dt=np.concatenate( + [ + tbl["waveform_presummed"]["dt"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + dt_units=raw_data_tables[0]["waveform_presummed"]["dt"].attrs["units"], + values=np.concatenate( + [ + tbl["waveform_presummed"]["values"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + ) + + col_dict = { + "waveform_presummed": waveform_presummed, + "waveform_windowed": waveform_windowed, + "presum_rate": Array( + np.concatenate( + [ + tbl["presum_rate"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "timestamp": Array( + np.concatenate( + [ + tbl["timestamp"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "baseline": Array( + np.concatenate( + [ + tbl["baseline"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "daqenergy": Array( + np.concatenate( + [ + tbl["daqenergy"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + } + return Table(col_dict) + + +def par_geds_psp_dplms() -> None: + """Compute DPLMS optimal filter coefficients for HPGe detectors. + This is the partition version of dplms, it combines multiple runs + with an equal number of fft and cal events from each. + + CLI entry point registered as ``par-geds-psp-dplms``. The *Discrete + Prolate Laplacian Maximum Signal* (DPLMS) filter is constructed from two + data samples: + + 1. **FFT baselines** - low-energy (``daqenergy <= 10`` ADC) events from + dedicated FFT run files used to characterise the noise power spectrum. + 2. **Calibration peak events** - gamma-line events read from the + *peak-file* produced by :func:`par_geds_dsp_evtsel`. + + The coefficients are computed by + :func:`pygama.pargen.dplms_ge_dict.dplms_ge_dict` and written to an LH5 + file at *lh5-path* (so that the DSP chain can load them with + ``loadlh5(...)``). The DSP parameter database is updated with the filter + configuration and written to *dsp-pars*. + + Notes + ----- + **Command-line arguments** + + ``--fft-raw-filelist`` : str + Path to a text file listing the FFT raw LH5 input files. + ``--peak-file`` : str + LH5 file containing preselected calibration peak events. + ``--inplots`` : str, optional + Existing pickle plot file to update with DPLMS plots. + ``--database`` : str + Path to the existing DSP parameter database (JSON/YAML). + ``--log`` : str, optional + Path to the log file. + ``--log-config`` : str, optional + Logging configuration file. + ``--processing-chain`` : list of str + Processing chain configuration file(s). + ``--config-file`` : list of str + DPLMS configuration file(s). Must contain ``run_dplms`` (bool), + ``n_baselines`` (int), and ``peaks_kev`` (list). + ``--channel`` : str + Channel identifier; used as the HDF5 group name in *lh5-path*. + ``--raw-table-name`` : str + LH5 table path within the raw and peak files. + ``--dsp-pars`` : str + Output path for the updated DSP parameter database (JSON/YAML). + ``--lh5-path`` : str + Output LH5 file path for the DPLMS filter coefficients. + ``--plot-path`` : str, optional + Output path for diagnostic plots (pickle). + """ + argparser = argparse.ArgumentParser() + argparser.add_argument( + "--fft-raw-filelists", + help="fft_raw_filelists", + type=str, + nargs="*", + required=True, + ) + argparser.add_argument( + "--peak-files", help="peak files", nargs="*", type=str, required=True + ) + argparser.add_argument("--inplots", help="in_plot_path", nargs="*", type=str) + argparser.add_argument( + "--database", help="database", type=str, nargs="*", required=True + ) + + argparser.add_argument("--log", help="log_file", type=str) + argparser.add_argument( + "--log-config", help="Log config file", type=str, required=False, default={} + ) + + argparser.add_argument( + "--processing-chain", + help="Processing chain config", + type=str, + nargs="*", + required=True, + ) + argparser.add_argument( + "--config-file", help="Config file", type=str, nargs="*", required=True + ) + + argparser.add_argument("--channel", help="channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) + + argparser.add_argument( + "--dsp-pars", help="dsp_pars", type=str, nargs="*", required=True + ) + argparser.add_argument( + "--lh5-path", help="lh5_path", type=str, nargs="*", required=True + ) + argparser.add_argument("--plot-path", help="plot_path", nargs="*", type=str) + + args = argparser.parse_args() + + dsp_config = Props.read_from(args.processing_chain) + log = build_log(args.log_config, args.log) + + t0 = time.time() + + dplms_dict = Props.read_from(args.config_file) + + db_dicts = get_run_dict(args.database) + db_dict = db_dicts[next(iter(db_dicts))] + + if dplms_dict["run_dplms"] is True: + fft_runs, _ = split_files_by_run(args.fft_raw_filelists) + + n_runs = len(fft_runs) + n_per_run = dplms_dict["n_baselines"] // n_runs + + t0 = time.time() + log.info("\nLoad fft data") + fft_tables = [] + eidxs_list = [] + raw_tables = [] + for _, files in fft_runs.items(): + energies = lh5.read_as( + f"{args.raw_table_name}/daqenergy", files, library="np" + ) + eidxs = np.where(energies <= 10)[0] + eidxs_list.append(eidxs) + raw_fft = lh5.read( + args.raw_table_name, + files, + n_rows=int(dplms_dict["n_baselines"] * 1.5), + idx=eidxs, + ) + raw_tables.append(raw_fft) + dsp_fft = run_one_dsp(raw_fft, dsp_config, db_dict=db_dict) + fft_tables.append(dsp_fft) + + all_fft = _concat_tables(fft_tables) + + cut_dict = generate_cuts(all_fft, cut_dict=dplms_dict["bls_cut_pars"]) + log.debug("Cuts are %s", cut_dict) + + cut_list = [] + for tbl in fft_tables: + idxs = np.full(len(tbl), True, dtype=bool) + for outname, info in cut_dict.items(): + outcol = tbl.eval(info["expression"], info.get("parameters", None)) + tbl.add_column(outname, outcol) + for cut in cut_dict: + idxs = tbl[cut].nda & idxs + cut_list.append(np.where(idxs)[0][:n_per_run]) + + raw_fft = filter_table(raw_tables, cut_list) + + log.debug("Applied Cuts") + + t1 = time.time() + msg = f"Time to load fft data {(t1 - t0):.2f} s, total events {len(raw_fft)}" + log.info(msg) + + log.info("\nRunning event selection") + peaks_kev = np.array(dplms_dict["peaks_kev"]) + # kev_widths = [tuple(kev_width) for kev_width in dplms_dict["kev_widths"]] + + peaks_rounded = [int(peak) for peak in peaks_kev] + + n_per_run_signals = dplms_dict["n_signals"] // n_runs + cal_tbs = [] + for file in args.peak_files: + peaks = lh5.read_as(f"{args.raw_table_name}/peak", file, library="np") + ids = np.isin(peaks, peaks_rounded) + peaks = peaks[ids] + + cal_tbs.append( + lh5.read(args.raw_table_name, file, idx=ids, n_rows=n_per_run_signals) + ) + + raw_cal = filter_table(cal_tbs, [np.ones(len(t), dtype=bool) for t in cal_tbs]) + + msg = f"Time to run event selection {(time.time() - t1):.2f} s, total events {len(raw_cal)}" + log.info(msg) + + if isinstance(dsp_config, str | list): + dsp_config = Props.read_from(dsp_config) + + out_dict, plot_dict = dplms_ge_dict( + raw_fft, + raw_cal, + dsp_config, + db_dict, + dplms_dict, + fom_func=eval(dplms_dict.get("fom_func", "pmd.gauss_on_step")), + display=1 if args.plot_path else 0, + ) + + coeffs = out_dict["dplms"].pop("coefficients") + dplms_pars = Table(col_dict={"coefficients": Array(coeffs)}) + out_dict["dplms"]["coefficients"] = ( + f"loadlh5('{args.lh5_path}', '{args.channel}/dplms/coefficients')" + ) + msg = f"DPLMS creation finished in {(time.time() - t0) / 60} minutes" + log.info(msg) + else: + out_dict = {} + dplms_pars = Table(col_dict={"coefficients": Array([])}) + plot_dict = {} + + inplot_dict = get_run_dict(args.inplots) if args.inplots else {} + + for _, entry in inplot_dict.items(): + entry.update({"dplms": plot_dict}) + + for _, db_dict in db_dicts.items(): + db_dict.update(out_dict) + + for outfile in args.lh5_path: + Path(outfile).parent.mkdir(parents=True, exist_ok=True) + lh5.write( + Table(col_dict={"dplms": dplms_pars}), + name=args.channel, + lh5_file=outfile, + wo_mode="overwrite", + ) + + save_dict_to_files(args.dsp_pars, db_dicts) + + if args.plot_path: + save_dict_to_files(args.plot_path, inplot_dict) From b1700e814f7c6bfcf38c00e71ec762ea2bdfdcae Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:23:35 +0000 Subject: [PATCH 3/8] style: pre-commit fixes --- workflow/rules/common.smk | 1 + workflow/rules/dsp_pars_geds.smk | 35 ++++++++----- workflow/rules/evt.smk | 3 +- workflow/rules/merge_in_channel.smk | 78 +++++++++++++++++++++-------- workflow/rules/psp_pars_geds.smk | 73 +++++++++++++++------------ 5 files changed, 123 insertions(+), 67 deletions(-) diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 287e335..01227f3 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -149,6 +149,7 @@ def get_table_name(metadata, config, datatype, timestamp, detector, tier): return config.table_format[tier].format(ch=chmap[detector[0]].daq.rawid) return config.table_format[tier].format(ch=chmap[detector].daq.rawid) + def get_all_channels(channelmap, timestamp, datatype): if isinstance(channelmap, (str, Path)): channelmap = TextDB(channelmap, lazy=True) diff --git a/workflow/rules/dsp_pars_geds.smk b/workflow/rules/dsp_pars_geds.smk index 6965702..3b761c1 100644 --- a/workflow/rules/dsp_pars_geds.smk +++ b/workflow/rules/dsp_pars_geds.smk @@ -21,6 +21,7 @@ from legenddataflow.methods.paths import ( ) from legenddataflowscripts.workflow import execenv_pyexe + rule build_pars_dsp_pz_geds: input: files=( @@ -161,10 +162,10 @@ rule build_pars_dsp_nopt_geds: database=rules.build_pars_dsp_pz_geds.output.dsp_pars, inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: - dsp_pars=temp( - get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization") + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization")), + dsp_plots=temp( + get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization") ), - dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")), log: get_pattern_log_channel(config, "par_dsp_noise_optimization", time), group: @@ -227,7 +228,9 @@ rule build_pars_dsp_dplms_geds: inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), - dsp_pars_lh5=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms", extension="lh5")), + dsp_pars_lh5=temp( + get_pattern_pars_tmp_channel(config, "dsp", "dplms", extension="lh5") + ), dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), log: get_pattern_log_channel(config, "pars_dsp_dplms", time), @@ -293,7 +296,9 @@ rule build_pars_dsp_eopt_geds: output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")), dsp_objects=temp( - get_pattern_pars_tmp_channel(config, "dsp", "eopt_objects", extension="pkl") + get_pattern_pars_tmp_channel( + config, "dsp", "eopt_objects", extension="pkl" + ) ), dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "eopt")), log: @@ -401,7 +406,7 @@ rule build_svm_dsp_geds: rule build_pars_dsp_svm_geds: input: - svm_file=rules.build_svm_dsp_geds.output.dsp_pars + svm_file=rules.build_svm_dsp_geds.output.dsp_pars, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "svm")), log: @@ -412,13 +417,17 @@ rule build_pars_dsp_svm_geds: runtime=300, shell: execenv_pyexe(config, "par-geds-dsp-svm") + "--log {log} " - #"--input-file {input.dsp_pars} " + # "--input-file {input.dsp_pars} " "--output-file {output.dsp_pars} " "--svm-file {input.svm_file}" -build_in_channel_merge_rules([ - rules.build_pars_dsp_eopt_geds, - rules.build_pars_dsp_nopt_geds, - rules.build_pars_dsp_svm_geds, - rules.build_pars_dsp_dplms_geds, -], "dsp") + +build_in_channel_merge_rules( + [ + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, + rules.build_pars_dsp_svm_geds, + rules.build_pars_dsp_dplms_geds, + ], + "dsp", +) diff --git a/workflow/rules/evt.smk b/workflow/rules/evt.smk index 15eab4d..696be61 100644 --- a/workflow/rules/evt.smk +++ b/workflow/rules/evt.smk @@ -128,7 +128,8 @@ for evt_tier in ("evt", "pet"): rule: input: os.path.join( - filelist_path(config), "all-{experiment}-{period}-{run}-phy-"+f"{evt_tier}.filelist" + filelist_path(config), + "all-{experiment}-{period}-{run}-phy-" + f"{evt_tier}.filelist", ), output: get_pattern_tier( diff --git a/workflow/rules/merge_in_channel.smk b/workflow/rules/merge_in_channel.smk index 0fccf84..cab01da 100644 --- a/workflow/rules/merge_in_channel.smk +++ b/workflow/rules/merge_in_channel.smk @@ -2,10 +2,13 @@ import inspect from legenddataflow.methods import patterns from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name -from legenddataflow.scripts.flow.build_chanlist import get_plt_chanlist, get_par_chanlist +from legenddataflow.scripts.flow.build_chanlist import ( + get_plt_chanlist, + get_par_chanlist, +) -def build_in_channel_merge_rules(rules, tier, output_name=None): +def build_in_channel_merge_rules(rules, tier, output_name=None): input_dsp_pars = [] input_dsp_pars_lh5 = [] @@ -36,8 +39,7 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): group: group_name shell: - execenv_pyexe(config, "merge-in-channels") + \ - "--input {input} " + execenv_pyexe(config, "merge-in-channels") + "--input {input} " "--output {output} " set_last_rule_name(workflow, f"build_plts_inchan_{name}") @@ -45,58 +47,90 @@ def build_in_channel_merge_rules(rules, tier, output_name=None): object_output_name = "objects" if output_name is not None: object_output_name += f"_{output_name}" + rule: input: input_dsp_objects, output: - temp(get_pattern_pars_tmp_channel(config, tier, name=object_output_name, extension="pkl")), + temp( + get_pattern_pars_tmp_channel( + config, tier, name=object_output_name, extension="pkl" + ) + ), group: group_name shell: - execenv_pyexe(config, "merge-in-channels") + \ - "--input {input} " + execenv_pyexe(config, "merge-in-channels") + "--input {input} " "--output {output} " set_last_rule_name(workflow, f"build_pars_inchan_{name}_objects") if len(input_dsp_pars_lh5) > 0: + rule: input: input_dsp_pars, output: - temp(get_pattern_pars_tmp_channel(config, tier, name=output_name + "_tmp" if output_name is not None else "tmp")), + temp( + get_pattern_pars_tmp_channel( + config, + tier, + name=( + output_name + "_tmp" if output_name is not None else "tmp" + ), + ) + ), group: group_name shell: - execenv_pyexe(config, "merge-in-channels") + \ - "--input {input} " + execenv_pyexe(config, "merge-in-channels") + "--input {input} " "--output {output} " set_last_rule_name(workflow, f"build_pars_inchan_{name}_db") - rule: input: - in_files=input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars, - in_db=get_pattern_pars_tmp_channel(config, tier, name=output_name + "_tmp" if output_name is not None else "tmp") if len(input_dsp_pars_lh5) > 0 else [], + in_files=( + input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars + ), + in_db=( + get_pattern_pars_tmp_channel( + config, + tier, + name=output_name + "_tmp" if output_name is not None else "tmp", + ) + if len(input_dsp_pars_lh5) > 0 + else [] + ), plts=get_pattern_plts_tmp_channel(config, tier, name=output_name), - objects=get_pattern_pars_tmp_channel(config, tier, name=object_output_name, extension="pkl"), + objects=get_pattern_pars_tmp_channel( + config, tier, name=object_output_name, extension="pkl" + ), output: - out_file=temp(get_pattern_pars_tmp_channel(config, tier, name=output_name, extension="lh5")) if len(input_dsp_pars_lh5) > 0 else temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)), - out_db=temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) if len(input_dsp_pars_lh5) > 0 else [], + out_file=( + temp( + get_pattern_pars_tmp_channel( + config, tier, name=output_name, extension="lh5" + ) + ) + if len(input_dsp_pars_lh5) > 0 + else temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) + ), + out_db=( + temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) + if len(input_dsp_pars_lh5) > 0 + else [] + ), group: group_name run: shell_string = ( - execenv_pyexe(config, "merge-in-channels") + \ - "--output {output.out_file} " + execenv_pyexe(config, "merge-in-channels") + + "--output {output.out_file} " "--input {input.in_files} " ) if len(input_dsp_pars_lh5) > 0: - shell_string += ( - "--in-db {input.in_db} " - "--out-db {output.out_db} " - ) + shell_string += "--in-db {input.in_db} " "--out-db {output.out_db} " shell(shell_string) set_last_rule_name(workflow, f"build_pars_inchan_{name}") diff --git a/workflow/rules/psp_pars_geds.smk b/workflow/rules/psp_pars_geds.smk index 38ef790..a6a6df6 100644 --- a/workflow/rules/psp_pars_geds.smk +++ b/workflow/rules/psp_pars_geds.smk @@ -17,10 +17,14 @@ from legenddataflow.methods.paths import config_path from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name # merge just needed rules here, eopt, nopt -build_in_channel_merge_rules([ - rules.build_pars_dsp_eopt_geds, - rules.build_pars_dsp_nopt_geds, -], "dsp", "psp_input") +build_in_channel_merge_rules( + [ + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, + ], + "dsp", + "psp_input", +) psp_rules = {} psp_dplms_rules = {} @@ -32,6 +36,7 @@ for key, dataset in part.datasets.items(): wildcard_constrain = part.get_wildcard_constraints(partition, key) if key == "default": + def raw_table_name(wildcards): return get_table_name( channelmap_textdb, @@ -51,6 +56,7 @@ for key, dataset in part.datasets.items(): key, "raw", ) + rule: input: dsp_pars=part.get_par_files( @@ -69,7 +75,11 @@ for key, dataset in part.datasets.items(): extension="pkl", ), dsp_plots=part.get_plt_files( - dsp_par_catalog, partition, key, tier="dsp", name = "psp_input", + dsp_par_catalog, + partition, + key, + tier="dsp", + name="psp_input", ), output: psp_pars=temp( @@ -93,11 +103,7 @@ for key, dataset in part.datasets.items(): ), psp_plots=temp( part.get_plt_files( - psp_par_catalog, - partition, - key, - tier="psp", - name="eopt" + psp_par_catalog, partition, key, tier="psp", name="eopt" ) ), log: @@ -141,6 +147,7 @@ for key, dataset in part.datasets.items(): psp_rules[key] = [list(workflow.rules)[-1]] if key == "default": + def config_file(wildcards): return get_config_files( dataflow_configs_texdb, @@ -150,6 +157,7 @@ for key, dataset in part.datasets.items(): "pars_psp_dplms", "dplms_pars", ) + def processing_chain(wildcards): return get_config_files( dataflow_configs_texdb, @@ -159,8 +167,10 @@ for key, dataset in part.datasets.items(): "pars_psp_dplms", "proc_chain", ) + else: - config_file=get_config_files( + config_file = ( + get_config_files( dataflow_configs_texdb, tstamp, "cal", @@ -168,7 +178,8 @@ for key, dataset in part.datasets.items(): "pars_psp_dplms", "dplms_pars", ), - processing_chain=get_config_files( + ) + processing_chain = get_config_files( dataflow_configs_texdb, tstamp, "cal", @@ -177,7 +188,6 @@ for key, dataset in part.datasets.items(): "proc_chain", ) - # This rule builds the dplms energy filter for the dsp using fft and cal files rule: input: @@ -188,22 +198,18 @@ for key, dataset in part.datasets.items(): key, tier="dsp", name="peaks", - extension="lh5" + extension="lh5", ), database=part.get_par_files( - psp_par_catalog, - partition, - key, - tier="psp", - name="eopt", - ), + psp_par_catalog, + partition, + key, + tier="psp", + name="eopt", + ), inplots=part.get_plt_files( - psp_par_catalog, - partition, - key, - tier="psp", - name = "eopt" - ), + psp_par_catalog, partition, key, tier="psp", name="eopt" + ), output: psp_pars=temp( part.get_par_files( @@ -239,7 +245,7 @@ for key, dataset in part.datasets.items(): "psp", time, name="pars_psp_dplms", - ) + ), wildcard_constraints: channel=wildcard_constrain, group: @@ -248,8 +254,8 @@ for key, dataset in part.datasets.items(): runtime=300, params: config_file=config_file, - processing_chain= processing_chain, - log_config= get_log_config( + processing_chain=processing_chain, + log_config=get_log_config( dataflow_configs_texdb, tstamp, "cal", @@ -259,7 +265,8 @@ for key, dataset in part.datasets.items(): configs=config_path(config), channel="{channel}" if key == "default" else key, shell: - execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-files {input.peak_file} " + execenv_pyexe(config, "par-geds-psp-dplms") + + "--peak-files {input.peak_file} " "--fft-raw-filelists {input.fft_files} " "--database {input.database} " "--inplots {input.inplots} " @@ -286,7 +293,9 @@ for key, dataset in part.datasets.items(): rule build_par_psp_fallback: input: dsp_pars=get_pattern_pars_tmp_channel(config, "dsp", "psp_input"), - dsp_objs=get_pattern_pars_tmp_channel(config, "dsp", "objects_psp_input", extension="pkl"), + dsp_objs=get_pattern_pars_tmp_channel( + config, "dsp", "objects_psp_input", extension="pkl" + ), dsp_plots=get_pattern_plts_tmp_channel(config, "dsp", "psp_input"), output: psp_pars=temp(get_pattern_pars_tmp_channel(config, "psp", "eopt")), @@ -396,6 +405,7 @@ rule build_pars_psp_dplms_geds_fallback: "--lh5-path {output.lh5_path} " "--plot-path {output.plots} " + fallback_psp_dplms_rule = list(workflow.rules)[-1] rule_order_list = [] ordered = OrderedDict(psp_dplms_rules) @@ -405,6 +415,7 @@ for key, items in ordered.items(): rule_order_list.append(fallback_psp_dplms_rule.name) workflow._ruleorder.add(*rule_order_list) # [::-1] + rule build_svm_psp: input: hyperpars=lambda wildcards: get_input_par_file( From d4c1d26671cb7ae2be254456cc80b19e34e7447b Mon Sep 17 00:00:00 2001 From: George Marshall Date: Thu, 23 Apr 2026 15:24:43 -0700 Subject: [PATCH 4/8] Update pygama and legend-dataflow-scripts versions --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 90f0a02..48c4d12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,12 +52,12 @@ dynamic = ["version"] dependencies = [ "colorlog", "dbetto==1.2.4", - "pygama==2.3.8a1", + "pygama==2.3.8a3", "dspeed==2.1.2", "pylegendmeta==1.3.1", "legend-pydataobj==1.17.2", "legend-daq2lh5==1.6.3", - "legend-dataflow-scripts==0.3.0a3", + "legend-dataflow-scripts==0.3.0a7", "pip", "snakemake-logger-plugin-rich", "snakemake-logger-plugin-snkmt", From b1f9a3cd752dce98e7dcd0276754bb8296610df6 Mon Sep 17 00:00:00 2001 From: George Marshall Date: Tue, 28 Apr 2026 16:43:16 -0700 Subject: [PATCH 5/8] Update pygama and legend-dataflow-scripts versions --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 48c4d12..052391e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,12 +52,12 @@ dynamic = ["version"] dependencies = [ "colorlog", "dbetto==1.2.4", - "pygama==2.3.8a3", + "pygama==2.3.8", "dspeed==2.1.2", "pylegendmeta==1.3.1", "legend-pydataobj==1.17.2", "legend-daq2lh5==1.6.3", - "legend-dataflow-scripts==0.3.0a7", + "legend-dataflow-scripts==0.3.0", "pip", "snakemake-logger-plugin-rich", "snakemake-logger-plugin-snkmt", From 4845485cc33536a1a07502385797bb692c57c45d Mon Sep 17 00:00:00 2001 From: George Marshall Date: Tue, 28 Apr 2026 16:43:34 -0700 Subject: [PATCH 6/8] Update legend_metadata_version to v1.4.0 --- dataflow-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataflow-config.yaml b/dataflow-config.yaml index 706d256..133bc0c 100644 --- a/dataflow-config.yaml +++ b/dataflow-config.yaml @@ -1,4 +1,4 @@ -legend_metadata_version: v1.2.2 +legend_metadata_version: v1.4.0 allow_none_par: false build_file_dbs: true check_log_files: true From b1a36d517918211f1738c28c698b11a8afbae3b2 Mon Sep 17 00:00:00 2001 From: ggmarshall Date: Wed, 29 Apr 2026 07:07:01 +0200 Subject: [PATCH 7/8] fix tests --- tests/runprod/test-argon-char-dataprod.sh | 2 ++ tests/runprod/test-evt.sh | 1 + 2 files changed, 3 insertions(+) diff --git a/tests/runprod/test-argon-char-dataprod.sh b/tests/runprod/test-argon-char-dataprod.sh index 7e91d43..e6725d5 100755 --- a/tests/runprod/test-argon-char-dataprod.sh +++ b/tests/runprod/test-argon-char-dataprod.sh @@ -17,6 +17,8 @@ rawfiles=( anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5 anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5 acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5 + cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5 + cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5 ) ( diff --git a/tests/runprod/test-evt.sh b/tests/runprod/test-evt.sh index 9521a9d..48c0e2d 100755 --- a/tests/runprod/test-evt.sh +++ b/tests/runprod/test-evt.sh @@ -24,6 +24,7 @@ rawfiles=( cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5 cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5 cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5 + cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5 ) ( From 20c6af51a7a75cbc0544caeebef32ab5c6f7e5d9 Mon Sep 17 00:00:00 2001 From: George Marshall Date: Tue, 28 Apr 2026 22:24:05 -0700 Subject: [PATCH 8/8] more test fixes --- workflow/rules/common.smk | 1 - workflow/rules/pht_pars_geds.smk | 2 ++ workflow/rules/pht_pars_geds_fast.smk | 7 +++++-- workflow/rules/psp_pars_geds.smk | 3 ++- workflow/src/legenddataflow/methods/cal_grouping.py | 2 +- 5 files changed, 10 insertions(+), 5 deletions(-) diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 01227f3..9299b55 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -142,7 +142,6 @@ def get_table_name(metadata, config, datatype, timestamp, detector, tier): ) if isinstance(detector, tuple): if detector[0] == "default": - print(detector) return config.table_format[tier].format( ch=chmap[detector[1].channel].daq.rawid ) diff --git a/workflow/rules/pht_pars_geds.smk b/workflow/rules/pht_pars_geds.smk index 3609f2e..2549ad8 100644 --- a/workflow/rules/pht_pars_geds.smk +++ b/workflow/rules/pht_pars_geds.smk @@ -38,6 +38,8 @@ for key, dataset in part.datasets.items(): ] tstamp = part.get_timestamp(pht_par_catalog, partition, key, tier="pht") wildcard_constrain = part.get_wildcard_constraints(partition, key) + if tstamp == "20000101T000000Z": + continue if key == "default": diff --git a/workflow/rules/pht_pars_geds_fast.smk b/workflow/rules/pht_pars_geds_fast.smk index 07804f7..7488add 100644 --- a/workflow/rules/pht_pars_geds_fast.smk +++ b/workflow/rules/pht_pars_geds_fast.smk @@ -21,6 +21,9 @@ from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name pht_fast_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): + tstamp = part.get_timestamp(pht_par_catalog, partition, key, tier="pht") + if tstamp == "20000101T000000Z": + continue if key == "default": def dsp_table_name(wildcards): @@ -28,7 +31,7 @@ for key, dataset in part.datasets.items(): channelmap_textdb, config, "cal", - part.get_timestamp(pht_par_catalog, partition, key, tier="pht"), + tstamp, wildcards.channel, "dsp", ) @@ -38,7 +41,7 @@ for key, dataset in part.datasets.items(): channelmap_textdb, config, "cal", - part.get_timestamp(pht_par_catalog, partition, key, tier="pht"), + tstamp, key, "dsp", ) diff --git a/workflow/rules/psp_pars_geds.smk b/workflow/rules/psp_pars_geds.smk index a6a6df6..8d606cb 100644 --- a/workflow/rules/psp_pars_geds.smk +++ b/workflow/rules/psp_pars_geds.smk @@ -30,9 +30,10 @@ psp_rules = {} psp_dplms_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): - fft_files = part.get_filelists(partition, key, "raw", datatype="fft") tstamp = part.get_timestamp(psp_par_catalog, partition, key, tier="psp") + if tstamp == "20000101T000000Z": + continue wildcard_constrain = part.get_wildcard_constraints(partition, key) if key == "default": diff --git a/workflow/src/legenddataflow/methods/cal_grouping.py b/workflow/src/legenddataflow/methods/cal_grouping.py index 6f1a865..2065cc8 100644 --- a/workflow/src/legenddataflow/methods/cal_grouping.py +++ b/workflow/src/legenddataflow/methods/cal_grouping.py @@ -192,7 +192,7 @@ def get_timestamp( if len(par_files) > 0: fk = ChannelProcKey.get_filekey_from_pattern(Path(par_files[0]).name) return fk.timestamp - return "20240101T000000Z" + return "20000101T000000Z" def get_wildcard_constraints(self, dataset, channel): if channel == "default":