Skip to content

Commit c1f9a9e

Browse files
authored
fix(glam_fog_release): split scalar_bucket_counts (#2349)
1 parent 74997d1 commit c1f9a9e

1 file changed

Lines changed: 24 additions & 1 deletion

File tree

dags/glam_fog_release.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,30 @@
7070
)
7171

7272
# stage 2 - downstream for export
73-
scalar_bucket_counts = query(task_name=f"{product}__scalar_bucket_counts_v1", use_slots=False)
73+
with TaskGroup(
74+
group_id=f"{product}__scalar_bucket_counts_v1", dag=dag, default_args=default_args
75+
) as scalar_bucket_counts:
76+
prev_task = None
77+
# Windows + Release data is in [0-9] so we're further splitting that range.
78+
for sample_range in (
79+
[0, 0], [1, 1], [2, 2], [3, 3], [4, 4], [5, 5], [6, 6],
80+
[7, 7], [8, 8], [9, 9], [10, 19], [20, 29], [30, 39],
81+
[40, 49], [50, 59], [60, 69], [70, 79], [80, 89], [90, 99]
82+
):
83+
scalar_bucket_counts_sampled = query(
84+
task_name=(
85+
f"{product}__scalar_bucket_counts_v1_sampled_"
86+
f"{sample_range[0]}_{sample_range[1]}"
87+
),
88+
min_sample_id=sample_range[0],
89+
max_sample_id=sample_range[1],
90+
replace_table=(sample_range[0] == 0),
91+
use_slots=False,
92+
)
93+
if prev_task:
94+
scalar_bucket_counts_sampled.set_upstream(prev_task)
95+
prev_task = scalar_bucket_counts_sampled
96+
7497
scalar_probe_counts = query(task_name=f"{product}__scalar_probe_counts_v1")
7598

7699
with TaskGroup(

0 commit comments

Comments
 (0)