From add4f0d498cec98acc7e4aa36ccabd523df990e3 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Mon, 28 Jul 2025 19:12:52 +0200 Subject: [PATCH 1/2] Updated DataFrame documentation --- .../components/core/batch-processing.md | 66 ++++++++++ .../components/core/building-blocks.md | 24 +--- documentation/components/core/constraints.md | 72 +++++++++++ documentation/components/core/core.md | 109 +++++++++++++--- .../components/core/data-manipulation.md | 118 ++++++++++++++++++ .../components/core/data-retrieval.md | 108 ++++++++++++++++ .../components/core/error-handling.md | 32 +++++ documentation/components/core/filter.md | 27 ++++ documentation/components/core/join.md | 43 ++++--- documentation/components/core/offset.md | 36 ++++++ documentation/components/core/partitioning.md | 77 ++++++++++++ documentation/components/core/pivot.md | 77 ++++++++++++ documentation/components/core/rename.md | 47 +++---- documentation/components/core/schema.md | 90 ++++++------- documentation/examples.md | 22 ++-- documentation/quick-start.md | 2 + 16 files changed, 803 insertions(+), 147 deletions(-) create mode 100644 documentation/components/core/batch-processing.md create mode 100644 documentation/components/core/constraints.md create mode 100644 documentation/components/core/data-manipulation.md create mode 100644 documentation/components/core/data-retrieval.md create mode 100644 documentation/components/core/offset.md create mode 100644 documentation/components/core/partitioning.md create mode 100644 documentation/components/core/pivot.md diff --git a/documentation/components/core/batch-processing.md b/documentation/components/core/batch-processing.md new file mode 100644 index 0000000000..1a5db0b93f --- /dev/null +++ b/documentation/components/core/batch-processing.md @@ -0,0 +1,66 @@ +# Batch Processing + +- [⬅️️ Back](/documentation/components/core/core.md) + +Batch processing controls how data flows through the DataFrame pipeline, affecting memory usage and performance. + +## Batch Size Control + +### batchSize() - Control processing chunks + +```php +read(from_array($largeDataset)) + ->batchSize(1000) // Process in batches of 1000 rows + ->map($expensiveTransformation) + ->write(to_output()) + ->run(); +``` + +> **Performance Tip**: Optimal batch size depends on your data and available memory. Larger batches reduce I/O +> operations but increase memory usage. Start with 1000-5000 rows and adjust based on your specific use case. + +## Data Collection + +### collect() - Load all data into memory + +```php +read($extractor) + ->filter($condition) + ->collect() // Collect all filtered data into single batch + ->sortBy(col('name')) // Now can sort the collected data + ->write($loader) + ->run(); +``` + +> **⚠️ Memory Warning**: The `collect()` method loads all data into memory at once. This can cause memory exhaustion +> with large datasets. Use only when: +> - You're certain the entire dataset fits comfortably in available memory +> - You need operations that require all data (like sorting) +> - You're working with small to medium datasets + +## Memory Management Strategies + +## Monitoring Memory Usage + +```php +read($extractor) + ->batchSize(1000) + ->map($transformation) + ->write($loader) + ->run(analyze: analzyze()); + +echo "Peak memory usage: " . $report->statistics()->memory->max()->inMb() . " bytes\n"; +``` \ No newline at end of file diff --git a/documentation/components/core/building-blocks.md b/documentation/components/core/building-blocks.md index 478064321e..05b5ff77e4 100644 --- a/documentation/components/core/building-blocks.md +++ b/documentation/components/core/building-blocks.md @@ -2,11 +2,12 @@ - [⬅️️ Back](/documentation/components/core/core.md) -Entries are the columns of the data frame, they are represented by the [Entry](/src/core/etl/src/Flow/ETL/Row/Entry.php) interface. +Entries are the columns of the [Data Frame](/documentation/components/core/core.md), they are represented by +the [Entry](/src/core/etl/src/Flow/ETL/Row/Entry.php) interface. Group of Entries is called `Row`, it is represented by the [Row](/src/core/etl/src/Flow/ETL/Row.php) class. Group of Rows is called `Rows`, it is represented by the [Rows](/src/core/etl/src/Flow/ETL/Rows.php) class. -Let's look at the following example: +Let's look at the following example: ```php read(from_array([ + ['email' => 'user1@example.com', 'username' => 'user1'], + ['email' => 'user2@example.com', 'username' => 'user2'], + ['email' => 'user1@example.com', 'username' => 'user3'], // This will cause constraint violation + ])) + ->constrain(constraint_unique('email')) + ->write(to_output()) + ->run(); +``` + +## Multiple Column Constraints + +Ensure unique combinations across multiple columns: + +```php +read($extractor) + ->constrain(constraint_unique('username', 'tenant_id')) + ->write($loader) + ->run(); +``` + +## Custom Constraints + +You can implement custom constraints by creating classes that implement the `Constraint` interface: + +```php +get('age')->value(); + return $age >= $this->minAge && $age <= $this->maxAge; + } + + public function toString(): string + { + return "Age must be between {$this->minAge} and {$this->maxAge}"; + } + + public function violation(Row $row): string + { + return "Age {$row->get('age')->value()} is outside allowed range"; + } +} +``` \ No newline at end of file diff --git a/documentation/components/core/core.md b/documentation/components/core/core.md index d902a86d2d..78f894b998 100644 --- a/documentation/components/core/core.md +++ b/documentation/components/core/core.md @@ -4,29 +4,106 @@ - [📚API Reference](/documentation/api/core) - [📁Files](/documentation/api/core/indices/files.html) -A Data Frame is a structured collection of tabular data, similar to a spreadsheet. -It organizes information into rows and columns, making it easy to understand, filter, and transform. -Using a Data Frame, you can quickly merge, clean, or modify data for your ETL processes, -allowing developers to focus more on transformations rather than low-level data handling. +A Data Frame is the core component of Flow PHP's ETL framework. It represents a structured collection of tabular data that can be processed, transformed, and loaded efficiently. Think of it as a programmable spreadsheet that can handle large datasets with minimal memory footprint. -Unlike loading an entire dataset at once, a Data Frame processes information in smaller, manageable chunks. -As it moves through the data, it only keeps a limited number of rows in memory at any given time. -This approach helps avoid running out of memory, making it efficient and scalable for handling large datasets. +## Key Features -Simple example of reading from php a array and writing to stdout. +- **Memory Efficient**: Processes data in chunks using generators, avoiding memory exhaustion +- **Lazy Evaluation**: Operations are only executed when needed +- **Immutable**: Each transformation returns a new DataFrame instance +- **Type Safe**: Strict typing throughout with comprehensive schema support +- **Chainable API**: Fluent interface for building complex data pipelines + +## Understanding DataFrame Operations + +DataFrame methods fall into two categories based on when they execute: + +### Lazy Operations (`@lazy`) + +These methods build the processing pipeline without executing it immediately: + +- **Transformations**: `filter()`, `map()`, `withEntry()`, `select()`, `drop()`, `rename()` +- **Memory-intensive**: `collect()`, `sortBy()`, `groupBy()`, `join()`, `cache()` +- **Processing control**: `batchSize()`, `limit()`, `offset()`, `partitionBy()` + +### Trigger Operations (`@trigger`) + +These methods execute the entire pipeline and return results: + +- **Data retrieval**: `get()`, `getEach()`, `fetch()`, `count()` +- **Output operations**: `run()`, `forEach()`, `printRows()`, `printSchema()` +- **Schema inspection**: `schema()`, `display()` + +> **Important**: Build your complete pipeline with lazy operations, then execute once with a trigger operation for optimal performance. + +## Creating DataFrames + +DataFrames are created using the `data_frame()` DSL function and populated with data through extractors. The framework supports various data sources through adapter-specific extractors. ```php read(from_array([ - ['id' => 1], - ['id' => 2], - ['id' => 3], - ['id' => 4], - ['id' => 5], + ['id' => 1, 'name' => 'John', 'age' => 30], + ['id' => 2, 'name' => 'Jane', 'age' => 25], + ['id' => 3, 'name' => 'Bob', 'age' => 35], ])) - ->collect() - ->write(to_stream(__DIR__ . '/output.txt', truncate: false)) + ->filter(col('age')->greaterThan(lit(25))) + ->select('id', 'name') + ->write(to_output()) ->run(); ``` + +> **Note**: Flow PHP supports many data sources through specialized adapters. See individual adapter documentation for specific extractor usage (CSV, JSON, Parquet, databases, APIs, etc.). + +## Memory Management Best Practices + +1. **Prefer Generator Methods**: Use `get()`, `getEach()`, `getEachAsArray()` over `fetch()` for large datasets +2. **Avoid Memory-Intensive Operations**: Be cautious with `collect()`, `sortBy()`, `groupBy()`, and `join()` on large datasets +3. **Use Appropriate Batch Sizes**: Start with 1000-5000 rows and adjust based on your memory constraints +4. **Monitor Memory Usage**: Use `run(analyze: true)` to track memory consumption during development + +## Performance Optimization + +- **Push Operations to Data Source**: When possible, perform filtering, sorting, and joins at the database/file level +- **Minimize Data Movement**: Apply filters early in the pipeline to reduce data volume +- **Cache Strategically**: Only cache expensive operations that will be reused multiple times +- **Avoid Large Offsets**: Use data source pagination instead of DataFrame `offset()` for large skips + +## Component Documentation + +For detailed information about specific DataFrame operations, see the following component documentation: + +### Core Operations +- **[Building Blocks](building-blocks.md)** - Understanding Rows, Entries, and basic data structures +- **[Select/Drop](select-drop.md)** - Column selection and removal +- **[Rename](rename.md)** - Column renaming strategies +- **[Map](map.md)** - Row transformations and data mapping +- **[Filter](filter.md)** - Row filtering and conditions + +### Data Processing +- **[Join](join.md)** - DataFrame joining operations +- **[Group By](group-by.md)** - Grouping and aggregation operations +- **[Pivot](pivot.md)** - Transform data from long to wide format +- **[Sort](sort.md)** - Data sorting +- **[Limit](limit.md)** - Result limiting and pagination +- **[Offset](offset.md)** - Skipping rows and pagination +- **[Until](until.md)** - Conditional processing termination +- **[Window Functions](window-functions.md)** - Advanced analytical functions + +### Memory & Performance +- **[Batch Processing](batch-processing.md)** - Controlling batch sizes and memory collection +- **[Partitioning](partitioning.md)** - Data partitioning for efficient processing +- **[Caching](caching.md)** - Performance optimization through caching +- **[Data Retrieval](data-retrieval.md)** - Methods for getting processed data + +### Data Quality & Validation +- **[Schema](schema.md)** - Schema management and validation +- **[Constraints](constraints.md)** - Data integrity constraints and business rules +- **[Error Handling](error-handling.md)** - Error management strategies + +### Output & Display +- **[Display](display.md)** - Data visualization and output diff --git a/documentation/components/core/data-manipulation.md b/documentation/components/core/data-manipulation.md new file mode 100644 index 0000000000..72fec2a04a --- /dev/null +++ b/documentation/components/core/data-manipulation.md @@ -0,0 +1,118 @@ +# Data Manipulation + +- [⬅️️ Back](/documentation/components/core/core.md) + +DataFrame provides several methods for manipulating data structures and values within your datasets. These operations +allow you to add, modify, cast, and clean data efficiently. + +## Type Casting with autoCast() + +Automatically cast data types based on content analysis: + +```php +read(from_array([ + ['id' => '1', 'price' => '19.99', 'active' => 'true'], + ['id' => '2', 'price' => '29.99', 'active' => 'false'], + ['id' => '3', 'price' => '39.99', 'active' => 'true'], + ])) + ->autoCast() // Automatically cast strings to appropriate types + ->write(to_output()) + ->run(); + +// Result: id becomes integer, price becomes float, active becomes boolean +``` + +> **Note**: `autoCast()` analyzes data patterns and attempts to convert string values to more appropriate types like +> integers, floats, booleans, and dates. Use with caution on large datasets as it requires data analysis. + +## Adding Entries with withEntry() + +Add new columns or modify existing ones using expressions: + +```php +read(from_array([ + ['first_name' => 'John', 'last_name' => 'Doe', 'salary' => 50000], + ['first_name' => 'Jane', 'last_name' => 'Smith', 'salary' => 60000], + ])) + ->withEntry('full_name', concat(col('first_name'), lit(' '), col('last_name'))) + ->withEntry('annual_bonus', col('salary')->multiply(lit(0.1))) + ->write(to_output()) + ->run(); +``` + +## Duplicating Rows + +Create duplicate rows for testing or data expansion: + +### duplicateRow() - Duplicate Specific Row + +```php +read(from_array([ + ['id' => 1, 'name' => 'Product A'], + ['id' => 2, 'name' => 'Product B'], + ['id' => 3, 'name' => 'Product C'], + ])) + ->duplicateRow(1) // Duplicate the second row (0-indexed) + ->write(to_output()) + ->run(); + +// Result: Row with id=2 appears twice in the output +``` + +## Removing Duplicates + +Remove duplicate rows from your dataset: + +```php +read(from_array([ + ['id' => 1, 'name' => 'Product A', 'category' => 'Electronics'], + ['id' => 2, 'name' => 'Product B', 'category' => 'Books'], + ['id' => 1, 'name' => 'Product A', 'category' => 'Electronics'], // Duplicate + ['id' => 3, 'name' => 'Product C', 'category' => 'Electronics'], + ['id' => 2, 'name' => 'Product B', 'category' => 'Books'], // Duplicate + ])) + ->dropDuplicates() // Remove all duplicate rows + ->write(to_output()) + ->run(); + +// Result: Only unique rows remain +``` + +### Selective Duplicate Removal + +Remove duplicates based on specific columns: + +```php +read(from_array([ + ['id' => 1, 'name' => 'Product A', 'version' => 1], + ['id' => 1, 'name' => 'Product A', 'version' => 2], // Same product, different version + ['id' => 2, 'name' => 'Product B', 'version' => 1], + ['id' => 3, 'name' => 'Product C', 'version' => 1], + ])) + ->dropDuplicates('id', 'name') // Remove duplicates based on id and name only + ->write(to_output()) + ->run(); + +// Result: Keep first occurrence of each id/name combination +``` diff --git a/documentation/components/core/data-retrieval.md b/documentation/components/core/data-retrieval.md new file mode 100644 index 0000000000..002d2a1ede --- /dev/null +++ b/documentation/components/core/data-retrieval.md @@ -0,0 +1,108 @@ +# Data Retrieval + +- [⬅️️ Back](/documentation/components/core/core.md) + +DataFrame provides several methods for retrieving processed data. These methods are trigger operations that execute the +entire pipeline. + +## Memory-Safe Retrieval (Recommended) + +These methods use generators to maintain constant memory usage regardless of dataset size: + +### get() - Retrieve as Rows batches + +```php +read(from_array($largeDataset)); + +foreach ($dataFrame->get() as $rows) { + echo "Processing batch of " . $rows->count() . " rows\n"; + // Process each batch + foreach ($rows as $row) { + // Process individual row + } +} +``` + +### getEach() - Retrieve individual Rows + +```php +getEach() as $row) { + echo "ID: " . $row->get('id')->value() . "\n"; + echo "Name: " . $row->get('name')->value() . "\n"; +} +``` + +### getAsArray() - Retrieve as array batches + +```php +getAsArray() as $rowsArray) { + // $rowsArray is an array of arrays + foreach ($rowsArray as $rowArray) { + echo "ID: " . $rowArray['id'] . "\n"; + } +} +``` + +### getEachAsArray() - Retrieve individual arrays + +```php +getEachAsArray() as $rowArray) { + echo "ID: " . $rowArray['id'] . "\n"; + echo "Name: " . $rowArray['name'] . "\n"; +} +``` + +### fetch() - Load into memory + +```php +fetch(10); +foreach ($firstTen as $row) { + // Process row +} + +// Fetch all results (dangerous for large datasets!) +$allRows = $dataFrame->fetch(); // Can cause memory exhaustion +``` + +> **⚠️ Memory Warning**: The `fetch()` method loads all requested rows into memory at once. Without a limit parameter, +> it will attempt to load the entire dataset into memory, which can cause memory exhaustion. Always use with a reasonable +> limit or prefer generator-based methods. + +### count() - Count total rows + +```php +count(); +echo "Total rows: $totalCount\n"; +``` + +> **⚠️ Performance Warning**: The `count()` method must process the entire dataset to return the total count, which can +> be expensive for large datasets. Consider whether you actually need the exact count or if an approximation would +> suffice. + +## Iteration with Callback + +### forEach() - Process with callback + +```php +forEach(function (Rows $rows) { + echo "Processing batch of " . $rows->count() . " rows\n"; + // Custom processing logic +}); +``` \ No newline at end of file diff --git a/documentation/components/core/error-handling.md b/documentation/components/core/error-handling.md index 2be56a7d4a..9fa9442724 100644 --- a/documentation/components/core/error-handling.md +++ b/documentation/components/core/error-handling.md @@ -31,3 +31,35 @@ data_frame() ->write(to_json(...)) ->run(); ``` + +## Row-level Error Handling + +For fine-grained error handling during row processing operations: + +```php +read($unreliableDataExtractor) + ->forEach(function(Row $row) use (&$successCount, &$errorCount) { + try { + validateAndProcess($row); + $successCount++; + } catch (InvalidArgumentException $e) { + logInvalidRow($row, $e->getMessage()); + $errorCount++; + } catch (Exception $e) { + logGeneralError($row, $e); + $errorCount++; + } + }); + +echo "Success: {$successCount}, Errors: {$errorCount}"; +``` + +> **Best Practice**: When processing unreliable data sources, implement row-level error handling to prevent entire pipeline failures and provide detailed error reporting. diff --git a/documentation/components/core/filter.md b/documentation/components/core/filter.md index 449ac68c8f..7422784336 100644 --- a/documentation/components/core/filter.md +++ b/documentation/components/core/filter.md @@ -20,4 +20,31 @@ data_frame() ->run(); ``` +## Complex Row-level Filtering + +For advanced filtering that requires custom business logic, you can use callback functions: + +```php +read($transactionExtractor) + ->filter(function(Row $row): bool { + $amount = $row->get('amount')->value(); + $type = $row->get('type')->value(); + $date = $row->get('date')->value(); + + // Complex business logic + return $amount > 1000 + && $type === 'purchase' + && $date > new DateTime('-30 days'); + }) + ->write($highValueTransactionLoader) + ->run(); +``` + +> **Performance Note**: Callback-based filtering cannot be optimized by the engine and should be used sparingly. When possible, prefer built-in scalar functions for better performance. + - [➡️ Until](until.md) diff --git a/documentation/components/core/join.md b/documentation/components/core/join.md index 20f21ea49c..5b28b5ef49 100644 --- a/documentation/components/core/join.md +++ b/documentation/components/core/join.md @@ -2,22 +2,36 @@ - [⬅️️ Back](/documentation/components/core/core.md) -Joining two data frames is a common operation in data processing. -It is used to combine data from two different sources into one data frame. -The join operation is performed on a common column or columns between the two data frames. +Joining two data frames is a common operation in data processing that combines data from two different sources. Flow PHP +implements joins using a **hash join algorithm** that creates a hash table from the right DataFrame and probes it with +rows from the left DataFrame. ## Join Methods -* `DataFrame::crossJoin` - join each row from the left side with each row on the right side creating `count(left) * count(right)` rows in total. -* `DataFrame::join` - right side is static for each left Rows set. -* `DataFrame::joinEach` - right side dynamically generated for each left Rows set. +### join() + +Main join method that loads the right DataFrame into memory as a hash table for efficient lookups. + +### crossJoin() - Cartesian Product + +Joins each row from the left side with each row on the right side, creating `count(left) * count(right)` rows total. + +### joinEach() - Streaming Join + +Right side is dynamically generated for each left row, useful for large right-side datasets that don't fit in memory. ## Join Types -* `left` -* `left_anti` (keep in left only what does not exist in right) -* `right` -* `inner` +Flow PHP supports four join types with specific behaviors: + +| Join Type | Description | +|----------------------------------------|--------------------------------------------------------------------------------------------| +| **Left Join** (`Join::left`) | Returns all rows from left DataFrame with matching rows from right (or NULL) - **Default** | +| **Inner Join** (`Join::inner`) | Returns only rows that exist in both DataFrames | +| **Right Join** (`Join::right`) | Returns all rows from right DataFrame with matching rows from left (or NULL) | +| **Left Anti Join** (`Join::left_anti`) | Returns rows from left DataFrame that have NO match in right DataFrame | + +> Flow uses hash join implementation where hashes are stored in sorted buckets to optimize memory usage and performance. ## Example @@ -35,12 +49,6 @@ $internalProducts = [ ['id' => 3, 'sku' => 'PRODUCT03'], ]; -/** - * DataFrame::join will perform joining having both dataframes in memory. - * This means that if if the right side dataframe is big (as the left side usually will be a batch) - * then it might become performance bottleneck. - * In that case please look at DataFrame::joinEach. - */ data_frame() ->read(from_array($externalProducts)) ->join( @@ -52,9 +60,10 @@ data_frame() ->run(); ``` -Output: +Output: ```console ++----+-----------+ | id | sku | +----+-----------+ | 1 | PRODUCT01 | diff --git a/documentation/components/core/offset.md b/documentation/components/core/offset.md new file mode 100644 index 0000000000..cad487375b --- /dev/null +++ b/documentation/components/core/offset.md @@ -0,0 +1,36 @@ +# Offset + +- [⬅️️ Back](/documentation/components/core/core.md) + +The offset operation skips a specified number of rows from the beginning of the dataset, commonly used for pagination +and data sampling. + +## Basic Offset Usage + +### offset() - Skip rows from beginning + +```php +read(from_array([ + ['id' => 1, 'name' => 'Alice'], + ['id' => 2, 'name' => 'Bob'], + ['id' => 3, 'name' => 'Charlie'], + ['id' => 4, 'name' => 'David'], + ['id' => 5, 'name' => 'Eve'], + ])) + ->offset(2) // Skip first 2 rows + ->write(to_output()) + ->run(); + +// Output: Charlie, David, Eve +``` + +## Performance Considerations + +> **⚠️ Performance Warning**: The `offset()` method must iterate through and process all skipped rows to reach the +> offset position. For large offsets (e.g., `offset(1000000)`), this can significantly impact performance as the DataFrame +> still needs to read and process all data up to the offset point. diff --git a/documentation/components/core/partitioning.md b/documentation/components/core/partitioning.md new file mode 100644 index 0000000000..a056bae02d --- /dev/null +++ b/documentation/components/core/partitioning.md @@ -0,0 +1,77 @@ +# Partitioning + +- [⬅️️ Back](/documentation/components/core/core.md) + +Partitioning divides data into logical groups based on column values, enabling more efficient processing of large datasets and reducing memory usage. + +## Basic Partitioning + +### partitionBy() - Partition by columns + +```php +read(from_array([ + ['date' => '2024-01-01', 'department' => 'sales', 'amount' => 100], + ['date' => '2024-01-01', 'department' => 'marketing', 'amount' => 200], + ['date' => '2024-01-02', 'department' => 'sales', 'amount' => 150], + ['date' => '2024-01-02', 'department' => 'marketing', 'amount' => 250], + ])) + ->partitionBy('date') // Partition by date + ->sortBy(col('amount')) // Sort within each date partition + ->write(to_output()) + ->run(); +``` + +## Multi-Column Partitioning + +```php +read($extractor) + ->partitionBy('date', 'department') // Partition by date AND department + ->aggregate(sum(col('amount'))->as('total_amount')) + ->write($loader) + ->run(); +``` + +### Dropping Partitions + +```php +read($extractor) + ->partitionBy('date') + ->map($transformation) + ->dropPartitions() // Remove partition information but keep data + ->write($loader) + ->run(); + +// Drop partitions AND partition columns +$dataFrame + ->partitionBy('date') + ->dropPartitions(dropPartitionColumns: true) // Also removes 'date' column + ->run(); +``` + +## Performance Considerations + +### Choosing Partition Columns + +```php +partitionBy('date'); // Assuming data is spread across dates + +// Bad partitioning - unbalanced partitions +$dataFrame->partitionBy('id'); // If IDs are unique, creates many tiny partitions + +// Good partitioning - moderate cardinality +$dataFrame->partitionBy('department'); // Assuming reasonable number of departments +``` \ No newline at end of file diff --git a/documentation/components/core/pivot.md b/documentation/components/core/pivot.md new file mode 100644 index 0000000000..5abeef671e --- /dev/null +++ b/documentation/components/core/pivot.md @@ -0,0 +1,77 @@ +# Pivot + +- [⬅️️ Back](/documentation/components/core/core.md) + +Pivot operations transform data from a long format to a wide format by rotating column values into column headers. This +is commonly used for creating cross-tabular reports and summary tables. + +## Basic Pivot Operation + +Pivot can only be used after a `groupBy()` operation and requires exactly one aggregation function. + +```php +read(from_array([ + ['region' => 'North', 'product' => 'Laptop', 'month' => 'Jan', 'sales' => 1000], + ['region' => 'North', 'product' => 'Laptop', 'month' => 'Feb', 'sales' => 1200], + ['region' => 'North', 'product' => 'Phone', 'month' => 'Jan', 'sales' => 800], + ['region' => 'North', 'product' => 'Phone', 'month' => 'Feb', 'sales' => 900], + ['region' => 'South', 'product' => 'Laptop', 'month' => 'Jan', 'sales' => 1100], + ['region' => 'South', 'product' => 'Laptop', 'month' => 'Feb', 'sales' => 1300], + ['region' => 'South', 'product' => 'Phone', 'month' => 'Jan', 'sales' => 700], + ['region' => 'South', 'product' => 'Phone', 'month' => 'Feb', 'sales' => 850], + ])) + ->groupBy('region', 'product') + ->aggregate(sum(col('sales'))->as('total_sales')) + ->pivot(col('product')) // Pivot by product - creates 'Laptop' and 'Phone' columns + ->write(to_output()) + ->run(); +``` + +**Result Structure:** + +``` +| region | Laptop | Phone | +|--------|--------|-------| +| North | 2200 | 1700 | +| South | 2400 | 1550 | +``` + +## Monthly Sales Pivot + +Create a pivot table showing sales by month: + +```php +read(from_array([ + ['region' => 'North', 'month' => 'Jan', 'sales' => 5000], + ['region' => 'North', 'month' => 'Feb', 'sales' => 5500], + ['region' => 'North', 'month' => 'Mar', 'sales' => 6000], + ['region' => 'South', 'month' => 'Jan', 'sales' => 4500], + ['region' => 'South', 'month' => 'Feb', 'sales' => 4800], + ['region' => 'South', 'month' => 'Mar', 'sales' => 5200], + ])) + ->groupBy('region') + ->aggregate(avg(col('sales'))->as('avg_sales')) + ->pivot(col('month')) // Pivot by month + ->write(to_output()) + ->run(); +``` + +**Result:** + +``` +| region | Jan | Feb | Mar | +|--------|------|------|------| +| North | 5000 | 5500 | 6000 | +| South | 4500 | 4800 | 5200 | +``` + diff --git a/documentation/components/core/rename.md b/documentation/components/core/rename.md index 03292329b9..ed5000291b 100644 --- a/documentation/components/core/rename.md +++ b/documentation/components/core/rename.md @@ -2,51 +2,32 @@ - [⬅️️ Back](/documentation/components/core/core.md) -There are multiple ways to rename entries in the data frame. +DataFrame provides several methods for renaming entries (columns) in your data. These operations are lazy and don't execute until a trigger operation is called. -## Rename +## Single Column Rename -To quickly rename single entry use Rows `DataFrame::rename` +To rename a single entry, use `DataFrame::rename()`: ```php read(from_array(...)) - ->rename("old_name", "new_name") - ->write($loader) - ->run(); -``` +use function Flow\ETL\DSL\{data_frame, from_array, to_output}; -## Rename All - -To quickly rename, all entries use Rows `DataFrame::renameAll` - -```php data_frame() ->read(from_array([ - ['e_id' => 1, 'e_name' => 2], - ['e_id' => 2, 'e_name' => 3], - ['e_id' => 3, 'e_name' => 4], - ['e_id' => 4, 'e_name' => 5], + ['old_name' => 'value1', 'other' => 'data1'], + ['old_name' => 'value2', 'other' => 'data2'], ])) - ->renameAll('e_', 'entry_') + ->rename('old_name', 'new_name') + ->write(to_output()) ->run(); - -// Output will be: -// [ -// ['entry_id' => 1, 'entry_name' => 2], -// ['entry_id' => 2, 'entry_name' => 3], -// ['entry_id' => 3, 'entry_name' => 4], -// ['entry_id' => 4, 'entry_name' => 5], -// ] ``` -## Remaining rename methods +## Batch Renaming + +The `renameEach()` method allows you to rename multiple columns at once using various strategies: -- `DataFrame::renameAllLowerCase()` -- `DataFrame::renameAllStyle(StringStyles|string $style)` -- `DataFrame::renameAllUpperCase()` -- `DataFrame::renameAllUpperCaseFirst()` -- `DataFrame::renameAllUpperCaseWord()` +### Available Strategies +- **`rename_style()`** - Changes entry names using string style conventions (camelCase, snake_case, etc.) +- **`rename_replace()`** - Replaces parts of entry names using search and replace patterns diff --git a/documentation/components/core/schema.md b/documentation/components/core/schema.md index ab2196b2fb..b974eefc7a 100644 --- a/documentation/components/core/schema.md +++ b/documentation/components/core/schema.md @@ -2,40 +2,50 @@ - [⬅️️ Back](/documentation/components/core/core.md) -Schema is a set of rules that defines how data is structured. -It is used to validate data and to provide information about data structure. +Schema defines the structure and validation rules for DataFrame data. It provides type safety, data validation, and +metadata management for your data processing pipelines. -Before loading data to sink it might be a good idea to validate it against the schema. -Row Schema is built from Entry Definitions, each definition is created from: +## Understanding Schema Components -* `entry` - name of entry -* `type` - type of entry (class string) -* `nullable` - if `true` NullEntry with matching name will also pass the validation regardless of the type -* `constraint` - additional, flexible validation. Useful for checking if entry value is for example one of expected values -* `metadata` - additional key-value collection that can carry additional context for the definition +A schema consists of entry definitions that specify: -There is more than one way to validate the schema, built in strategies are defined below: +- **Entry Name**: The column/field identifier +- **Type**: The expected data type (class string) +- **Nullable**: Whether NULL values are permitted +- **Metadata**: Key-value pairs for additional context -* [StrictValidator](/src/core/etl/src/Flow/ETL/Row/Schema/StrictValidator.php) - each row must exactly match the schema, extra entries will fail validation -* [SelectiveValidator](/src/core/etl/src/Flow/ETL/Row/Schema/SelectiveValidator.php) - only rows defined in the schema must match, any extra entry in row will be ignored +## Schema Validation Strategies -By default, ETL is initializing `StrictValidator`, but it's possible to override it by passing second argument to `DataFrame::validate()` method. +Flow PHP provides two built-in validation strategies: -## Example - schema validation +- **[StrictValidator](/src/core/etl/src/Flow/ETL/Row/Schema/StrictValidator.php)** - Rows must exactly match the schema; + extra entries cause validation failure +- **[SelectiveValidator](/src/core/etl/src/Flow/ETL/Row/Schema/SelectiveValidator.php)** - Only validates entries + defined in schema; ignores extra entries + +By default, DataFrame uses `StrictValidator`, but you can specify a different validator as the second parameter to +`DataFrame::match()`. + +## Basic Schema Matching + +Use `DataFrame::match()` to validate data against a schema: ```php read(from_array([ ['id' => 1, 'name' => 'Product 1', 'active' => true], ['id' => 2, 'name' => 'Product 2', 'active' => false], ['id' => 3, 'name' => 'Product 3', 'active' => true] ])) - ->validate( + ->match( schema( int_schema('id', $nullable = false), - str_schema('name', $nullable = true), + str_schema('name', $nullable = false), bool_schema('active', $nullable = false, Metadata::empty()->add('key', 'value')), ) ) @@ -43,48 +53,24 @@ data_frame() ->run(); ``` -Output: - -```console -Fatal error: Uncaught Flow\ETL\Exception\SchemaValidationException: Given schema: -schema -|-- id: integer -|-- name: ?string -|-- active: boolean - -Does not match rows: -schema -|-- id: integer -|-- name: string -|-- active: boolean -``` - -## Example - display schema +## Schema Validation with Selective Strategy ```php read(from_array([ - ['id' => 1, 'name' => 'Product 1', 'active' => true, 'tags' => ['tag1', 'tag2']], - ['id' => 2, 'name' => 'Product 2', 'active' => false, 'address' => ['city' => 'London', 'country' => 'UK']], - ['id' => 3, 'name' => 'Product 3', 'active' => true, 'tags' => ['tag1', 'tag2']], - ['id' => 3, 'name' => 'Product 3', 'active' => true] + ['id' => 1, 'name' => 'John', 'extra_field' => 'ignored'], + ['id' => 2, 'name' => 'Jane', 'another_extra' => 'also ignored'], ])) - ->collect() - ->write(to_output(false, Output::schema)) + ->match( + schema( + int_schema('id'), + str_schema('name') + ), + schema_selective_validator() // Only validate id and name, ignore other fields + ) + ->write(to_output()) ->run(); -``` - -Output: - -```console -schema -|-- id: integer -|-- name: string -|-- active: boolean -|-- tags: list -|-- address: structure -| |-- city: string -| |-- country: string ``` \ No newline at end of file diff --git a/documentation/examples.md b/documentation/examples.md index 87f469b73e..950977bf76 100644 --- a/documentation/examples.md +++ b/documentation/examples.md @@ -18,17 +18,17 @@ examples/ ├── run.php # Main execution script ├── clean.php # Cleanup utility └── topics/ # Topic-based organization - ├── aggregations/ # Data aggregation operations (8 examples) - ├── data_frame/ # DataFrame operations (7 examples) - ├── data_reading/ # Data input methods (13 examples) - ├── data_writing/ # Data output methods (9 examples) - ├── filesystem/ # Filesystem operations (4 examples) - ├── join/ # Data joining (2 examples) - ├── partitioning/ # Data partitioning (4 examples) - ├── schema/ # Schema management (4 examples) - ├── transformations/ # Data transformations (12 examples) - ├── types/ # Type system (2 examples) - └── window_functions/ # Window functions (1 example) + ├── aggregations/ # Data aggregation operations + ├── data_frame/ # DataFrame operations + ├── data_reading/ # Data input methods + ├── data_writing/ # Data output methods + ├── filesystem/ # Filesystem operations + ├── join/ # Data joining + ├── partitioning/ # Data partitioning + ├── schema/ # Schema management + ├── transformations/ # Data transformations + ├── types/ # Type system + └── window_functions/ # Window functions ``` ### Standard Example Structure diff --git a/documentation/quick-start.md b/documentation/quick-start.md index 846e5b7945..9bf38a1c92 100644 --- a/documentation/quick-start.md +++ b/documentation/quick-start.md @@ -47,6 +47,8 @@ The `data_frame()` function is the entry point to the Flow ETL DSL. It creates a new instance of the `Flow\ETL\Flow` class, which is the main class of the ETL. +For comprehensive DataFrame documentation, see the [Data Frame Guide](/documentation/components/core/core.md). + [Data Frame Examples](/data_frame/#example) [Examples Documentation](/documentation/examples) From 37bb42dccb4a8bfe227b18e893e1d77ac28e19d1 Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Mon, 28 Jul 2025 23:03:33 +0200 Subject: [PATCH 2/2] Updated documentation navigation --- .../documentation/navigation_left.html.twig | 48 ++++++++++++++----- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/web/landing/templates/documentation/navigation_left.html.twig b/web/landing/templates/documentation/navigation_left.html.twig index bb28955ee9..b8e4465bd5 100644 --- a/web/landing/templates/documentation/navigation_left.html.twig +++ b/web/landing/templates/documentation/navigation_left.html.twig @@ -64,6 +64,12 @@
  • Building Blocks
  • +
  • + Data Retrieval +
  • +
  • + Data Manipulation +
  • Select/Drop
  • @@ -77,36 +83,54 @@ Filter
  • - Error Handling + Join
  • - Join + Group By +
  • - Until + Pivot
  • - Sort + Window Functions
  • - Display + Sort
  • Limit
  • +
  • + Offset +
  • +
  • + Until +
  • +
  • + Batch Processing +
  • +
  • + Caching +
  • +
  • + Partitioning +
  • +
  • + Constraints +
  • Schema
  • - Group By - + Display
  • - Window Functions + Error Handling