Skip to content

Commit a45a915

Browse files
[WORKAROUND] Traditional schedule for e assets, eager for h* assets (#1060)
* disable upstream sensor; add schedule for e assets * weekly forecast H and weekly forecast E groups that we can supply new automation conditions on * some tidying and bug fixes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * submitting pyrenew-e jobs wednesday only --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 2b84164 commit a45a915

2 files changed

Lines changed: 113 additions & 64 deletions

File tree

EpiAutoGP/test/test_output.jl

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,23 +114,27 @@ end
114114
Symbol(".variable"),
115115
:resolution,
116116
:geo_value,
117-
:disease
117+
:disease,
118118
]
119119

120120
con = DBInterface.connect(DuckDB.DB, ":memory:")
121121
try
122-
read_df = DataFrame(DBInterface.execute(
123-
con,
124-
"SELECT * FROM read_parquet($(EpiAutoGP._quote_duckdb_string(parquet_path)))"
125-
))
122+
read_df = DataFrame(
123+
DBInterface.execute(
124+
con,
125+
"SELECT * FROM read_parquet($(EpiAutoGP._quote_duckdb_string(parquet_path)))"
126+
)
127+
)
126128

127129
@test eltype(read_df.date) == Date
128130
@test propertynames(read_df) == propertynames(result_df)
129131
@test read_df.date == forecast_dates[[1, 2, 1, 2]]
130132
@test read_df[!, Symbol(".draw")] == [1, 1, 2, 2]
131133
@test read_df[!, Symbol(".value")] == [10.0, 20.0, 30.0, 40.0]
132-
@test all(read_df[!, Symbol(".variable")] .==
133-
"observed_hospital_admissions")
134+
@test all(
135+
read_df[!, Symbol(".variable")] .==
136+
"observed_hospital_admissions"
137+
)
134138
@test all(read_df.resolution .== "epiweekly")
135139
@test all(read_df.geo_value .== "CA")
136140
@test all(read_df.disease .== "COVID-19")

dagster_defs.py

Lines changed: 102 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,55 @@ def _fuse_pyrenew_timeseries(
459459
# SCHEDULES AND AUTOMATION CONDITION SENSORS
460460
# ============================================================================
461461

462-
weekly_forecast_upstream_sensor = dg.AutomationConditionSensorDefinition(
463-
name="WeeklyForecastUpstream",
464-
target=dg.AssetSelection.groups("WeeklyForecastUpstream"),
465-
use_user_code_server=True, # allows for custom automation conditions
462+
# Custom Automation Condition. Relies on use_user_code_server=True on the sensor
463+
# class IsWeekday(dg.AutomationCondition):
464+
# def __init__(self, weekday: int):
465+
# """
466+
# Check if evaluation time falls on a specific weekday.
467+
# This is is a simple evaluation, rather than a stateful operation,
468+
# such as with cron_tick_passed().
469+
470+
# Args:
471+
# weekday: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday,
472+
# 4=Friday, 5=Saturday, 6=Sunday
473+
# """
474+
# self.weekday = weekday
475+
# super().__init__()
476+
477+
# def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult:
478+
# # If the current weekday is equal to the desired weekday,
479+
# # return the candidate_subset -> a dagster context's "true" case
480+
# if context.evaluation_time.weekday() == self.weekday:
481+
# true_subset = context.candidate_subset
482+
# else:
483+
# true_subset = context.get_empty_subset()
484+
485+
# return dg.AutomationResult(context=context, true_subset=true_subset)
486+
487+
# @property
488+
# def name(self) -> str:
489+
# """Define the label that will appear in the UI"""
490+
# days = [
491+
# "Monday",
492+
# "Tuesday",
493+
# "Wednesday",
494+
# "Thursday",
495+
# "Friday",
496+
# "Saturday",
497+
# "Sunday",
498+
# ]
499+
# return f"is_{days[self.weekday].lower()}"
500+
501+
# weekly_forecast_upstream_sensor = dg.AutomationConditionSensorDefinition(
502+
# name="WeeklyForecastUpstream",
503+
# target=dg.AssetSelection.groups("WeeklyForecastUpstream"),
504+
# use_user_code_server=True, # allows for custom automation conditions
505+
# )
506+
507+
weekly_forecast_h_sensor = dg.AutomationConditionSensorDefinition(
508+
name="WeeklyForecastH",
509+
target=dg.AssetSelection.groups("WeeklyForecastH"),
510+
use_user_code_server=False, # does NOT allow custom conditions
466511
)
467512

468513
weekly_forecast_fusion_sensor = dg.AutomationConditionSensorDefinition(
@@ -472,44 +517,28 @@ def _fuse_pyrenew_timeseries(
472517
)
473518

474519

475-
# Custom Automation Condition. Relies on use_user_code_server=True on the sensor
476-
class IsWeekday(dg.AutomationCondition):
477-
def __init__(self, weekday: int):
478-
"""
479-
Check if evaluation time falls on a specific weekday.
480-
This is is a simple evaluation, rather than a stateful operation,
481-
such as with cron_tick_passed().
482-
483-
Args:
484-
weekday: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday,
485-
4=Friday, 5=Saturday, 6=Sunday
486-
"""
487-
self.weekday = weekday
488-
super().__init__()
489-
490-
def evaluate(self, context: dg.AutomationContext) -> dg.AutomationResult:
491-
# If the current weekday is equal to the desired weekday,
492-
# return the candidate_subset -> a dagster context's "true" case
493-
if context.evaluation_time.weekday() == self.weekday:
494-
true_subset = context.candidate_subset
495-
else:
496-
true_subset = context.get_empty_subset()
497-
498-
return dg.AutomationResult(context=context, true_subset=true_subset)
499-
500-
@property
501-
def name(self) -> str:
502-
"""Define the label that will appear in the UI"""
503-
days = [
504-
"Monday",
505-
"Tuesday",
506-
"Wednesday",
507-
"Thursday",
508-
"Friday",
509-
"Saturday",
510-
"Sunday",
511-
]
512-
return f"is_{days[self.weekday].lower()}"
520+
# Temporary workaround schedule while we debug the custom automation condition
521+
@dg.schedule(
522+
target=dg.AssetSelection.assets(
523+
"timeseries_e", "epiweekly_timeseries_e", "pyrenew_e"
524+
),
525+
cron_schedule="30 6 * * WED", # 6:30am on Wednesday (day 3)
526+
execution_timezone="America/New_York",
527+
)
528+
def weekly_forecast_e_schedule(context: dg.ScheduleEvaluationContext):
529+
_partition_key = daily_partitions_def.get_last_partition_key()
530+
context.log.info(f"Submitting job request for partition: {_partition_key}")
531+
return dg.RunRequest(
532+
partition_key=_partition_key,
533+
run_config=dg.RunConfig(
534+
ops={
535+
"timeseries_e": TimeseriesConfig(),
536+
"epiweekly_timeseries_e": TimeseriesConfig(),
537+
"pyrenew_e": PyrenewEConfig(),
538+
},
539+
execution=azure_batch_execution_config.to_run_config(),
540+
),
541+
)
513542

514543

515544
# ---------- Shared Asset Decorator Arguments ----------
@@ -518,22 +547,26 @@ def name(self) -> str:
518547
# partitions, graph_dimensions, automation conditions, and asset groups
519548
# The only thing that differs between them are their dependencies
520549

521-
weekly_forecast_upstream_asset_args = {
550+
weekly_forecast_base_asset_args = {
522551
"partitions_def": daily_partitions_def,
523552
"graph_dimensions": ["diseases", "locations"],
524-
"group_name": "WeeklyForecastUpstream",
525-
"automation_condition": (
526-
# We specifically don't want these to run unless it's Wednesday
527-
# 0=monday,1=tuesday,2=wednesday,etc.
528-
# Note this is different from cron which is 1-indexed
529-
dg.AutomationCondition.eager() & IsWeekday(2)
530-
).with_label("eager_on_wednesday"),
531553
}
532554

555+
# weekly_forecast_upstream_asset_args = {
556+
# **weekly_forecast_base_asset_args,
557+
# "group_name": "WeeklyForecastUpstream",
558+
# "automation_condition": (
559+
# # We specifically don't want these to run unless it's Wednesday
560+
# # 0=monday,1=tuesday,2=wednesday,etc.
561+
# # Note this is different from cron which is 1-indexed
562+
# dg.AutomationCondition.eager() & IsWeekday(2)
563+
# ).with_label("eager_on_wednesday"),
564+
# }
565+
533566
weekly_forecast_fusion_asset_args = {
534-
**weekly_forecast_upstream_asset_args,
567+
**weekly_forecast_base_asset_args,
535568
"group_name": "WeeklyForecastFusion",
536-
# we want vanilla eager for the fusion assets and post-processing
569+
# we want vanilla eager for the fusion assets
537570
"automation_condition": dg.AutomationCondition.eager(),
538571
}
539572

@@ -564,7 +597,9 @@ def name(self) -> str:
564597

565598
# Timeseries E
566599
@dynamic_graph_asset(
567-
**weekly_forecast_upstream_asset_args,
600+
# **weekly_forecast_upstream_asset_args,
601+
**weekly_forecast_base_asset_args,
602+
group_name="WeeklyForecastE", # This will override what's in the asset args for now
568603
ins={"nssp_gold_v1": dg.In(dg.Nothing)},
569604
)
570605
def timeseries_e(context: DynamicGraphAssetExecutionContext, config: TimeseriesConfig):
@@ -573,7 +608,9 @@ def timeseries_e(context: DynamicGraphAssetExecutionContext, config: TimeseriesC
573608

574609
# Epiweekly Timeseries E
575610
@dynamic_graph_asset(
576-
**weekly_forecast_upstream_asset_args,
611+
# **weekly_forecast_upstream_asset_args,
612+
**weekly_forecast_base_asset_args,
613+
group_name="WeeklyForecastE", # This will override what's in the asset args for now
577614
ins={"nssp_gold_v1": dg.In(dg.Nothing)},
578615
)
579616
def epiweekly_timeseries_e(
@@ -584,7 +621,9 @@ def epiweekly_timeseries_e(
584621

585622
# Pyrenew E
586623
@dynamic_graph_asset(
587-
**weekly_forecast_upstream_asset_args,
624+
# **weekly_forecast_upstream_asset_args,
625+
**weekly_forecast_base_asset_args,
626+
group_name="WeeklyForecastE", # This will override what's in the asset args for now
588627
ins={
589628
"nssp_gold_v1": dg.In(dg.Nothing),
590629
},
@@ -598,7 +637,10 @@ def pyrenew_e(
598637

599638
# Pyrenew H
600639
@dynamic_graph_asset(
601-
**weekly_forecast_upstream_asset_args,
640+
# **weekly_forecast_upstream_asset_args,
641+
**weekly_forecast_base_asset_args,
642+
automation_condition=dg.AutomationCondition.eager(), # H assets can be eager, inheriting their schedule from dataops
643+
group_name="WeeklyForecastH", # This will override what's in the asset args for now
602644
ins={
603645
"nhsn_hrd_prelim": dg.In(dg.Nothing),
604646
},
@@ -609,7 +651,10 @@ def pyrenew_h(context: DynamicGraphAssetExecutionContext, config: PyrenewConfig)
609651

610652
# Pyrenew HE
611653
@dynamic_graph_asset(
612-
**weekly_forecast_upstream_asset_args,
654+
# **weekly_forecast_upstream_asset_args,
655+
**weekly_forecast_base_asset_args,
656+
automation_condition=dg.AutomationCondition.eager(), # H assets can be eager, inheriting their schedule from dataops
657+
group_name="WeeklyForecastH", # This will override what's in the asset args for now
613658
ins={
614659
"nhsn_hrd_prelim": dg.In(dg.Nothing),
615660
},

0 commit comments

Comments
 (0)