diff --git a/src/config_override/.gitkeep b/src/config_override/.gitkeep new file mode 100644 index 0000000..5e45830 --- /dev/null +++ b/src/config_override/.gitkeep @@ -0,0 +1,8 @@ +Use this ./config_override folder for your own customised framework-level config, using the same layout as the default ./config folder. +Doing so isolates your fork from upstream changes to ./config in the open-source project, so merges do not overwrite or conflict with your settings. +When this folder contains any non-hidden files, the framework reads from here instead of ./config for: + * global config + * substitutions + * secrets + * dataflow spec mappings + * operational metadata. diff --git a/src/constants.py b/src/constants.py index 2122a22..11c2cfc 100644 --- a/src/constants.py +++ b/src/constants.py @@ -1,5 +1,20 @@ from dataclasses import dataclass from enum import Enum +import os + + +def _has_visible_children(directory: str) -> bool: + """ + Return True if `directory` exists and contains at least one child name not prefixed with `.` + """ + if not os.path.isdir(directory): + return False + try: + names = os.listdir(directory) + except OSError: + return False + return any(not n.startswith(".") for n in names) + @dataclass(frozen=True) class FrameworkSettings: @@ -39,13 +54,18 @@ class FrameworkPaths: """ FrameworkPaths is a class that contains constants for various paths and file masks used in the Lakeflow Framework. + CONFIG_PATH and CONFIG_OVERRIDE_PATH are static path segments (./config and ./config_override). + At runtime, which root to use for framework config files should be chosen with + resolve_framework_config_path(framework_path). + Attributes: - CONFIG_PATH (str): Path to the config directory. + CONFIG_PATH (str): Path to the config directory (./config). + CONFIG_OVERRIDE_PATH (str): Overrides the config directory (./config_override). EXTENSIONS_PATH (str): The path for extensions. - GLOBAL_CONFIG (tuple): Paths to the global configuration files. + GLOBAL_CONFIG (tuple): Basenames of global configuration files (under the resolved config root). GLOBAL_SUBSTITUTIONS (tuple): Paths to the global substitutions files. GLOBAL_SECRETS (tuple): Paths to the global secrets files. - DATAFLOW_SPEC_MAPPING_PATH (str): Path to the dataflow spec mapping directory. + DATAFLOW_SPEC_MAPPING (str): Directory segment for dataflow spec mapping (under the resolved root). MAIN_SPEC_SCHEMA_PATH (str): Path to the main specification schema file. FLOW_GROUP_SPEC_SCHEMA_PATH (str): Path to the flow group specification schema file. EXPECTATIONS_SPEC_SCHEMA_PATH (str): Path to the expectations specification schema file. @@ -54,11 +74,12 @@ class FrameworkPaths: TEMPLATE_SPEC_SCHEMA_PATH (str): Path to the template specification schema file. """ CONFIG_PATH: str = "./config" + CONFIG_OVERRIDE_PATH: str = "./config_override" EXTENSIONS_PATH: str = "./extensions" - GLOBAL_CONFIG: tuple = ("./config/global.json", "./config/global.yaml", "./config/global.yml") + GLOBAL_CONFIG: tuple = ("global.json", "global.yaml", "global.yml") GLOBAL_SUBSTITUTIONS: tuple = ("_substitutions.json", "_substitutions.yaml", "_substitutions.yml") GLOBAL_SECRETS: tuple = ("_secrets.json", "_secrets.yaml", "_secrets.yml") - DATAFLOW_SPEC_MAPPING_PATH: str = "./config/dataflow_spec_mapping" + DATAFLOW_SPEC_MAPPING: str = "dataflow_spec_mapping" REQUIREMENTS_FILE: str = "requirements.txt" # Spec schema definitions paths @@ -69,7 +90,18 @@ class FrameworkPaths: SECRETS_SCHEMA_PATH: str = "./schemas/secrets.json" TEMPLATE_DEFINITION_SPEC_SCHEMA_PATH: str = "./schemas/spec_template_definition.json" TEMPLATE_SPEC_SCHEMA_PATH: str = "./schemas/spec_template.json" - + + +def resolve_framework_config_path(framework_path: str) -> str: + """ + Return FrameworkPaths.CONFIG_OVERRIDE_PATH when framework_path/config_override + exists and has at least one non-hidden entry; otherwise FrameworkPaths.CONFIG_PATH. + """ + override_dir = os.path.join(framework_path, FrameworkPaths.CONFIG_OVERRIDE_PATH) + if _has_visible_children(override_dir): + return FrameworkPaths.CONFIG_OVERRIDE_PATH + return FrameworkPaths.CONFIG_PATH + class SupportedSpecFormat(str, Enum): """Supported specification file formats.""" diff --git a/src/dataflow_spec_builder/spec_mapper.py b/src/dataflow_spec_builder/spec_mapper.py index 9ce1039..51fe0ca 100644 --- a/src/dataflow_spec_builder/spec_mapper.py +++ b/src/dataflow_spec_builder/spec_mapper.py @@ -2,7 +2,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Tuple, Optional, Any -from constants import FrameworkPaths +from constants import FrameworkPaths, resolve_framework_config_path import pipeline_config import utility @@ -44,6 +44,7 @@ def __init__(self, framework_path: str, max_workers: int = 1): max_workers: Maximum parallel workers for processing """ self.framework_path = framework_path + self._framework_config_path = resolve_framework_config_path(framework_path) self.max_workers = max_workers self._mapping_cache: Dict[str, Dict] = {} @@ -121,7 +122,8 @@ def get_mapping(self, version: str) -> Dict: mapping_path = os.path.join( self.framework_path, - FrameworkPaths.DATAFLOW_SPEC_MAPPING_PATH, + self._framework_config_path, + FrameworkPaths.DATAFLOW_SPEC_MAPPING, version, "dataflow_spec_mapping.json" ) diff --git a/src/dlt_pipeline_builder.py b/src/dlt_pipeline_builder.py index 2f1af72..46e804c 100644 --- a/src/dlt_pipeline_builder.py +++ b/src/dlt_pipeline_builder.py @@ -10,8 +10,13 @@ from pyspark.sql import SparkSession from typing import Dict, Any -from constants import( - FrameworkPaths, FrameworkSettings, PipelineBundlePaths, DLTPipelineSettingKeys, SupportedSpecFormat +from constants import ( + FrameworkPaths, + FrameworkSettings, + PipelineBundlePaths, + DLTPipelineSettingKeys, + SupportedSpecFormat, + resolve_framework_config_path, ) from dataflow import DataFlow from dataflow_spec_builder import DataflowSpecBuilder @@ -114,6 +119,7 @@ def _init_configurations(self) -> None: self.bundle_path = config_values[DLTPipelineSettingKeys.BUNDLE_SOURCE_PATH] self.framework_path = config_values[DLTPipelineSettingKeys.FRAMEWORK_SOURCE_PATH] + self._framework_config_path = resolve_framework_config_path(self.framework_path) self.workspace_host = config_values[DLTPipelineSettingKeys.WORKSPACE_HOST] # Load optional parameters @@ -186,7 +192,10 @@ def _init_pipeline_components(self) -> None: def _load_framework_global_config_file(self) -> Dict[str, Any]: """Load a global config file""" - global_config_paths = [os.path.join(self.framework_path, path) for path in FrameworkPaths.GLOBAL_CONFIG] + global_config_paths = [ + os.path.join(self.framework_path, self._framework_config_path, path) + for path in FrameworkPaths.GLOBAL_CONFIG + ] # Check if more than one global config exists existing_configs = [path for path in global_config_paths if os.path.exists(path)] @@ -284,7 +293,7 @@ def _init_substitution_manager(self) -> None: # Build framework substitutions paths framework_subs_paths = [ - os.path.join(self.framework_path, FrameworkPaths.CONFIG_PATH, workspace_env + path) + os.path.join(self.framework_path, self._framework_config_path, workspace_env + path) for path in FrameworkPaths.GLOBAL_SUBSTITUTIONS ] self.logger.info("Framework substitutions paths: %s", framework_subs_paths) @@ -315,7 +324,7 @@ def _init_secrets_manager(self) -> None: # Build framework secrets paths framework_secrets_config_paths = [ - os.path.join(self.framework_path, FrameworkPaths.CONFIG_PATH, workspace_env + path) + os.path.join(self.framework_path, self._framework_config_path, workspace_env + path) for path in FrameworkPaths.GLOBAL_SECRETS ] @@ -370,7 +379,9 @@ def _setup_operational_metadata(self) -> None: return self.logger.info("Operational Metadata: layer set to %s", layer) - metadata_path = os.path.join(self.framework_path, f"config/operational_metadata_{layer}.json") + metadata_path = os.path.join( + self.framework_path, self._framework_config_path, f"operational_metadata_{layer}.json" + ) self.logger.info("Operational Metadata Path: %s", metadata_path) metadata_json = utility.get_json_from_file(metadata_path, False) self.operational_metadata_schema = (