Skip to content

[FLINK-39401] Extend raw format to support line-delimiter option#27897

Open
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter
Open

[FLINK-39401] Extend raw format to support line-delimiter option#27897
featzhang wants to merge 1 commit intoapache:masterfrom
featzhang:feature/raw-format-line-delimiter

Conversation

@featzhang
Copy link
Copy Markdown
Member

Summary

This PR extends the raw format to support a new optional raw.line-delimiter config option.

When raw.line-delimiter is set:

  • Deserialization: each incoming message is decoded to a string using raw.charset, split by the delimiter (String.split(Pattern.quote(delimiter), -1)), and one RowData is emitted per segment via deserialize(byte[], Collector<T>).
  • Serialization: the delimiter bytes are appended to the serialized value.

When raw.line-delimiter is not set, all existing behavior is preserved exactly (backward compatible).

Example SQL

CREATE TABLE nginx_log (log STRING) WITH (
  'connector'          = 'kafka',
  'topic'              = 'nginx_log',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format'             = 'raw',
  'raw.line-delimiter' = '\n'
);
-- Each Kafka message "line1\nline2\nline3" produces 3 rows
SELECT log FROM nginx_log;

Changes

Class Change
RawFormatOptions Add LINE_DELIMITER ConfigOption<String> with no default value
RawFormatFactory Read new option; pass to schema builders; register in optionalOptions()
RawFormatDeserializationSchema Override deserialize(byte[], Collector) to split by delimiter when set; add lineDelimiter field to equals/hashCode
RawFormatSerializationSchema Append delimiter bytes when set; add lineDelimiter field to equals/hashCode
RawFormatFactoryTest Add testLineDelimiterOption()
RawFormatLineDelimiterTest New test class: 9 tests covering splitting, GBK charset, null message, serialization appending, null row

Test Plan

  • RawFormatLineDelimiterTest (9 tests):
    • No delimiter → single row (regression)
    • \n delimiter → 3 rows
    • Custom multi-char delimiter || → 3 rows
    • Null message with delimiter → 0 rows
    • GBK charset with \n delimiter → correct splitting
    • Serialization without delimiter → no change
    • Serialization with \n → appends \n
    • Serialization with || → appends ||
    • Serialization of null row → returns null
  • RawFormatFactoryTest.testLineDelimiterOption(): verifies factory produces schemas with correct delimiter
  • All 50 existing raw format tests continue to pass

Add a new optional `raw.line-delimiter` config option to the raw format.

- RawFormatOptions: add LINE_DELIMITER ConfigOption<String> with no default value
- RawFormatFactory: read the option and pass it to schema constructors;
  register it in optionalOptions()
- RawFormatDeserializationSchema: override deserialize(byte[], Collector)
  to split the message by the delimiter and emit one RowData per part
  when the delimiter is set; single-record deserialize(byte[]) is
  unchanged for backward compatibility
- RawFormatSerializationSchema: append delimiter bytes to the serialized
  value when the delimiter is set; null rows are unaffected
- RawFormatFactoryTest: add testLineDelimiterOption() covering factory
  wiring with the new option
- RawFormatLineDelimiterTest: new test class covering deserialization
  splitting (newline, custom delimiter, GBK charset, null message) and
  serialization appending (newline, custom delimiter, null row)
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 5, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@featzhang featzhang changed the title [FLINK] Extend raw format to support line-delimiter option [FLINK-39401] Extend raw format to support line-delimiter option Apr 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants