Entity Providers are a core component of the Kindling Framework, responsible for abstracting the storage and access mechanisms for data entities. They provide a consistent interface for performing CRUD operations, regardless of the underlying storage technology.
An Entity Provider is responsible for:
- Creating and managing entity tables
- Reading entity data
- Writing and updating entity data
- Handling merge operations
- Managing entity versions
The framework uses an abstract interface (EntityProvider) to enable multiple provider implementations for different storage systems (e.g., Delta Lake, Parquet, Hive tables).
The primary implementation, DeltaEntityProvider, provides seamless integration with Delta Lake, leveraging features like:
- ACID transactions
- Schema evolution
- Time travel
- Merge operations
- Partitioning
The abstract interface defining all operations that entity providers must implement.
class EntityProvider(ABC):
@abstractmethod
def ensure_entity_table(self, entity):
"""Ensure the entity table exists, create if necessary"""
pass
@abstractmethod
def check_entity_exists(self, entity):
"""Check if entity table exists"""
pass
@abstractmethod
def merge_to_entity(self, df, entity):
"""Merge DataFrame into entity using merge columns"""
pass
@abstractmethod
def append_to_entity(self, df, entity):
"""Append DataFrame to entity"""
pass
@abstractmethod
def read_entity(self, entity):
"""Read entire entity as DataFrame"""
pass
@abstractmethod
def read_entity_since_version(self, entity, since_version):
"""Read entity changes since specific version"""
pass
@abstractmethod
def write_to_entity(self, df, entity):
"""Write DataFrame to entity (overwrite)"""
pass
@abstractmethod
def get_entity_version(self, entity):
"""Get current version of entity"""
passThe default implementation for Delta Lake storage.
@GlobalInjector.singleton_autobind()
class DeltaEntityProvider(EntityProvider):
@inject
def __init__(self, entity_name_mapper: EntityNameMapper,
path_locator: EntityPathLocator):
# Initialize provider with required dependencies
pass
# Implementation of all EntityProvider methods
def ensure_entity_table(self, entity):
# Create the table if it doesn't exist
pass
def merge_to_entity(self, df, entity):
# Perform a Delta merge operation
pass
# Additional methods...A utility class for handling different ways of accessing Delta tables.
class DeltaTableReference:
"""Encapsulates how to reference a Delta table"""
def __init__(self, table_name: str, table_path: str, access_mode: DeltaAccessMode):
# Initialize with both name and path
pass
def get_delta_table(self) -> DeltaTable:
"""Get DeltaTable instance using appropriate method"""
pass
def get_read_path(self) -> str:
"""Get path for spark.read operations"""
passAn enumeration of different ways to access Delta tables.
class DeltaAccessMode(Enum):
"""Defines how Delta tables are accessed"""
FOR_NAME = "forName" # Catalog-managed table names (default convention)
FOR_PATH = "forPath" # Direct path access (override when needed)
AUTO = "auto" # Auto-detect based on environment# Get entity provider
from kindling.injection import get_kindling_service
entity_provider = get_kindling_service(EntityProvider)
# Get entity definition
entity = data_entity_registry.get_entity_definition("sales.transactions")
# Read entity
df = entity_provider.read_entity(entity)
# Write to entity (overwrite)
entity_provider.write_to_entity(transformed_df, entity)
# Append to entity
entity_provider.append_to_entity(new_records_df, entity)# Get entity provider
from kindling.injection import get_kindling_service
entity_provider = get_kindling_service(EntityProvider)
# Define entity with merge columns
@DataEntities.entity(
entityid="customers.profiles",
name="Customer Profiles",
partition_columns=["country"],
merge_columns=["customer_id"], # These columns define the merge key
tags={"domain": "customer"},
schema=customer_schema
)
# Later, merge new/updated records
entity_provider.merge_to_entity(
updated_customers_df,
entity_registry.get_entity_definition("customers.profiles")
)# Get entity version
current_version = entity_provider.get_entity_version(entity)
# Read changes since a specific version
changes_df = entity_provider.read_entity_since_version(entity, last_processed_version)The ensure_entity_table method creates tables with appropriate configuration:
def ensure_entity_table(self, entity):
# Create empty DataFrame with entity schema
empty_df = self.spark.createDataFrame([], entity.schema)
# Write with appropriate options
empty_df.write \
.format("delta") \
.partitionBy(*entity.partition_columns) \
.save(table_path)The merge_to_entity method implements Delta Lake's merge capabilities:
def merge_to_entity(self, df, entity):
# Get Delta table reference
delta_table = self._get_table_reference(entity).get_delta_table()
# Build merge condition from entity's merge columns
merge_condition = " AND ".join([f"source.{col} = target.{col}"
for col in entity.merge_columns])
# Perform merge operation
delta_table.alias("target") \
.merge(df.alias("source"), merge_condition) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()When an entity carries tags={"scd.type": "2"}, merge_to_entity automatically applies the staged-updates SCD2 pattern instead of a simple overwrite-in-place merge. For each incoming row:
- If the row matches a current target row and tracked columns have changed → the existing row is closed (
__effective_to = now,__is_current = false) and a new version is inserted. - If the row has no match in the target → it is inserted as a new current row.
- If the row matches but tracked columns are unchanged → no action (no false history entries).
This is handled entirely by DeltaMergeStrategies — no pipe-level changes are needed.
For SCD2 entities, read_entity_as_of returns the entity state as it appeared at a specific instant:
from datetime import datetime
provider = get_kindling_service(EntityProvider)
# Returns rows where effective_from <= point_in_time < effective_to (or is_current=true)
snapshot_df = provider.read_entity_as_of(entity, datetime(2024, 6, 1))
# Also accepts a string timestamp
snapshot_df = provider.read_entity_as_of(entity, "2024-06-01 00:00:00")For non-SCD2 entities the method falls back to Delta's native timestampAsOf time travel.
The DeltaEntityProvider supports automatic schema evolution:
# Enable schema evolution in Spark config
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# During merge operations, new columns will be added automaticallyOptimize table read performance with Z-ordering:
def optimize_entity(self, entity, z_order_cols=None):
# Get table reference
table_ref = self._get_table_reference(entity)
# Build optimize command
optimize_cmd = f"OPTIMIZE {table_ref.get_read_path()}"
# Add Z-ORDER if columns specified
if z_order_cols:
z_order_cols_str = ", ".join(z_order_cols)
optimize_cmd += f" ZORDER BY ({z_order_cols_str})"
# Execute command
self.spark.sql(optimize_cmd)Control data retention through vacuum operations:
def vacuum_entity(self, entity, retention_hours=168): # Default 7 days
# Get table reference
table_ref = self._get_table_reference(entity)
# Execute vacuum command
self.spark.sql(f"VACUUM {table_ref.get_read_path()} RETAIN {retention_hours} HOURS")-
Use Merge Operations: Prefer merge operations over appends for upsert scenarios.
-
Define Proper Merge Keys: Carefully select merge columns that uniquely identify records.
-
Partitioning Strategy: Choose partitioning columns based on query patterns and data distribution.
-
Schema Evolution: Enable schema evolution for development, but manage schema carefully in production.
-
Version Management: Use versioning for incremental processing and auditing.
-
Performance Optimization: Consider Z-ordering for frequently filtered columns.
-
Data Retention: Plan vacuum operations according to your retention requirements.
To implement a custom entity provider:
- Create a class that implements the EntityProvider interface
- Register it with the dependency injection system
- Ensure all method contracts are properly fulfilled
Example:
@GlobalInjector.singleton_autobind()
class CustomEntityProvider(EntityProvider):
# Implement all required methods
def ensure_entity_table(self, entity):
# Custom implementation
pass
# ... other methods