|
17 | 17 |
|
18 | 18 | from __future__ import annotations |
19 | 19 |
|
| 20 | +import importlib |
20 | 21 | import logging |
21 | 22 | import re |
22 | 23 | import uuid |
|
77 | 78 |
|
78 | 79 | TOKEN = "token" |
79 | 80 | TYPE = "type" |
| 81 | +PY_CATALOG_IMPL = "py-catalog-impl" |
80 | 82 | ICEBERG = "iceberg" |
81 | 83 | TABLE_TYPE = "table_type" |
82 | 84 | WAREHOUSE_LOCATION = "warehouse" |
@@ -233,6 +235,19 @@ def load_catalog(name: Optional[str] = None, **properties: Optional[str]) -> Cat |
233 | 235 | catalog_type: Optional[CatalogType] |
234 | 236 | provided_catalog_type = conf.get(TYPE) |
235 | 237 |
|
| 238 | + if catalog_impl := properties.get(PY_CATALOG_IMPL): |
| 239 | + if provided_catalog_type: |
| 240 | + raise ValueError( |
| 241 | + "Must not set both catalog type and py-catalog-impl configurations, " |
| 242 | + f"but found type {provided_catalog_type} and py-catalog-impl {catalog_impl}" |
| 243 | + ) |
| 244 | + |
| 245 | + if catalog := _import_catalog(name, catalog_impl, properties): |
| 246 | + logger.info("Loaded Catalog: %s", catalog_impl) |
| 247 | + return catalog |
| 248 | + else: |
| 249 | + raise ValueError(f"Could not initialize Catalog: {catalog_impl}") |
| 250 | + |
236 | 251 | catalog_type = None |
237 | 252 | if provided_catalog_type and isinstance(provided_catalog_type, str): |
238 | 253 | catalog_type = CatalogType[provided_catalog_type.upper()] |
@@ -283,6 +298,20 @@ def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> No |
283 | 298 | deleted_files[path] = True |
284 | 299 |
|
285 | 300 |
|
| 301 | +def _import_catalog(name: str, catalog_impl: str, properties: Properties) -> Optional[Catalog]: |
| 302 | + try: |
| 303 | + path_parts = catalog_impl.split(".") |
| 304 | + if len(path_parts) < 2: |
| 305 | + raise ValueError(f"py-catalog-impl should be full path (module.CustomCatalog), got: {catalog_impl}") |
| 306 | + module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] |
| 307 | + module = importlib.import_module(module_name) |
| 308 | + class_ = getattr(module, class_name) |
| 309 | + return class_(name, **properties) |
| 310 | + except ModuleNotFoundError: |
| 311 | + logger.warning("Could not initialize Catalog: %s", catalog_impl) |
| 312 | + return None |
| 313 | + |
| 314 | + |
286 | 315 | @dataclass |
287 | 316 | class PropertiesUpdateSummary: |
288 | 317 | removed: List[str] |
|
0 commit comments