Skip to content

Commit 8205f0c

Browse files
committed
[FLINK] Extend raw format to support line-delimiter option
Add a new optional `raw.line-delimiter` config option to the raw format. - RawFormatOptions: add LINE_DELIMITER ConfigOption<String> with no default value - RawFormatFactory: read the option and pass it to schema constructors; register it in optionalOptions() - RawFormatDeserializationSchema: override deserialize(byte[], Collector) to split the message by the delimiter and emit one RowData per part when the delimiter is set; single-record deserialize(byte[]) is unchanged for backward compatibility - RawFormatSerializationSchema: append delimiter bytes to the serialized value when the delimiter is set; null rows are unaffected - RawFormatFactoryTest: add testLineDelimiterOption() covering factory wiring with the new option - RawFormatLineDelimiterTest: new test class covering deserialization splitting (newline, custom delimiter, GBK charset, null message) and serialization appending (newline, custom delimiter, null row)
1 parent c0265db commit 8205f0c

6 files changed

Lines changed: 347 additions & 7 deletions

File tree

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@
2929
import org.apache.flink.table.data.StringData;
3030
import org.apache.flink.table.types.logical.LogicalType;
3131
import org.apache.flink.types.DeserializationException;
32+
import org.apache.flink.util.Collector;
33+
34+
import javax.annotation.Nullable;
3235

3336
import java.io.IOException;
3437
import java.io.Serializable;
3538
import java.nio.charset.Charset;
3639
import java.nio.charset.StandardCharsets;
3740
import java.util.Objects;
41+
import java.util.regex.Pattern;
3842

3943
import static org.apache.flink.util.Preconditions.checkNotNull;
4044

@@ -55,6 +59,8 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row
5559

5660
private final boolean isBigEndian;
5761

62+
@Nullable private final String lineDelimiter;
63+
5864
private final DeserializationRuntimeConverter converter;
5965

6066
private final DataLengthValidator validator;
@@ -64,12 +70,22 @@ public RawFormatDeserializationSchema(
6470
TypeInformation<RowData> producedTypeInfo,
6571
String charsetName,
6672
boolean isBigEndian) {
73+
this(deserializedType, producedTypeInfo, charsetName, isBigEndian, null);
74+
}
75+
76+
public RawFormatDeserializationSchema(
77+
LogicalType deserializedType,
78+
TypeInformation<RowData> producedTypeInfo,
79+
String charsetName,
80+
boolean isBigEndian,
81+
@Nullable String lineDelimiter) {
6782
this.deserializedType = checkNotNull(deserializedType);
6883
this.producedTypeInfo = checkNotNull(producedTypeInfo);
6984
this.converter = createConverter(deserializedType, charsetName, isBigEndian);
7085
this.validator = createDataLengthValidator(deserializedType);
7186
this.charsetName = charsetName;
7287
this.isBigEndian = isBigEndian;
88+
this.lineDelimiter = lineDelimiter;
7389
}
7490

7591
@Override
@@ -92,6 +108,34 @@ public RowData deserialize(byte[] message) throws IOException {
92108
return rowData;
93109
}
94110

111+
@Override
112+
public void deserialize(byte[] message, Collector<RowData> out) throws IOException {
113+
if (lineDelimiter == null) {
114+
// no delimiter: default single-record behavior
115+
RowData row = deserialize(message);
116+
if (row != null) {
117+
out.collect(row);
118+
}
119+
return;
120+
}
121+
122+
if (message == null) {
123+
return;
124+
}
125+
126+
Charset charset = Charset.forName(charsetName);
127+
String decoded = new String(message, charset);
128+
String[] parts = decoded.split(Pattern.quote(lineDelimiter), -1);
129+
for (String part : parts) {
130+
byte[] partBytes = part.getBytes(charset);
131+
validator.validate(partBytes);
132+
Object field = converter.convert(partBytes);
133+
GenericRowData rowData = new GenericRowData(1);
134+
rowData.setField(0, field);
135+
out.collect(rowData);
136+
}
137+
}
138+
95139
@Override
96140
public boolean isEndOfStream(RowData nextElement) {
97141
return false;
@@ -114,12 +158,14 @@ public boolean equals(Object o) {
114158
return producedTypeInfo.equals(that.producedTypeInfo)
115159
&& deserializedType.equals(that.deserializedType)
116160
&& charsetName.equals(that.charsetName)
117-
&& isBigEndian == that.isBigEndian;
161+
&& isBigEndian == that.isBigEndian
162+
&& Objects.equals(lineDelimiter, that.lineDelimiter);
118163
}
119164

120165
@Override
121166
public int hashCode() {
122-
return Objects.hash(producedTypeInfo, deserializedType, charsetName, isBigEndian);
167+
return Objects.hash(
168+
producedTypeInfo, deserializedType, charsetName, isBigEndian, lineDelimiter);
123169
}
124170

125171
// ------------------------------------------------------------------------

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.nio.charset.Charset;
4646
import java.util.Collections;
4747
import java.util.HashSet;
48+
import java.util.Optional;
4849
import java.util.Set;
4950
import java.util.stream.Collectors;
5051

@@ -72,6 +73,7 @@ public Set<ConfigOption<?>> optionalOptions() {
7273
Set<ConfigOption<?>> options = new HashSet<>();
7374
options.add(RawFormatOptions.ENDIANNESS);
7475
options.add(RawFormatOptions.CHARSET);
76+
options.add(RawFormatOptions.LINE_DELIMITER);
7577
return options;
7678
}
7779

@@ -81,6 +83,8 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
8183
FactoryUtil.validateFactoryOptions(this, formatOptions);
8284
final String charsetName = validateAndGetCharsetName(formatOptions);
8385
final boolean isBigEndian = isBigEndian(formatOptions);
86+
final Optional<String> lineDelimiter =
87+
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
8488

8589
return new DecodingFormat<DeserializationSchema<RowData>>() {
8690
@Override
@@ -91,7 +95,11 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
9195
final TypeInformation<RowData> producedTypeInfo =
9296
context.createTypeInformation(producedDataType);
9397
return new RawFormatDeserializationSchema(
94-
fieldType, producedTypeInfo, charsetName, isBigEndian);
98+
fieldType,
99+
producedTypeInfo,
100+
charsetName,
101+
isBigEndian,
102+
lineDelimiter.orElse(null));
95103
}
96104

97105
@Override
@@ -107,14 +115,17 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
107115
FactoryUtil.validateFactoryOptions(this, formatOptions);
108116
final String charsetName = validateAndGetCharsetName(formatOptions);
109117
final boolean isBigEndian = isBigEndian(formatOptions);
118+
final Optional<String> lineDelimiter =
119+
formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
110120

111121
return new EncodingFormat<SerializationSchema<RowData>>() {
112122
@Override
113123
public SerializationSchema<RowData> createRuntimeEncoder(
114124
DynamicTableSink.Context context, DataType consumedDataType) {
115125
final RowType physicalRowType = (RowType) consumedDataType.getLogicalType();
116126
final LogicalType fieldType = validateAndExtractSingleField(physicalRowType);
117-
return new RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
127+
return new RawFormatSerializationSchema(
128+
fieldType, charsetName, isBigEndian, lineDelimiter.orElse(null));
118129
}
119130

120131
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,14 @@ public class RawFormatOptions {
4343
.defaultValue(StandardCharsets.UTF_8.displayName())
4444
.withDescription("Defines the string charset.");
4545

46+
public static final ConfigOption<String> LINE_DELIMITER =
47+
ConfigOptions.key("line-delimiter")
48+
.stringType()
49+
.noDefaultValue()
50+
.withDescription(
51+
"Optional line delimiter. Supports Java escape sequences (e.g. '\\n', '\\r\\n'). "
52+
+ "When set, deserialization splits each message by this delimiter and emits "
53+
+ "one RowData per part. Serialization appends the delimiter after each row's value.");
54+
4655
private RawFormatOptions() {}
4756
}

flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import org.apache.flink.table.types.logical.LogicalType;
2828
import org.apache.flink.table.types.logical.RawType;
2929

30+
import javax.annotation.Nullable;
31+
3032
import java.io.IOException;
3133
import java.io.Serializable;
3234
import java.nio.charset.Charset;
3335
import java.nio.charset.StandardCharsets;
36+
import java.util.Arrays;
3437
import java.util.Objects;
3538

3639
/** Serialization schema that serializes an {@link RowData} object into raw (byte based) value. */
@@ -47,12 +50,23 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
4750

4851
private final boolean isBigEndian;
4952

53+
@Nullable private final String lineDelimiter;
54+
5055
public RawFormatSerializationSchema(
5156
LogicalType serializedType, String charsetName, boolean isBigEndian) {
57+
this(serializedType, charsetName, isBigEndian, null);
58+
}
59+
60+
public RawFormatSerializationSchema(
61+
LogicalType serializedType,
62+
String charsetName,
63+
boolean isBigEndian,
64+
@Nullable String lineDelimiter) {
5265
this.serializedType = serializedType;
5366
this.converter = createConverter(serializedType, charsetName, isBigEndian);
5467
this.charsetName = charsetName;
5568
this.isBigEndian = isBigEndian;
69+
this.lineDelimiter = lineDelimiter;
5670
}
5771

5872
@Override
@@ -63,7 +77,14 @@ public void open(InitializationContext context) throws Exception {
6377
@Override
6478
public byte[] serialize(RowData row) {
6579
try {
66-
return converter.convert(row);
80+
byte[] valueBytes = converter.convert(row);
81+
if (lineDelimiter == null || valueBytes == null) {
82+
return valueBytes;
83+
}
84+
byte[] delimiterBytes = lineDelimiter.getBytes(Charset.forName(charsetName));
85+
byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + delimiterBytes.length);
86+
System.arraycopy(delimiterBytes, 0, result, valueBytes.length, delimiterBytes.length);
87+
return result;
6788
} catch (IOException e) {
6889
throw new RuntimeException("Could not serialize row '" + row + "'. ", e);
6990
}
@@ -80,12 +101,13 @@ public boolean equals(Object o) {
80101
RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
81102
return serializedType.equals(that.serializedType)
82103
&& charsetName.equals(that.charsetName)
83-
&& isBigEndian == that.isBigEndian;
104+
&& isBigEndian == that.isBigEndian
105+
&& Objects.equals(lineDelimiter, that.lineDelimiter);
84106
}
85107

86108
@Override
87109
public int hashCode() {
88-
return Objects.hash(serializedType, charsetName, isBigEndian);
110+
return Objects.hash(serializedType, charsetName, isBigEndian, lineDelimiter);
89111
}
90112

91113
// ------------------------------------------------------------------------

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,29 @@ void testInvalidFieldTypes() {
175175
.hasMessage("The 'raw' format doesn't supports 'MAP<INT, STRING>' as column type.");
176176
}
177177

178+
@Test
179+
void testLineDelimiterOption() {
180+
final Map<String, String> tableOptions =
181+
getModifiedOptions(
182+
options -> {
183+
options.put("raw.line-delimiter", "\n");
184+
});
185+
186+
// test deserialization schema contains line delimiter
187+
final RawFormatDeserializationSchema expectedDeser =
188+
new RawFormatDeserializationSchema(
189+
ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), "UTF-8", true, "\n");
190+
DeserializationSchema<RowData> actualDeser =
191+
createDeserializationSchema(SCHEMA, tableOptions);
192+
assertThat(actualDeser).isEqualTo(expectedDeser);
193+
194+
// test serialization schema contains line delimiter
195+
final RawFormatSerializationSchema expectedSer =
196+
new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), "UTF-8", true, "\n");
197+
SerializationSchema<RowData> actualSer = createSerializationSchema(SCHEMA, tableOptions);
198+
assertThat(actualSer).isEqualTo(expectedSer);
199+
}
200+
178201
// ------------------------------------------------------------------------
179202
// Utilities
180203
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)