Skip to content

Commit 91fae29

Browse files
authored
[core] support csv parse mode (#6350)
1 parent e1dcc85 commit 91fae29

5 files changed

Lines changed: 191 additions & 16 deletions

File tree

docs/content/concepts/spec/fileformat.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,18 @@ Format Options:
419419
<td>String</td>
420420
<td>Null literal string that is interpreted as a null value (disabled by default).</td>
421421
</tr>
422+
<tr>
423+
<td><h5>csv.mode</h5></td>
424+
<td style="word-wrap: break-word;"><code>PERMISSIVE</code></td>
425+
<td>String</td>
426+
<td>Allows a mode for dealing with corrupt records during reading. Currently supported values are <code>'PERMISSIVE'</code>, <code>'DROPMALFORMED'</code> and <code>'FAILFAST'</code>:
427+
<ul>
428+
<li>Option <code>'PERMISSIVE'</code> sets malformed fields to null.</li>
429+
<li>Option <code>'DROPMALFORMED'</code> ignores the whole corrupted records.</li>
430+
<li>Option <code>'FAILFAST'</code> throws an exception when it meets corrupted records.</li>
431+
</ul>
432+
</td>
433+
</tr>
422434
</tbody>
423435
</table>
424436

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class CsvFileReader extends BaseTextFileReader {
4444

4545
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
4646
private static final CsvMapper CSV_MAPPER = new CsvMapper();
47+
private static final InternalRow DROP_ROW = new GenericRow(1);
4748

4849
// Performance optimization: Cache frequently used cast executors
4950
private static final Map<String, CastExecutor<?, ?>> CAST_EXECUTOR_CACHE =
@@ -98,8 +99,25 @@ protected void setupReading() throws IOException {
9899
}
99100

100101
private class CsvRecordIterator extends BaseTextRecordIterator {
101-
// Inherits all functionality from BaseTextRecordIterator
102-
// No additional CSV-specific iterator logic needed
102+
@Override
103+
public InternalRow next() throws IOException {
104+
while (true) {
105+
if (readerClosed) {
106+
return null;
107+
}
108+
String nextLine = bufferedReader.readLine();
109+
if (nextLine == null) {
110+
end = true;
111+
return null;
112+
}
113+
114+
currentPosition++;
115+
InternalRow row = parseLine(nextLine);
116+
if (row != DROP_ROW) {
117+
return row;
118+
}
119+
}
120+
}
103121
}
104122

105123
protected static String[] parseCsvLineToArray(String line, CsvSchema schema)
@@ -148,8 +166,21 @@ private InternalRow parseCsvLine(String line, CsvSchema schema) throws IOExcepti
148166
}
149167

150168
// Optimized field parsing with cached cast executors
151-
projectedValues[i] =
152-
parseFieldOptimized(field.trim(), dataSchemaRowType.getTypeAt(readIndex));
169+
try {
170+
projectedValues[i] =
171+
parseFieldOptimized(
172+
field.trim(), dataSchemaRowType.getTypeAt(readIndex));
173+
} catch (Exception e) {
174+
switch (formatOptions.mode()) {
175+
case PERMISSIVE:
176+
projectedValues[i] = null;
177+
break;
178+
case DROPMALFORMED:
179+
return DROP_ROW;
180+
case FAILFAST:
181+
throw e;
182+
}
183+
}
153184
} else {
154185
projectedValues[i] = null; // Field not present in the CSV line
155186
}

paimon-format/src/main/java/org/apache/paimon/format/csv/CsvOptions.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import org.apache.paimon.options.ConfigOption;
2222
import org.apache.paimon.options.ConfigOptions;
2323
import org.apache.paimon.options.Options;
24+
import org.apache.paimon.options.description.DescribedEnum;
25+
import org.apache.paimon.options.description.InlineElement;
26+
27+
import static org.apache.paimon.options.description.TextElement.text;
2428

2529
/** Options for csv format. */
2630
public class CsvOptions {
@@ -63,12 +67,21 @@ public class CsvOptions {
6367
.defaultValue("")
6468
.withDescription("The literal for null values in CSV format");
6569

70+
public static final ConfigOption<Mode> MODE =
71+
ConfigOptions.key("csv.mode")
72+
.enumType(Mode.class)
73+
.defaultValue(Mode.PERMISSIVE)
74+
.withFallbackKeys("mode")
75+
.withDescription(
76+
"Allows a mode for dealing with corrupt records during reading.");
77+
6678
private final String fieldDelimiter;
6779
private final String lineDelimiter;
6880
private final String nullLiteral;
6981
private final boolean includeHeader;
7082
private final String quoteCharacter;
7183
private final String escapeCharacter;
84+
private final Mode mode;
7285

7386
public CsvOptions(Options options) {
7487
this.fieldDelimiter = options.get(FIELD_DELIMITER);
@@ -77,6 +90,7 @@ public CsvOptions(Options options) {
7790
this.includeHeader = options.get(INCLUDE_HEADER);
7891
this.quoteCharacter = options.get(QUOTE_CHARACTER);
7992
this.escapeCharacter = options.get(ESCAPE_CHARACTER);
93+
this.mode = options.get(MODE);
8094
}
8195

8296
public String fieldDelimiter() {
@@ -102,4 +116,37 @@ public String quoteCharacter() {
102116
public String escapeCharacter() {
103117
return escapeCharacter;
104118
}
119+
120+
public Mode mode() {
121+
return mode;
122+
}
123+
124+
/** Mode for dealing with corrupt records during reading. */
125+
public enum Mode implements DescribedEnum {
126+
PERMISSIVE("permissive", "Sets malformed fields to null."),
127+
DROPMALFORMED("dropmalformed", "Ignores the whole corrupted records."),
128+
FAILFAST("failfast", "Throws an exception when it meets corrupted records.");
129+
130+
private final String value;
131+
private final String description;
132+
133+
Mode(String value, String description) {
134+
this.value = value;
135+
this.description = description;
136+
}
137+
138+
@Override
139+
public String toString() {
140+
return value;
141+
}
142+
143+
@Override
144+
public InlineElement getDescription() {
145+
return text(description);
146+
}
147+
148+
public String getValue() {
149+
return value;
150+
}
151+
}
105152
}

paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import static org.apache.paimon.data.BinaryString.fromString;
5555
import static org.assertj.core.api.Assertions.assertThat;
56+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5657

5758
/** Test for {@link CsvFileFormat}. */
5859
public class CsvFileFormatTest extends FormatReadWriteTest {
@@ -444,6 +445,71 @@ public void testCsvOptionsCombinationWriteRead() throws IOException {
444445
assertThat(result.get(3).isNullAt(3)).isTrue();
445446
}
446447

448+
@Test
449+
public void testCsvModeWriteRead() throws IOException {
450+
RowType rowType =
451+
DataTypes.ROW(DataTypes.INT().notNull(), DataTypes.STRING(), DataTypes.DOUBLE());
452+
453+
// Test PERMISSIVE mode
454+
Options permissiveOptions = new Options();
455+
permissiveOptions.set(CsvOptions.MODE, CsvOptions.Mode.PERMISSIVE);
456+
FileFormat format =
457+
new CsvFileFormatFactory().create(new FormatContext(permissiveOptions, 1024, 1024));
458+
Path testFile = new Path(parent, "test_mode_" + UUID.randomUUID() + ".csv");
459+
460+
fileIO.writeFile(testFile, "1,Alice,aaaa,100.23\n2,Bob,200.75", false);
461+
List<InternalRow> permissiveResult = read(format, rowType, rowType, testFile);
462+
assertThat(permissiveResult).hasSize(2);
463+
assertThat(permissiveResult.get(0).getInt(0)).isEqualTo(1);
464+
assertThat(permissiveResult.get(0).getString(1).toString()).isEqualTo("Alice");
465+
assertThat(permissiveResult.get(0).isNullAt(2)).isTrue();
466+
assertThat(permissiveResult.get(1).getInt(0)).isEqualTo(2);
467+
assertThat(permissiveResult.get(1).getString(1).toString()).isEqualTo("Bob");
468+
assertThat(permissiveResult.get(1).getDouble(2)).isEqualTo(200.75);
469+
470+
// Test DROPMALFORMED mode
471+
Options dropMalformedOptions = new Options();
472+
dropMalformedOptions.set(CsvOptions.MODE, CsvOptions.Mode.DROPMALFORMED);
473+
format =
474+
new CsvFileFormatFactory()
475+
.create(new FormatContext(dropMalformedOptions, 1024, 1024));
476+
List<InternalRow> dropMalformedResult = read(format, rowType, rowType, testFile);
477+
assertThat(dropMalformedResult).hasSize(1);
478+
assertThat(dropMalformedResult.get(0).getInt(0)).isEqualTo(2);
479+
assertThat(dropMalformedResult.get(0).getString(1).toString()).isEqualTo("Bob");
480+
assertThat(dropMalformedResult.get(0).getDouble(2)).isEqualTo(200.75);
481+
482+
// Test FAILFAST mode
483+
Options failFastOptions = new Options();
484+
failFastOptions.set(CsvOptions.MODE, CsvOptions.Mode.FAILFAST);
485+
assertThatThrownBy(
486+
() -> {
487+
read(
488+
new CsvFileFormatFactory()
489+
.create(new FormatContext(failFastOptions, 1024, 1024)),
490+
rowType,
491+
rowType,
492+
testFile);
493+
})
494+
.isInstanceOf(IllegalArgumentException.class);
495+
}
496+
497+
private List<InternalRow> read(
498+
FileFormat format, RowType fullRowType, RowType readRowType, Path testFile)
499+
throws IOException {
500+
try (RecordReader<InternalRow> reader =
501+
format.createReaderFactory(fullRowType, readRowType, new ArrayList<>())
502+
.createReader(
503+
new FormatReaderContext(
504+
fileIO, testFile, fileIO.getFileSize(testFile)))) {
505+
506+
InternalRowSerializer serializer = new InternalRowSerializer(readRowType);
507+
List<InternalRow> result = new ArrayList<>();
508+
reader.forEachRemaining(row -> result.add(serializer.copy(row)));
509+
return result;
510+
}
511+
}
512+
447513
@Override
448514
protected RowType rowTypeForFullTypesTest() {
449515
RowType.Builder builder =
@@ -581,16 +647,6 @@ private List<InternalRow> writeThenRead(
581647
writer.addElement(row);
582648
}
583649
}
584-
try (RecordReader<InternalRow> reader =
585-
format.createReaderFactory(fullRowType, rowType, new ArrayList<>())
586-
.createReader(
587-
new FormatReaderContext(
588-
fileIO, testFile, fileIO.getFileSize(testFile)))) {
589-
590-
InternalRowSerializer serializer = new InternalRowSerializer(rowType);
591-
List<InternalRow> result = new ArrayList<>();
592-
reader.forEachRemaining(row -> result.add(serializer.copy(row)));
593-
return result;
594-
}
650+
return read(format, fullRowType, rowType, testFile);
595651
}
596652
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,34 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
3535
sql("USE test_db")
3636
}
3737

38+
test("PaimonFormatTableRead table: csv mode") {
39+
val tableName = "paimon_format_test_csv_malformed"
40+
withTable(tableName) {
41+
sql(
42+
s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV OPTIONS ('" +
43+
s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
44+
s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
45+
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}') PARTITIONED BY (`ds` bigint)")
46+
val table =
47+
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
48+
val partition = 20250920
49+
val csvFile =
50+
new Path(
51+
table.location(),
52+
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
53+
table.fileIO().writeFile(csvFile, "1|asfasdfsdf|aaaa|10\n2|asfasdfsdf|11", false)
54+
checkAnswer(
55+
sql(s"SELECT * FROM $tableName"),
56+
Seq(Row(1, "asfasdfsdf", null, partition), Row(2, "asfasdfsdf", 11, partition))
57+
)
58+
sql(s"Alter table $tableName SET TBLPROPERTIES ('mode'='dropmalformed')")
59+
checkAnswer(
60+
sql(s"SELECT * FROM $tableName"),
61+
Seq(Row(2, "asfasdfsdf", 11, partition))
62+
)
63+
}
64+
}
65+
3866
test("PaimonFormatTableRead table: csv with field-delimiter") {
3967
val tableName = "paimon_format_test_csv_options"
4068
withTable(tableName) {
@@ -53,7 +81,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
5381
table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
5482
checkAnswer(
5583
sql(s"SELECT * FROM $tableName"),
56-
Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition)))
84+
Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition))
85+
)
5786
}
5887
}
5988

0 commit comments

Comments
 (0)