The Data Pipes Framework is a notebook-based data processing system built on Apache Spark with dependency injection. It provides a declarative way to define, register, and execute data transformation pipelines with automatic dependency resolution and tracing capabilities.
Data pipes are individual transformation units that take input entities (DataFrames) and produce output entities. Each pipe is defined using a decorator and contains metadata about its inputs, outputs, and execution logic.
Entities represent data sources or transformed datasets, typically as Spark DataFrames. They are identified by unique entity IDs and managed through the framework's registry system.
The framework orchestrates the execution of multiple pipes, handling data flow between them and providing tracing and logging capabilities.
@dataclass
class PipeMetadata:
pipeid: str
name: str
execute: Callable
tags: Dict[str,str]
input_entity_ids: List[str]
output_entity_id: str
output_type: strPurpose: Defines metadata for a data pipe.
Fields:
pipeid: Unique identifier for the pipename: Human-readable name for the pipeexecute: The function that performs the transformationtags: Key-value pairs for categorization and filteringinput_entity_ids: List of input entity identifiersoutput_entity_id: Identifier for the output entityoutput_type: Type of the output (e.g., "table", "view")
@DataPipes.pipe(
pipeid="unique_pipe_id",
name="Human Readable Name",
tags={"category": "transformation", "env": "prod"},
input_entity_ids=["input.entity1", "input.entity2"],
output_entity_id="output.transformed_data",
output_type="table"
)
def my_transformation_function(input_entity1, input_entity2):
# Your transformation logic here
return transformed_dataframePurpose: Decorator to register data transformation functions as pipes.
Parameters: All parameters correspond to PipeMetadata fields except execute (automatically set).
Usage Notes:
- All parameters are required
- Input entity IDs with dots (.) are converted to underscores (_) in function parameters
- The decorated function receives DataFrames as named parameters
- Function must return a DataFrame
class EntityReadPersistStrategy(ABC):
@abstractmethod
def create_pipe_entity_reader(self, pipe: str):
"""Create a reader function for pipe entities"""
pass
@abstractmethod
def create_pipe_persist_activator(self, pipe: PipeMetadata):
"""Create a persist function for pipe output"""
passPurpose: Abstract interface for defining how entities are read and persisted.
Implementation Required: Users must implement both methods to define their storage strategy.
class DataPipesRegistry(ABC):
@abstractmethod
def register_pipe(self, pipeid, **decorator_params):
"""Register a pipe with given parameters"""
pass
@abstractmethod
def get_pipe_ids(self):
"""Get all registered pipe IDs"""
pass
@abstractmethod
def get_pipe_definition(self, name):
"""Get pipe definition by name"""
passPurpose: Abstract interface for pipe registry operations.
Default Implementation: DataPipesManager provides the concrete implementation.
class DataPipesExecution(ABC):
@abstractmethod
def run_datapipes(self, pipes):
"""Execute a list of pipes"""
passPurpose: Abstract interface for pipe execution.
Default Implementation: DataPipesExecuter provides the concrete implementation.
@GlobalInjector.singleton_autobind()
class DataPipesManager(DataPipesRegistry):
def get_pipe_ids(self):
"""Returns all registered pipe IDs"""
def get_pipe_definition(self, name):
"""Returns PipeMetadata for given pipe name"""Purpose: Concrete implementation of pipe registry with automatic dependency injection.
Key Features:
- Singleton pattern with automatic binding
- Debug logging for pipe registration
- Thread-safe registry storage
@GlobalInjector.singleton_autobind()
class DataPipesExecuter(DataPipesExecution):
def run_datapipes(self, pipes):
"""Execute a list of pipes in order"""Purpose: Concrete implementation of pipe execution engine.
Key Features:
- Distributed tracing support
- Automatic entity reading and persistence
- Conditional execution (skips if first input entity is None)
- Debug logging throughout execution
@DataPipes.pipe(
pipeid="clean_customer_data",
name="Clean Customer Data",
tags={"category": "cleaning", "domain": "customer"},
input_entity_ids=["raw.customers"],
output_entity_id="clean.customers",
output_type="table"
)
def clean_customers(raw_customers):
return raw_customers.filter(col("email").isNotNull()) \
.dropDuplicates(["customer_id"])@DataPipes.pipe(
pipeid="customer_orders_summary",
name="Customer Orders Summary",
tags={"category": "aggregation", "domain": "analytics"},
input_entity_ids=["clean.customers", "clean.orders"],
output_entity_id="summary.customer_orders",
output_type="table"
)
def create_customer_summary(clean_customers, clean_orders):
return clean_customers.join(clean_orders, "customer_id") \
.groupBy("customer_id", "customer_name") \
.agg(count("order_id").alias("total_orders"),
sum("order_amount").alias("total_spent"))# Get the executer from dependency injection
executer = GlobalInjector.get(DataPipesExecuter)
# Execute specific pipes
pipes_to_run = ["clean_customer_data", "customer_orders_summary"]
executer.run_datapipes(pipes_to_run)run_datapipes accepts a use_dag=True flag to delegate to the
ExecutionOrchestrator, which builds a dependency graph and runs pipes
in topological generation order. This is the recommended path for any
pipeline with non-trivial dependencies.
# Dependency-aware execution — runs pipes in correct order automatically
executer.run_datapipes(pipes_to_run, use_dag=True)
# With options: parallel within each generation, fail-fast on error
from kindling.generation_executor import ErrorStrategy
executer.run_datapipes(
pipes_to_run,
use_dag=True,
parallel=True,
max_workers=4,
error_strategy=ErrorStrategy.FAIL_FAST,
auto_cache=True,
)You can also use ExecutionOrchestrator directly for batch or streaming:
from kindling.execution_orchestrator import ExecutionOrchestrator
orchestrator = GlobalInjector.get(ExecutionOrchestrator)
# Batch mode
result = orchestrator.execute_batch(pipes_to_run, parallel=True)
# Streaming mode
result = orchestrator.execute_streaming(pipes_to_run)
# Inspect result
print(f"Succeeded: {result.succeeded}, Failed: {result.failed}")ExecutionOrchestrator emits an orchestrator.plan_generated signal before
execution begins, carrying the resolved strategy, pipe count, and generation
count for observability hooks.
# Get the registry from dependency injection
registry = GlobalInjector.get(DataPipesRegistry)
# List all registered pipes
all_pipes = registry.get_pipe_ids()
print(f"Registered pipes: {list(all_pipes)}")
# Get specific pipe definition
pipe_def = registry.get_pipe_definition("clean_customer_data")
print(f"Pipe: {pipe_def.name}, Inputs: {pipe_def.input_entity_ids}")To use this framework, you must implement:
- EntityReadPersistStrategy: Define how your data is read from and written to storage
- Data Entity Registry: Implement entity definition lookup (referenced but not shown in code)
- Logging and Tracing Providers: Set up logging and distributed tracing infrastructure
The framework requires these components to be available through dependency injection:
PythonLoggerProvider: For logging capabilitiesDataEntityRegistry: For entity definition managementSparkTraceProvider: For distributed tracingEntityReadPersistStrategy: For data I/O operations
- Missing Decorator Parameters: Raises
ValueErrorif requiredPipeMetadatafields are missing - Entity Not Found: Pipes are skipped if the first input entity returns
None - Execution Failures: Individual pipe failures are logged but don't stop the entire pipeline
- Pipe Design: Keep pipes focused on single transformations
- Entity Naming: Use consistent, hierarchical naming (e.g.,
domain.entity_name) - Tags: Use tags for categorization and pipeline filtering
- Error Handling: Implement robust error handling in pipe functions
- Testing: Test pipe functions independently before registration
- Documentation: Document complex transformation logic within pipe functions
The framework provides built-in logging at debug level for:
- Pipe registration events
- Pipeline execution start/end
- Individual pipe execution
- Pipe skipping due to missing inputs
Distributed tracing spans are automatically created for:
- Overall pipeline execution
- Individual pipe execution
This enables comprehensive monitoring and debugging of data pipeline performance and behavior.