2121from pydantic .fields import Field
2222from upath import UPath
2323
24- ENV_DOMAINS = {
25- "lcl" : {"scheme" : "http" , "domain" : "localhost:8000" },
26- "dev" : {"scheme" : "https" , "domain" : "headev.fews.net" },
27- "tst" : {"scheme" : "https" , "domain" : "heastage.fews.net" },
28- "prd" : {"scheme" : "https" , "domain" : "hea.fews.net" },
29- }
3024
31-
32- class DownloadMetadataMixin :
25+ class DownloadAssetMixin :
3326 """
34- Mixin that adds download URL metadata to any IOManager.
35- Override handle_output to inject download metadata.
27+ Mixin that adds download URL of the asset to the metadata.
3628 """
3729
3830 def get_download_url (self , context : OutputContext ) -> str :
3931 if not context or not getattr (context , "has_asset_key" , False ) or not context .has_asset_key :
4032 return ""
4133
4234 asset_name = context .asset_key .path [- 1 ]
43- env = os .getenv ("ENV" , "lcl" )
44-
45- config = ENV_DOMAINS .get (env , ENV_DOMAINS ["lcl" ])
46- scheme , domain = config ["scheme" ], config ["domain" ]
35+ root_url = os .getenv ("BASELINE_EXPLORER_API_ROOT_URL" , "http://localhost:8000" )
4736
4837 kwargs = {"asset_name" : asset_name }
4938 if context .has_partition_key :
@@ -52,10 +41,20 @@ def get_download_url(self, context: OutputContext) -> str:
5241 else :
5342 relative_url = reverse ("asset_download" , kwargs = kwargs )
5443
55- return urljoin (f"{ scheme } ://{ domain } " , relative_url )
44+ return urljoin (root_url , relative_url )
45+
46+ def handle_output (self , context : OutputContext , obj ):
47+ download_url = self .get_download_url (context )
48+ if download_url :
49+ context .add_output_metadata (
50+ {
51+ "download" : MetadataValue .url (download_url ),
52+ }
53+ )
54+ super ().handle_output (context , obj )
5655
5756
58- class PickleFilesystemIOManager (UPathIOManager , DownloadMetadataMixin ):
57+ class PickleFilesystemIOManager (UPathIOManager ):
5958 """
6059 Dagster I/O Manager that serializes Python objects to files using Pickle.
6160
@@ -93,18 +92,8 @@ def load_from_path(self, context: InputContext, path: "UPath") -> Any:
9392 with path .open ("rb" ) as file :
9493 return pickle .load (file )
9594
96- def handle_output (self , context : OutputContext , obj ):
97- super ().handle_output (context , obj )
98- download_url = self .get_download_url (context )
99- if download_url :
100- context .add_output_metadata (
101- {
102- "download" : MetadataValue .url (download_url ),
103- }
104- )
105-
10695
107- class JSONFilesystemIOManager (UPathIOManager , DownloadMetadataMixin ):
96+ class JSONFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
10897 """
10998 Dagster I/O Manager that serializes Python objects to JSON files.
11099 """
@@ -123,18 +112,8 @@ def load_from_path(self, context: InputContext, path: "UPath") -> Any:
123112 with path .open ("r" ) as file :
124113 return json .loads (file .read ())
125114
126- def handle_output (self , context : OutputContext , obj ):
127- super ().handle_output (context , obj )
128- download_url = self .get_download_url (context )
129- if download_url :
130- context .add_output_metadata (
131- {
132- "download" : MetadataValue .url (download_url ),
133- }
134- )
135-
136115
137- class DataFrameCSVFilesystemIOManager (UPathIOManager , DownloadMetadataMixin ):
116+ class DataFrameCSVFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
138117 """
139118 Dagster I/O Manager that serializes DataFrames to CSV files.
140119 """
@@ -151,18 +130,8 @@ def dump_to_path(self, context: OutputContext, obj: pd.DataFrame, path: UPath) -
151130 def load_from_path (self , context : InputContext , path : UPath ) -> pd .DataFrame :
152131 return pd .read_csv (path )
153132
154- def handle_output (self , context : OutputContext , obj ):
155- super ().handle_output (context , obj )
156- download_url = self .get_download_url (context )
157- if download_url :
158- context .add_output_metadata (
159- {
160- "download" : MetadataValue .url (download_url ),
161- }
162- )
163-
164133
165- class DataFrameExcelFilesystemIOManager (UPathIOManager , DownloadMetadataMixin ):
134+ class DataFrameExcelFilesystemIOManager (DownloadAssetMixin , UPathIOManager ):
166135 """
167136 Dagster I/O Manager that serializes DataFrames to Excel .xlsx using the OpenPyXL engine.
168137
@@ -217,16 +186,6 @@ def load_from_path(self, context: InputContext, path: UPath) -> dict[str, pd.Dat
217186 with pd .ExcelFile (path , engine = "openpyxl" ) as xls :
218187 return xls .parse (sheet_name = None ) # Get all sheets as a dict
219188
220- def handle_output (self , context : OutputContext , obj ):
221- super ().handle_output (context , obj )
222- download_url = self .get_download_url (context )
223- if download_url :
224- context .add_output_metadata (
225- {
226- "download" : MetadataValue .url (download_url ),
227- }
228- )
229-
230189
231190class ConfigurableUPathIOManagerFactory (
232191 ConfigurableIOManagerFactory [
@@ -267,12 +226,12 @@ def create_io_manager(self, context: InitResourceContext) -> UPathIOManager:
267226 )
268227
269228
270- class PickleIOManager (ConfigurableUPathIOManagerFactory , DownloadMetadataMixin ):
229+ class PickleIOManager (ConfigurableUPathIOManagerFactory ):
271230 def get_iomanager_class (self ) -> Type [UPathIOManager ]:
272231 return PickleFilesystemIOManager
273232
274233
275- class JSONIOManager (ConfigurableUPathIOManagerFactory , DownloadMetadataMixin ):
234+ class JSONIOManager (ConfigurableUPathIOManagerFactory ):
276235 def get_iomanager_class (self ) -> Type [UPathIOManager ]:
277236 return JSONFilesystemIOManager
278237
0 commit comments