Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions documentation/components/core/batch-processing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Batch Processing

- [⬅️️ Back](/documentation/components/core/core.md)

Batch processing controls how data flows through the DataFrame pipeline, affecting memory usage and performance.

## Batch Size Control

### batchSize() - Control processing chunks

```php
<?php

use function Flow\ETL\DSL\{data_frame, from_array, to_output};

$dataFrame = data_frame()
->read(from_array($largeDataset))
->batchSize(1000) // Process in batches of 1000 rows
->map($expensiveTransformation)
->write(to_output())
->run();
```

> **Performance Tip**: Optimal batch size depends on your data and available memory. Larger batches reduce I/O
> operations but increase memory usage. Start with 1000-5000 rows and adjust based on your specific use case.

## Data Collection

### collect() - Load all data into memory

```php
<?php

$dataFrame = data_frame()
->read($extractor)
->filter($condition)
->collect() // Collect all filtered data into single batch
->sortBy(col('name')) // Now can sort the collected data
->write($loader)
->run();
```

> **⚠️ Memory Warning**: The `collect()` method loads all data into memory at once. This can cause memory exhaustion
> with large datasets. Use only when:
> - You're certain the entire dataset fits comfortably in available memory
> - You need operations that require all data (like sorting)
> - You're working with small to medium datasets

## Memory Management Strategies

## Monitoring Memory Usage

```php
<?php

use function Flow\ETL\DSL\analyze;

$report = data_frame()
->read($extractor)
->batchSize(1000)
->map($transformation)
->write($loader)
->run(analyze: analzyze());

echo "Peak memory usage: " . $report->statistics()->memory->max()->inMb() . " bytes\n";
```
24 changes: 6 additions & 18 deletions documentation/components/core/building-blocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

- [⬅️️ Back](/documentation/components/core/core.md)

Entries are the columns of the data frame, they are represented by the [Entry](/src/core/etl/src/Flow/ETL/Row/Entry.php) interface.
Entries are the columns of the [Data Frame](/documentation/components/core/core.md), they are represented by
the [Entry](/src/core/etl/src/Flow/ETL/Row/Entry.php) interface.
Group of Entries is called `Row`, it is represented by the [Row](/src/core/etl/src/Flow/ETL/Row.php) class.
Group of Rows is called `Rows`, it is represented by the [Rows](/src/core/etl/src/Flow/ETL/Rows.php) class.

Let's look at the following example:
Let's look at the following example:

```php
<?php
Expand All @@ -24,7 +25,7 @@ $rows = rows(
```

Rows are the main data structure in Flow ETL, they’re used to represent data in the data frame.
Extractors are yielding Rows and Loaders are saving Rows.
Extractors are yielding Rows and Loaders are saving Rows.

The same can be achieved using the following code:

Expand Down Expand Up @@ -61,20 +62,7 @@ $rows = array_to_rows([
- [XML](/src/core/etl/src/Flow/ETL/Row/Entry/XMLEntry.php)
- [XMLElement](/src/core/etl/src/Flow/ETL/Row/Entry/XMLElementEntry.php)

Internally flow is using [EntryFactory](/src/core/etl/src/Flow/ETL/Row/EntryFactory.php) to create entries.
Internally flow is using [EntryFactory](/src/core/etl/src/Flow/ETL/Row/EntryFactory.php) to create entries.
It will try to detect and create the most appropriate entry type based on the value.

Flow Entries are based on [PHP Types](/src/core/etl/src/Flow/ETL/PHP/Type/Type.php), which are divided into two groups:

- Native
- Array
- Callable
- Enum
- Object
- Resource
- Scalar
- Logical
- List
- Map
- Structure

Flow Entries are based on [Flow Types Library](/documentation/components/libs/types.md)
72 changes: 72 additions & 0 deletions documentation/components/core/constraints.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Constraints

- [⬅️️ Back](/documentation/components/core/core.md)

Data constraints allow you to apply business rules and data integrity checks to ensure data quality during processing. When a constraint is violated, a `ConstraintViolationException` is thrown with details about the violating row.

## Unique Constraints

Ensure that values in specified columns are unique across the entire dataset.

```php
<?php

use function Flow\ETL\DSL\{data_frame, from_array, constraint_unique, to_output};

$dataFrame = data_frame()
->read(from_array([
['email' => 'user1@example.com', 'username' => 'user1'],
['email' => 'user2@example.com', 'username' => 'user2'],
['email' => 'user1@example.com', 'username' => 'user3'], // This will cause constraint violation
]))
->constrain(constraint_unique('email'))
->write(to_output())
->run();
```

## Multiple Column Constraints

Ensure unique combinations across multiple columns:

```php
<?php

use function Flow\ETL\DSL\{constraint_unique};

$dataFrame = data_frame()
->read($extractor)
->constrain(constraint_unique('username', 'tenant_id'))
->write($loader)
->run();
```

## Custom Constraints

You can implement custom constraints by creating classes that implement the `Constraint` interface:

```php
<?php

use Flow\ETL\{Constraint, Row};

class AgeRangeConstraint implements Constraint
{
public function __construct(private int $minAge, private int $maxAge) {}

public function isSatisfiedBy(Row $row): bool
{
$age = $row->get('age')->value();
return $age >= $this->minAge && $age <= $this->maxAge;
}

public function toString(): string
{
return "Age must be between {$this->minAge} and {$this->maxAge}";
}

public function violation(Row $row): string
{
return "Age {$row->get('age')->value()} is outside allowed range";
}
}
```
109 changes: 93 additions & 16 deletions documentation/components/core/core.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,106 @@
- [📚API Reference](/documentation/api/core)
- [📁Files](/documentation/api/core/indices/files.html)

A Data Frame is a structured collection of tabular data, similar to a spreadsheet.
It organizes information into rows and columns, making it easy to understand, filter, and transform.
Using a Data Frame, you can quickly merge, clean, or modify data for your ETL processes,
allowing developers to focus more on transformations rather than low-level data handling.
A Data Frame is the core component of Flow PHP's ETL framework. It represents a structured collection of tabular data that can be processed, transformed, and loaded efficiently. Think of it as a programmable spreadsheet that can handle large datasets with minimal memory footprint.

Unlike loading an entire dataset at once, a Data Frame processes information in smaller, manageable chunks.
As it moves through the data, it only keeps a limited number of rows in memory at any given time.
This approach helps avoid running out of memory, making it efficient and scalable for handling large datasets.
## Key Features

Simple example of reading from php a array and writing to stdout.
- **Memory Efficient**: Processes data in chunks using generators, avoiding memory exhaustion
- **Lazy Evaluation**: Operations are only executed when needed
- **Immutable**: Each transformation returns a new DataFrame instance
- **Type Safe**: Strict typing throughout with comprehensive schema support
- **Chainable API**: Fluent interface for building complex data pipelines

## Understanding DataFrame Operations

DataFrame methods fall into two categories based on when they execute:

### Lazy Operations (`@lazy`)

These methods build the processing pipeline without executing it immediately:

- **Transformations**: `filter()`, `map()`, `withEntry()`, `select()`, `drop()`, `rename()`
- **Memory-intensive**: `collect()`, `sortBy()`, `groupBy()`, `join()`, `cache()`
- **Processing control**: `batchSize()`, `limit()`, `offset()`, `partitionBy()`

### Trigger Operations (`@trigger`)

These methods execute the entire pipeline and return results:

- **Data retrieval**: `get()`, `getEach()`, `fetch()`, `count()`
- **Output operations**: `run()`, `forEach()`, `printRows()`, `printSchema()`
- **Schema inspection**: `schema()`, `display()`

> **Important**: Build your complete pipeline with lazy operations, then execute once with a trigger operation for optimal performance.

## Creating DataFrames

DataFrames are created using the `data_frame()` DSL function and populated with data through extractors. The framework supports various data sources through adapter-specific extractors.

```php
<?php

data_frame()
use function Flow\ETL\DSL\{data_frame, from_array, to_output};

$dataFrame = data_frame()
->read(from_array([
['id' => 1],
['id' => 2],
['id' => 3],
['id' => 4],
['id' => 5],
['id' => 1, 'name' => 'John', 'age' => 30],
['id' => 2, 'name' => 'Jane', 'age' => 25],
['id' => 3, 'name' => 'Bob', 'age' => 35],
]))
->collect()
->write(to_stream(__DIR__ . '/output.txt', truncate: false))
->filter(col('age')->greaterThan(lit(25)))
->select('id', 'name')
->write(to_output())
->run();
```

> **Note**: Flow PHP supports many data sources through specialized adapters. See individual adapter documentation for specific extractor usage (CSV, JSON, Parquet, databases, APIs, etc.).

## Memory Management Best Practices

1. **Prefer Generator Methods**: Use `get()`, `getEach()`, `getEachAsArray()` over `fetch()` for large datasets
2. **Avoid Memory-Intensive Operations**: Be cautious with `collect()`, `sortBy()`, `groupBy()`, and `join()` on large datasets
3. **Use Appropriate Batch Sizes**: Start with 1000-5000 rows and adjust based on your memory constraints
4. **Monitor Memory Usage**: Use `run(analyze: true)` to track memory consumption during development

## Performance Optimization

- **Push Operations to Data Source**: When possible, perform filtering, sorting, and joins at the database/file level
- **Minimize Data Movement**: Apply filters early in the pipeline to reduce data volume
- **Cache Strategically**: Only cache expensive operations that will be reused multiple times
- **Avoid Large Offsets**: Use data source pagination instead of DataFrame `offset()` for large skips

## Component Documentation

For detailed information about specific DataFrame operations, see the following component documentation:

### Core Operations
- **[Building Blocks](building-blocks.md)** - Understanding Rows, Entries, and basic data structures
- **[Select/Drop](select-drop.md)** - Column selection and removal
- **[Rename](rename.md)** - Column renaming strategies
- **[Map](map.md)** - Row transformations and data mapping
- **[Filter](filter.md)** - Row filtering and conditions

### Data Processing
- **[Join](join.md)** - DataFrame joining operations
- **[Group By](group-by.md)** - Grouping and aggregation operations
- **[Pivot](pivot.md)** - Transform data from long to wide format
- **[Sort](sort.md)** - Data sorting
- **[Limit](limit.md)** - Result limiting and pagination
- **[Offset](offset.md)** - Skipping rows and pagination
- **[Until](until.md)** - Conditional processing termination
- **[Window Functions](window-functions.md)** - Advanced analytical functions

### Memory & Performance
- **[Batch Processing](batch-processing.md)** - Controlling batch sizes and memory collection
- **[Partitioning](partitioning.md)** - Data partitioning for efficient processing
- **[Caching](caching.md)** - Performance optimization through caching
- **[Data Retrieval](data-retrieval.md)** - Methods for getting processed data

### Data Quality & Validation
- **[Schema](schema.md)** - Schema management and validation
- **[Constraints](constraints.md)** - Data integrity constraints and business rules
- **[Error Handling](error-handling.md)** - Error management strategies

### Output & Display
- **[Display](display.md)** - Data visualization and output
Loading