Skip to content

Commit 0bce206

Browse files
adding schedule for pricing_data
1 parent ac4418f commit 0bce206

1 file changed

Lines changed: 15 additions & 3 deletions

File tree

card_data/pipelines/definitions.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55

66
import dagster as dg
77

8+
from .defs.extract.extract_pricing_data import build_dataframe
9+
from .defs.load.load_pricing_data import load_pricing_data
10+
811

912
@definitions
1013
def defs():
11-
return load_from_defs_folder(project_root=Path(__file__).parent.parent)
14+
folder_defs = load_from_defs_folder(project_root=Path(__file__).parent.parent)
15+
return dg.Definitions.merge(folder_defs, defs_pricing)
1216

1317
dbt_project_directory = Path(__file__).absolute().parent / "poke_cli_dbt"
1418
dbt_project = DbtProject(project_dir=dbt_project_directory)
@@ -23,5 +27,13 @@ def defs():
2327
def dbt_models(context: dg.AssetExecutionContext, dbt: DbtCliResource):
2428
yield from dbt.cli(["build"], context=context).stream()
2529

26-
# Dagster object that contains the dbt assets and resource
27-
defs_dbt = dg.Definitions(assets=[dbt_models], resources={"dbt": dbt_resource})
30+
price_schedule = dg.ScheduleDefinition(
31+
name="price_schedule",
32+
cron_schedule="10 10 * * *",
33+
target=[build_dataframe, load_pricing_data],
34+
execution_timezone="America/Los_Angeles",
35+
)
36+
37+
defs_pricing = dg.Definitions(
38+
schedules=[price_schedule],
39+
)

0 commit comments

Comments
 (0)