Skip to content

Commit 7f0cd6d

Browse files
authored
Fixed reading multine strings in CSV files (#1740)
* Fixed reading multine strings in CSV files * Moved BOM removal to CSV Line Reader * Move Rows Normalization to standalone class
1 parent 31d4580 commit 7f0cd6d

13 files changed

Lines changed: 10859 additions & 52 deletions

File tree

src/adapter/etl-adapter-csv/src/Flow/ETL/Adapter/CSV/CSVExtractor.php

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ public function extract(FlowContext $context) : \Generator
5252

5353
$headers = [];
5454
$headersCount = 0;
55+
$streamUri = $shouldPutInputIntoRows ? $stream->path()->uri() : null;
56+
$partitions = $stream->path()->partitions();
5557

56-
foreach ($stream->readLines(length: $this->charactersReadInLine) as $line => $csvLine) {
57-
if ($line === 0 && $this->removeBOM) {
58-
$csvLine = preg_replace('/^(\xEF\xBB\xBF|\xFF\xFE|\xFE\xFF|\xFF\xFE\x00\x00|\x00\x00\xFE\xFF)/', '', $csvLine);
59-
}
58+
$csvLineReader = new CSVLineReader($enclosure, $this->charactersReadInLine, $this->removeBOM);
59+
$rowNormalizer = new CSVRowNormalizer($this->emptyToNull);
6060

61-
/** @var non-empty-list<null|string> $rowData */
62-
$rowData = \str_getcsv((string) $csvLine, $separator, $enclosure, $escape);
61+
foreach ($csvLineReader->readLines($stream) as $csvLine) {
62+
$rowData = \str_getcsv($csvLine, $separator, $enclosure, $escape);
6363
$rowDataCount = \count($rowData);
6464

6565
if ([] === $headers) {
@@ -71,36 +71,19 @@ public function extract(FlowContext $context) : \Generator
7171
continue;
7272
}
7373

74-
$headers = \array_map(fn (int $e) : string => 'e' . \str_pad((string) $e, 2, '0', STR_PAD_LEFT), \range(0, \count($rowData) - 1));
75-
$headers = $this->mapHeaders($headers);
76-
$headersCount = \count($headers);
77-
}
78-
79-
// Expand columns to the size of the previous row
80-
for ($i = $rowDataCount; $i < $headersCount; $i++) {
81-
$rowData[$i] = $this->emptyToNull ? null : '';
82-
}
83-
84-
// Cut columns to the size of the header row
85-
if ($rowDataCount > $headersCount) {
86-
$rowData = \array_slice($rowData, 0, $headersCount);
74+
$headers = $this->generateAutoHeaders($rowDataCount);
75+
$headersCount = $rowDataCount;
8776
}
8877

89-
if ($this->emptyToNull) {
90-
foreach ($rowData as $i => $data) {
91-
if ($data === '') {
92-
$rowData[$i] = null;
93-
}
94-
}
95-
}
78+
$rowData = $rowNormalizer->normalize($rowData, $headersCount);
9679

9780
$row = \array_combine($headers, $rowData);
9881

99-
if ($shouldPutInputIntoRows) {
100-
$row['_input_file_uri'] = $stream->path()->uri();
82+
if ($streamUri !== null) {
83+
$row['_input_file_uri'] = $streamUri;
10184
}
10285

103-
$signal = yield array_to_rows($row, $context->entryFactory(), $stream->path()->partitions(), $this->schema);
86+
$signal = yield array_to_rows($row, $context->entryFactory(), $partitions, $this->schema);
10487
$this->incrementReturnedRows();
10588

10689
if ($signal === Signal::STOP || $this->reachedLimit()) {
@@ -185,6 +168,20 @@ public function withSeparator(string $separator) : self
185168
return $this;
186169
}
187170

171+
/**
172+
* @return array<int, string>
173+
*/
174+
private function generateAutoHeaders(int $count) : array
175+
{
176+
$headers = [];
177+
178+
for ($i = 0; $i < $count; $i++) {
179+
$headers[$i] = 'e' . \str_pad((string) $i, 2, '0', STR_PAD_LEFT);
180+
}
181+
182+
return $headers;
183+
}
184+
188185
/**
189186
* @param array<array-key, mixed> $headers
190187
*
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\CSV;
6+
7+
use Flow\Filesystem\SourceStream;
8+
9+
final readonly class CSVLineReader
10+
{
11+
/**
12+
* @param null|int<1, max> $charactersReadInLine
13+
*/
14+
public function __construct(
15+
private string $enclosure,
16+
private ?int $charactersReadInLine = null,
17+
private bool $removeBOM = true,
18+
) {
19+
}
20+
21+
/**
22+
* @return \Generator<int, string>
23+
*/
24+
public function readLines(SourceStream $stream) : \Generator
25+
{
26+
$lineNumber = 0;
27+
$buffer = '';
28+
29+
foreach ($stream->readLines(length: $this->charactersReadInLine) as $rawLine) {
30+
$buffer .= $rawLine;
31+
32+
if (!\str_contains($buffer, $this->enclosure)) {
33+
yield $this->removeBOM && $lineNumber === 0 ? $this->removeBOMFromLine(\rtrim($buffer, "\r\n")) : rtrim($buffer, "\r\n");
34+
$lineNumber++;
35+
$buffer = '';
36+
} else {
37+
if ($this->isCompleteCSVRecord($buffer)) {
38+
yield $this->removeBOM && $lineNumber === 0 ? $this->removeBOMFromLine(\rtrim($buffer, "\r\n")) : \rtrim($buffer, "\r\n");
39+
$lineNumber++;
40+
$buffer = '';
41+
} else {
42+
$buffer .= "\n";
43+
}
44+
}
45+
}
46+
47+
if ($buffer !== '') {
48+
yield $this->removeBOM && $lineNumber === 0 ? $this->removeBOMFromLine(\rtrim($buffer, "\r\n")) : \rtrim($buffer, "\r\n");
49+
}
50+
}
51+
52+
/**
53+
* Check if the current buffer contains a complete CSV record
54+
* by counting enclosures and ensuring they are properly paired.
55+
*/
56+
private function isCompleteCSVRecord(string $buffer) : bool
57+
{
58+
if (!\str_contains($buffer, $this->enclosure)) {
59+
return true;
60+
}
61+
62+
return \substr_count($buffer, $this->enclosure) % 2 === 0;
63+
}
64+
65+
/**
66+
* Remove Byte Order Mark (BOM) from the beginning of a line if present.
67+
*/
68+
private function removeBOMFromLine(string $line) : string
69+
{
70+
if (\str_starts_with($line, "\xEF\xBB\xBF")) {
71+
return \substr($line, 3);
72+
}
73+
74+
if (\str_starts_with($line, "\xFF\xFE\x00\x00")) {
75+
return \substr($line, 4);
76+
}
77+
78+
if (\str_starts_with($line, "\x00\x00\xFE\xFF")) {
79+
return \substr($line, 4);
80+
}
81+
82+
if (\str_starts_with($line, "\xFF\xFE")) {
83+
return \substr($line, 2);
84+
}
85+
86+
if (\str_starts_with($line, "\xFE\xFF")) {
87+
return \substr($line, 2);
88+
}
89+
90+
return $line;
91+
}
92+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\CSV;
6+
7+
final readonly class CSVRowNormalizer
8+
{
9+
public function __construct(private bool $emptyToNull = true)
10+
{
11+
}
12+
13+
/**
14+
* Normalize CSV row data to match the expected number of headers.
15+
* - Expands rows with fewer columns by adding fill values
16+
* - Truncates rows with more columns to match header count
17+
* - Converts empty strings to null if emptyToNull is enabled.
18+
*
19+
* @param array<int, null|string> $rowData
20+
* @param int $headersCount
21+
*
22+
* @return array<int, null|string>
23+
*/
24+
public function normalize(array $rowData, int $headersCount) : array
25+
{
26+
$rowDataCount = \count($rowData);
27+
28+
if ($rowDataCount < $headersCount) {
29+
$fillValue = $this->emptyToNull ? null : '';
30+
31+
for ($i = $rowDataCount; $i < $headersCount; $i++) {
32+
$rowData[$i] = $fillValue;
33+
}
34+
} elseif ($rowDataCount > $headersCount) {
35+
$rowData = \array_slice($rowData, 0, $headersCount, true);
36+
}
37+
38+
if ($this->emptyToNull) {
39+
foreach ($rowData as $i => $data) {
40+
if ($data === '') {
41+
$rowData[$i] = null;
42+
}
43+
}
44+
}
45+
46+
return $rowData;
47+
}
48+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"name","description"
2+
"John ""The Great""","Description with ""quotes"""
3+
"Jane","Normal description"
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"id","content"
2+
"0","Line 0 content
3+
Line 0 content
4+
Line 0 content
5+
Line 0 content
6+
Line 0 content
7+
Line 0 content
8+
Line 0 content
9+
Line 0 content
10+
Line 0 content
11+
Line 0 content"
12+
"1","Line 1 content
13+
Line 1 content
14+
Line 1 content
15+
Line 1 content
16+
Line 1 content
17+
Line 1 content
18+
Line 1 content
19+
Line 1 content
20+
Line 1 content
21+
Line 1 content"
22+
"2","Line 2 content
23+
Line 2 content
24+
Line 2 content
25+
Line 2 content
26+
Line 2 content
27+
Line 2 content
28+
Line 2 content
29+
Line 2 content
30+
Line 2 content
31+
Line 2 content"
32+
"3","Line 3 content
33+
Line 3 content
34+
Line 3 content
35+
Line 3 content
36+
Line 3 content
37+
Line 3 content
38+
Line 3 content
39+
Line 3 content
40+
Line 3 content
41+
Line 3 content"
42+
"4","Line 4 content
43+
Line 4 content
44+
Line 4 content
45+
Line 4 content
46+
Line 4 content
47+
Line 4 content
48+
Line 4 content
49+
Line 4 content
50+
Line 4 content
51+
Line 4 content"
52+
"5","Line 5 content
53+
Line 5 content
54+
Line 5 content
55+
Line 5 content
56+
Line 5 content
57+
Line 5 content
58+
Line 5 content
59+
Line 5 content
60+
Line 5 content
61+
Line 5 content"
62+
"6","Line 6 content
63+
Line 6 content
64+
Line 6 content
65+
Line 6 content
66+
Line 6 content
67+
Line 6 content
68+
Line 6 content
69+
Line 6 content
70+
Line 6 content
71+
Line 6 content"
72+
"7","Line 7 content
73+
Line 7 content
74+
Line 7 content
75+
Line 7 content
76+
Line 7 content
77+
Line 7 content
78+
Line 7 content
79+
Line 7 content
80+
Line 7 content
81+
Line 7 content"
82+
"8","Line 8 content
83+
Line 8 content
84+
Line 8 content
85+
Line 8 content
86+
Line 8 content
87+
Line 8 content
88+
Line 8 content
89+
Line 8 content
90+
Line 8 content
91+
Line 8 content"
92+
"9","Line 9 content
93+
Line 9 content
94+
Line 9 content
95+
Line 9 content
96+
Line 9 content
97+
Line 9 content
98+
Line 9 content
99+
Line 9 content
100+
Line 9 content
101+
Line 9 content"

0 commit comments

Comments
 (0)