Skip to content

Commit 4959309

Browse files
committed
Fixed issue with non cleaning page coumn chunk buidlers properly after flushing
1 parent 24c49cb commit 4959309

13 files changed

Lines changed: 271 additions & 87 deletions

src/lib/parquet/src/Flow/Parquet/Writer/ColumnChunkBuilder/DeltaBinaryPackedColumnChunkBuilder.php

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
use Flow\Parquet\ParquetFile\Page\PageHeader;
2828
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
2929
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn, PhysicalType};
30-
use Flow\Parquet\Thrift\{CompactProtocol, MemoryBuffer};
3130
use Flow\Parquet\Writer\PageBuilder\{RLEBitPackedPacker};
3231
use Flow\Parquet\Writer\ValueStorage\{DeltaBinaryPackedValueStorage, ValueStorage};
3332

@@ -44,7 +43,7 @@ final class DeltaBinaryPackedColumnChunkBuilder implements ColumnChunkBuilder
4443

4544
private int $nullCount = 0;
4645

47-
private readonly PageContainers $pages;
46+
private PageContainers $pages;
4847

4948
private StatisticsCounter $pageStatistics;
5049

@@ -99,6 +98,10 @@ public function addRow(WriteColumnData $columnData) : void
9998

10099
public function closePage() : void
101100
{
101+
if ($this->isEmpty()) {
102+
return;
103+
}
104+
102105
$codec = new Codec($this->options);
103106

104107
$pageContainer = match ($writerVersion = $this->options->getInt(Option::WRITER_VERSION)) {
@@ -126,11 +129,9 @@ public function column() : Column
126129

127130
public function flush(int $fileOffset) : array
128131
{
129-
// if (!$this->valueStorage->isEmpty() || \count($this->repetitionLevels) > 0 || \count($this->definitionLevels) > 0) {
130132
$this->closePage();
131-
// }
132133

133-
return [new ColumnChunkContainer(
134+
$containers = [new ColumnChunkContainer(
134135
$this->pages->buffer(),
135136
new ColumnChunk(
136137
type: $this->column->type(),
@@ -148,6 +149,29 @@ public function flush(int $fileOffset) : array
148149
options: $this->options
149150
)
150151
)];
152+
153+
$this->pages = new PageContainers();
154+
$this->chunkStatistics = new StatisticsCounter($this->column);
155+
$this->pageStatistics = new StatisticsCounter($this->column);
156+
$this->repetitionLevels = [];
157+
$this->definitionLevels = [];
158+
$this->valueStorage->reset();
159+
$this->rowsCount = 0;
160+
$this->nullCount = 0;
161+
$this->nonNullValuesCount = 0;
162+
163+
return $containers;
164+
}
165+
166+
/**
167+
* Checks if the builder has any data that needs to be written.
168+
* Used to prevent writing empty pages.
169+
*/
170+
public function isEmpty() : bool
171+
{
172+
return $this->valueStorage->size() === 0
173+
&& count($this->definitionLevels) === 0
174+
&& count($this->repetitionLevels) === 0;
151175
}
152176

153177
public function isFull() : bool
@@ -193,13 +217,9 @@ private function buildDataPage(Codec $codec, Compressions $compression) : PageCo
193217
dataPageHeaderV2: null,
194218
dictionaryPageHeader: null,
195219
);
196-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
197220

198221
return new PageContainer(
199-
$pageHeaderBuffer->data(),
200222
$compressedBuffer,
201-
[],
202-
null,
203223
$pageHeader
204224
);
205225
}
@@ -246,13 +266,9 @@ private function buildDataPageV2(Codec $codec, Compressions $compression) : Page
246266
),
247267
dictionaryPageHeader: null,
248268
);
249-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
250269

251270
return new PageContainer(
252-
$pageHeaderBuffer->data(),
253271
$repetitionsBuffer . $definitionsBuffer . $compressedBuffer,
254-
[],
255-
null,
256272
$pageHeader
257273
);
258274
}

src/lib/parquet/src/Flow/Parquet/Writer/ColumnChunkBuilder/PlainFlatColumnChunkBuilder.php

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
use Flow\Parquet\ParquetFile\Page\PageHeader;
2626
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
2727
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn, PhysicalType};
28-
use Flow\Parquet\Thrift\{CompactProtocol, MemoryBuffer};
2928
use Flow\Parquet\Writer\PageBuilder\{RLEBitPackedPacker};
3029
use Flow\Parquet\Writer\ValueStorage\{BooleanValueStorage, BufferValueStorage, ValueStorage};
3130

@@ -97,6 +96,10 @@ public function addRow(WriteColumnData $columnData) : void
9796

9897
public function closePage() : void
9998
{
99+
if ($this->isEmpty()) {
100+
return;
101+
}
102+
100103
$codec = new Codec($this->options);
101104

102105
$pageContainer = match ($writerVersion = $this->options->getInt(Option::WRITER_VERSION)) {
@@ -146,10 +149,29 @@ public function flush(int $fileOffset) : array
146149
)];
147150

148151
$this->pages = new PageContainers();
152+
$this->chunkStatistics = new StatisticsCounter($this->column);
153+
$this->pageStatistics = new StatisticsCounter($this->column);
154+
$this->repetitionLevels = [];
155+
$this->definitionLevels = [];
156+
$this->valueStorage->reset();
157+
$this->rowsCount = 0;
158+
$this->nullCount = 0;
159+
$this->nonNullValuesCount = 0;
149160

150161
return $containers;
151162
}
152163

164+
/**
165+
* Checks if the builder has any data that needs to be written.
166+
* Used to prevent writing empty pages.
167+
*/
168+
public function isEmpty() : bool
169+
{
170+
return $this->valueStorage->size() === 0
171+
&& count($this->definitionLevels) === 0
172+
&& count($this->repetitionLevels) === 0;
173+
}
174+
153175
public function isFull() : bool
154176
{
155177
return $this->valueStorage->size() >= $this->options->get(Option::PAGE_SIZE_BYTES);
@@ -192,13 +214,9 @@ private function buildDataPage(Codec $codec, Compressions $compression) : PageCo
192214
dataPageHeaderV2: null,
193215
dictionaryPageHeader: null,
194216
);
195-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
196217

197218
return new PageContainer(
198-
$pageHeaderBuffer->data(),
199219
$compressedBuffer,
200-
[],
201-
null,
202220
$pageHeader
203221
);
204222
}
@@ -244,13 +262,9 @@ private function buildDataPageV2(Codec $codec, Compressions $compression) : Page
244262
),
245263
dictionaryPageHeader: null,
246264
);
247-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
248265

249266
return new PageContainer(
250-
$pageHeaderBuffer->data(),
251267
$repetitionsBuffer . $definitionsBuffer . $compressedBuffer,
252-
[],
253-
null,
254268
$pageHeader
255269
);
256270
}

src/lib/parquet/src/Flow/Parquet/Writer/ColumnChunkBuilder/RLEDictionaryChunkBuilder.php

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
use Flow\Parquet\ParquetFile\Page\PageHeader;
2929
use Flow\Parquet\ParquetFile\RowGroup\ColumnChunk;
3030
use Flow\Parquet\ParquetFile\Schema\{Column, FlatColumn};
31-
use Flow\Parquet\Thrift\{CompactProtocol, MemoryBuffer};
3231
use Flow\Parquet\Writer\PageBuilder\{Dictionary, DictionaryBuilder};
3332
use Flow\Parquet\Writer\PageBuilder\RLEBitPackedPacker;
3433

@@ -96,6 +95,10 @@ public function addRow(WriteColumnData $columnData) : void
9695

9796
public function closePage() : void
9897
{
98+
if ($this->isEmpty()) {
99+
return;
100+
}
101+
99102
$codec = new Codec($this->options);
100103

101104
if (\count($this->pageValues) > 0) {
@@ -138,11 +141,9 @@ public function column() : Column
138141

139142
public function flush(int $fileOffset) : array
140143
{
141-
if (\count($this->pageValues) > 0 || \count($this->definitionLevels) > 0) {
142-
$this->closePage();
143-
}
144+
$this->closePage();
144145

145-
$contaiers = [new ColumnChunkContainer(
146+
$containers = [new ColumnChunkContainer(
146147
$this->pages->buffer(),
147148
new ColumnChunk(
148149
type: $this->column->type(),
@@ -161,9 +162,29 @@ public function flush(int $fileOffset) : array
161162
)
162163
)];
163164

165+
// Reset all state after flush
164166
$this->pages = new PageContainers();
167+
$this->chunkStatistics = new StatisticsCounter($this->column);
168+
$this->pageStatistics = new StatisticsCounter($this->column);
169+
$this->definitionLevels = [];
170+
$this->repetitionLevels = [];
171+
$this->pageValues = [];
172+
$this->dictionary = null;
173+
$this->rowsCount = 0;
174+
$this->nullCount = 0;
175+
176+
return $containers;
177+
}
165178

166-
return $contaiers;
179+
/**
180+
* Checks if the builder has any data that needs to be written.
181+
* Used to prevent writing empty pages.
182+
*/
183+
public function isEmpty() : bool
184+
{
185+
return count($this->pageValues) === 0
186+
&& count($this->definitionLevels) === 0
187+
&& count($this->repetitionLevels) === 0;
167188
}
168189

169190
public function isFull() : bool
@@ -211,13 +232,9 @@ private function buildDataPage(Codec $codec, Compressions $compression) : PageCo
211232
dataPageHeaderV2: null,
212233
dictionaryPageHeader: null,
213234
);
214-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
215235

216236
return new PageContainer(
217-
$pageHeaderBuffer->data(),
218237
$compressedBuffer,
219-
$this->dictionary->indices ?? [],
220-
$this->dictionary->dictionary ?? [],
221238
$pageHeader
222239
);
223240
}
@@ -270,13 +287,9 @@ private function buildDataPageV2(Codec $codec, Compressions $compression) : Page
270287
),
271288
dictionaryPageHeader: null,
272289
);
273-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
274290

275291
return new PageContainer(
276-
$pageHeaderBuffer->data(),
277292
$repetitionsBuffer . $definitionsBuffer . $compressedBuffer,
278-
$this->dictionary->indices ?? [],
279-
$this->dictionary->dictionary ?? [],
280293
$pageHeader
281294
);
282295
}
@@ -304,13 +317,9 @@ private function buildDictionaryPage(Codec $codec, Compressions $compression) :
304317
\count($this->dictionary->dictionary)
305318
),
306319
);
307-
$pageHeader->toThrift()->write(new CompactProtocol($pageHeaderBuffer = new MemoryBuffer()));
308320

309321
return new PageContainer(
310-
$pageHeaderBuffer->data(),
311322
$compressedBuffer,
312-
$this->dictionary->indices,
313-
$this->dictionary->dictionary,
314323
$pageHeader
315324
);
316325
}

src/lib/parquet/src/Flow/Parquet/Writer/ColumnChunkBuilders.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ public function add(WriteColumnData $columnData) : void
4646
$this->builders[$columnData->column->name()]->addRow($columnData);
4747
}
4848

49+
/**
50+
* @return array<ColumnChunkBuilder>
51+
*/
52+
public function builders() : array
53+
{
54+
return $this->builders;
55+
}
56+
4957
/**
5058
* Close all pages in the column chunk builders.
5159
*/

src/lib/parquet/src/Flow/Parquet/Writer/PageContainer.php

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,43 @@
55
namespace Flow\Parquet\Writer;
66

77
use Flow\Parquet\ParquetFile\Page\PageHeader;
8+
use Flow\Parquet\Thrift\{CompactProtocol, MemoryBuffer};
89

9-
final readonly class PageContainer
10+
final class PageContainer
1011
{
12+
private ?string $serializedHeader = null;
13+
1114
/**
12-
* @param string $pageHeaderBuffer
13-
* @param string $pageBuffer
14-
* @param array<array-key, mixed> $values - when dictionary is present values are indices
15-
* @param null|array<array-key, mixed> $dictionary
15+
* @param string $compressedData - Compressed page data (repetition levels, definition levels, and values)
1616
* @param PageHeader $pageHeader
1717
*/
1818
public function __construct(
19-
public string $pageHeaderBuffer,
20-
public string $pageBuffer,
21-
public array $values,
22-
public ?array $dictionary,
19+
public string $compressedData,
2320
public PageHeader $pageHeader,
2421
) {
2522
}
2623

2724
public function dataSize() : int
2825
{
29-
return \strlen($this->pageBuffer);
26+
return \strlen($this->compressedData);
3027
}
3128

3229
public function headerSize() : int
3330
{
34-
return \strlen($this->pageHeaderBuffer);
31+
return \strlen($this->serializedHeader());
32+
}
33+
34+
public function serializedHeader() : string
35+
{
36+
if ($this->serializedHeader !== null) {
37+
return $this->serializedHeader;
38+
}
39+
40+
$this->pageHeader->toThrift()->write(new CompactProtocol($buffer = new MemoryBuffer()));
41+
42+
$this->serializedHeader = $buffer->data();
43+
44+
return $this->serializedHeader;
3545
}
3646

3747
public function totalCompressedSize() : int
@@ -41,6 +51,6 @@ public function totalCompressedSize() : int
4151

4252
public function totalUncompressedSize() : int
4353
{
44-
return $this->headerSize() + $this->dataSize();
54+
return $this->headerSize() + $this->pageHeader->uncompressedPageSize();
4555
}
4656
}

src/lib/parquet/src/Flow/Parquet/Writer/PageContainers.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ public function buffer() : string
4747
$buffer = '';
4848

4949
if ($this->dictionaryPageContainer) {
50-
$buffer .= $this->dictionaryPageContainer->pageHeaderBuffer;
51-
$buffer .= $this->dictionaryPageContainer->pageBuffer;
50+
$buffer .= $this->dictionaryPageContainer->serializedHeader();
51+
$buffer .= $this->dictionaryPageContainer->compressedData;
5252
}
5353

5454
foreach ($this->dataPageContainers as $pageContainer) {
55-
$buffer .= $pageContainer->pageHeaderBuffer;
56-
$buffer .= $pageContainer->pageBuffer;
55+
$buffer .= $pageContainer->serializedHeader();
56+
$buffer .= $pageContainer->compressedData;
5757
}
5858

5959
return $buffer;

0 commit comments

Comments
 (0)