Skip to content

Latest commit

 

History

History
343 lines (243 loc) · 9.67 KB

File metadata and controls

343 lines (243 loc) · 9.67 KB

Entity Providers

Entity Providers are a core component of the Kindling Framework, responsible for abstracting the storage and access mechanisms for data entities. They provide a consistent interface for performing CRUD operations, regardless of the underlying storage technology.

Core Concepts

Entity Provider

An Entity Provider is responsible for:

  • Creating and managing entity tables
  • Reading entity data
  • Writing and updating entity data
  • Handling merge operations
  • Managing entity versions

Provider Abstraction

The framework uses an abstract interface (EntityProvider) to enable multiple provider implementations for different storage systems (e.g., Delta Lake, Parquet, Hive tables).

Delta Lake Integration

The primary implementation, DeltaEntityProvider, provides seamless integration with Delta Lake, leveraging features like:

  • ACID transactions
  • Schema evolution
  • Time travel
  • Merge operations
  • Partitioning

Key Components

EntityProvider Interface

The abstract interface defining all operations that entity providers must implement.

class EntityProvider(ABC):
    @abstractmethod
    def ensure_entity_table(self, entity):
        """Ensure the entity table exists, create if necessary"""
        pass

    @abstractmethod
    def check_entity_exists(self, entity):
        """Check if entity table exists"""
        pass

    @abstractmethod
    def merge_to_entity(self, df, entity):
        """Merge DataFrame into entity using merge columns"""
        pass

    @abstractmethod
    def append_to_entity(self, df, entity):
        """Append DataFrame to entity"""
        pass

    @abstractmethod
    def read_entity(self, entity):
        """Read entire entity as DataFrame"""
        pass

    @abstractmethod
    def read_entity_since_version(self, entity, since_version):
        """Read entity changes since specific version"""
        pass

    @abstractmethod
    def write_to_entity(self, df, entity):
        """Write DataFrame to entity (overwrite)"""
        pass

    @abstractmethod
    def get_entity_version(self, entity):
        """Get current version of entity"""
        pass

DeltaEntityProvider

The default implementation for Delta Lake storage.

@GlobalInjector.singleton_autobind()
class DeltaEntityProvider(EntityProvider):
    @inject
    def __init__(self, entity_name_mapper: EntityNameMapper,
                 path_locator: EntityPathLocator):
        # Initialize provider with required dependencies
        pass

    # Implementation of all EntityProvider methods
    def ensure_entity_table(self, entity):
        # Create the table if it doesn't exist
        pass

    def merge_to_entity(self, df, entity):
        # Perform a Delta merge operation
        pass

    # Additional methods...

DeltaTableReference

A utility class for handling different ways of accessing Delta tables.

class DeltaTableReference:
    """Encapsulates how to reference a Delta table"""
    def __init__(self, table_name: str, table_path: str, access_mode: DeltaAccessMode):
        # Initialize with both name and path
        pass

    def get_delta_table(self) -> DeltaTable:
        """Get DeltaTable instance using appropriate method"""
        pass

    def get_read_path(self) -> str:
        """Get path for spark.read operations"""
        pass

DeltaAccessMode

An enumeration of different ways to access Delta tables.

class DeltaAccessMode(Enum):
    """Defines how Delta tables are accessed"""
    FOR_NAME = "forName"     # Catalog-managed table names (default convention)
    FOR_PATH = "forPath"     # Direct path access (override when needed)
    AUTO = "auto"           # Auto-detect based on environment

Usage Examples

Basic CRUD Operations

# Get entity provider
from kindling.injection import get_kindling_service
entity_provider = get_kindling_service(EntityProvider)

# Get entity definition
entity = data_entity_registry.get_entity_definition("sales.transactions")

# Read entity
df = entity_provider.read_entity(entity)

# Write to entity (overwrite)
entity_provider.write_to_entity(transformed_df, entity)

# Append to entity
entity_provider.append_to_entity(new_records_df, entity)

Merge Operations

# Get entity provider
from kindling.injection import get_kindling_service
entity_provider = get_kindling_service(EntityProvider)

# Define entity with merge columns
@DataEntities.entity(
    entityid="customers.profiles",
    name="Customer Profiles",
    partition_columns=["country"],
    merge_columns=["customer_id"],  # These columns define the merge key
    tags={"domain": "customer"},
    schema=customer_schema
)

# Later, merge new/updated records
entity_provider.merge_to_entity(
    updated_customers_df,
    entity_registry.get_entity_definition("customers.profiles")
)

Versioning and Time Travel

# Get entity version
current_version = entity_provider.get_entity_version(entity)

# Read changes since a specific version
changes_df = entity_provider.read_entity_since_version(entity, last_processed_version)

Implementation Details

Table Creation

The ensure_entity_table method creates tables with appropriate configuration:

def ensure_entity_table(self, entity):
    # Create empty DataFrame with entity schema
    empty_df = self.spark.createDataFrame([], entity.schema)

    # Write with appropriate options
    empty_df.write \
        .format("delta") \
        .partitionBy(*entity.partition_columns) \
        .save(table_path)

Merge Operations

The merge_to_entity method implements Delta Lake's merge capabilities:

def merge_to_entity(self, df, entity):
    # Get Delta table reference
    delta_table = self._get_table_reference(entity).get_delta_table()

    # Build merge condition from entity's merge columns
    merge_condition = " AND ".join([f"source.{col} = target.{col}"
                                   for col in entity.merge_columns])

    # Perform merge operation
    delta_table.alias("target") \
        .merge(df.alias("source"), merge_condition) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

Advanced Features

SCD Type 2 Merge

When an entity carries tags={"scd.type": "2"}, merge_to_entity automatically applies the staged-updates SCD2 pattern instead of a simple overwrite-in-place merge. For each incoming row:

  • If the row matches a current target row and tracked columns have changed → the existing row is closed (__effective_to = now, __is_current = false) and a new version is inserted.
  • If the row has no match in the target → it is inserted as a new current row.
  • If the row matches but tracked columns are unchanged → no action (no false history entries).

This is handled entirely by DeltaMergeStrategies — no pipe-level changes are needed.

Point-in-Time Reads (read_entity_as_of)

For SCD2 entities, read_entity_as_of returns the entity state as it appeared at a specific instant:

from datetime import datetime

provider = get_kindling_service(EntityProvider)

# Returns rows where effective_from <= point_in_time < effective_to (or is_current=true)
snapshot_df = provider.read_entity_as_of(entity, datetime(2024, 6, 1))

# Also accepts a string timestamp
snapshot_df = provider.read_entity_as_of(entity, "2024-06-01 00:00:00")

For non-SCD2 entities the method falls back to Delta's native timestampAsOf time travel.

Schema Evolution

The DeltaEntityProvider supports automatic schema evolution:

# Enable schema evolution in Spark config
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# During merge operations, new columns will be added automatically

Z-Ordering

Optimize table read performance with Z-ordering:

def optimize_entity(self, entity, z_order_cols=None):
    # Get table reference
    table_ref = self._get_table_reference(entity)

    # Build optimize command
    optimize_cmd = f"OPTIMIZE {table_ref.get_read_path()}"

    # Add Z-ORDER if columns specified
    if z_order_cols:
        z_order_cols_str = ", ".join(z_order_cols)
        optimize_cmd += f" ZORDER BY ({z_order_cols_str})"

    # Execute command
    self.spark.sql(optimize_cmd)

Vacuum Operations

Control data retention through vacuum operations:

def vacuum_entity(self, entity, retention_hours=168):  # Default 7 days
    # Get table reference
    table_ref = self._get_table_reference(entity)

    # Execute vacuum command
    self.spark.sql(f"VACUUM {table_ref.get_read_path()} RETAIN {retention_hours} HOURS")

Best Practices

  1. Use Merge Operations: Prefer merge operations over appends for upsert scenarios.

  2. Define Proper Merge Keys: Carefully select merge columns that uniquely identify records.

  3. Partitioning Strategy: Choose partitioning columns based on query patterns and data distribution.

  4. Schema Evolution: Enable schema evolution for development, but manage schema carefully in production.

  5. Version Management: Use versioning for incremental processing and auditing.

  6. Performance Optimization: Consider Z-ordering for frequently filtered columns.

  7. Data Retention: Plan vacuum operations according to your retention requirements.

Custom Entity Providers

To implement a custom entity provider:

  1. Create a class that implements the EntityProvider interface
  2. Register it with the dependency injection system
  3. Ensure all method contracts are properly fulfilled

Example:

@GlobalInjector.singleton_autobind()
class CustomEntityProvider(EntityProvider):
    # Implement all required methods

    def ensure_entity_table(self, entity):
        # Custom implementation
        pass

    # ... other methods