Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .php-cs-fixer.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
__DIR__ . '/tools/rector/src',
])
->exclude([
'Flow/Parquet/Thrift',
'Flow/Parquet/ThriftModel',
'Flow/CLI/Tests/Integration',
'Flow/ETL/Tests/Unit/Loader',
'Flow/ETL/Tests/Unit/Exception'
Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@
"./tools/phpdocumentor/vendor/bin/phpdoc --config=./phpdoc/bridge.symfony.http-foundation.xml"
],
"build:parquet:thrift": [
"grep -q 'namespace php Flow.Parquet.Thrift' src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift || { echo \"Flow php namespace not found in thrift definition!\"; exit 1; }\n",
"rm src/lib/parquet/src/Flow/Parquet/Thrift/*.php",
"grep -q 'namespace php Flow.Parquet.ThriftModel' src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift || { echo \"Flow php namespace not found in thrift definition!\"; exit 1; }\n",
"rm src/lib/parquet/src/Flow/Parquet/ThriftModel/*.php",
"thrift --gen php --out src/lib/parquet/src src/lib/parquet/src/Flow/Parquet/Resources/Thrift/parquet.thrift",
"@cs:php:fix"
],
Expand Down
2 changes: 1 addition & 1 deletion phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ parameters:
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/SearchParams.php
- src/adapter/etl-adapter-elasticsearch/src/Flow/ETL/Adapter/Elasticsearch/ElasticsearchPHP/PointInTime.php
- src/adapter/etl-adapter-excel/src/Flow/ETL/Adapter/Excel/Sheet/SheetsManager.php
- src/lib/parquet/src/Flow/Parquet/ThriftStream/*
- src/lib/parquet/src/Flow/Parquet/Thrift/*
- src/lib/parquet/src/Flow/Parquet/ThriftModel/*
- src/lib/parquet/src/Flow/Parquet/BinaryReader/*
- src/lib/parquet/src/Flow/Parquet/Dremel/ColumnData/DefinitionConverter.php

Expand Down
2 changes: 1 addition & 1 deletion rector.src.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
])
->withSkip([
StringClassNameToClassConstantRector::class,
__DIR__ . '/src/lib/parquet/src/Flow/Parquet/Thrift/*',
__DIR__ . '/src/lib/parquet/src/Flow/Parquet/ThriftModel/*',
])
->withCache(__DIR__ . '/var/rector/src')
->withImportNames(importShortClasses: false, removeUnusedImports: true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,15 @@ function empty_generator() : \Generator
{
yield from [];
}

#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
function schema_to_parquet(Schema $schema) : \Flow\Parquet\ParquetFile\Schema
{
return (new SchemaConverter())->toParquet($schema);
}

#[DocumentationDSL(module: Module::PARQUET, type: DSLType::HELPER)]
function schema_from_parquet(\Flow\Parquet\ParquetFile\Schema $schema) : Schema
{
return (new SchemaConverter())->toFlow($schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

namespace Flow\ETL\Tests\Double;

use function Flow\ETL\DSL\{array_to_rows, datetime_schema, float_schema, list_schema, schema, string_schema, structure_schema, uuid_schema};
use function Flow\ETL\DSL\{array_to_rows,
datetime_schema,
float_schema,
integer_schema,
list_schema,
schema,
string_schema,
structure_schema,
uuid_schema};
use function Flow\Types\DSL\{type_float, type_integer, type_list, type_string, type_structure};
use Flow\ETL\{Extractor, FlowContext, Schema};

Expand All @@ -17,6 +25,7 @@ public function __construct(private int $count = 1_000)
public static function schema() : Schema
{
return schema(
integer_schema('index'),
uuid_schema('order_id'),
datetime_schema('created_at'),
datetime_schema('updated_at', true),
Expand Down Expand Up @@ -65,6 +74,7 @@ public function rawData() : \Generator

for ($i = 0; $i < $this->count; $i++) {
yield [
'index' => $i,
'order_id' => '254d61c5-22c8-4407-83a2-76f1cab53af2',
'created_at' => new \DateTimeImmutable('2025-01-01 12:00:00'),
'updated_at' => \random_int(0, 1) === 1 ? new \DateTimeImmutable('2025-01-01 12:10:00') : null,
Expand Down
12 changes: 6 additions & 6 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
ParquetFile\Metadata,
ParquetFile\Page\ColumnPageHeader,
ParquetFile\Schema,
Reader\PageReader};
Reader\PageReader,
Thrift\CompactProtocol,
Thrift\MemoryBuffer};
use Flow\Parquet\Exception\{InvalidArgumentException, RuntimeException};
use Flow\Parquet\ParquetFile\Data\DataConverter;
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn};
use Flow\Parquet\ParquetFile\Schema\NestedColumn;
use Flow\Parquet\Reader\{ColumnChunkReader, ColumnChunkViewer};
use Flow\Parquet\Thrift\FileMetaData;
use Thrift\Protocol\TCompactProtocol;
use Thrift\Transport\TMemoryBuffer;
use Flow\Parquet\ThriftModel\FileMetaData;

final class ParquetFile
{
Expand Down Expand Up @@ -64,8 +64,8 @@ public function metadata() : Metadata

$thriftMetadata = new FileMetaData();
$thriftMetadata->read(
new TCompactProtocol(
new TMemoryBuffer($metadata)
new CompactProtocol(
new MemoryBuffer($metadata)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
use Flow\Parquet\Thrift\FileMetaData;
use Flow\Parquet\ThriftModel\FileMetaData;

final readonly class Metadata
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeader $thrift) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\DataPageHeader $thrift) : self
{
return new self(
Encodings::from($thrift->encoding),
Expand All @@ -41,9 +41,9 @@ public function repetitionLevelEncoding() : Encodings
return $this->repetitionLevelEncoding;
}

public function toThrift() : \Flow\Parquet\Thrift\DataPageHeader
public function toThrift() : \Flow\Parquet\ThriftModel\DataPageHeader
{
return new \Flow\Parquet\Thrift\DataPageHeader([
return new \Flow\Parquet\ThriftModel\DataPageHeader([
'num_values' => $this->valuesCount,
'encoding' => $this->encoding->value,
'definition_level_encoding' => $this->definitionLevelEncoding->value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\DataPageHeaderV2 $thrift, Options $options) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\DataPageHeaderV2 $thrift, Options $options) : self
{
return new self(
$thrift->num_values,
Expand Down Expand Up @@ -61,9 +61,9 @@ public function statistics(Options $options) : ?StatisticsReader
return new StatisticsReader($this->statistics, $options);
}

public function toThrift() : \Flow\Parquet\Thrift\DataPageHeaderV2
public function toThrift() : \Flow\Parquet\ThriftModel\DataPageHeaderV2
{
return new \Flow\Parquet\Thrift\DataPageHeaderV2([
return new \Flow\Parquet\ThriftModel\DataPageHeaderV2([
'num_values' => $this->valuesCount,
'num_nulls' => $this->nullsCount,
'num_rows' => $this->rowsCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\DictionaryPageHeader $thrift) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\DictionaryPageHeader $thrift) : self
{
return new self(
Encodings::from($thrift->encoding),
Expand All @@ -27,9 +27,9 @@ public function encoding() : Encodings
return $this->encoding;
}

public function toThrift() : \Flow\Parquet\Thrift\DictionaryPageHeader
public function toThrift() : \Flow\Parquet\ThriftModel\DictionaryPageHeader
{
return new \Flow\Parquet\Thrift\DictionaryPageHeader([
return new \Flow\Parquet\ThriftModel\DictionaryPageHeader([
'encoding' => $this->encoding->value,
'num_values' => $this->valuesCount,
'is_sorted' => false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\PageHeader $thrift, Options $options) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\PageHeader $thrift, Options $options) : self
{
return new self(
Type::from($thrift->type),
Expand Down Expand Up @@ -90,9 +90,9 @@ public function encoding() : Encodings
return $this->dataPageHeader->encoding();
}

public function toThrift() : \Flow\Parquet\Thrift\PageHeader
public function toThrift() : \Flow\Parquet\ThriftModel\PageHeader
{
return new \Flow\Parquet\Thrift\PageHeader([
return new \Flow\Parquet\ThriftModel\PageHeader([
'type' => $this->type->value,
'compressed_page_size' => $this->compressedPageSize,
'uncompressed_page_size' => $this->uncompressedPageSize,
Expand Down
8 changes: 4 additions & 4 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroup.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\RowGroup $thrift, Options $options) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\RowGroup $thrift, Options $options) : self
{
return new self(
\array_map(static fn (\Flow\Parquet\Thrift\ColumnChunk $columnChunk) => ColumnChunk::fromThrift($columnChunk, $options), $thrift->columns),
\array_map(static fn (\Flow\Parquet\ThriftModel\ColumnChunk $columnChunk) => ColumnChunk::fromThrift($columnChunk, $options), $thrift->columns),
$thrift->num_rows
);
}
Expand Down Expand Up @@ -72,13 +72,13 @@ public function totalByteSize() : int
return \array_sum(\array_map(static fn (ColumnChunk $chunk) => $chunk->totalUncompressedSize(), $this->columnChunks));
}

public function toThrift() : \Flow\Parquet\Thrift\RowGroup
public function toThrift() : \Flow\Parquet\ThriftModel\RowGroup
{
$fileOffset = \count($this->columnChunks) ? \current($this->columnChunks)->fileOffset() : 0;
$chunksUncompressedSize = \array_map(static fn (ColumnChunk $chunk) => $chunk->totalUncompressedSize(), $this->columnChunks);
$chunksCompressedSize = \array_map(static fn (ColumnChunk $chunk) => $chunk->totalCompressedSize(), $this->columnChunks);

return new \Flow\Parquet\Thrift\RowGroup([
return new \Flow\Parquet\ThriftModel\RowGroup([
'columns' => \array_map(static fn (ColumnChunk $columnChunk) => $columnChunk->toThrift(), $this->columnChunks),
'num_rows' => $this->rowsCount,
'file_offset' => $fileOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Flow\Parquet\Options;
use Flow\Parquet\ParquetFile\{Compressions, Encodings, Statistics};
use Flow\Parquet\ParquetFile\Schema\PhysicalType;
use Flow\Parquet\Thrift\ColumnMetaData;
use Flow\Parquet\ThriftModel\ColumnMetaData;

final readonly class ColumnChunk
{
Expand Down Expand Up @@ -41,7 +41,7 @@ public function __construct(
) {
}

public static function fromThrift(\Flow\Parquet\Thrift\ColumnChunk $thrift, Options $options) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\ColumnChunk $thrift, Options $options) : self
{
return new self(
PhysicalType::from($thrift->meta_data->type),
Expand Down Expand Up @@ -126,9 +126,9 @@ public function totalUncompressedSize() : int
return $this->totalUncompressedSize;
}

public function toThrift() : \Flow\Parquet\Thrift\ColumnChunk
public function toThrift() : \Flow\Parquet\ThriftModel\ColumnChunk
{
return new \Flow\Parquet\Thrift\ColumnChunk([
return new \Flow\Parquet\ThriftModel\ColumnChunk([
'file_offset' => $this->fileOffset,
'meta_data' => new ColumnMetaData([
'type' => $this->type->value,
Expand Down
4 changes: 2 additions & 2 deletions src/lib/parquet/src/Flow/Parquet/ParquetFile/RowGroups.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public function __construct(private array $rowGroups)
}

/**
* @param array<\Flow\Parquet\Thrift\RowGroup> $rowGroups
* @param array<\Flow\Parquet\ThriftModel\RowGroup> $rowGroups
*/
public static function fromThrift(array $rowGroups, Options $options) : self
{
Expand Down Expand Up @@ -54,7 +54,7 @@ public function rowsCount() : int
}

/**
* @return array<\Flow\Parquet\Thrift\RowGroup>
* @return array<\Flow\Parquet\ThriftModel\RowGroup>
*/
public function toThrift() : array
{
Expand Down
2 changes: 1 addition & 1 deletion src/lib/parquet/src/Flow/Parquet/ParquetFile/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn, NestedColumn};
use Flow\Parquet\Thrift\SchemaElement;
use Flow\Parquet\ThriftModel\SchemaElement;

final class Schema
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\Parquet\ParquetFile\Schema;

use Flow\Parquet\Thrift\SchemaElement;
use Flow\Parquet\ThriftModel\SchemaElement;

interface Column
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\Parquet\Consts;
use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\Thrift\SchemaElement;
use Flow\Parquet\ThriftModel\SchemaElement;

final class FlatColumn implements Column
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\ParquetFile\Schema\LogicalType\{Decimal, Time, Timestamp};
use Flow\Parquet\Thrift\{BsonType, DateType, DecimalType, EnumType, IntType, JsonType, ListType, MapType, MicroSeconds, MilliSeconds, NanoSeconds, NullType, StringType, TimeType, TimeUnit, TimestampType, UUIDType};
use Flow\Parquet\ThriftModel\{BsonType, DateType, DecimalType, EnumType, IntType, JsonType, ListType, MapType, MicroSeconds, MilliSeconds, NanoSeconds, NullType, StringType, TimeType, TimeUnit, TimestampType, UUIDType};

final readonly class LogicalType
{
Expand Down Expand Up @@ -64,7 +64,7 @@ public static function enum() : self
return new self(self::ENUM);
}

public static function fromThrift(\Flow\Parquet\Thrift\LogicalType $logicalType) : self
public static function fromThrift(\Flow\Parquet\ThriftModel\LogicalType $logicalType) : self
{
$name = null;

Expand Down Expand Up @@ -202,9 +202,9 @@ public function timestampData() : ?Timestamp
return $this->timestamp;
}

public function toThrift() : \Flow\Parquet\Thrift\LogicalType
public function toThrift() : \Flow\Parquet\ThriftModel\LogicalType
{
return new \Flow\Parquet\Thrift\LogicalType([
return new \Flow\Parquet\ThriftModel\LogicalType([
self::BSON => $this->is(self::BSON) ? new BsonType() : null,
self::DATE => $this->is(self::DATE) ? new DateType() : null,
self::DECIMAL => $this->is(self::DECIMAL) ? new DecimalType([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\Parquet\ParquetFile\Schema\LogicalType;

use Flow\Parquet\Thrift\DecimalType;
use Flow\Parquet\ThriftModel\DecimalType;

final readonly class Decimal
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\Parquet\ParquetFile\Schema\LogicalType;

use Flow\Parquet\Thrift\TimeType;
use Flow\Parquet\ThriftModel\TimeType;

final readonly class Time
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Flow\Parquet\ParquetFile\Schema\LogicalType;

use Flow\Parquet\Thrift\TimestampType;
use Flow\Parquet\ThriftModel\TimestampType;

final readonly class Timestamp
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\Parquet\ParquetFile\Schema;

use Flow\Parquet\Exception\InvalidArgumentException;
use Flow\Parquet\Thrift\SchemaElement;
use Flow\Parquet\ThriftModel\SchemaElement;

final class NestedColumn implements Column
{
Expand Down
Loading
Loading