Skip to content

Commit d775693

Browse files
authored
Bug parquet reader memory consumption (#1757)
* Split FlatColumnData to Read and Write * Use Read/Write FlatColumnData in reader/writer * Updated ReadFlatColumnValues constructor * Make ParquetFile yield rows instead of buffering them in array * Refactored REad/WriteFlatcolumnValues iterator method * Yield pages from column chunk reader instead of buffering them * Optimized Dremel Assembler null level processing * Make assemblyFlat to yield rows instead of buffering them * Fix yielding one page at time from ColumnChunkReader * Optimized microseconds to date time conversio in parquet * Optimize DremelAssembler assemblyList method * Optimize DremelAssemble assemblyList assemblyMap assemblyStructure methods * Optimize using stack for parquet flat columns * Optimize BinaryReady to yield values * Make PlainValueUnpacker return generator * Fix bug in write column values and read even deeply nested columns read page by page * Cleaned up ParquetFile * Move to flat number of rows per page vs estimations basde on rows in memory * Smal refactoring of ReadFlatColumnValues * Fixed skipping rows in Read/Write flat column values * Simplified read flat column values * Make ReadFlatColumnValues take values as generator * Updated dsl definitions * CS Fixes * Regenerate data for parquet extractor benchmark * Added missing file * Added a failing test related to data page sizes * Temporarly save nested column children in one page * Restored data page size option * PoC of new Optimized Row Group Writer * Fixed bug when writing empty pages * Fix not equal rows distribution across pges * Fixed bug related to inverting booleans * More performance optimizations * Regenerated parquet fixtures in order to fix extractors benchmarks * Removed legacy row group builder * Reorganized parquet library namespaces * Covered column Page Builders with unit tests * Fixed namespace for ChunkColumnBuilder implementtaions * Covered RowGroupBuilder and dependencies with unit tests * Added Benchmarks for Parquet Library * Updated github workflow with benchmarks * Added snappy extension to parquet benchmarks
1 parent 1ca7f29 commit d775693

119 files changed

Lines changed: 10086 additions & 2475 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/test-benchmark.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ jobs:
4343
tools: composer:v2
4444
php-version: "${{ matrix.php-version }}"
4545
ini-values: memory_limit=-1
46-
extensions: :psr, bcmath, dom, hash, json, mbstring, xml, xmlwriter, xmlreader, zlib
46+
extensions: :psr, bcmath, dom, hash, json, mbstring, xml, xmlwriter, xmlreader, zlib, snappy-https://github.com/kjdev/php-ext-snappy@0.2.1
47+
env:
48+
SNAPPY_CONFIGURE_PREFIX_OPTS: "CXXFLAGS=-std=c++11"
4749

4850
- name: "Get Composer Cache Directory"
4951
id: composer-cache
@@ -107,6 +109,14 @@ jobs:
107109
echo ' '
108110
echo '</details>'
109111
echo ' '
112+
echo '<details><summary>Parquet Library</summary>'
113+
echo ' '
114+
echo '```shell'
115+
composer test:benchmark:parquet-library -- --ref=1.x --progress=none
116+
echo '```'
117+
echo ' '
118+
echo '</details>'
119+
echo ' '
110120
} >> "./var/phpbench/summary.txt"
111121
112122
- uses: actions/upload-artifact@v4

composer.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,8 @@
361361
"@test:benchmark:building_blocks",
362362
"@test:benchmark:extractor",
363363
"@test:benchmark:loader",
364-
"@test:benchmark:transformer"
364+
"@test:benchmark:transformer",
365+
"@test:benchmark:parquet-library"
365366
],
366367
"test:website": [
367368
"composer test --working-dir=./web/landing"
@@ -381,6 +382,9 @@
381382
"test:benchmark:transformer": [
382383
"tools/phpbench/vendor/bin/phpbench run --report=flow-report --group=transformer"
383384
],
385+
"test:benchmark:parquet-library": [
386+
"tools/phpbench/vendor/bin/phpbench run --report=flow-report --group=parquet-library"
387+
],
384388
"test:mutation": [
385389
"tools/infection/vendor/bin/infection --threads=max"
386390
],

phpbench.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
"src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/",
2323
"src/adapter/etl-adapter-text/tests/Flow/ETL/Adapter/Text/Tests/Benchmark/",
2424
"src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Benchmark/",
25-
"src/core/etl/tests/Flow/ETL/Tests/Benchmark/"
25+
"src/core/etl/tests/Flow/ETL/Tests/Benchmark/",
26+
"src/lib/parquet/tests/Flow/Parquet/Tests/Benchmark/"
2627
],
2728
"runner.php_config": { "memory_limit": "1G" },
2829
"runner.iterations": 3,

phpstan.neon

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ parameters:
6666
- src/lib/parquet/src/Flow/Parquet/ThriftStream/*
6767
- src/lib/parquet/src/Flow/Parquet/Thrift/*
6868
- src/lib/parquet/src/Flow/Parquet/BinaryReader/*
69-
- src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroupBuilder/ColumnData/DefinitionConverter.php
69+
- src/lib/parquet/src/Flow/Parquet/Dremel/ColumnData/DefinitionConverter.php
7070

7171
tmpDir: var/phpstan/cache
7272

rector.tests.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
use Rector\Set\ValueObject\LevelSetList;
5858
use Rector\Transform\Rector\StaticCall\StaticCallToFuncCallRector;
5959
use \Rector\Transform\ValueObject\StaticCallToFuncCall;
60-
use Rector\Renaming\Rector\FuncCall\RenameFunctionRector;
60+
use Rector\PHPUnit\AnnotationsToAttributes\Rector\ClassMethod\DataProviderAnnotationToAttributeRector;
6161

6262
return RectorConfig::configure()
6363
->withPaths([
@@ -71,6 +71,9 @@
7171
->withSets([
7272
LevelSetList::UP_TO_PHP_82
7373
])
74+
->withRules([
75+
DataProviderAnnotationToAttributeRector::class,
76+
])
7477
->withConfiguredRule(
7578
StaticCallToFuncCallRector::class,
7679
[

src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/RowsNormalizer.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ public function normalize(Rows $rows, Schema $schema) : array
2727
$columns = [];
2828

2929
foreach ($row->entries() as $entry) {
30-
if ($schema->get($entry->ref())->isNullable() && $entry->value() === null) {
30+
$definition = $schema->get($entry->ref());
31+
32+
if ($definition->isNullable() && $entry->value() === null) {
3133
$columns[$entry->name()] = null;
3234

3335
continue;

src/adapter/etl-adapter-parquet/src/Flow/ETL/Adapter/Parquet/functions.php

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
namespace Flow\ETL\Adapter\Parquet;
66

7-
use Flow\ETL\{Attribute\DocumentationDSL, Attribute\DocumentationExample, Attribute\Module, Attribute\Type as DSLType};
7+
use Flow\ETL\{Attribute\DocumentationDSL,
8+
Attribute\DocumentationExample,
9+
Attribute\Module,
10+
Attribute\Type as DSLType
11+
};
812
use Flow\ETL\Schema;
913
use Flow\Filesystem\Path;
1014
use Flow\Parquet\{ByteOrder, Options};
@@ -68,3 +72,24 @@ function to_parquet(
6872

6973
return $loader;
7074
}
75+
76+
/**
77+
* @template T
78+
*
79+
* @param array<T> $data
80+
*
81+
* @return \Generator<T>
82+
*/
83+
#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
84+
function array_to_generator(array $data) : \Generator
85+
{
86+
foreach ($data as $row) {
87+
yield $row;
88+
}
89+
}
90+
91+
#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
92+
function empty_generator() : \Generator
93+
{
94+
yield from [];
95+
}

src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Integration/ParquetTest.php

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
namespace Flow\ETL\Adapter\Parquet\Tests\Integration;
66

77
use function Flow\ETL\Adapter\Parquet\{from_parquet, to_parquet};
8-
use function Flow\ETL\DSL\{config, from_array, json_schema, schema, str_schema};
8+
use function Flow\ETL\DSL\{config, from_array, json_schema, overwrite, schema, str_schema};
99
use function Flow\ETL\DSL\data_frame;
1010
use function Flow\Filesystem\DSL\{path};
1111
use Flow\ETL\Tests\Double\FakeExtractor;
12-
use Flow\ETL\{Tests\FlowTestCase};
12+
use Flow\ETL\{Tests\Double\FakeRandomOrdersExtractor, Tests\FlowTestCase};
13+
use Flow\Filesystem\SizeUnits;
14+
use Flow\Parquet\{Option, Options};
1315
use Ramsey\Uuid\Uuid;
1416

1517
final class ParquetTest extends FlowTestCase
@@ -33,6 +35,32 @@ public function test_writing_and_reading_into_parquet() : void
3335
);
3436
}
3537

38+
public function test_writing_and_reading_parquet_orders() : void
39+
{
40+
$path = path(__DIR__ . '/var/orders.snappy.parquet');
41+
$config = config();
42+
data_frame($config)
43+
->read(new FakeRandomOrdersExtractor(1000))
44+
->mode(overwrite())
45+
->write(
46+
to_parquet($path)
47+
->withOptions(
48+
Options::default()
49+
->set(Option::ROW_GROUP_SIZE_CHECK_INTERVAL, 500)
50+
->set(Option::ROW_GROUP_SIZE_BYTES, SizeUnits::MiB_SIZE)
51+
->set(Option::PAGE_MAXIMUM_ROWS_COUNT, 10)
52+
)
53+
)
54+
->run();
55+
56+
self::assertEquals(
57+
1000,
58+
(data_frame($config))
59+
->read(from_parquet($path))
60+
->count()
61+
);
62+
}
63+
3664
public function test_writing_with_provided_schema() : void
3765
{
3866
$path = path('memory://var/file_schema.snappy.parquet');

0 commit comments

Comments
 (0)