Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataflow-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
legend_metadata_version: v1.2.2
legend_metadata_version: v1.4.0
allow_none_par: false
build_file_dbs: true
check_log_files: true
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ dynamic = ["version"]
dependencies = [
"colorlog",
"dbetto==1.2.4",
"pygama==2.3.8a1",
"pygama==2.3.8",
"dspeed==2.1.2",
"pylegendmeta==1.3.1",
"legend-pydataobj==1.17.2",
"legend-daq2lh5==1.6.3",
"legend-dataflow-scripts==0.3.0a3",
"legend-dataflow-scripts==0.3.0",
"pip",
"snakemake-logger-plugin-rich",
"snakemake-logger-plugin-snkmt",
Expand Down Expand Up @@ -96,6 +96,7 @@ docs = [
[project.scripts]
create-chankeylist = "legenddataflow.scripts.create_chankeylist:create_chankeylist"
merge-channels = "legenddataflow.scripts.flow.merge_channels:merge_channels"
merge-in-channels = "legenddataflow.scripts.flow.merge_in_channel:merge_in_channel"
build-tier-evt = "legenddataflow.scripts.tier.evt:build_tier_evt"
build-tier-raw-blind = "legenddataflow.scripts.tier.raw_blind:build_tier_raw_blind"
build-tier-raw-fcio = "legenddataflow.scripts.tier.raw_fcio:build_tier_raw_fcio"
Expand All @@ -108,6 +109,7 @@ par-geds-pht-fast = "legenddataflow.scripts.par.geds.pht.fast:par_geds_pht
par-geds-pht-qc-phy = "legenddataflow.scripts.par.geds.pht.qc_phy:par_geds_pht_qc_phy"
par-geds-pht-qc = "legenddataflow.scripts.par.geds.pht.qc:par_geds_pht_qc"
par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_psp_average"
par-geds-psp-dplms = "legenddataflow.scripts.par.geds.psp.dplms:par_geds_psp_dplms"
par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal"
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
Expand Down
2 changes: 2 additions & 0 deletions tests/runprod/test-argon-char-dataprod.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ rawfiles=(
anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5
anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5
acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5
cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5
cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5
)

(
Expand Down
1 change: 1 addition & 0 deletions tests/runprod/test-evt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rawfiles=(
cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5
cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5
cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5
cal/p16/r000/l200-p16-r000-cal-20260101T000000Z-tier_raw.lh5
)

(
Expand Down
1 change: 1 addition & 0 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ wildcard_constraints:


include: "rules/channel_merge.smk"
include: "rules/merge_in_channel.smk"
include: "rules/common.smk"
include: "rules/main.smk"
include: "rules/tcm.smk"
Expand Down
2 changes: 1 addition & 1 deletion workflow/profiles/lngs/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cores: 90
restart-times: 2
restart-times: 0
resources:
- mem_swap=3500
configfile: dataflow-config.yaml
Expand Down
1 change: 0 additions & 1 deletion workflow/rules/common.smk
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ def get_table_name(metadata, config, datatype, timestamp, detector, tier):
)
if isinstance(detector, tuple):
if detector[0] == "default":
print(detector)
return config.table_format[tier].format(
ch=chmap[detector[1].channel].daq.rawid
)
Expand Down
76 changes: 46 additions & 30 deletions workflow/rules/dsp_pars_geds.smk
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Snakemake rules for building dsp pars for HPGes, before running build_dsp()
- extraction of pole zero constant(s) for each channel from cal data
- extraction of energy filter parameters and charge trapping correction for each channel from cal data

Outputs should be dsp_pars, dsp_plots, dsp_pars_lh5 and dsp_objects, to be fed into merge rules
"""

from legenddataflow.methods import ParsKeyResolve
Expand Down Expand Up @@ -32,8 +34,8 @@ rule build_pars_dsp_pz_geds:
),
pulser=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"),
output:
decay_const=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")),
plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")),
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "decay_constant")),
dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "decay_constant")),
log:
get_pattern_log_channel(config, "par_dsp_decay_constant", time),
group:
Expand Down Expand Up @@ -78,8 +80,8 @@ rule build_pars_dsp_pz_geds:
"--config-file {params.config_file} "
"--processing-chain {params.processing_chain} "
"--raw-table-name {params.raw_table_name} "
"--plot-path {output.plots} "
"--output-file {output.decay_const} "
"--plot-path {output.dsp_plots} "
"--output-file {output.dsp_pars} "
"--pulser-file {input.pulser} "
"--raw-files {input.files} "
"--pz-files {input.pz_files}"
Expand All @@ -91,7 +93,7 @@ rule build_pars_evtsel_geds:
filelist_path(config), "all-{experiment}-{period}-{run}-cal-raw.filelist"
),
pulser_file=get_pattern_pars_tmp_channel(config, "tcm", "pulser_ids"),
database=rules.build_pars_dsp_pz_geds.output.decay_const,
database=rules.build_pars_dsp_pz_geds.output.dsp_pars,
raw_cal_curve=get_blinding_curve_file,
output:
peak_file=temp(
Expand Down Expand Up @@ -157,13 +159,13 @@ rule build_pars_dsp_nopt_geds:
files=os.path.join(
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
),
database=rules.build_pars_dsp_pz_geds.output.decay_const,
inplots=rules.build_pars_dsp_pz_geds.output.plots,
database=rules.build_pars_dsp_pz_geds.output.dsp_pars,
inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots,
output:
dsp_pars_nopt=temp(
get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization")
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "noise_optimization")),
dsp_plots=temp(
get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")
),
plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "noise_optimization")),
log:
get_pattern_log_channel(config, "par_dsp_noise_optimization", time),
group:
Expand Down Expand Up @@ -210,8 +212,8 @@ rule build_pars_dsp_nopt_geds:
"--processing-chain {params.processing_chain} "
"--raw-table-name {params.raw_table_name} "
"--inplots {input.inplots} "
"--plot-path {output.plots} "
"--dsp-pars {output.dsp_pars_nopt} "
"--plot-path {output.dsp_plots} "
"--dsp-pars {output.dsp_pars} "
"--raw-filelist {input.files}"


Expand All @@ -222,12 +224,14 @@ rule build_pars_dsp_dplms_geds:
filelist_path(config), "all-{experiment}-{period}-{run}-fft-raw.filelist"
),
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
database=rules.build_pars_dsp_nopt_geds.output.dsp_pars_nopt,
inplots=rules.build_pars_dsp_nopt_geds.output.plots,
dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars,
inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots,
output:
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "dplms")),
lh5_path=temp(get_pattern_pars_tmp_channel(config, "dsp", extension="lh5")),
plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")),
dsp_pars_lh5=temp(
get_pattern_pars_tmp_channel(config, "dsp", "dplms", extension="lh5")
),
dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "dplms")),
log:
get_pattern_log_channel(config, "pars_dsp_dplms", time),
group:
Expand Down Expand Up @@ -270,7 +274,7 @@ rule build_pars_dsp_dplms_geds:
shell:
execenv_pyexe(config, "par-geds-dsp-dplms") + "--peak-file {input.peak_file} "
"--fft-raw-filelist {input.fft_files} "
"--database {input.database} "
"--database {input.dsp_pars} "
"--inplots {input.inplots} "
"--log {log} "
"--log-config {params.log_config} "
Expand All @@ -279,22 +283,24 @@ rule build_pars_dsp_dplms_geds:
"--processing-chain {params.processing_chain} "
"--raw-table-name {params.raw_table_name} "
"--dsp-pars {output.dsp_pars} "
"--lh5-path {output.lh5_path} "
"--plot-path {output.plots} "
"--lh5-path {output.dsp_pars_lh5} "
"--plot-path {output.dsp_plots} "


# This rule builds the optimal energy filter parameters for the dsp using calibration dsp files
rule build_pars_dsp_eopt_geds:
input:
peak_file=rules.build_pars_evtsel_geds.output.peak_file,
decay_const=rules.build_pars_dsp_dplms_geds.output.dsp_pars,
inplots=rules.build_pars_dsp_dplms_geds.output.plots,
dsp_pars=rules.build_pars_dsp_pz_geds.output.dsp_pars,
inplots=rules.build_pars_dsp_pz_geds.output.dsp_plots,
output:
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp_eopt")),
qbb_grid=temp(
get_pattern_pars_tmp_channel(config, "dsp", "objects", extension="pkl")
dsp_objects=temp(
get_pattern_pars_tmp_channel(
config, "dsp", "eopt_objects", extension="pkl"
)
),
plots=temp(get_pattern_plts_tmp_channel(config, "dsp")),
dsp_plots=temp(get_pattern_plts_tmp_channel(config, "dsp", "eopt")),
log:
get_pattern_log_channel(config, "pars_dsp_eopt", time),
group:
Expand Down Expand Up @@ -340,9 +346,9 @@ rule build_pars_dsp_eopt_geds:
"--raw-table-name {params.raw_table_name} "
"--peak-file {input.peak_file} "
"--inplots {input.inplots} "
"--decay-const {input.decay_const} "
"--plot-path {output.plots} "
"--qbb-grid-path {output.qbb_grid} "
"--decay-const {input.dsp_pars} "
"--plot-path {output.dsp_plots} "
"--qbb-grid-path {output.dsp_objects} "
"--final-dsp-pars {output.dsp_pars}"


Expand Down Expand Up @@ -400,10 +406,9 @@ rule build_svm_dsp_geds:

rule build_pars_dsp_svm_geds:
input:
dsp_pars=rules.build_pars_dsp_eopt_geds.output.dsp_pars,
svm_file=rules.build_svm_dsp_geds.output.dsp_pars,
output:
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp")),
dsp_pars=temp(get_pattern_pars_tmp_channel(config, "dsp", "svm")),
log:
get_pattern_log_channel(config, "pars_dsp_svm", time),
group:
Expand All @@ -412,6 +417,17 @@ rule build_pars_dsp_svm_geds:
runtime=300,
shell:
execenv_pyexe(config, "par-geds-dsp-svm") + "--log {log} "
"--input-file {input.dsp_pars} "
# "--input-file {input.dsp_pars} "
"--output-file {output.dsp_pars} "
"--svm-file {input.svm_file}"


build_in_channel_merge_rules(
[
rules.build_pars_dsp_eopt_geds,
rules.build_pars_dsp_nopt_geds,
rules.build_pars_dsp_svm_geds,
rules.build_pars_dsp_dplms_geds,
],
"dsp",
)
3 changes: 2 additions & 1 deletion workflow/rules/evt.smk
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ for evt_tier in ("evt", "pet"):
rule:
input:
os.path.join(
filelist_path(config), "all-{experiment}-{period}-{run}-phy-"+f"{evt_tier}.filelist"
filelist_path(config),
"all-{experiment}-{period}-{run}-phy-" + f"{evt_tier}.filelist",
),
output:
get_pattern_tier(
Expand Down
136 changes: 136 additions & 0 deletions workflow/rules/merge_in_channel.smk
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import inspect

from legenddataflow.methods import patterns
from legenddataflowscripts.workflow import execenv_pyexe, set_last_rule_name
from legenddataflow.scripts.flow.build_chanlist import (
get_plt_chanlist,
get_par_chanlist,
)


def build_in_channel_merge_rules(rules, tier, output_name=None):

input_dsp_pars = []
input_dsp_pars_lh5 = []
input_dsp_objects = []
input_dsp_plots = []

name = f"tier" if output_name is None else f"{tier}_{output_name}"

group_name = f"merge-inchan-{tier}"
if output_name is not None:
group_name += f"_{output_name}"

for r in rules:
if hasattr(r.output, f"{tier}_pars"):
input_dsp_pars.append(r.output.dsp_pars)
if hasattr(r.output, f"{tier}_pars_lh5"):
input_dsp_pars_lh5.append(r.output.dsp_pars_lh5)
if hasattr(r.output, f"{tier}_plots"):
input_dsp_plots.append(r.output.dsp_plots)
if hasattr(r.output, f"{tier}_objects"):
input_dsp_objects.append(r.output.dsp_objects)

rule:
input:
input_dsp_plots,
output:
temp(get_pattern_plts_tmp_channel(config, tier, name=output_name)),
group:
group_name
shell:
execenv_pyexe(config, "merge-in-channels") + "--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_plts_inchan_{name}")

object_output_name = "objects"
if output_name is not None:
object_output_name += f"_{output_name}"

rule:
input:
input_dsp_objects,
output:
temp(
get_pattern_pars_tmp_channel(
config, tier, name=object_output_name, extension="pkl"
)
),
group:
group_name
shell:
execenv_pyexe(config, "merge-in-channels") + "--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_pars_inchan_{name}_objects")

if len(input_dsp_pars_lh5) > 0:

rule:
input:
input_dsp_pars,
output:
temp(
get_pattern_pars_tmp_channel(
config,
tier,
name=(
output_name + "_tmp" if output_name is not None else "tmp"
),
)
),
group:
group_name
shell:
execenv_pyexe(config, "merge-in-channels") + "--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_pars_inchan_{name}_db")

rule:
input:
in_files=(
input_dsp_pars_lh5 if len(input_dsp_pars_lh5) > 0 else input_dsp_pars
),
in_db=(
get_pattern_pars_tmp_channel(
config,
tier,
name=output_name + "_tmp" if output_name is not None else "tmp",
)
if len(input_dsp_pars_lh5) > 0
else []
),
plts=get_pattern_plts_tmp_channel(config, tier, name=output_name),
objects=get_pattern_pars_tmp_channel(
config, tier, name=object_output_name, extension="pkl"
),
output:
out_file=(
temp(
get_pattern_pars_tmp_channel(
config, tier, name=output_name, extension="lh5"
)
)
if len(input_dsp_pars_lh5) > 0
else temp(get_pattern_pars_tmp_channel(config, tier, name=output_name))
),
out_db=(
temp(get_pattern_pars_tmp_channel(config, tier, name=output_name))
if len(input_dsp_pars_lh5) > 0
else []
),
group:
group_name
run:
shell_string = (
execenv_pyexe(config, "merge-in-channels")
+ "--output {output.out_file} "
"--input {input.in_files} "
)
if len(input_dsp_pars_lh5) > 0:
shell_string += "--in-db {input.in_db} " "--out-db {output.out_db} "
shell(shell_string)

set_last_rule_name(workflow, f"build_pars_inchan_{name}")
Loading
Loading