diff --git a/dataflow-config.yaml b/dataflow-config.yaml index 706d256c..133bc0c2 100644 --- a/dataflow-config.yaml +++ b/dataflow-config.yaml @@ -1,4 +1,4 @@ -legend_metadata_version: v1.2.2 +legend_metadata_version: v1.4.0 allow_none_par: false build_file_dbs: true check_log_files: true diff --git a/pyproject.toml b/pyproject.toml index 4a658d73..052391eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,12 +52,12 @@ dynamic = ["version"] dependencies = [ "colorlog", "dbetto==1.2.4", - "pygama==2.3.8a1", + "pygama==2.3.8", "dspeed==2.1.2", "pylegendmeta==1.3.1", "legend-pydataobj==1.17.2", "legend-daq2lh5==1.6.3", - "legend-dataflow-scripts==0.3.0a3", + "legend-dataflow-scripts==0.3.0", "pip", "snakemake-logger-plugin-rich", "snakemake-logger-plugin-snkmt", @@ -96,6 +96,7 @@ docs = [ [project.scripts] create-chankeylist = "legenddataflow.scripts.create_chankeylist:create_chankeylist" merge-channels = "legenddataflow.scripts.flow.merge_channels:merge_channels" +merge-in-channels = "legenddataflow.scripts.flow.merge_in_channel:merge_in_channel" build-tier-evt = "legenddataflow.scripts.tier.evt:build_tier_evt" build-tier-raw-blind = "legenddataflow.scripts.tier.raw_blind:build_tier_raw_blind" build-tier-raw-fcio = "legenddataflow.scripts.tier.raw_fcio:build_tier_raw_fcio" @@ -108,6 +109,7 @@ par-geds-pht-fast = "legenddataflow.scripts.par.geds.pht.fast:par_geds_pht par-geds-pht-qc-phy = "legenddataflow.scripts.par.geds.pht.qc_phy:par_geds_pht_qc_phy" par-geds-pht-qc = "legenddataflow.scripts.par.geds.pht.qc:par_geds_pht_qc" par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_psp_average" +par-geds-psp-dplms = "legenddataflow.scripts.par.geds.psp.dplms:par_geds_psp_dplms" par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal" par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck" par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser" diff --git a/tests/runprod/test-argon-char-dataprod.sh b/tests/runprod/test-argon-char-dataprod.sh index 7e91d43b..e6725d55 100755 --- a/tests/runprod/test-argon-char-dataprod.sh +++ b/tests/runprod/test-argon-char-dataprod.sh @@ -17,6 +17,8 @@ rawfiles=( anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5 anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5 acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5 + cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5 + cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5 ) ( diff --git a/tests/runprod/test-evt.sh b/tests/runprod/test-evt.sh index 9521a9d0..48c0e2d6 100755 --- a/tests/runprod/test-evt.sh +++ b/tests/runprod/test-evt.sh @@ -24,6 +24,7 @@ rawfiles=( cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5 cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5 cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5 + cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5 ) ( diff --git a/workflow/Snakefile b/workflow/Snakefile index 1589848c..ff26cc1c 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -73,6 +73,7 @@ wildcard_constraints: include: "rules/channel_merge.smk" +include: "rules/merge_in_channel.smk" include: "rules/common.smk" include: "rules/main.smk" include: "rules/tcm.smk" diff --git a/workflow/profiles/lngs/config.yaml b/workflow/profiles/lngs/config.yaml index 689c134d..961f8188 100644 --- a/workflow/profiles/lngs/config.yaml +++ b/workflow/profiles/lngs/config.yaml @@ -1,5 +1,5 @@ cores: 90 -restart-times: 2 +restart-times: 0 resources: - mem_swap=3500 configfile: dataflow-config.yaml diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 01227f3b..9299b559 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -142,7 +142,6 @@ def get_table_name(metadata, config, datatype, timestamp, detector, tier): ) if isinstance(detector, tuple): if detector[0] == "default": - print(detector) return config.table_format[tier].format( ch=chmap[detector[1].channel].daq.rawid ) diff --git a/workflow/rules/dsp_pars_geds.smk b/workflow/rules/dsp_pars_geds.smk index 629358dc..3b761c15 100644 --- a/workflow/rules/dsp_pars_geds.smk +++ b/workflow/rules/dsp_pars_geds.smk @@ -2,6 +2,8 @@ Snakemake rules for building dsp pars for HPGes, before running build_dsp() - extraction of pole zero constant(s) for each channel from cal data - extraction of energy filter parameters and charge trapping correction for each channel from cal data + +Outputs should be dsp_pars, dsp_plots, dsp_pars_lh5 and dsp_objects, to be fed into merge rules """ from legenddataflow.methods import ParsKeyResolve @@ -32,8 +34,8 @@ rule build_pars_dsp_pz_geds: ), pulser=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"), output: - decay_const=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")), + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")), log: get_pattern_log_channel(config, "par_dsp_decay_constant", time), group: @@ -78,8 +80,8 @@ rule build_pars_dsp_pz_geds: "--config-file {params.config_file} " "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " - "--plot-path {output.plots} " - "--output-file {output.decay_const} " + "--plot-path {output.dsp_plots} " + "--output-file {output.dsp_pars} " "--pulser-file {input.pulser} " "--raw-files {input.files} " "--pz-files {input.pz_files}" @@ -91,7 +93,7 @@ rule build_pars_evtsel_geds: filelist_path(config), "all-{experiment}-{period}-{run}-cal-raw.filelist" ), pulser_file=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"), - database=rules.build_pars_dsp_pz_geds.output.decay_const, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, raw_cal_curve=get_blinding_curve_file, output: peak_file=temp( @@ -157,13 +159,13 @@ rule build_pars_dsp_nopt_geds: files=os.path.join( filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" ), - database=rules.build_pars_dsp_pz_geds.output.decay_const, - inplots=rules.build_pars_dsp_pz_geds.output.plots, + database=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: - dsp_pars_nopt=temp( - get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization") + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization")), + dsp_plots=temp( + get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization") ), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")), log: get_pattern_log_channel(config, "par_dsp_noise_optimization", time), group: @@ -210,8 +212,8 @@ rule build_pars_dsp_nopt_geds: "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " "--inplots {input.inplots} " - "--plot-path {output.plots} " - "--dsp-pars {output.dsp_pars_nopt} " + "--plot-path {output.dsp_plots} " + "--dsp-pars {output.dsp_pars} " "--raw-filelist {input.files}" @@ -222,12 +224,14 @@ rule build_pars_dsp_dplms_geds: filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" ), peak_file=rules.build_pars_evtsel_geds.output.peak_file, - database=rules.build_pars_dsp_nopt_geds.output.dsp_pars_nopt, - inplots=rules.build_pars_dsp_nopt_geds.output.plots, + dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")), - lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), + dsp_pars_lh5=temp( + get_pattern_pars_tmp_channel(config, "dsp", "dplms", extension="lh5") + ), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")), log: get_pattern_log_channel(config, "pars_dsp_dplms", time), group: @@ -270,7 +274,7 @@ rule build_pars_dsp_dplms_geds: shell: execenv_pyexe(config, "par-geds-dsp-dplms") + "--peak-file {input.peak_file} " "--fft-raw-filelist {input.fft_files} " - "--database {input.database} " + "--database {input.dsp_pars} " "--inplots {input.inplots} " "--log {log} " "--log-config {params.log_config} " @@ -279,22 +283,24 @@ rule build_pars_dsp_dplms_geds: "--processing-chain {params.processing_chain} " "--raw-table-name {params.raw_table_name} " "--dsp-pars {output.dsp_pars} " - "--lh5-path {output.lh5_path} " - "--plot-path {output.plots} " + "--lh5-path {output.dsp_pars_lh5} " + "--plot-path {output.dsp_plots} " # This rule builds the optimal energy filter parameters for the dsp using calibration dsp files rule build_pars_dsp_eopt_geds: input: peak_file=rules.build_pars_evtsel_geds.output.peak_file, - decay_const=rules.build_pars_dsp_dplms_geds.output.dsp_pars, - inplots=rules.build_pars_dsp_dplms_geds.output.plots, + dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars, + inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots, output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")), - qbb_grid=temp( - get_pattern_pars_tmp_channel(config, "dsp", "objects", extension="pkl") + dsp_objects=temp( + get_pattern_pars_tmp_channel( + config, "dsp", "eopt_objects", extension="pkl" + ) ), - plots=temp(get_pattern_plts_tmp_channel(config, "dsp")), + dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "eopt")), log: get_pattern_log_channel(config, "pars_dsp_eopt", time), group: @@ -340,9 +346,9 @@ rule build_pars_dsp_eopt_geds: "--raw-table-name {params.raw_table_name} " "--peak-file {input.peak_file} " "--inplots {input.inplots} " - "--decay-const {input.decay_const} " - "--plot-path {output.plots} " - "--qbb-grid-path {output.qbb_grid} " + "--decay-const {input.dsp_pars} " + "--plot-path {output.dsp_plots} " + "--qbb-grid-path {output.dsp_objects} " "--final-dsp-pars {output.dsp_pars}" @@ -400,10 +406,9 @@ rule build_svm_dsp_geds: rule build_pars_dsp_svm_geds: input: - dsp_pars=rules.build_pars_dsp_eopt_geds.output.dsp_pars, svm_file=rules.build_svm_dsp_geds.output.dsp_pars, output: - dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp")), + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "svm")), log: get_pattern_log_channel(config, "pars_dsp_svm", time), group: @@ -412,6 +417,17 @@ rule build_pars_dsp_svm_geds: runtime=300, shell: execenv_pyexe(config, "par-geds-dsp-svm") + "--log {log} " - "--input-file {input.dsp_pars} " + # "--input-file {input.dsp_pars} " "--output-file {output.dsp_pars} " "--svm-file {input.svm_file}" + + +build_in_channel_merge_rules( + [ + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, + rules.build_pars_dsp_svm_geds, + rules.build_pars_dsp_dplms_geds, + ], + "dsp", +) diff --git a/workflow/rules/evt.smk b/workflow/rules/evt.smk index 15eab4d8..696be615 100644 --- a/workflow/rules/evt.smk +++ b/workflow/rules/evt.smk @@ -128,7 +128,8 @@ for evt_tier in ("evt", "pet"): rule: input: os.path.join( - filelist_path(config), "all-{experiment}-{period}-{run}-phy-"+f"{evt_tier}.filelist" + filelist_path(config), + "all-{experiment}-{period}-{run}-phy-" + f"{evt_tier}.filelist", ), output: get_pattern_tier( diff --git a/workflow/rules/merge_in_channel.smk b/workflow/rules/merge_in_channel.smk new file mode 100644 index 00000000..cab01da9 --- /dev/null +++ b/workflow/rules/merge_in_channel.smk @@ -0,0 +1,136 @@ +import inspect + +from legenddataflow.methods import patterns +from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name +from legenddataflow.scripts.flow.build_chanlist import ( + get_plt_chanlist, + get_par_chanlist, +) + + +def build_in_channel_merge_rules(rules, tier, output_name=None): + + input_dsp_pars = [] + input_dsp_pars_lh5 = [] + input_dsp_objects = [] + input_dsp_plots = [] + + name = f"tier" if output_name is None else f"{tier}_{output_name}" + + group_name = f"merge-inchan-{tier}" + if output_name is not None: + group_name += f"_{output_name}" + + for r in rules: + if hasattr(r.output, f"{tier}_pars"): + input_dsp_pars.append(r.output.dsp_pars) + if hasattr(r.output, f"{tier}_pars_lh5"): + input_dsp_pars_lh5.append(r.output.dsp_pars_lh5) + if hasattr(r.output, f"{tier}_plots"): + input_dsp_plots.append(r.output.dsp_plots) + if hasattr(r.output, f"{tier}_objects"): + input_dsp_objects.append(r.output.dsp_objects) + + rule: + input: + input_dsp_plots, + output: + temp(get_pattern_plts_tmp_channel(config, tier, name=output_name)), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_plts_inchan_{name}") + + object_output_name = "objects" + if output_name is not None: + object_output_name += f"_{output_name}" + + rule: + input: + input_dsp_objects, + output: + temp( + get_pattern_pars_tmp_channel( + config, tier, name=object_output_name, extension="pkl" + ) + ), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_pars_inchan_{name}_objects") + + if len(input_dsp_pars_lh5) > 0: + + rule: + input: + input_dsp_pars, + output: + temp( + get_pattern_pars_tmp_channel( + config, + tier, + name=( + output_name + "_tmp" if output_name is not None else "tmp" + ), + ) + ), + group: + group_name + shell: + execenv_pyexe(config, "merge-in-channels") + "--input {input} " + "--output {output} " + + set_last_rule_name(workflow, f"build_pars_inchan_{name}_db") + + rule: + input: + in_files=( + input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars + ), + in_db=( + get_pattern_pars_tmp_channel( + config, + tier, + name=output_name + "_tmp" if output_name is not None else "tmp", + ) + if len(input_dsp_pars_lh5) > 0 + else [] + ), + plts=get_pattern_plts_tmp_channel(config, tier, name=output_name), + objects=get_pattern_pars_tmp_channel( + config, tier, name=object_output_name, extension="pkl" + ), + output: + out_file=( + temp( + get_pattern_pars_tmp_channel( + config, tier, name=output_name, extension="lh5" + ) + ) + if len(input_dsp_pars_lh5) > 0 + else temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) + ), + out_db=( + temp(get_pattern_pars_tmp_channel(config, tier, name=output_name)) + if len(input_dsp_pars_lh5) > 0 + else [] + ), + group: + group_name + run: + shell_string = ( + execenv_pyexe(config, "merge-in-channels") + + "--output {output.out_file} " + "--input {input.in_files} " + ) + if len(input_dsp_pars_lh5) > 0: + shell_string += "--in-db {input.in_db} " "--out-db {output.out_db} " + shell(shell_string) + + set_last_rule_name(workflow, f"build_pars_inchan_{name}") diff --git a/workflow/rules/pht_pars_geds.smk b/workflow/rules/pht_pars_geds.smk index f8c65cf7..2549ad8a 100644 --- a/workflow/rules/pht_pars_geds.smk +++ b/workflow/rules/pht_pars_geds.smk @@ -38,6 +38,8 @@ for key, dataset in part.datasets.items(): ] tstamp = part.get_timestamp(pht_par_catalog, partition, key, tier="pht") wildcard_constrain = part.get_wildcard_constraints(partition, key) + if tstamp == "20000101T000000Z": + continue if key == "default": @@ -70,12 +72,7 @@ for key, dataset in part.datasets.items(): overwrite_files=get_input_par_file( config, tier="pht", - timestamp=part.get_timestamp( - pht_par_catalog, - partition, - key, - tier="pht", - ), + timestamp=tstamp, ), output: hit_pars=[ @@ -117,9 +114,7 @@ for key, dataset in part.datasets.items(): params: datatype="cal", channel="{channel}" if key == "default" else key, - timestamp=part.get_timestamp( - pht_par_catalog, partition, key, tier="pht" - ), + timestamp=tstamp, dsp_table_name=dsp_table_name, configs=config_path(config), meta=metadata_path(config), diff --git a/workflow/rules/pht_pars_geds_fast.smk b/workflow/rules/pht_pars_geds_fast.smk index 07804f77..7488add1 100644 --- a/workflow/rules/pht_pars_geds_fast.smk +++ b/workflow/rules/pht_pars_geds_fast.smk @@ -21,6 +21,9 @@ from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name pht_fast_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): + tstamp = part.get_timestamp(pht_par_catalog, partition, key, tier="pht") + if tstamp == "20000101T000000Z": + continue if key == "default": def dsp_table_name(wildcards): @@ -28,7 +31,7 @@ for key, dataset in part.datasets.items(): channelmap_textdb, config, "cal", - part.get_timestamp(pht_par_catalog, partition, key, tier="pht"), + tstamp, wildcards.channel, "dsp", ) @@ -38,7 +41,7 @@ for key, dataset in part.datasets.items(): channelmap_textdb, config, "cal", - part.get_timestamp(pht_par_catalog, partition, key, tier="pht"), + tstamp, key, "dsp", ) diff --git a/workflow/rules/psp.smk b/workflow/rules/psp.smk index 3ee3377d..4f314f96 100644 --- a/workflow/rules/psp.smk +++ b/workflow/rules/psp.smk @@ -24,7 +24,7 @@ psp_par_catalog = ParsKeyResolve.get_par_catalog( ignore_keys_file=Path(det_status) / "ignored_daq_cycles.yaml", ) -build_merge_rules("psp", lh5_merge=True, lh5_tier="dsp") +build_merge_rules("psp", lh5_merge=True, lh5_tier="psp") rule build_psp: diff --git a/workflow/rules/psp_pars_geds.smk b/workflow/rules/psp_pars_geds.smk index fa790eb0..8d606cb0 100644 --- a/workflow/rules/psp_pars_geds.smk +++ b/workflow/rules/psp_pars_geds.smk @@ -16,9 +16,47 @@ from legenddataflow.methods.patterns import ( from legenddataflow.methods.paths import config_path from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name +# merge just needed rules here, eopt, nopt +build_in_channel_merge_rules( + [ + rules.build_pars_dsp_eopt_geds, + rules.build_pars_dsp_nopt_geds, + ], + "dsp", + "psp_input", +) + psp_rules = {} +psp_dplms_rules = {} for key, dataset in part.datasets.items(): for partition in dataset.keys(): + fft_files = part.get_filelists(partition, key, "raw", datatype="fft") + tstamp = part.get_timestamp(psp_par_catalog, partition, key, tier="psp") + if tstamp == "20000101T000000Z": + continue + wildcard_constrain = part.get_wildcard_constraints(partition, key) + + if key == "default": + + def raw_table_name(wildcards): + return get_table_name( + channelmap_textdb, + config, + "cal", + tstamp, + wildcards.channel, + "raw", + ) + + else: + raw_table_name = get_table_name( + channelmap_textdb, + config, + "cal", + tstamp, + key, + "raw", + ) rule: input: @@ -27,18 +65,22 @@ for key, dataset in part.datasets.items(): partition, key, tier="dsp", - name="eopt", + name="psp_input", ), dsp_objs=part.get_par_files( dsp_par_catalog, partition, key, tier="dsp", - name="objects", + name="objects_psp_input", extension="pkl", ), dsp_plots=part.get_plt_files( - dsp_par_catalog, partition, key, tier="dsp" + dsp_par_catalog, + partition, + key, + tier="dsp", + name="psp_input", ), output: psp_pars=temp( @@ -62,10 +104,7 @@ for key, dataset in part.datasets.items(): ), psp_plots=temp( part.get_plt_files( - psp_par_catalog, - partition, - key, - tier="psp", + psp_par_catalog, partition, key, tier="psp", name="eopt" ) ), log: @@ -86,9 +125,7 @@ for key, dataset in part.datasets.items(): params: datatype="cal", channel="{channel}" if key == "default" else key, - timestamp=part.get_timestamp( - psp_par_catalog, partition, key, tier="psp" - ), + timestamp=tstamp, configs=ro(config_path(config)), shell: execenv_pyexe(config, "par-geds-psp-average") + "--log {log} " @@ -110,20 +147,163 @@ for key, dataset in part.datasets.items(): else: psp_rules[key] = [list(workflow.rules)[-1]] + if key == "default": + + def config_file(wildcards): + return get_config_files( + dataflow_configs_texdb, + tstamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "dplms_pars", + ) + + def processing_chain(wildcards): + return get_config_files( + dataflow_configs_texdb, + tstamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "proc_chain", + ) + + else: + config_file = ( + get_config_files( + dataflow_configs_texdb, + tstamp, + "cal", + key, + "pars_psp_dplms", + "dplms_pars", + ), + ) + processing_chain = get_config_files( + dataflow_configs_texdb, + tstamp, + "cal", + key, + "pars_psp_dplms", + "proc_chain", + ) + + # This rule builds the dplms energy filter for the dsp using fft and cal files + rule: + input: + fft_files=fft_files, + peak_file=part.get_par_files( + dsp_par_catalog, + partition, + key, + tier="dsp", + name="peaks", + extension="lh5", + ), + database=part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + name="eopt", + ), + inplots=part.get_plt_files( + psp_par_catalog, partition, key, tier="psp", name="eopt" + ), + output: + psp_pars=temp( + part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + name="dplms", + ) + ), + psp_pars_lh5=temp( + part.get_par_files( + psp_par_catalog, + partition, + key, + tier="psp", + extension="lh5", + ) + ), + psp_plots=temp( + part.get_plt_files( + psp_par_catalog, + partition, + key, + tier="psp", + ) + ), + log: + part.get_log_file( + psp_par_catalog, + partition, + key, + "psp", + time, + name="pars_psp_dplms", + ), + wildcard_constraints: + channel=wildcard_constrain, + group: + "par-psp" + resources: + runtime=300, + params: + config_file=config_file, + processing_chain=processing_chain, + log_config=get_log_config( + dataflow_configs_texdb, + tstamp, + "cal", + "pars_psp_dplms", + ), + raw_table_name=raw_table_name, + configs=config_path(config), + channel="{channel}" if key == "default" else key, + shell: + execenv_pyexe(config, "par-geds-psp-dplms") + + "--peak-files {input.peak_file} " + "--fft-raw-filelists {input.fft_files} " + "--database {input.database} " + "--inplots {input.inplots} " + "--log {log} " + "--log-config {params.log_config} " + "--config-file {params.config_file} " + "--channel {params.channel} " + "--processing-chain {params.processing_chain} " + "--raw-table-name {params.raw_table_name} " + "--dsp-pars {output.psp_pars} " + "--lh5-path {output.psp_pars_lh5} " + "--plot-path {output.psp_plots} " + + set_last_rule_name(workflow, f"{key}-{partition}-build_par_psp_dplms") + + if key in psp_dplms_rules: + psp_dplms_rules[key].append(list(workflow.rules)[-1]) + else: + psp_dplms_rules[key] = [list(workflow.rules)[-1]] + # Merged energy and a/e supercalibrations to reduce number of rules as they have same inputs/outputs # This rule builds the a/e calibration using the calibration dsp files for the whole partition rule build_par_psp_fallback: input: - dsp_pars=get_pattern_pars_tmp_channel(config, "dsp", "eopt"), - dsp_objs=get_pattern_pars_tmp_channel(config, "dsp", "objects", extension="pkl"), - dsp_plots=get_pattern_plts_tmp_channel(config, "dsp"), + dsp_pars=get_pattern_pars_tmp_channel(config, "dsp", "psp_input"), + dsp_objs=get_pattern_pars_tmp_channel( + config, "dsp", "objects_psp_input", extension="pkl" + ), + dsp_plots=get_pattern_plts_tmp_channel(config, "dsp", "psp_input"), output: psp_pars=temp(get_pattern_pars_tmp_channel(config, "psp", "eopt")), psp_objs=temp( get_pattern_pars_tmp_channel(config, "psp", "objects", extension="pkl") ), - psp_plots=temp(get_pattern_plts_tmp_channel(config, "psp")), + psp_plots=temp(get_pattern_plts_tmp_channel(config, "psp", "eopt")), log: get_pattern_log_channel(config, "pars_psp", time), group: @@ -159,18 +339,119 @@ rule_order_list.append(fallback_psp_rule.name) workflow._ruleorder.add(*rule_order_list) # [::-1] +# This rule builds the dplms energy filter for the dsp using fft and cal files +rule build_pars_psp_dplms_geds_fallback: + input: + fft_files=os.path.join( + filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist" + ), + peak_file=rules.build_pars_evtsel_geds.output.peak_file, + database=rules.build_par_psp_fallback.output.psp_pars, + inplots=rules.build_par_psp_fallback.output.psp_plots, + output: + dsp_pars=temp(get_pattern_pars_tmp_channel(config, "psp", "dplms")), + lh5_path=temp(get_pattern_pars_tmp_channel(config, "psp", extension="lh5")), + plots=temp(get_pattern_plts_tmp_channel(config, "psp")), + log: + get_pattern_log_channel(config, "pars_psp_dplms", time), + group: + "par-psp" + resources: + runtime=300, + params: + config_file=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "dplms_pars", + ), + processing_chain=lambda wildcards: get_config_files( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + wildcards.channel, + "pars_psp_dplms", + "proc_chain", + ), + log_config=lambda wildcards: get_log_config( + dataflow_configs_texdb, + wildcards.timestamp, + "cal", + "pars_psp_dplms", + ), + raw_table_name=lambda wildcards: get_table_name( + channelmap_textdb, + config, + "cal", + wildcards.timestamp, + wildcards.channel, + "raw", + ), + configs=config_path(config), + channel="{channel}", + shell: + execenv_pyexe(config, "par-geds-psp-dplms") + "--peak-file {input.peak_file} " + "--fft-raw-filelist {input.fft_files} " + "--database {input.database} " + "--inplots {input.inplots} " + "--log {log} " + "--log-config {params.log_config} " + "--config-file {params.config_file} " + "--channel {params.channel} " + "--processing-chain {params.processing_chain} " + "--raw-table-name {params.raw_table_name} " + "--dsp-pars {output.dsp_pars} " + "--lh5-path {output.lh5_path} " + "--plot-path {output.plots} " + + +fallback_psp_dplms_rule = list(workflow.rules)[-1] +rule_order_list = [] +ordered = OrderedDict(psp_dplms_rules) +ordered.move_to_end("default") +for key, items in ordered.items(): + rule_order_list += [item.name for item in items] +rule_order_list.append(fallback_psp_dplms_rule.name) +workflow._ruleorder.add(*rule_order_list) # [::-1] + + rule build_svm_psp: input: hyperpars=lambda wildcards: get_input_par_file( - config=config, wildcards=wildcards, tier="psp", name="svm_hyperpars" + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, ), - train_data=lambda wildcards: str( - get_input_par_file( - config=config, wildcards=wildcards, tier="psp", name="svm_hyperpars" + train_data=lambda wildcards: ( + str( + get_input_par_file( + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, + overwrite=False, + ) + ).replace("hyperpars.yaml", "train.lh5") + if not isinstance( + get_input_par_file( + config=config, + wildcards=wildcards, + tier="psp", + name="svm_hyperpars", + allow_none=True, + overwrite=False, + ), + list, ) - ).replace("hyperpars.yaml", "train.lh5"), + else [] + ), output: - dsp_pars=get_pattern_pars(config, "psp", "svm", extension="pkl"), + psp_pars=get_pattern_pars(config, "psp", "svm", extension="pkl"), log: str(get_pattern_log(config, "pars_psp_svm", time)).replace("{datatype}", "cal"), group: @@ -185,15 +466,12 @@ rule build_svm_psp: execenv_pyexe(config, "par-geds-dsp-svm-build") + "--log {log} " "--train-data {input.train_data} " "--train-hyperpars {input.hyperpars} " - "--output-file {output.dsp_pars} " - "--timestamp {params.timestamp} " - "--datatype {params.datatype} " - "--configs {params.configs} " + "--output-file {output.psp_pars} " rule build_pars_psp_svm: input: - dsp_pars=get_pattern_pars_tmp_channel(config, "psp_eopt"), + dsp_pars=get_pattern_pars_tmp_channel(config, "psp_dplms"), svm_model=get_pattern_pars(config, "psp", "svm", extension="pkl"), output: dsp_pars=temp(get_pattern_pars_tmp_channel(config, "psp")), diff --git a/workflow/src/legenddataflow/methods/cal_grouping.py b/workflow/src/legenddataflow/methods/cal_grouping.py index 6f1a8659..2065cc83 100644 --- a/workflow/src/legenddataflow/methods/cal_grouping.py +++ b/workflow/src/legenddataflow/methods/cal_grouping.py @@ -192,7 +192,7 @@ def get_timestamp( if len(par_files) > 0: fk = ChannelProcKey.get_filekey_from_pattern(Path(par_files[0]).name) return fk.timestamp - return "20240101T000000Z" + return "20000101T000000Z" def get_wildcard_constraints(self, dataset, channel): if channel == "default": diff --git a/workflow/src/legenddataflow/scripts/flow/merge_channels.py b/workflow/src/legenddataflow/scripts/flow/merge_channels.py index e6485772..74586909 100644 --- a/workflow/src/legenddataflow/scripts/flow/merge_channels.py +++ b/workflow/src/legenddataflow/scripts/flow/merge_channels.py @@ -11,18 +11,7 @@ from legenddataflow.methods import ChannelProcKey - -def replace_path(d, old_path, new_path): - if isinstance(d, dict): - for k, v in d.items(): - d[k] = replace_path(v, old_path, new_path) - elif isinstance(d, list): - for i in range(len(d)): - d[i] = replace_path(d[i], old_path, new_path) - elif isinstance(d, str) and old_path in d: - d = d.replace(old_path, new_path) - d = d.replace(new_path, f"$_/{Path(new_path).name}") - return d +from .utils import replace_path def merge_channels() -> None: diff --git a/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py new file mode 100644 index 00000000..7111ce80 --- /dev/null +++ b/workflow/src/legenddataflow/scripts/flow/merge_in_channel.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import argparse +import pickle as pkl +from pathlib import Path + +from dbetto.catalog import Props +from lgdo import lh5 +from lgdo.types import Struct + +from legenddataflow.methods import ChannelProcKey + +from .utils import replace_path + + +def merge_in_channel() -> None: + argparser = argparse.ArgumentParser() + argparser.add_argument( + "--input", help="input file", nargs="*", type=str, required=True + ) + argparser.add_argument("--output", help="output file", type=str, required=True) + argparser.add_argument( + "--in-db", + help="in db file (used for when lh5 files referred to in db)", + type=str, + required=False, + ) + argparser.add_argument( + "--out-db", + help="lh5 file (used for when lh5 files referred to in db)", + type=str, + required=False, + ) + args = argparser.parse_args() + + # change to only have 1 output file for multiple inputs + # don't care about processing step, check if extension matches + + input_files = args.input.infiles if hasattr(args.input, "infiles") else args.input + + file_extension = Path(args.output).suffix + out_file = args.output + + Path(args.output).parent.mkdir(parents=True, exist_ok=True) + + if file_extension in (".json", ".yaml", ".yml"): + Props.write_to(out_file, Props.read_from(input_files)) + + elif file_extension == ".pkl": + out_dict = {} + for infile in input_files: + with Path(infile).open("rb") as r: + indict = pkl.load(r) + out_dict.update(indict) + + with Path(out_file).open("wb") as w: + pkl.dump(out_dict, w, protocol=pkl.HIGHEST_PROTOCOL) + + elif file_extension == ".lh5": + if args.in_db: + db_dict = Props.read_from(args.in_db) + objects = {} + for infile in input_files: + fkey = ChannelProcKey.get_filekey_from_pattern(Path(infile).name) + tb_in = lh5.read(f"{fkey.channel}", infile) + + objects[f"{fkey.processing_step}".split("_", maxsplit=3)[2]] = tb_in + + lh5.write( + Struct(objects), + name=fkey.channel, + lh5_file=out_file, + wo_mode="a", + ) + + if args.out_db: + if args.in_db: + db_dict = replace_path(db_dict, infile, args.output) + + Props.write_to(args.out_db, db_dict) diff --git a/workflow/src/legenddataflow/scripts/flow/utils.py b/workflow/src/legenddataflow/scripts/flow/utils.py new file mode 100644 index 00000000..36991378 --- /dev/null +++ b/workflow/src/legenddataflow/scripts/flow/utils.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from pathlib import Path + + +def replace_path(d, old_path, new_path): + if isinstance(d, dict): + for k, v in d.items(): + d[k] = replace_path(v, old_path, new_path) + elif isinstance(d, list): + for i in range(len(d)): + d[i] = replace_path(d[i], old_path, new_path) + elif isinstance(d, str) and old_path in d: + d = d.replace(old_path, new_path) + d = d.replace(new_path, f"$_/{Path(new_path).name}") + return d diff --git a/workflow/src/legenddataflow/scripts/par/geds/pht/util.py b/workflow/src/legenddataflow/scripts/par/geds/pht/util.py index eb2df4cd..d909022f 100644 --- a/workflow/src/legenddataflow/scripts/par/geds/pht/util.py +++ b/workflow/src/legenddataflow/scripts/par/geds/pht/util.py @@ -6,7 +6,13 @@ import numpy as np from dbetto import Props -from legenddataflow.FileKey import ChannelProcKey, ProcessingFileKey, run_splitter +from legenddataflowscripts.utils import convert_dict_np_to_float + +from legenddataflow.methods.FileKey import ( + ChannelProcKey, + ProcessingFileKey, + run_splitter, +) def update_cal_dicts(cal_dicts, update_dict): @@ -37,17 +43,16 @@ def get_run_dict(files): return out_dicts -def split_files_by_run(files): +def split_files_by_run(infiles): # sort files in dictionary where keys are first timestamp from run - if isinstance(files, list): + if isinstance(infiles, list): files = [] - for file in files: + for file in infiles: with Path(file).open() as f: files += f.read().splitlines() else: - with Path(files).open() as f: + with Path(infiles).open() as f: files = f.read().splitlines() - files = sorted( np.unique(files) ) # need this as sometimes files get double counted as it somehow puts in the p%-* filelist and individual runs also @@ -68,7 +73,7 @@ def save_dict_to_files(files, out_dict): with Path(file).open("wb") as w: pkl.dump(out_dict[fk.timestamp], w, protocol=pkl.HIGHEST_PROTOCOL) elif Path(file).suffix in (".json", ".yml", ".yaml"): - Props.write_to(file, out_dict[fk.timestamp]) + Props.write_to(file, convert_dict_np_to_float(out_dict[fk.timestamp])) else: err = f"File {file} not recognized as .pkl, .json, .yml or .yaml" raise ValueError(err) diff --git a/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py b/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py new file mode 100644 index 00000000..1a504fa5 --- /dev/null +++ b/workflow/src/legenddataflow/scripts/par/geds/psp/dplms.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +import argparse +import time +from pathlib import Path + +import numpy as np +import pygama.math.distributions as pmd # noqa: F401 +from dbetto.catalog import Props +from legenddataflowscripts.utils import build_log +from lgdo import Array, Table, WaveformTable, lh5 +from pygama.evt.build_tcm import _concat_tables +from pygama.pargen.data_cleaning import generate_cuts +from pygama.pargen.dplms_ge_dict import dplms_ge_dict +from pygama.pargen.dsp_optimize import run_one_dsp + +from legenddataflow.scripts.par.geds.pht.util import ( + get_run_dict, + save_dict_to_files, + split_files_by_run, +) + + +def filter_table(raw_data_tables, final_masks): + + waveform_windowed = WaveformTable( + t0=np.concatenate( + [ + tbl["waveform_windowed"]["t0"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + t0_units=raw_data_tables[0]["waveform_windowed"]["t0"].attrs["units"], + dt=np.concatenate( + [ + tbl["waveform_windowed"]["dt"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + dt_units=raw_data_tables[0]["waveform_windowed"]["dt"].attrs["units"], + values=np.concatenate( + [ + tbl["waveform_windowed"]["values"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + ) + waveform_presummed = WaveformTable( + t0=np.concatenate( + [ + tbl["waveform_presummed"]["t0"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + t0_units=raw_data_tables[0]["waveform_presummed"]["t0"].attrs["units"], + dt=np.concatenate( + [ + tbl["waveform_presummed"]["dt"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + dt_units=raw_data_tables[0]["waveform_presummed"]["dt"].attrs["units"], + values=np.concatenate( + [ + tbl["waveform_presummed"]["values"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ), + ) + + col_dict = { + "waveform_presummed": waveform_presummed, + "waveform_windowed": waveform_windowed, + "presum_rate": Array( + np.concatenate( + [ + tbl["presum_rate"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "timestamp": Array( + np.concatenate( + [ + tbl["timestamp"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "baseline": Array( + np.concatenate( + [ + tbl["baseline"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + "daqenergy": Array( + np.concatenate( + [ + tbl["daqenergy"].nda[mask] + for tbl, mask in zip(raw_data_tables, final_masks, strict=True) + ] + ) + ), + } + return Table(col_dict) + + +def par_geds_psp_dplms() -> None: + """Compute DPLMS optimal filter coefficients for HPGe detectors. + This is the partition version of dplms, it combines multiple runs + with an equal number of fft and cal events from each. + + CLI entry point registered as ``par-geds-psp-dplms``. The *Discrete + Prolate Laplacian Maximum Signal* (DPLMS) filter is constructed from two + data samples: + + 1. **FFT baselines** - low-energy (``daqenergy <= 10`` ADC) events from + dedicated FFT run files used to characterise the noise power spectrum. + 2. **Calibration peak events** - gamma-line events read from the + *peak-file* produced by :func:`par_geds_dsp_evtsel`. + + The coefficients are computed by + :func:`pygama.pargen.dplms_ge_dict.dplms_ge_dict` and written to an LH5 + file at *lh5-path* (so that the DSP chain can load them with + ``loadlh5(...)``). The DSP parameter database is updated with the filter + configuration and written to *dsp-pars*. + + Notes + ----- + **Command-line arguments** + + ``--fft-raw-filelist`` : str + Path to a text file listing the FFT raw LH5 input files. + ``--peak-file`` : str + LH5 file containing preselected calibration peak events. + ``--inplots`` : str, optional + Existing pickle plot file to update with DPLMS plots. + ``--database`` : str + Path to the existing DSP parameter database (JSON/YAML). + ``--log`` : str, optional + Path to the log file. + ``--log-config`` : str, optional + Logging configuration file. + ``--processing-chain`` : list of str + Processing chain configuration file(s). + ``--config-file`` : list of str + DPLMS configuration file(s). Must contain ``run_dplms`` (bool), + ``n_baselines`` (int), and ``peaks_kev`` (list). + ``--channel`` : str + Channel identifier; used as the HDF5 group name in *lh5-path*. + ``--raw-table-name`` : str + LH5 table path within the raw and peak files. + ``--dsp-pars`` : str + Output path for the updated DSP parameter database (JSON/YAML). + ``--lh5-path`` : str + Output LH5 file path for the DPLMS filter coefficients. + ``--plot-path`` : str, optional + Output path for diagnostic plots (pickle). + """ + argparser = argparse.ArgumentParser() + argparser.add_argument( + "--fft-raw-filelists", + help="fft_raw_filelists", + type=str, + nargs="*", + required=True, + ) + argparser.add_argument( + "--peak-files", help="peak files", nargs="*", type=str, required=True + ) + argparser.add_argument("--inplots", help="in_plot_path", nargs="*", type=str) + argparser.add_argument( + "--database", help="database", type=str, nargs="*", required=True + ) + + argparser.add_argument("--log", help="log_file", type=str) + argparser.add_argument( + "--log-config", help="Log config file", type=str, required=False, default={} + ) + + argparser.add_argument( + "--processing-chain", + help="Processing chain config", + type=str, + nargs="*", + required=True, + ) + argparser.add_argument( + "--config-file", help="Config file", type=str, nargs="*", required=True + ) + + argparser.add_argument("--channel", help="channel", type=str, required=True) + argparser.add_argument( + "--raw-table-name", help="raw table name", type=str, required=True + ) + + argparser.add_argument( + "--dsp-pars", help="dsp_pars", type=str, nargs="*", required=True + ) + argparser.add_argument( + "--lh5-path", help="lh5_path", type=str, nargs="*", required=True + ) + argparser.add_argument("--plot-path", help="plot_path", nargs="*", type=str) + + args = argparser.parse_args() + + dsp_config = Props.read_from(args.processing_chain) + log = build_log(args.log_config, args.log) + + t0 = time.time() + + dplms_dict = Props.read_from(args.config_file) + + db_dicts = get_run_dict(args.database) + db_dict = db_dicts[next(iter(db_dicts))] + + if dplms_dict["run_dplms"] is True: + fft_runs, _ = split_files_by_run(args.fft_raw_filelists) + + n_runs = len(fft_runs) + n_per_run = dplms_dict["n_baselines"] // n_runs + + t0 = time.time() + log.info("\nLoad fft data") + fft_tables = [] + eidxs_list = [] + raw_tables = [] + for _, files in fft_runs.items(): + energies = lh5.read_as( + f"{args.raw_table_name}/daqenergy", files, library="np" + ) + eidxs = np.where(energies <= 10)[0] + eidxs_list.append(eidxs) + raw_fft = lh5.read( + args.raw_table_name, + files, + n_rows=int(dplms_dict["n_baselines"] * 1.5), + idx=eidxs, + ) + raw_tables.append(raw_fft) + dsp_fft = run_one_dsp(raw_fft, dsp_config, db_dict=db_dict) + fft_tables.append(dsp_fft) + + all_fft = _concat_tables(fft_tables) + + cut_dict = generate_cuts(all_fft, cut_dict=dplms_dict["bls_cut_pars"]) + log.debug("Cuts are %s", cut_dict) + + cut_list = [] + for tbl in fft_tables: + idxs = np.full(len(tbl), True, dtype=bool) + for outname, info in cut_dict.items(): + outcol = tbl.eval(info["expression"], info.get("parameters", None)) + tbl.add_column(outname, outcol) + for cut in cut_dict: + idxs = tbl[cut].nda & idxs + cut_list.append(np.where(idxs)[0][:n_per_run]) + + raw_fft = filter_table(raw_tables, cut_list) + + log.debug("Applied Cuts") + + t1 = time.time() + msg = f"Time to load fft data {(t1 - t0):.2f} s, total events {len(raw_fft)}" + log.info(msg) + + log.info("\nRunning event selection") + peaks_kev = np.array(dplms_dict["peaks_kev"]) + # kev_widths = [tuple(kev_width) for kev_width in dplms_dict["kev_widths"]] + + peaks_rounded = [int(peak) for peak in peaks_kev] + + n_per_run_signals = dplms_dict["n_signals"] // n_runs + cal_tbs = [] + for file in args.peak_files: + peaks = lh5.read_as(f"{args.raw_table_name}/peak", file, library="np") + ids = np.isin(peaks, peaks_rounded) + peaks = peaks[ids] + + cal_tbs.append( + lh5.read(args.raw_table_name, file, idx=ids, n_rows=n_per_run_signals) + ) + + raw_cal = filter_table(cal_tbs, [np.ones(len(t), dtype=bool) for t in cal_tbs]) + + msg = f"Time to run event selection {(time.time() - t1):.2f} s, total events {len(raw_cal)}" + log.info(msg) + + if isinstance(dsp_config, str | list): + dsp_config = Props.read_from(dsp_config) + + out_dict, plot_dict = dplms_ge_dict( + raw_fft, + raw_cal, + dsp_config, + db_dict, + dplms_dict, + fom_func=eval(dplms_dict.get("fom_func", "pmd.gauss_on_step")), + display=1 if args.plot_path else 0, + ) + + coeffs = out_dict["dplms"].pop("coefficients") + dplms_pars = Table(col_dict={"coefficients": Array(coeffs)}) + out_dict["dplms"]["coefficients"] = ( + f"loadlh5('{args.lh5_path}', '{args.channel}/dplms/coefficients')" + ) + msg = f"DPLMS creation finished in {(time.time() - t0) / 60} minutes" + log.info(msg) + else: + out_dict = {} + dplms_pars = Table(col_dict={"coefficients": Array([])}) + plot_dict = {} + + inplot_dict = get_run_dict(args.inplots) if args.inplots else {} + + for _, entry in inplot_dict.items(): + entry.update({"dplms": plot_dict}) + + for _, db_dict in db_dicts.items(): + db_dict.update(out_dict) + + for outfile in args.lh5_path: + Path(outfile).parent.mkdir(parents=True, exist_ok=True) + lh5.write( + Table(col_dict={"dplms": dplms_pars}), + name=args.channel, + lh5_file=outfile, + wo_mode="overwrite", + ) + + save_dict_to_files(args.dsp_pars, db_dicts) + + if args.plot_path: + save_dict_to_files(args.plot_path, inplot_dict)