Skip to content

Commit 0a74411

Browse files
authored
Close Excel extractor when limit is reached (#1659)
* Close Excel extractor when the limit is reached * Prevent memory leak in ExcelExtractor by not copying each row in loop
1 parent 4c36ac4 commit 0a74411

6 files changed

Lines changed: 49 additions & 20 deletions

File tree

src/adapter/etl-adapter-excel/src/Flow/ETL/Adapter/Excel/ExcelExtractor.php

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,20 @@
44

55
namespace Flow\ETL\Adapter\Excel;
66

7-
use function Flow\ETL\DSL\array_to_rows;
7+
use function Flow\ETL\DSL\{array_to_rows};
88
use Flow\ETL\{Adapter\Excel\Sheet\SheetNameAssertion,
99
Adapter\Excel\Sheet\SheetsManager,
1010
Exception\InvalidArgumentException,
1111
Extractor,
12-
FlowContext,
13-
Loader\Closure};
12+
FlowContext
13+
};
1414
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
1515
use Flow\Filesystem\{Path, SourceStream};
1616
use OpenSpout\Common\Entity\{Cell, Row};
1717
use OpenSpout\Reader\ODS\Reader as OdsReader;
1818
use OpenSpout\Reader\XLSX\Reader as XlsxReader;
1919

20-
final class ExcelExtractor implements Closure, Extractor, FileExtractor, LimitableExtractor
20+
final class ExcelExtractor implements Extractor, FileExtractor, LimitableExtractor
2121
{
2222
use Limitable;
2323
use PathFiltering;
@@ -43,11 +43,6 @@ public function __construct(private readonly Path $path)
4343
$this->resetLimit();
4444
}
4545

46-
public function closure(FlowContext $context) : void
47-
{
48-
$this->reader()->close();
49-
}
50-
5146
public function extract(FlowContext $context) : \Generator
5247
{
5348
$headers = [];
@@ -61,13 +56,17 @@ public function extract(FlowContext $context) : \Generator
6156

6257
foreach ($context->streams()->list($this->path, $this->filter()) as $stream) {
6358
foreach ($this->extractRows($stream, $headers, $offset) as $row) {
64-
$signal = yield array_to_rows($row, $context->entryFactory());
59+
$signal = yield array_to_rows($row, $context->entryFactory(), $stream->path()->partitions());
6560
$this->incrementReturnedRows();
6661

6762
if ($signal === Signal::STOP || $this->reachedLimit()) {
63+
$stream->close();
64+
6865
return;
6966
}
7067
}
68+
69+
$stream->close();
7170
}
7271
}
7372

@@ -136,7 +135,7 @@ private function createRowsFromCells(Row $row, int $previousRowDataCount = 0) :
136135
return $rowData;
137136
}
138137

139-
private function extractRows(SourceStream $stream, array $headers, int $offset) : array
138+
private function extractRows(SourceStream $stream, array $headers, int $offset) : \Generator
140139
{
141140
try {
142141
$this->reader()->open($stream->path()->path());
@@ -148,9 +147,9 @@ private function extractRows(SourceStream $stream, array $headers, int $offset)
148147

149148
$sheet = $this->sheetName ? $manager->get($this->sheetName) : $manager->first();
150149

151-
foreach ($sheet->getRowIterator() as $rowIndex => $row) {
150+
foreach ($sheet->getRowIterator() as $rowIndex => $sheetRow) {
152151
if (1 === $rowIndex && $this->withHeader) {
153-
$headers = $this->createRowsFromCells($row);
152+
$headers = $this->createRowsFromCells($sheetRow);
154153

155154
continue;
156155
}
@@ -161,17 +160,17 @@ private function extractRows(SourceStream $stream, array $headers, int $offset)
161160
}
162161

163162
// ODS format reader skips empty cells when reading rows
164-
$rowData = $this->createRowsFromCells($row, $previousRowDataCount);
165-
$previousRowDataCount = \count($rowData);
163+
$row = $this->createRowsFromCells($sheetRow, $previousRowDataCount);
164+
$previousRowDataCount = \count($row);
166165

167166
if ($this->withHeader) {
168-
$rowData = \array_combine($headers, $rowData);
167+
yield \array_combine($headers, $row);
168+
} else {
169+
yield $row;
169170
}
170-
171-
$rows[] = $rowData;
172171
}
173172

174-
return $rows;
173+
$this->reader()->close();
175174
} catch (\Throwable $e) {
176175
throw new InvalidArgumentException('Failed to open file: ' . $e->getMessage(), previous: $e);
177176
}

src/adapter/etl-adapter-excel/tests/Flow/ETL/Adapter/Excel/Tests/Integration/ExcelExtractorTest.php

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
use function Flow\ETL\DSL\{config, df, flow_context};
99
use Flow\ETL\Adapter\Excel\ExcelReader;
1010
use Flow\ETL\Exception\InvalidArgumentException;
11-
use Flow\ETL\Row;
11+
use Flow\ETL\{Extractor\Signal, Row, Rows};
1212
use Flow\ETL\Tests\FlowTestCase;
13+
use Flow\Filesystem\{Partition, Path};
1314
use PHPUnit\Framework\Attributes\DataProvider;
1415

1516
final class ExcelExtractorTest extends FlowTestCase
@@ -199,4 +200,33 @@ public function test_extract_with_wrongly_selected_reader() : void
199200

200201
iterator_to_array($extractor->extract(flow_context(config())));
201202
}
203+
204+
public function test_loading_data_from_all_partitions() : void
205+
{
206+
df()
207+
->read(from_excel(__DIR__ . '/../Fixtures/partitioned/group=*/*.xlsx'))
208+
->run(function (Rows $rows) : void {
209+
$this->assertSame(
210+
['group'],
211+
\array_map(
212+
fn (Partition $p) => $p->name,
213+
$rows->partitions()->toArray()
214+
)
215+
);
216+
});
217+
}
218+
219+
public function test_signal_stop() : void
220+
{
221+
$generator = from_excel(Path::realpath(__DIR__ . '/../Fixtures/fixture.xlsx'))
222+
->extract(flow_context(config()));
223+
224+
self::assertTrue($generator->valid());
225+
$generator->next();
226+
self::assertTrue($generator->valid());
227+
$generator->next();
228+
self::assertTrue($generator->valid());
229+
$generator->send(Signal::STOP);
230+
self::assertFalse($generator->valid());
231+
}
202232
}

0 commit comments

Comments
 (0)