44
55import dagster as dg
66
7+ from .defs .extract .limitless .extract_standings import create_standings_dataframe
78from .defs .extract .tcgcsv .extract_pricing import build_dataframe
89from .defs .extract .tcgdex .extract_sets import extract_sets_data
910from .defs .extract .tcgdex .extract_series import extract_series_data
11+ from .defs .load .limitless .load_standings import load_standings_data
1012from .defs .load .tcgcsv .load_pricing import load_pricing_data , data_quality_checks_on_pricing
1113from .defs .load .tcgdex .load_sets import load_sets_data , data_quality_check_on_sets
1214from .defs .load .tcgdex .load_series import load_series_data , data_quality_check_on_series
1719def defs () -> dg .Definitions :
1820 # load_from_defs_folder discovers dbt assets from transform_data.py
1921 folder_defs : dg .Definitions = load_from_defs_folder (project_root = Path (__file__ ).parent .parent )
20- return dg .Definitions .merge (folder_defs , defs_discord_sensors , defs_pricing , defs_sets , defs_series )
22+ return dg .Definitions .merge (folder_defs , defs_discord_sensors , defs_pricing , defs_sets , defs_series , defs_standings )
2123
2224
2325defs_discord_sensors : dg .Definitions = dg .Definitions (
@@ -63,4 +65,15 @@ def defs() -> dg.Definitions:
6365defs_sets : dg .Definitions = dg .Definitions (
6466 assets = [extract_sets_data , load_sets_data , data_quality_check_on_sets ],
6567 jobs = [sets_pipeline ],
68+ )
69+
70+ # Standings pipeline job
71+ standings_pipeline = dg .define_asset_job (
72+ name = "standings_pipeline_job" ,
73+ selection = dg .AssetSelection .assets (create_standings_dataframe ).downstream (include_self = True ),
74+ )
75+
76+ defs_standings : dg .Definitions = dg .Definitions (
77+ assets = [create_standings_dataframe , load_standings_data ],
78+ jobs = [standings_pipeline ],
6679)
0 commit comments