Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/config_override/.gitkeep
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Use this ./config_override folder for your own customised framework-level config, using the same layout as the default ./config folder.
Doing so isolates your fork from upstream changes to ./config in the open-source project, so merges do not overwrite or conflict with your settings.
When this folder contains any non-hidden files, the framework reads from here instead of ./config for:
* global config
* substitutions
* secrets
* dataflow spec mappings
* operational metadata.
44 changes: 38 additions & 6 deletions src/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
from dataclasses import dataclass
from enum import Enum
import os


def _has_visible_children(directory: str) -> bool:
"""
Return True if `directory` exists and contains at least one child name not prefixed with `.`
"""
if not os.path.isdir(directory):
return False
try:
names = os.listdir(directory)
except OSError:
return False
return any(not n.startswith(".") for n in names)


@dataclass(frozen=True)
class FrameworkSettings:
Expand Down Expand Up @@ -39,13 +54,18 @@ class FrameworkPaths:
"""
FrameworkPaths is a class that contains constants for various paths and file masks used in the Lakeflow Framework.

CONFIG_PATH and CONFIG_OVERRIDE_PATH are static path segments (./config and ./config_override).
At runtime, which root to use for framework config files should be chosen with
resolve_framework_config_path(framework_path).

Attributes:
CONFIG_PATH (str): Path to the config directory.
CONFIG_PATH (str): Path to the config directory (./config).
CONFIG_OVERRIDE_PATH (str): Overrides the config directory (./config_override).
EXTENSIONS_PATH (str): The path for extensions.
GLOBAL_CONFIG (tuple): Paths to the global configuration files.
GLOBAL_CONFIG (tuple): Basenames of global configuration files (under the resolved config root).
GLOBAL_SUBSTITUTIONS (tuple): Paths to the global substitutions files.
GLOBAL_SECRETS (tuple): Paths to the global secrets files.
DATAFLOW_SPEC_MAPPING_PATH (str): Path to the dataflow spec mapping directory.
DATAFLOW_SPEC_MAPPING (str): Directory segment for dataflow spec mapping (under the resolved root).
MAIN_SPEC_SCHEMA_PATH (str): Path to the main specification schema file.
FLOW_GROUP_SPEC_SCHEMA_PATH (str): Path to the flow group specification schema file.
EXPECTATIONS_SPEC_SCHEMA_PATH (str): Path to the expectations specification schema file.
Expand All @@ -54,11 +74,12 @@ class FrameworkPaths:
TEMPLATE_SPEC_SCHEMA_PATH (str): Path to the template specification schema file.
"""
CONFIG_PATH: str = "./config"
CONFIG_OVERRIDE_PATH: str = "./config_override"
EXTENSIONS_PATH: str = "./extensions"
GLOBAL_CONFIG: tuple = ("./config/global.json", "./config/global.yaml", "./config/global.yml")
GLOBAL_CONFIG: tuple = ("global.json", "global.yaml", "global.yml")
GLOBAL_SUBSTITUTIONS: tuple = ("_substitutions.json", "_substitutions.yaml", "_substitutions.yml")
GLOBAL_SECRETS: tuple = ("_secrets.json", "_secrets.yaml", "_secrets.yml")
DATAFLOW_SPEC_MAPPING_PATH: str = "./config/dataflow_spec_mapping"
DATAFLOW_SPEC_MAPPING: str = "dataflow_spec_mapping"
REQUIREMENTS_FILE: str = "requirements.txt"

# Spec schema definitions paths
Expand All @@ -69,7 +90,18 @@ class FrameworkPaths:
SECRETS_SCHEMA_PATH: str = "./schemas/secrets.json"
TEMPLATE_DEFINITION_SPEC_SCHEMA_PATH: str = "./schemas/spec_template_definition.json"
TEMPLATE_SPEC_SCHEMA_PATH: str = "./schemas/spec_template.json"



def resolve_framework_config_path(framework_path: str) -> str:
"""
Return FrameworkPaths.CONFIG_OVERRIDE_PATH when framework_path/config_override
exists and has at least one non-hidden entry; otherwise FrameworkPaths.CONFIG_PATH.
"""
override_dir = os.path.join(framework_path, FrameworkPaths.CONFIG_OVERRIDE_PATH)
if _has_visible_children(override_dir):
return FrameworkPaths.CONFIG_OVERRIDE_PATH
return FrameworkPaths.CONFIG_PATH


class SupportedSpecFormat(str, Enum):
"""Supported specification file formats."""
Expand Down
6 changes: 4 additions & 2 deletions src/dataflow_spec_builder/spec_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple, Optional, Any

from constants import FrameworkPaths
from constants import FrameworkPaths, resolve_framework_config_path
import pipeline_config
import utility

Expand Down Expand Up @@ -44,6 +44,7 @@ def __init__(self, framework_path: str, max_workers: int = 1):
max_workers: Maximum parallel workers for processing
"""
self.framework_path = framework_path
self._framework_config_path = resolve_framework_config_path(framework_path)
self.max_workers = max_workers
self._mapping_cache: Dict[str, Dict] = {}

Expand Down Expand Up @@ -121,7 +122,8 @@ def get_mapping(self, version: str) -> Dict:

mapping_path = os.path.join(
self.framework_path,
FrameworkPaths.DATAFLOW_SPEC_MAPPING_PATH,
self._framework_config_path,
FrameworkPaths.DATAFLOW_SPEC_MAPPING,
version,
"dataflow_spec_mapping.json"
)
Expand Down
23 changes: 17 additions & 6 deletions src/dlt_pipeline_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
from pyspark.sql import SparkSession
from typing import Dict, Any

from constants import(
FrameworkPaths, FrameworkSettings, PipelineBundlePaths, DLTPipelineSettingKeys, SupportedSpecFormat
from constants import (
FrameworkPaths,
FrameworkSettings,
PipelineBundlePaths,
DLTPipelineSettingKeys,
SupportedSpecFormat,
resolve_framework_config_path,
)
from dataflow import DataFlow
from dataflow_spec_builder import DataflowSpecBuilder
Expand Down Expand Up @@ -114,6 +119,7 @@ def _init_configurations(self) -> None:

self.bundle_path = config_values[DLTPipelineSettingKeys.BUNDLE_SOURCE_PATH]
self.framework_path = config_values[DLTPipelineSettingKeys.FRAMEWORK_SOURCE_PATH]
self._framework_config_path = resolve_framework_config_path(self.framework_path)
self.workspace_host = config_values[DLTPipelineSettingKeys.WORKSPACE_HOST]

# Load optional parameters
Expand Down Expand Up @@ -186,7 +192,10 @@ def _init_pipeline_components(self) -> None:

def _load_framework_global_config_file(self) -> Dict[str, Any]:
"""Load a global config file"""
global_config_paths = [os.path.join(self.framework_path, path) for path in FrameworkPaths.GLOBAL_CONFIG]
global_config_paths = [
os.path.join(self.framework_path, self._framework_config_path, path)
for path in FrameworkPaths.GLOBAL_CONFIG
]

# Check if more than one global config exists
existing_configs = [path for path in global_config_paths if os.path.exists(path)]
Expand Down Expand Up @@ -284,7 +293,7 @@ def _init_substitution_manager(self) -> None:

# Build framework substitutions paths
framework_subs_paths = [
os.path.join(self.framework_path, FrameworkPaths.CONFIG_PATH, workspace_env + path)
os.path.join(self.framework_path, self._framework_config_path, workspace_env + path)
for path in FrameworkPaths.GLOBAL_SUBSTITUTIONS
]
self.logger.info("Framework substitutions paths: %s", framework_subs_paths)
Expand Down Expand Up @@ -315,7 +324,7 @@ def _init_secrets_manager(self) -> None:

# Build framework secrets paths
framework_secrets_config_paths = [
os.path.join(self.framework_path, FrameworkPaths.CONFIG_PATH, workspace_env + path)
os.path.join(self.framework_path, self._framework_config_path, workspace_env + path)
for path in FrameworkPaths.GLOBAL_SECRETS
]

Expand Down Expand Up @@ -370,7 +379,9 @@ def _setup_operational_metadata(self) -> None:
return

self.logger.info("Operational Metadata: layer set to %s", layer)
metadata_path = os.path.join(self.framework_path, f"config/operational_metadata_{layer}.json")
metadata_path = os.path.join(
self.framework_path, self._framework_config_path, f"operational_metadata_{layer}.json"
)
self.logger.info("Operational Metadata Path: %s", metadata_path)
metadata_json = utility.get_json_from_file(metadata_path, False)
self.operational_metadata_schema = (
Expand Down